[Checkins] SVN: relstorage/trunk/relstorage/ Checkpoint: refactoring adapters from inheritance to composition.

Shane Hathaway shane at hathawaymix.org
Wed Sep 23 17:12:58 EDT 2009


Log message for revision 104464:
  Checkpoint: refactoring adapters from inheritance to composition.
  

Changed:
  D   relstorage/trunk/relstorage/adapters/abstract.py
  A   relstorage/trunk/relstorage/adapters/connmanager.py
  A   relstorage/trunk/relstorage/adapters/dbiter.py
  D   relstorage/trunk/relstorage/adapters/historyfree.py
  D   relstorage/trunk/relstorage/adapters/historypreserving.py
  A   relstorage/trunk/relstorage/adapters/interfaces.py
  A   relstorage/trunk/relstorage/adapters/loadstore.py
  A   relstorage/trunk/relstorage/adapters/locker.py
  U   relstorage/trunk/relstorage/adapters/mysql.py
  U   relstorage/trunk/relstorage/adapters/oracle.py
  A   relstorage/trunk/relstorage/adapters/packundo.py
  A   relstorage/trunk/relstorage/adapters/poller.py
  U   relstorage/trunk/relstorage/adapters/postgresql.py
  A   relstorage/trunk/relstorage/adapters/schema.py
  A   relstorage/trunk/relstorage/adapters/scriptrunner.py
  A   relstorage/trunk/relstorage/adapters/stats.py
  A   relstorage/trunk/relstorage/adapters/txncontrol.py
  U   relstorage/trunk/relstorage/relstorage.py

-=-
Deleted: relstorage/trunk/relstorage/adapters/abstract.py
===================================================================
--- relstorage/trunk/relstorage/adapters/abstract.py	2009-09-23 18:16:28 UTC (rev 104463)
+++ relstorage/trunk/relstorage/adapters/abstract.py	2009-09-23 21:12:58 UTC (rev 104464)
@@ -1,414 +0,0 @@
-##############################################################################
-#
-# Copyright (c) 2008 Zope Foundation and Contributors.
-# All Rights Reserved.
-#
-# This software is subject to the provisions of the Zope Public License,
-# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
-# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
-# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
-# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
-# FOR A PARTICULAR PURPOSE.
-#
-##############################################################################
-"""Code common to most adapters."""
-
-import logging
-import time
-
-try:
-    from hashlib import md5
-except ImportError:
-    from md5 import new as md5
-
-
-log = logging.getLogger(__name__)
-
-# Notes about adapters:
-#
-# An adapter must not hold a connection, cursor, or database state, because
-# RelStorage opens multiple concurrent connections using a single adapter
-# instance.
-# Within the context of an adapter, all OID and TID values are integers,
-# not binary strings, except as noted.
-
-class AbstractAdapter(object):
-    """Common code for a database adapter.
-    """
-
-    keep_history = None  # True or False
-    verify_sane_database = False
-
-    # _script_vars contains replacements for statements in scripts.
-    # These are correct for PostgreSQL and MySQL but not for Oracle.
-    _script_vars = {
-        'TRUE':         'TRUE',
-        'FALSE':        'FALSE',
-        'OCTET_LENGTH': 'OCTET_LENGTH',
-        'TRUNCATE':     'TRUNCATE',
-        'oid':          '%(oid)s',
-        'tid':          '%(tid)s',
-        'pack_tid':     '%(pack_tid)s',
-        'undo_tid':     '%(undo_tid)s',
-        'self_tid':     '%(self_tid)s',
-        'min_tid':      '%(min_tid)s',
-        'max_tid':      '%(max_tid)s',
-    }
-
-    def _run_script_stmt(self, cursor, generic_stmt, generic_params=()):
-        """Execute a statement from a script with the given parameters.
-
-        params should be either an empty tuple (no parameters) or
-        a map.
-
-        Subclasses may override this.
-        The input statement is generic and needs to be transformed
-        into a database-specific statement.
-        """
-        stmt = generic_stmt % self._script_vars
-        try:
-            cursor.execute(stmt, generic_params)
-        except:
-            log.warning("script statement failed: %r; parameters: %r",
-                stmt, generic_params)
-            raise
-
-    def _run_script(self, cursor, script, params=()):
-        """Execute a series of statements in the database.
-
-        params should be either an empty tuple (no parameters) or
-        a map.
-
-        The statements are transformed by _run_script_stmt
-        before execution.
-        """
-        lines = []
-        for line in script.split('\n'):
-            line = line.strip()
-            if not line or line.startswith('--'):
-                continue
-            if line.endswith(';'):
-                line = line[:-1]
-                lines.append(line)
-                stmt = '\n'.join(lines)
-                self._run_script_stmt(cursor, stmt, params)
-                lines = []
-            else:
-                lines.append(line)
-        if lines:
-            stmt = '\n'.join(lines)
-            self._run_script_stmt(cursor, stmt, params)
-
-    def _run_many(self, cursor, stmt, items):
-        """Execute a statement repeatedly.  Items should be a list of tuples.
-
-        stmt should use '%s' parameter format.  Overridden by adapters
-        that use a different parameter format.
-        """
-        cursor.executemany(stmt, items)
-
-    def _open_and_call(self, callback):
-        """Call a function with an open connection and cursor.
-
-        If the function returns, commits the transaction and returns the
-        result returned by the function.
-        If the function raises an exception, aborts the transaction
-        then propagates the exception.
-        """
-        conn, cursor = self.open()
-        try:
-            try:
-                res = callback(conn, cursor)
-            except:
-                conn.rollback()
-                raise
-            else:
-                conn.commit()
-                return res
-        finally:
-            self.close(conn, cursor)
-
-    def md5sum(self, data):
-        if data is not None:
-            return md5(data).hexdigest()
-        else:
-            # George Bailey object
-            return None
-
-    def iter_objects(self, cursor, tid):
-        """Iterate over object states in a transaction.
-
-        Yields (oid, prev_tid, state) for each object state.
-        """
-        stmt = """
-        SELECT zoid, state
-        FROM object_state
-        WHERE tid = %(tid)s
-        ORDER BY zoid
-        """
-        self._run_script_stmt(cursor, stmt, {'tid': tid})
-        for oid, state in cursor:
-            if hasattr(state, 'read'):
-                # Oracle
-                state = state.read()
-            yield oid, state
-
-    def open_for_pre_pack(self):
-        """Open a connection to be used for the pre-pack phase.
-        Returns (conn, cursor).
-
-        Subclasses may override this.
-        """
-        return self.open()
-
-    def _hold_commit_lock(self, cursor):
-        """Hold the commit lock for packing"""
-        cursor.execute("LOCK TABLE commit_lock IN EXCLUSIVE MODE")
-
-    def _release_commit_lock(self, cursor):
-        """Release the commit lock during packing"""
-        # no action needed
-        pass
-
-
-    def fill_object_refs(self, conn, cursor, get_references):
-        """Update the object_refs table by analyzing new transactions."""
-        if self.keep_history:
-            stmt = """
-            SELECT transaction.tid
-            FROM transaction
-                LEFT JOIN object_refs_added
-                    ON (transaction.tid = object_refs_added.tid)
-            WHERE object_refs_added.tid IS NULL
-            ORDER BY transaction.tid
-            """
-        else:
-            stmt = """
-            SELECT transaction.tid
-            FROM (SELECT DISTINCT tid FROM object_state) AS transaction
-                LEFT JOIN object_refs_added
-                    ON (transaction.tid = object_refs_added.tid)
-            WHERE object_refs_added.tid IS NULL
-            ORDER BY transaction.tid
-            """
-
-        self._run_script_stmt(cursor, stmt)
-        tids = [tid for (tid,) in cursor]
-        if tids:
-            added = 0
-            log.info("discovering references from objects in %d "
-                "transaction(s)" % len(tids))
-            for tid in tids:
-                added += self._add_refs_for_tid(cursor, tid, get_references)
-                if added >= 10000:
-                    # save the work done so far
-                    conn.commit()
-                    added = 0
-            if added:
-                conn.commit()
-
-    def _add_refs_for_tid(self, cursor, tid, get_references):
-        """Fill object_refs with all states for a transaction.
-
-        Returns the number of references added.
-        """
-        log.debug("pre_pack: transaction %d: computing references ", tid)
-        from_count = 0
-
-        stmt = """
-        SELECT zoid, state
-        FROM object_state
-        WHERE tid = %(tid)s
-        """
-        self._run_script_stmt(cursor, stmt, {'tid': tid})
-
-        add_rows = []  # [(from_oid, tid, to_oid)]
-        for from_oid, state in cursor:
-            if hasattr(state, 'read'):
-                # Oracle
-                state = state.read()
-            if state:
-                from_count += 1
-                try:
-                    to_oids = get_references(str(state))
-                except:
-                    log.error("pre_pack: can't unpickle "
-                        "object %d in transaction %d; state length = %d" % (
-                        from_oid, tid, len(state)))
-                    raise
-                if self.keep_history:
-                    for to_oid in to_oids:
-                        add_rows.append((from_oid, tid, to_oid))
-                else:
-                    for to_oid in to_oids:
-                        add_rows.append((from_oid, to_oid))
-
-        if self.keep_history:
-            stmt = """
-            INSERT INTO object_ref (zoid, tid, to_zoid)
-            VALUES (%s, %s, %s)
-            """
-            self._run_many(cursor, stmt, add_rows)
-
-        else:
-            stmt = """
-            DELETE FROM object_ref
-            WHERE zoid in (
-                SELECT zoid
-                FROM object_state
-                WHERE tid = %(tid)s
-                )
-            """
-            self._run_script(cursor, stmt, {'tid': tid})
-
-            stmt = """
-            INSERT INTO object_ref (zoid, to_zoid)
-            VALUES (%s, %s)
-            """
-            self._run_many(cursor, stmt, add_rows)
-
-        # The references have been computed for this transaction.
-        stmt = """
-        INSERT INTO object_refs_added (tid)
-        VALUES (%(tid)s)
-        """
-        self._run_script_stmt(cursor, stmt, {'tid': tid})
-
-        to_count = len(add_rows)
-        log.debug("pre_pack: transaction %d: has %d reference(s) "
-            "from %d object(s)", tid, to_count, from_count)
-        return to_count
-
-
-    def _visit_all_references(self, cursor):
-        """Visit all references in pack_object and set the keep flags.
-        """
-        # Each of the objects to be kept might refer to other objects.
-        # Mark the referenced objects to be kept as well. Do this
-        # repeatedly until all references have been satisfied.
-        pass_num = 1
-        while True:
-            log.info("pre_pack: following references, pass %d", pass_num)
-
-            # Make a list of all parent objects that still need to be
-            # visited. Then set pack_object.visited for all pack_object
-            # rows with keep = true.
-            stmt = """
-            %(TRUNCATE)s temp_pack_visit;
-
-            INSERT INTO temp_pack_visit (zoid, keep_tid)
-            SELECT zoid, keep_tid
-            FROM pack_object
-            WHERE keep = %(TRUE)s
-                AND visited = %(FALSE)s;
-
-            UPDATE pack_object SET visited = %(TRUE)s
-            WHERE keep = %(TRUE)s
-                AND visited = %(FALSE)s
-            """
-            self._run_script(cursor, stmt)
-            visit_count = cursor.rowcount
-
-            if self.verify_sane_database:
-                # Verify the update actually worked.
-                # MySQL 5.1.23 fails this test; 5.1.24 passes.
-                stmt = """
-                SELECT 1
-                FROM pack_object
-                WHERE keep = %(TRUE)s AND visited = %(FALSE)s
-                """
-                self._run_script_stmt(cursor, stmt)
-                if list(cursor):
-                    raise AssertionError(
-                        "database failed to update pack_object")
-
-            log.debug("pre_pack: checking references from %d object(s)",
-                visit_count)
-
-            # Visit the children of all parent objects that were
-            # just visited.
-            stmt = self._scripts['pre_pack_follow_child_refs']
-            self._run_script(cursor, stmt)
-            found_count = cursor.rowcount
-
-            log.debug("pre_pack: found %d more referenced object(s) in "
-                "pass %d", found_count, pass_num)
-            if not found_count:
-                # No new references detected.
-                break
-            else:
-                pass_num += 1
-
-
-    def _pause_pack(self, sleep, options, start):
-        """Pause packing to allow concurrent commits."""
-        elapsed = time.time() - start
-        if elapsed == 0.0:
-            # Compensate for low timer resolution by
-            # assuming that at least 10 ms elapsed.
-            elapsed = 0.01
-        duty_cycle = options.pack_duty_cycle
-        if duty_cycle > 0.0 and duty_cycle < 1.0:
-            delay = min(options.pack_max_delay,
-                elapsed * (1.0 / duty_cycle - 1.0))
-            if delay > 0:
-                log.debug('pack: sleeping %.4g second(s)', delay)
-                sleep(delay)
-
-
-    def poll_invalidations(self, conn, cursor, prev_polled_tid, ignore_tid):
-        """Polls for new transactions.
-
-        conn and cursor must have been created previously by open_for_load().
-        prev_polled_tid is the tid returned at the last poll, or None
-        if this is the first poll.  If ignore_tid is not None, changes
-        committed in that transaction will not be included in the list
-        of changed OIDs.
-
-        Returns (changed_oids, new_polled_tid).
-        """
-        # find out the tid of the most recent transaction.
-        cursor.execute(self._poll_query)
-        new_polled_tid = cursor.fetchone()[0]
-
-        if prev_polled_tid is None:
-            # This is the first time the connection has polled.
-            return None, new_polled_tid
-
-        if new_polled_tid == prev_polled_tid:
-            # No transactions have been committed since prev_polled_tid.
-            return (), new_polled_tid
-
-        if self.keep_history:
-            stmt = "SELECT 1 FROM transaction WHERE tid = %(tid)s"
-        else:
-            stmt = "SELECT 1 FROM object_state WHERE tid <= %(tid)s LIMIT 1"
-        cursor.execute(intern(stmt % self._script_vars),
-            {'tid': prev_polled_tid})
-        rows = cursor.fetchall()
-        if not rows:
-            # Transaction not found; perhaps it has been packed.
-            # The connection cache needs to be cleared.
-            return None, new_polled_tid
-
-        # Get the list of changed OIDs and return it.
-        if ignore_tid is None:
-            stmt = """
-            SELECT zoid
-            FROM current_object
-            WHERE tid > %(tid)s
-            """
-            cursor.execute(intern(stmt % self._script_vars),
-                {'tid': prev_polled_tid})
-        else:
-            stmt = """
-            SELECT zoid
-            FROM current_object
-            WHERE tid > %(tid)s
-                AND tid != %(self_tid)s
-            """
-            cursor.execute(intern(stmt % self._script_vars),
-                {'tid': prev_polled_tid, 'self_tid': ignore_tid})
-        oids = [oid for (oid,) in cursor]
-
-        return oids, new_polled_tid

Added: relstorage/trunk/relstorage/adapters/connmanager.py
===================================================================
--- relstorage/trunk/relstorage/adapters/connmanager.py	                        (rev 0)
+++ relstorage/trunk/relstorage/adapters/connmanager.py	2009-09-23 21:12:58 UTC (rev 104464)
@@ -0,0 +1,63 @@
+##############################################################################
+#
+# Copyright (c) 2009 Zope Foundation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+
+from relstorage.adapters.interfaces import IConnectionManager
+from zope.interface import implements
+
+class AbstractConnectionManager(object):
+    """Abstract base class for connection management.
+
+    Responsible for opening and closing database connections.
+    """
+    implements(IConnectionManager)
+
+    # close_exceptions contains the exception types to ignore
+    # when the adapter attempts to close a database connection.
+    close_exceptions = ()
+
+    def open(self):
+        """Open a database connection and return (conn, cursor)."""
+        raise NotImplementedError
+
+    def close(self, conn, cursor):
+        """Close a connection and cursor, ignoring certain errors.
+        """
+        for obj in (cursor, conn):
+            if obj is not None:
+                try:
+                    obj.close()
+                except self.close_exceptions:
+                    pass
+
+    def open_and_call(self, callback):
+        """Call a function with an open connection and cursor.
+
+        If the function returns, commits the transaction and returns the
+        result returned by the function.
+        If the function raises an exception, aborts the transaction
+        then propagates the exception.
+        """
+        conn, cursor = self.open()
+        try:
+            try:
+                res = callback(conn, cursor)
+            except:
+                conn.rollback()
+                raise
+            else:
+                conn.commit()
+                return res
+        finally:
+            self.close(conn, cursor)
+

Added: relstorage/trunk/relstorage/adapters/dbiter.py
===================================================================
--- relstorage/trunk/relstorage/adapters/dbiter.py	                        (rev 0)
+++ relstorage/trunk/relstorage/adapters/dbiter.py	2009-09-23 21:12:58 UTC (rev 104464)
@@ -0,0 +1,187 @@
+##############################################################################
+#
+# Copyright (c) 2009 Zope Foundation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+
+from relstorage.adapters.interfaces import IDatabaseIterator
+from zope.interface import implements
+
+class DatabaseIterator(object):
+    """Abstract base class for database iteration.
+    """
+
+    def __init__(self, runner):
+        self.runner = runner
+
+    def iter_objects(self, cursor, tid):
+        """Iterate over object states in a transaction.
+
+        Yields (oid, prev_tid, state) for each object state.
+        """
+        stmt = """
+        SELECT zoid, state
+        FROM object_state
+        WHERE tid = %(tid)s
+        ORDER BY zoid
+        """
+        self.runner.run_script_stmt(cursor, stmt, {'tid': tid})
+        for oid, state in cursor:
+            if hasattr(state, 'read'):
+                # Oracle
+                state = state.read()
+            yield oid, state
+
+
+class HistoryPreservingDatabaseIterator(DatabaseIterator):
+    implements(IDatabaseIterator)
+
+    def _transaction_iterator(self, cursor):
+        """Iterate over a list of transactions returned from the database.
+
+        Each row begins with (tid, username, description, extension)
+        and may have other columns.
+        """
+        for row in cursor:
+            tid, username, description, ext = row[:4]
+            if username is None:
+                username = ''
+            else:
+                username = str(username)
+            if description is None:
+                description = ''
+            else:
+                description = str(description)
+            if ext is None:
+                ext = ''
+            else:
+                ext = str(ext)
+            yield (tid, username, description, ext) + tuple(row[4:])
+
+
+    def iter_transactions(self, cursor):
+        """Iterate over the transaction log, newest first.
+
+        Skips packed transactions.
+        Yields (tid, username, description, extension) for each transaction.
+        """
+        stmt = """
+        SELECT tid, username, description, extension
+        FROM transaction
+        WHERE packed = %(FALSE)s
+            AND tid != 0
+        ORDER BY tid DESC
+        """
+        self.runner.run_script_stmt(cursor, stmt)
+        return self._transaction_iterator(cursor)
+
+
+    def iter_transactions_range(self, cursor, start=None, stop=None):
+        """Iterate over the transactions in the given range, oldest first.
+
+        Includes packed transactions.
+        Yields (tid, username, description, extension, packed)
+        for each transaction.
+        """
+        stmt = """
+        SELECT tid, username, description, extension,
+            CASE WHEN packed = %(TRUE)s THEN 1 ELSE 0 END
+        FROM transaction
+        WHERE tid >= 0
+        """
+        if start is not None:
+            stmt += " AND tid >= %(min_tid)s"
+        if stop is not None:
+            stmt += " AND tid <= %(max_tid)s"
+        stmt += " ORDER BY tid"
+        self.runner.run_script_stmt(cursor, stmt,
+            {'min_tid': start, 'max_tid': stop})
+        return self._transaction_iterator(cursor)
+
+
+    def iter_object_history(self, cursor, oid):
+        """Iterate over an object's history.
+
+        Raises KeyError if the object does not exist.
+        Yields (tid, username, description, extension, pickle_size)
+        for each modification.
+        """
+        stmt = """
+        SELECT 1 FROM current_object WHERE zoid = %(oid)s
+        """
+        self.runner.run_script_stmt(cursor, stmt, {'oid': oid})
+        if not cursor.fetchall():
+            raise KeyError(oid)
+
+        stmt = """
+        SELECT tid, username, description, extension, %(OCTET_LENGTH)s(state)
+        FROM transaction
+            JOIN object_state USING (tid)
+        WHERE zoid = %(oid)s
+            AND packed = %(FALSE)s
+        ORDER BY tid DESC
+        """
+        self.runner.run_script_stmt(cursor, stmt, {'oid': oid})
+        return self._transaction_iterator(cursor)
+
+
+class HistoryFreeDatabaseIterator(DatabaseIterator):
+    implements(IDatabaseIterator)
+
+    def iter_transactions(self, cursor):
+        """Iterate over the transaction log, newest first.
+
+        Skips packed transactions.
+        Yields (tid, username, description, extension) for each transaction.
+        """
+        stmt = """
+        SELECT DISTINCT tid
+        FROM object_state
+        ORDER BY tid DESC
+        """
+        self.runner.run_script_stmt(cursor, stmt)
+        return ((tid, '', '', '') for (tid,) in cursor)
+
+    def iter_transactions_range(self, cursor, start=None, stop=None):
+        """Iterate over the transactions in the given range, oldest first.
+
+        Includes packed transactions.
+        Yields (tid, username, description, extension, packed)
+        for each transaction.
+        """
+        stmt = """
+        SELECT DISTINCT tid
+        FROM object_state
+        WHERE tid > 0
+        """
+        if start is not None:
+            stmt += " AND tid >= %(min_tid)s"
+        if stop is not None:
+            stmt += " AND tid <= %(max_tid)s"
+        stmt += " ORDER BY tid"
+        self.runner.run_script_stmt(cursor, stmt,
+            {'min_tid': start, 'max_tid': stop})
+        return ((tid, '', '', '', True) for (tid,) in cursor)
+
+    def iter_object_history(self, cursor, oid):
+        """Iterate over an object's history.
+
+        Raises KeyError if the object does not exist.
+        Yields (tid, username, description, extension, pickle_size)
+        for each modification.
+        """
+        stmt = """
+        SELECT tid, %(OCTET_LENGTH)s(state)
+        FROM object_state
+        WHERE zoid = %(oid)s
+        """
+        self.runner.run_script_stmt(cursor, stmt, {'oid': oid})
+        return ((tid, '', '', '', size) for (tid, size) in cursor)

Deleted: relstorage/trunk/relstorage/adapters/historyfree.py
===================================================================
--- relstorage/trunk/relstorage/adapters/historyfree.py	2009-09-23 18:16:28 UTC (rev 104463)
+++ relstorage/trunk/relstorage/adapters/historyfree.py	2009-09-23 21:12:58 UTC (rev 104464)
@@ -1,310 +0,0 @@
-##############################################################################
-#
-# Copyright (c) 2008 Zope Foundation and Contributors.
-# All Rights Reserved.
-#
-# This software is subject to the provisions of the Zope Public License,
-# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
-# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
-# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
-# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
-# FOR A PARTICULAR PURPOSE.
-#
-##############################################################################
-"""Code common to history-free adapters."""
-
-
-import logging
-import time
-from relstorage.adapters.abstract import AbstractAdapter
-from ZODB.POSException import UndoError
-
-log = logging.getLogger(__name__)
-
-
-class HistoryFreeAdapter(AbstractAdapter):
-    """An abstract adapter that does not retain history.
-
-    Derivatives should have at least the following schema::
-
-        -- All object states in all transactions.
-        CREATE TABLE object_state (
-            zoid        BIGINT NOT NULL PRIMARY KEY,
-            tid         BIGINT NOT NULL CHECK (tid > 0),
-            state       BYTEA
-        );
-
-        -- A list of referenced OIDs from each object_state.
-        -- This table is populated as needed during garbage collection.
-        CREATE TABLE object_ref (
-            zoid        BIGINT NOT NULL,
-            to_zoid     BIGINT NOT NULL,
-            PRIMARY KEY (zoid, to_zoid)
-        );
-
-        -- The object_refs_added table tracks whether object_refs has
-        -- been populated for all states in a given transaction.
-        -- An entry is added only when the work is finished.
-        CREATE TABLE object_refs_added (
-            tid         BIGINT NOT NULL PRIMARY KEY
-        );
-
-        -- Temporary state during garbage collection:
-        -- The list of all objects, a flag signifying whether
-        -- the object should be kept, and a flag signifying whether
-        -- the object's references have been visited.
-        -- The keep_tid field specifies the current revision of the object.
-        CREATE TABLE pack_object (
-            zoid        BIGINT NOT NULL PRIMARY KEY,
-            keep        BOOLEAN NOT NULL DEFAULT FALSE,
-            keep_tid    BIGINT NOT NULL,
-            visited     BOOLEAN NOT NULL DEFAULT FALSE
-        );
-    """
-
-    keep_history = False
-
-    _scripts = {
-        'create_temp_pack_visit': """
-            CREATE TEMPORARY TABLE temp_pack_visit (
-                zoid BIGINT NOT NULL
-            );
-            CREATE UNIQUE INDEX temp_pack_visit_zoid ON temp_pack_visit (zoid)
-            """,
-
-        'pre_pack_follow_child_refs': """
-            UPDATE pack_object SET keep = %(TRUE)s
-            WHERE keep = %(FALSE)s
-                AND zoid IN (
-                    SELECT DISTINCT to_zoid
-                    FROM object_ref
-                        JOIN temp_pack_visit USING (zoid)
-                )
-            """,
-    }
-
-    def iter_transactions(self, cursor):
-        """Iterate over the transaction log, newest first.
-
-        Skips packed transactions.
-        Yields (tid, username, description, extension) for each transaction.
-        """
-        stmt = """
-        SELECT DISTINCT tid
-        FROM object_state
-        ORDER BY tid DESC
-        """
-        self._run_script_stmt(cursor, stmt)
-        return ((tid, '', '', '') for (tid,) in cursor)
-
-    def iter_transactions_range(self, cursor, start=None, stop=None):
-        """Iterate over the transactions in the given range, oldest first.
-
-        Includes packed transactions.
-        Yields (tid, username, description, extension, packed)
-        for each transaction.
-        """
-        stmt = """
-        SELECT DISTINCT tid
-        FROM object_state
-        WHERE tid > 0
-        """
-        if start is not None:
-            stmt += " AND tid >= %(min_tid)s"
-        if stop is not None:
-            stmt += " AND tid <= %(max_tid)s"
-        stmt += " ORDER BY tid"
-        self._run_script_stmt(cursor, stmt,
-            {'min_tid': start, 'max_tid': stop})
-        return ((tid, '', '', '', True) for (tid,) in cursor)
-
-    def iter_object_history(self, cursor, oid):
-        """Iterate over an object's history.
-
-        Raises KeyError if the object does not exist.
-        Yields (tid, username, description, extension, pickle_size)
-        for each modification.
-        """
-        stmt = """
-        SELECT tid, %(OCTET_LENGTH)s(state)
-        FROM object_state
-        WHERE zoid = %(oid)s
-        """
-        self._run_script_stmt(cursor, stmt, {'oid': oid})
-        return ((tid, '', '', '', size) for (tid, size) in cursor)
-
-    def verify_undoable(self, cursor, undo_tid):
-        """Raise UndoError if it is not safe to undo the specified txn."""
-        raise UndoError("Undo is not supported by this storage")
-
-    def undo(self, cursor, undo_tid, self_tid):
-        """Undo a transaction.
-
-        Parameters: "undo_tid", the integer tid of the transaction to undo,
-        and "self_tid", the integer tid of the current transaction.
-
-        Returns the list of OIDs undone.
-        """
-        raise UndoError("Undo is not supported by this storage")
-
-    def choose_pack_transaction(self, pack_point):
-        """Return the transaction before or at the specified pack time.
-
-        Returns None if there is nothing to pack.
-        """
-        return 1
-
-
-    def pre_pack(self, pack_tid, get_references, options):
-        """Decide what the garbage collector should delete.
-
-        pack_tid is ignored.
-
-        get_references is a function that accepts a pickled state and
-        returns a set of OIDs that state refers to.
-
-        options is an instance of relstorage.Options.
-        The options.pack_gc flag indicates whether to run garbage collection.
-        If pack_gc is false, this method does nothing.
-        """
-        if not options.pack_gc:
-            log.warning("pre_pack: garbage collection is disabled on a "
-                "history-free storage, so doing nothing")
-            return
-
-        conn, cursor = self.open_for_pre_pack()
-        try:
-            try:
-                self._pre_pack_main(conn, cursor, get_references)
-            except:
-                log.exception("pre_pack: failed")
-                conn.rollback()
-                raise
-            else:
-                conn.commit()
-                log.info("pre_pack: finished successfully")
-        finally:
-            self.close(conn, cursor)
-
-
-    def _pre_pack_main(self, conn, cursor, get_references):
-        """Determine what to garbage collect.
-        """
-        stmt = self._scripts['create_temp_pack_visit']
-        if stmt:
-            self._run_script(cursor, stmt)
-
-        self.fill_object_refs(conn, cursor, get_references)
-
-        log.info("pre_pack: filling the pack_object table")
-        # Fill the pack_object table with all known OIDs.
-        stmt = """
-        %(TRUNCATE)s pack_object;
-
-        INSERT INTO pack_object (zoid, keep_tid)
-        SELECT zoid, tid
-        FROM object_state;
-
-        -- Keep the root object
-        UPDATE pack_object SET keep = %(TRUE)s
-        WHERE zoid = 0;
-        """
-        self._run_script(cursor, stmt)
-
-        # Set the 'keep' flags in pack_object
-        self._visit_all_references(cursor)
-
-
-    def pack(self, pack_tid, options, sleep=time.sleep, packed_func=None):
-        """Run garbage collection.
-
-        Requires the information provided by _pre_gc.
-        """
-
-        # Read committed mode is sufficient.
-        conn, cursor = self.open()
-        try:
-            try:
-                stmt = """
-                SELECT zoid, keep_tid
-                FROM pack_object
-                WHERE keep = %(FALSE)s
-                """
-                self._run_script_stmt(cursor, stmt)
-                to_remove = list(cursor)
-
-                log.info("pack: will remove %d object(s)", len(to_remove))
-
-                # Hold the commit lock while packing to prevent deadlocks.
-                # Pack in small batches of transactions in order to minimize
-                # the interruption of concurrent write operations.
-                start = time.time()
-                packed_list = []
-                self._hold_commit_lock(cursor)
-
-                for item in to_remove:
-                    oid, tid = item
-                    stmt = """
-                    DELETE FROM object_state
-                    WHERE zoid = %(oid)s
-                        AND tid = %(tid)s
-                    """
-                    self._run_script_stmt(
-                        cursor, stmt, {'oid': oid, 'tid': tid})
-                    packed_list.append(item)
-
-                    if time.time() >= start + options.pack_batch_timeout:
-                        conn.commit()
-                        if packed_func is not None:
-                            for oid, tid in packed_list:
-                                packed_func(oid, tid)
-                        del packed_list[:]
-                        self._release_commit_lock(cursor)
-                        self._pause_pack(sleep, options, start)
-                        self._hold_commit_lock(cursor)
-                        start = time.time()
-
-                if packed_func is not None:
-                    for oid, tid in packed_list:
-                        packed_func(oid, tid)
-                packed_list = None
-
-                self._pack_cleanup(conn, cursor)
-
-            except:
-                log.exception("pack: failed")
-                conn.rollback()
-                raise
-
-            else:
-                log.info("pack: finished successfully")
-                conn.commit()
-
-        finally:
-            self.close(conn, cursor)
-
-
-    def _pack_cleaup(self, conn, cursor):
-        # commit the work done so far
-        conn.commit()
-        self._release_commit_lock(cursor)
-        self._hold_commit_lock(cursor)
-        log.info("pack: cleaning up")
-
-        stmt = """
-        DELETE FROM object_refs_added
-        WHERE tid NOT IN (
-            SELECT DISTINCT tid
-            FROM object_state
-        );
-
-        DELETE FROM object_ref
-        WHERE zoid IN (
-            SELECT zoid
-            FROM pack_object
-            WHERE keep = %(FALSE)s
-        );
-
-        %(TRUNCATE)s pack_object
-        """
-        self._run_script(cursor, stmt)

Deleted: relstorage/trunk/relstorage/adapters/historypreserving.py
===================================================================
--- relstorage/trunk/relstorage/adapters/historypreserving.py	2009-09-23 18:16:28 UTC (rev 104463)
+++ relstorage/trunk/relstorage/adapters/historypreserving.py	2009-09-23 21:12:58 UTC (rev 104464)
@@ -1,724 +0,0 @@
-##############################################################################
-#
-# Copyright (c) 2008 Zope Foundation and Contributors.
-# All Rights Reserved.
-#
-# This software is subject to the provisions of the Zope Public License,
-# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
-# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
-# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
-# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
-# FOR A PARTICULAR PURPOSE.
-#
-##############################################################################
-"""Code common to history-preserving adapters."""
-
-
-import logging
-import time
-from relstorage.adapters.abstract import AbstractAdapter
-from ZODB.POSException import UndoError
-
-log = logging.getLogger(__name__)
-
-
-class HistoryPreservingAdapter(AbstractAdapter):
-    """An abstract adapter that retains history.
-
-    Derivatives should have at least the following schema::
-
-        -- The list of all transactions in the database
-        CREATE TABLE transaction (
-            tid         BIGINT NOT NULL PRIMARY KEY,
-            packed      BOOLEAN NOT NULL DEFAULT FALSE,
-            empty       BOOLEAN NOT NULL DEFAULT FALSE,
-            username    BYTEA NOT NULL,
-            description BYTEA NOT NULL,
-            extension   BYTEA
-        );
-
-        -- All object states in all transactions.  Note that md5 and state
-        -- can be null to represent object uncreation.
-        CREATE TABLE object_state (
-            zoid        BIGINT NOT NULL,
-            tid         BIGINT NOT NULL REFERENCES transaction
-                        CHECK (tid > 0),
-            PRIMARY KEY (zoid, tid),
-            prev_tid    BIGINT NOT NULL REFERENCES transaction,
-            md5         CHAR(32),
-            state       BYTEA
-        );
-
-        -- Pointers to the current object state
-        CREATE TABLE current_object (
-            zoid        BIGINT NOT NULL PRIMARY KEY,
-            tid         BIGINT NOT NULL,
-            FOREIGN KEY (zoid, tid) REFERENCES object_state
-        );
-
-        -- A list of referenced OIDs from each object_state.
-        -- This table is populated as needed during packing.
-        -- To prevent unnecessary table locking, it does not use
-        -- foreign keys, which is safe because rows in object_state
-        -- are never modified once committed, and rows are removed
-        -- from object_state only by packing.
-        CREATE TABLE object_ref (
-            zoid        BIGINT NOT NULL,
-            tid         BIGINT NOT NULL,
-            to_zoid     BIGINT NOT NULL,
-            PRIMARY KEY (tid, zoid, to_zoid)
-        );
-
-        -- The object_refs_added table tracks whether object_refs has
-        -- been populated for all states in a given transaction.
-        -- An entry is added only when the work is finished.
-        -- To prevent unnecessary table locking, it does not use
-        -- foreign keys, which is safe because object states
-        -- are never added to a transaction once committed, and
-        -- rows are removed from the transaction table only by
-        -- packing.
-        CREATE TABLE object_refs_added (
-            tid         BIGINT NOT NULL PRIMARY KEY
-        );
-
-        -- Temporary state during packing:
-        -- The list of objects to pack.  If keep is false,
-        -- the object and all its revisions will be removed.
-        -- If keep is true, instead of removing the object,
-        -- the pack operation will cut the object's history.
-        -- The keep_tid field specifies the oldest revision
-        -- of the object to keep.
-        -- The visited flag is set when pre_pack is visiting an object's
-        -- references, and remains set.
-        CREATE TABLE pack_object (
-            zoid        BIGINT NOT NULL PRIMARY KEY,
-            keep        BOOLEAN NOT NULL,
-            keep_tid    BIGINT NOT NULL,
-            visited     BOOLEAN NOT NULL DEFAULT FALSE
-        );
-
-        -- Temporary state during packing: the list of object states to pack.
-        CREATE TABLE pack_state (
-            tid         BIGINT NOT NULL,
-            zoid        BIGINT NOT NULL,
-            PRIMARY KEY (tid, zoid)
-        );
-
-        -- Temporary state during packing: the list of transactions that
-        -- have at least one object state to pack.
-        CREATE TABLE pack_state_tid (
-            tid         BIGINT NOT NULL PRIMARY KEY
-        );
-    """
-
-    keep_history = True
-
-    _scripts = {
-        'create_temp_pack_visit': """
-            CREATE TEMPORARY TABLE temp_pack_visit (
-                zoid BIGINT NOT NULL,
-                keep_tid BIGINT
-            );
-            CREATE UNIQUE INDEX temp_pack_visit_zoid ON temp_pack_visit (zoid)
-            """,
-
-        'pre_pack_follow_child_refs': """
-            UPDATE pack_object SET keep = %(TRUE)s
-            WHERE keep = %(FALSE)s
-                AND zoid IN (
-                    SELECT DISTINCT to_zoid
-                    FROM object_ref
-                        JOIN temp_pack_visit USING (zoid)
-                    WHERE object_ref.tid >= temp_pack_visit.keep_tid
-                )
-            """,
-
-        'choose_pack_transaction': """
-            SELECT tid
-            FROM transaction
-            WHERE tid > 0
-                AND tid <= %(tid)s
-                AND packed = FALSE
-            ORDER BY tid DESC
-            LIMIT 1
-            """,
-
-        'create_temp_undo': """
-            CREATE TEMPORARY TABLE temp_undo (
-                zoid BIGINT NOT NULL,
-                prev_tid BIGINT NOT NULL
-            );
-            CREATE UNIQUE INDEX temp_undo_zoid ON temp_undo (zoid)
-            """,
-
-        'reset_temp_undo': "DROP TABLE temp_undo",
-
-        'transaction_has_data': """
-            SELECT tid
-            FROM object_state
-            WHERE tid = %(tid)s
-            LIMIT 1
-            """,
-
-        'pack_current_object': """
-            DELETE FROM current_object
-            WHERE tid = %(tid)s
-                AND zoid in (
-                    SELECT pack_state.zoid
-                    FROM pack_state
-                    WHERE pack_state.tid = %(tid)s
-                )
-            """,
-
-        'pack_object_state': """
-            DELETE FROM object_state
-            WHERE tid = %(tid)s
-                AND zoid in (
-                    SELECT pack_state.zoid
-                    FROM pack_state
-                    WHERE pack_state.tid = %(tid)s
-                )
-            """,
-
-        'pack_object_ref': """
-            DELETE FROM object_refs_added
-            WHERE tid IN (
-                SELECT tid
-                FROM transaction
-                WHERE empty = %(TRUE)s
-                );
-            DELETE FROM object_ref
-            WHERE tid IN (
-                SELECT tid
-                FROM transaction
-                WHERE empty = %(TRUE)s
-                )
-            """,
-    }
-
-    def _transaction_iterator(self, cursor):
-        """Iterate over a list of transactions returned from the database.
-
-        Each row begins with (tid, username, description, extension)
-        and may have other columns.
-        """
-        for row in cursor:
-            tid, username, description, ext = row[:4]
-            if username is None:
-                username = ''
-            else:
-                username = str(username)
-            if description is None:
-                description = ''
-            else:
-                description = str(description)
-            if ext is None:
-                ext = ''
-            else:
-                ext = str(ext)
-            yield (tid, username, description, ext) + tuple(row[4:])
-
-
-    def iter_transactions(self, cursor):
-        """Iterate over the transaction log, newest first.
-
-        Skips packed transactions.
-        Yields (tid, username, description, extension) for each transaction.
-        """
-        stmt = """
-        SELECT tid, username, description, extension
-        FROM transaction
-        WHERE packed = %(FALSE)s
-            AND tid != 0
-        ORDER BY tid DESC
-        """
-        self._run_script_stmt(cursor, stmt)
-        return self._transaction_iterator(cursor)
-
-
-    def iter_transactions_range(self, cursor, start=None, stop=None):
-        """Iterate over the transactions in the given range, oldest first.
-
-        Includes packed transactions.
-        Yields (tid, username, description, extension, packed)
-        for each transaction.
-        """
-        stmt = """
-        SELECT tid, username, description, extension,
-            CASE WHEN packed = %(TRUE)s THEN 1 ELSE 0 END
-        FROM transaction
-        WHERE tid >= 0
-        """
-        if start is not None:
-            stmt += " AND tid >= %(min_tid)s"
-        if stop is not None:
-            stmt += " AND tid <= %(max_tid)s"
-        stmt += " ORDER BY tid"
-        self._run_script_stmt(cursor, stmt,
-            {'min_tid': start, 'max_tid': stop})
-        return self._transaction_iterator(cursor)
-
-
-    def iter_object_history(self, cursor, oid):
-        """Iterate over an object's history.
-
-        Raises KeyError if the object does not exist.
-        Yields (tid, username, description, extension, pickle_size)
-        for each modification.
-        """
-        stmt = """
-        SELECT 1 FROM current_object WHERE zoid = %(oid)s
-        """
-        self._run_script_stmt(cursor, stmt, {'oid': oid})
-        if not cursor.fetchall():
-            raise KeyError(oid)
-
-        stmt = """
-        SELECT tid, username, description, extension, %(OCTET_LENGTH)s(state)
-        FROM transaction
-            JOIN object_state USING (tid)
-        WHERE zoid = %(oid)s
-            AND packed = %(FALSE)s
-        ORDER BY tid DESC
-        """
-        self._run_script_stmt(cursor, stmt, {'oid': oid})
-        return self._transaction_iterator(cursor)
-
-
-    def verify_undoable(self, cursor, undo_tid):
-        """Raise UndoError if it is not safe to undo the specified txn."""
-        stmt = """
-        SELECT 1 FROM transaction
-        WHERE tid = %(undo_tid)s
-            AND packed = %(FALSE)s
-        """
-        self._run_script_stmt(cursor, stmt, {'undo_tid': undo_tid})
-        if not cursor.fetchall():
-            raise UndoError("Transaction not found or packed")
-
-        # Rule: we can undo an object if the object's state in the
-        # transaction to undo matches the object's current state.
-        # If any object in the transaction does not fit that rule,
-        # refuse to undo.
-        stmt = """
-        SELECT prev_os.zoid, current_object.tid
-        FROM object_state prev_os
-            JOIN object_state cur_os ON (prev_os.zoid = cur_os.zoid)
-            JOIN current_object ON (cur_os.zoid = current_object.zoid
-                AND cur_os.tid = current_object.tid)
-        WHERE prev_os.tid = %(undo_tid)s
-            AND cur_os.md5 != prev_os.md5
-        """
-        self._run_script_stmt(cursor, stmt, {'undo_tid': undo_tid})
-        if cursor.fetchmany():
-            raise UndoError(
-                "Some data were modified by a later transaction")
-
-        # Rule: don't allow the creation of the root object to
-        # be undone.  It's hard to get it back.
-        stmt = """
-        SELECT 1
-        FROM object_state
-        WHERE tid = %(undo_tid)s
-            AND zoid = 0
-            AND prev_tid = 0
-        """
-        self._run_script_stmt(cursor, stmt, {'undo_tid': undo_tid})
-        if cursor.fetchall():
-            raise UndoError("Can't undo the creation of the root object")
-
-
-    def undo(self, cursor, undo_tid, self_tid):
-        """Undo a transaction.
-
-        Parameters: "undo_tid", the integer tid of the transaction to undo,
-        and "self_tid", the integer tid of the current transaction.
-
-        Returns the states copied forward by the undo operation as a
-        list of (oid, old_tid).
-        """
-        stmt = self._scripts['create_temp_undo']
-        if stmt:
-            self._run_script(cursor, stmt)
-
-        stmt = """
-        DELETE FROM temp_undo;
-
-        -- Put into temp_undo the list of objects to be undone and
-        -- the tid of the transaction that has the undone state.
-        INSERT INTO temp_undo (zoid, prev_tid)
-        SELECT zoid, prev_tid
-        FROM object_state
-        WHERE tid = %(undo_tid)s;
-
-        -- Override previous undo operations within this transaction
-        -- by resetting the current_object pointer and deleting
-        -- copied states from object_state.
-        UPDATE current_object
-        SET tid = (
-                SELECT prev_tid
-                FROM object_state
-                WHERE zoid = current_object.zoid
-                    AND tid = %(self_tid)s
-            )
-        WHERE zoid IN (SELECT zoid FROM temp_undo)
-            AND tid = %(self_tid)s;
-
-        DELETE FROM object_state
-        WHERE zoid IN (SELECT zoid FROM temp_undo)
-            AND tid = %(self_tid)s;
-
-        -- Copy old states forward.
-        INSERT INTO object_state (zoid, tid, prev_tid, md5, state)
-        SELECT temp_undo.zoid, %(self_tid)s, current_object.tid,
-            prev.md5, prev.state
-        FROM temp_undo
-            JOIN current_object ON (temp_undo.zoid = current_object.zoid)
-            LEFT JOIN object_state prev
-                ON (prev.zoid = temp_undo.zoid
-                    AND prev.tid = temp_undo.prev_tid);
-
-        -- List the copied states.
-        SELECT zoid, prev_tid FROM temp_undo
-        """
-        self._run_script(cursor, stmt,
-            {'undo_tid': undo_tid, 'self_tid': self_tid})
-        res = list(cursor)
-
-        stmt = self._scripts['reset_temp_undo']
-        if stmt:
-            self._run_script(cursor, stmt)
-
-        return res
-
-
-    def choose_pack_transaction(self, pack_point):
-        """Return the transaction before or at the specified pack time.
-
-        Returns None if there is nothing to pack.
-        """
-        conn, cursor = self.open()
-        try:
-            stmt = self._scripts['choose_pack_transaction']
-            self._run_script(cursor, stmt, {'tid': pack_point})
-            rows = cursor.fetchall()
-            if not rows:
-                # Nothing needs to be packed.
-                return None
-            return rows[0][0]
-        finally:
-            self.close(conn, cursor)
-
-
-    def pre_pack(self, pack_tid, get_references, options):
-        """Decide what to pack.
-
-        tid specifies the most recent transaction to pack.
-
-        get_references is a function that accepts a pickled state and
-        returns a set of OIDs that state refers to.
-
-        options is an instance of relstorage.Options.
-        The options.pack_gc flag indicates whether to run garbage collection.
-        If pack_gc is false, at least one revision of every object is kept,
-        even if nothing refers to it.  Packing with pack_gc disabled can be
-        much faster.
-        """
-        conn, cursor = self.open_for_pre_pack()
-        try:
-            try:
-                if options.pack_gc:
-                    log.info("pre_pack: start with gc enabled")
-                    self._pre_pack_with_gc(
-                        conn, cursor, pack_tid, get_references)
-                else:
-                    log.info("pre_pack: start without gc")
-                    self._pre_pack_without_gc(
-                        conn, cursor, pack_tid)
-                conn.commit()
-
-                log.info("pre_pack: enumerating states to pack")
-                stmt = "%(TRUNCATE)s pack_state"
-                self._run_script_stmt(cursor, stmt)
-                to_remove = 0
-
-                if options.pack_gc:
-                    # Pack objects with the keep flag set to false.
-                    stmt = """
-                    INSERT INTO pack_state (tid, zoid)
-                    SELECT tid, zoid
-                    FROM object_state
-                        JOIN pack_object USING (zoid)
-                    WHERE keep = %(FALSE)s
-                        AND tid > 0
-                        AND tid <= %(pack_tid)s
-                    """
-                    self._run_script_stmt(cursor, stmt, {'pack_tid': pack_tid})
-                    to_remove += cursor.rowcount
-
-                # Pack object states with the keep flag set to true.
-                stmt = """
-                INSERT INTO pack_state (tid, zoid)
-                SELECT tid, zoid
-                FROM object_state
-                    JOIN pack_object USING (zoid)
-                WHERE keep = %(TRUE)s
-                    AND tid > 0
-                    AND tid != keep_tid
-                    AND tid <= %(pack_tid)s
-                """
-                self._run_script_stmt(cursor, stmt, {'pack_tid':pack_tid})
-                to_remove += cursor.rowcount
-
-                log.info("pre_pack: enumerating transactions to pack")
-                stmt = "%(TRUNCATE)s pack_state_tid"
-                self._run_script_stmt(cursor, stmt)
-                stmt = """
-                INSERT INTO pack_state_tid (tid)
-                SELECT DISTINCT tid
-                FROM pack_state
-                """
-                cursor.execute(stmt)
-
-                log.info("pre_pack: will remove %d object state(s)",
-                    to_remove)
-
-            except:
-                log.exception("pre_pack: failed")
-                conn.rollback()
-                raise
-            else:
-                log.info("pre_pack: finished successfully")
-                conn.commit()
-        finally:
-            self.close(conn, cursor)
-
-
-    def _pre_pack_without_gc(self, conn, cursor, pack_tid):
-        """Determine what to pack, without garbage collection.
-
-        With garbage collection disabled, there is no need to follow
-        object references.
-        """
-        # Fill the pack_object table with OIDs, but configure them
-        # all to be kept by setting keep to true.
-        log.debug("pre_pack: populating pack_object")
-        stmt = """
-        %(TRUNCATE)s pack_object;
-
-        INSERT INTO pack_object (zoid, keep, keep_tid)
-        SELECT zoid, %(TRUE)s, MAX(tid)
-        FROM object_state
-        WHERE tid > 0 AND tid <= %(pack_tid)s
-        GROUP BY zoid
-        """
-        self._run_script(cursor, stmt, {'pack_tid': pack_tid})
-
-
-    def _pre_pack_with_gc(self, conn, cursor, pack_tid, get_references):
-        """Determine what to pack, with garbage collection.
-        """
-        stmt = self._scripts['create_temp_pack_visit']
-        if stmt:
-            self._run_script(cursor, stmt)
-
-        self.fill_object_refs(conn, cursor, get_references)
-
-        log.info("pre_pack: filling the pack_object table")
-        # Fill the pack_object table with OIDs that either will be
-        # removed (if nothing references the OID) or whose history will
-        # be cut.
-        stmt = """
-        %(TRUNCATE)s pack_object;
-
-        INSERT INTO pack_object (zoid, keep, keep_tid)
-        SELECT zoid, %(FALSE)s, MAX(tid)
-        FROM object_state
-        WHERE tid > 0 AND tid <= %(pack_tid)s
-        GROUP BY zoid;
-
-        -- If the root object is in pack_object, keep it.
-        UPDATE pack_object SET keep = %(TRUE)s
-        WHERE zoid = 0;
-
-        -- Keep objects that have been revised since pack_tid.
-        UPDATE pack_object SET keep = %(TRUE)s
-        WHERE zoid IN (
-            SELECT zoid
-            FROM current_object
-            WHERE tid > %(pack_tid)s
-        );
-
-        -- Keep objects that are still referenced by object states in
-        -- transactions that will not be packed.
-        -- Use temp_pack_visit for temporary state; otherwise MySQL 5 chokes.
-        INSERT INTO temp_pack_visit (zoid)
-        SELECT DISTINCT to_zoid
-        FROM object_ref
-        WHERE tid > %(pack_tid)s;
-
-        UPDATE pack_object SET keep = %(TRUE)s
-        WHERE zoid IN (
-            SELECT zoid
-            FROM temp_pack_visit
-        );
-
-        %(TRUNCATE)s temp_pack_visit;
-        """
-        self._run_script(cursor, stmt, {'pack_tid': pack_tid})
-
-        # Set the 'keep' flags in pack_object
-        self._visit_all_references(cursor)
-
-
-    def pack(self, pack_tid, options, sleep=time.sleep, packed_func=None):
-        """Pack.  Requires the information provided by pre_pack."""
-
-        # Read committed mode is sufficient.
-        conn, cursor = self.open()
-        try:
-            try:
-                stmt = """
-                SELECT transaction.tid,
-                    CASE WHEN packed = %(TRUE)s THEN 1 ELSE 0 END,
-                    CASE WHEN pack_state_tid.tid IS NOT NULL THEN 1 ELSE 0 END
-                FROM transaction
-                    LEFT JOIN pack_state_tid ON (
-                        transaction.tid = pack_state_tid.tid)
-                WHERE transaction.tid > 0
-                    AND transaction.tid <= %(pack_tid)s
-                    AND (packed = %(FALSE)s OR pack_state_tid.tid IS NOT NULL)
-                """
-                self._run_script_stmt(cursor, stmt, {'pack_tid': pack_tid})
-                tid_rows = list(cursor)
-                tid_rows.sort()  # oldest first
-
-                log.info("pack: will pack %d transaction(s)", len(tid_rows))
-
-                stmt = self._scripts['create_temp_pack_visit']
-                if stmt:
-                    self._run_script(cursor, stmt)
-
-                # Hold the commit lock while packing to prevent deadlocks.
-                # Pack in small batches of transactions in order to minimize
-                # the interruption of concurrent write operations.
-                start = time.time()
-                packed_list = []
-                self._hold_commit_lock(cursor)
-                for tid, packed, has_removable in tid_rows:
-                    self._pack_transaction(
-                        cursor, pack_tid, tid, packed, has_removable,
-                        packed_list)
-                    if time.time() >= start + options.pack_batch_timeout:
-                        conn.commit()
-                        if packed_func is not None:
-                            for oid, tid in packed_list:
-                                packed_func(oid, tid)
-                        del packed_list[:]
-                        self._release_commit_lock(cursor)
-                        self._pause_pack(sleep, options, start)
-                        self._hold_commit_lock(cursor)
-                        start = time.time()
-                if packed_func is not None:
-                    for oid, tid in packed_list:
-                        packed_func(oid, tid)
-                packed_list = None
-
-                self._pack_cleanup(conn, cursor)
-
-            except:
-                log.exception("pack: failed")
-                conn.rollback()
-                raise
-
-            else:
-                log.info("pack: finished successfully")
-                conn.commit()
-
-        finally:
-            self.close(conn, cursor)
-
-
-    def _pack_transaction(self, cursor, pack_tid, tid, packed,
-            has_removable, packed_list):
-        """Pack one transaction.  Requires populated pack tables."""
-        log.debug("pack: transaction %d: packing", tid)
-        removed_objects = 0
-        removed_states = 0
-
-        if has_removable:
-            stmt = self._scripts['pack_current_object']
-            self._run_script_stmt(cursor, stmt, {'tid': tid})
-            removed_objects = cursor.rowcount
-
-            stmt = self._scripts['pack_object_state']
-            self._run_script_stmt(cursor, stmt, {'tid': tid})
-            removed_states = cursor.rowcount
-
-            # Terminate prev_tid chains
-            stmt = """
-            UPDATE object_state SET prev_tid = 0
-            WHERE prev_tid = %(tid)s
-                AND tid <= %(pack_tid)s
-            """
-            self._run_script_stmt(cursor, stmt,
-                {'pack_tid': pack_tid, 'tid': tid})
-
-            stmt = """
-            SELECT pack_state.zoid
-            FROM pack_state
-            WHERE pack_state.tid = %(tid)s
-            """
-            self._run_script_stmt(cursor, stmt, {'tid': tid})
-            for (oid,) in cursor:
-                packed_list.append((oid, tid))
-
-        # Find out whether the transaction is empty
-        stmt = self._scripts['transaction_has_data']
-        self._run_script_stmt(cursor, stmt, {'tid': tid})
-        empty = not list(cursor)
-
-        # mark the transaction packed and possibly empty
-        if empty:
-            clause = 'empty = %(TRUE)s'
-            state = 'empty'
-        else:
-            clause = 'empty = %(FALSE)s'
-            state = 'not empty'
-        stmt = "UPDATE transaction SET packed = %(TRUE)s, " + clause
-        stmt += " WHERE tid = %(tid)s"
-        self._run_script_stmt(cursor, stmt, {'tid': tid})
-
-        log.debug(
-            "pack: transaction %d (%s): removed %d object(s) and %d state(s)",
-            tid, state, removed_objects, removed_states)
-
-
-    def _pack_cleanup(self, conn, cursor):
-        """Remove unneeded table rows after packing"""
-        # commit the work done so far
-        conn.commit()
-        self._release_commit_lock(cursor)
-        self._hold_commit_lock(cursor)
-        log.info("pack: cleaning up")
-
-        log.debug("pack: removing unused object references")
-        stmt = self._scripts['pack_object_ref']
-        self._run_script(cursor, stmt)
-
-        log.debug("pack: removing empty packed transactions")
-        stmt = """
-        DELETE FROM transaction
-        WHERE packed = %(TRUE)s
-            AND empty = %(TRUE)s
-        """
-        self._run_script_stmt(cursor, stmt)
-
-        # perform cleanup that does not require the commit lock
-        conn.commit()
-        self._release_commit_lock(cursor)
-
-        log.debug("pack: clearing temporary pack state")
-        for _table in ('pack_object', 'pack_state', 'pack_state_tid'):
-            stmt = '%(TRUNCATE)s ' + _table
-            self._run_script_stmt(cursor, stmt)

Added: relstorage/trunk/relstorage/adapters/interfaces.py
===================================================================
--- relstorage/trunk/relstorage/adapters/interfaces.py	                        (rev 0)
+++ relstorage/trunk/relstorage/adapters/interfaces.py	2009-09-23 21:12:58 UTC (rev 104464)
@@ -0,0 +1,251 @@
+##############################################################################
+#
+# Copyright (c) 2009 Zope Foundation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""Interfaces provided by RelStorage database adapters"""
+
+from zope.interface import Attribute
+from zope.interface import Interface
+
+class IConnectionManager(Interface):
+
+    def open():
+        """Open a database connection and return (conn, cursor)."""
+
+    def close(conn, cursor):
+        """Close a connection and cursor, ignoring certain errors.
+        """
+
+    def open_and_call(callback):
+        """Call a function with an open connection and cursor.
+
+        If the function returns, commits the transaction and returns the
+        result returned by the function.
+        If the function raises an exception, aborts the transaction
+        then propagates the exception.
+        """
+
+
+class IDatabaseIterator(Interface):
+
+    def iter_objects(cursor, tid):
+        """Iterate over object states in a transaction.
+
+        Yields (oid, prev_tid, state) for each object state.
+        """
+
+    def iter_transactions(cursor):
+        """Iterate over the transaction log, newest first.
+
+        Skips packed transactions.
+        Yields (tid, username, description, extension) for each transaction.
+        """
+
+    def iter_transactions_range(cursor, start=None, stop=None):
+        """Iterate over the transactions in the given range, oldest first.
+
+        Includes packed transactions.
+        Yields (tid, username, description, extension, packed)
+        for each transaction.
+        """
+
+    def iter_object_history(cursor, oid):
+        """Iterate over an object's history.
+
+        Raises KeyError if the object does not exist.
+        Yields (tid, username, description, extension, pickle_size)
+        for each modification.
+        """
+
+
+class ILocker(Interface):
+
+    def hold_commit_lock(cursor, ensure_current=False):
+        """Acquire the commit lock.
+
+        If ensure_current is True, other tables may be locked as well, to
+        ensure the most current data is available.
+
+        May raise StorageError if the lock can not be acquired before
+        some timeout.
+        """
+
+    def release_commit_lock(cursor):
+        """Release the commit lock"""
+
+    def hold_pack_lock(cursor):
+        """Try to acquire the pack lock.
+
+        Raise StorageError if packing or undo is already in progress.
+        """
+
+    def release_pack_lock(cursor):
+        """Release the pack lock."""
+
+
+class IPackUndo(Interface):
+
+    def verify_undoable(cursor, undo_tid):
+        """Raise UndoError if it is not safe to undo the specified txn.
+        """
+
+    def undo(cursor, undo_tid, self_tid):
+        """Undo a transaction.
+
+        Parameters: "undo_tid", the integer tid of the transaction to undo,
+        and "self_tid", the integer tid of the current transaction.
+
+        Returns the states copied forward by the undo operation as a
+        list of (oid, old_tid).
+
+        May raise UndoError.
+        """
+
+    def open_for_pre_pack():
+        """Open a connection to be used for the pre-pack phase.
+
+        Returns (conn, cursor).
+        """
+
+    def fill_object_refs(conn, cursor, get_references):
+        """Update the object_refs table by analyzing new transactions.
+        """
+
+    def choose_pack_transaction(pack_point):
+        """Return the transaction before or at the specified pack time.
+
+        Returns None if there is nothing to pack.
+        """
+
+    def pre_pack(pack_tid, get_references, options):
+        """Decide what to pack.
+
+        pack_tid specifies the most recent transaction to pack.
+
+        get_references is a function that accepts a pickled state and
+        returns a set of OIDs that state refers to.
+
+        options is an instance of relstorage.Options.
+        In particular, the options.pack_gc flag indicates whether
+        to run garbage collection.
+        """
+
+    def pack(pack_tid, options, sleep=None, packed_func=None):
+        """Pack.  Requires the information provided by pre_pack.
+
+        packed_func, if provided, will be called for every object
+        packed, just after it is removed.  The function must accept
+        two parameters, oid and tid (64 bit integers).
+
+        The sleep function defaults to time.sleep(). It can be
+        overridden to do something else instead of sleep during pauses
+        configured by the duty cycle.
+        """
+
+
+class IPoller(Interface):
+
+    def poll_invalidations(conn, cursor, prev_polled_tid, ignore_tid):
+        """Polls for new transactions.
+
+        conn and cursor must have been created previously by open_for_load().
+        prev_polled_tid is the tid returned at the last poll, or None
+        if this is the first poll.  If ignore_tid is not None, changes
+        committed in that transaction will not be included in the list
+        of changed OIDs.
+
+        Returns (changed_oids, new_polled_tid).
+        """
+
+
+class ISchemaInstaller(Interface):
+
+    def create(cursor):
+        """Create the database tables, sequences, etc."""
+
+    def prepare():
+        """Create the database schema if it does not already exist."""
+
+    def zap_all():
+        """Clear all data out of the database."""
+
+    def drop_all():
+        """Drop all tables and sequences."""
+
+
+class IScriptRunner(Interface):
+
+    script_vars = Attribute(
+        """A mapping providing replacements for parts of scripts.
+
+        Used for making scripts compatible with databases using
+        different parameter styles.
+        """)
+
+    def run_script_stmt(cursor, generic_stmt, generic_params=()):
+        """Execute a statement from a script with the given parameters.
+
+        generic_params should be either an empty tuple (no parameters) or
+        a map.
+
+        The input statement is generic and will be transformed
+        into a database-specific statement.
+        """
+
+    def run_script(cursor, script, params=()):
+        """Execute a series of statements in the database.
+
+        params should be either an empty tuple (no parameters) or
+        a map.
+
+        The statements are transformed by run_script_stmt
+        before execution.
+        """
+
+    def run_many(cursor, stmt, items):
+        """Execute a statement repeatedly.  Items should be a list of tuples.
+
+        stmt should use '%s' parameter format (not %(name)s).
+        """
+
+
+class ITransactionControl(Interface):
+
+    def commit_phase1(conn, cursor, tid):
+        """Begin a commit.  Returns the transaction name.
+
+        The transaction name must not be None.
+
+        This method should guarantee that commit_phase2() will succeed,
+        meaning that if commit_phase2() would raise any error, the error
+        should be raised in commit_phase1() instead.
+        """
+
+    def commit_phase2(conn, cursor, txn):
+        """Final transaction commit.
+
+        txn is the name returned by commit_phase1.
+        """
+
+    def abort(conn, cursor, txn=None):
+        """Abort the commit.  If txn is not None, phase 1 is also aborted."""
+
+    def get_tid_and_time(cursor):
+        """Returns the most recent tid and the current database time.
+
+        The database time is the number of seconds since the epoch.
+        """
+
+    def add_transaction(cursor, tid, username, description, extension,
+            packed=False):
+        """Add a transaction."""
+

Added: relstorage/trunk/relstorage/adapters/loadstore.py
===================================================================
--- relstorage/trunk/relstorage/adapters/loadstore.py	                        (rev 0)
+++ relstorage/trunk/relstorage/adapters/loadstore.py	2009-09-23 21:12:58 UTC (rev 104464)
@@ -0,0 +1,867 @@
+##############################################################################
+#
+# Copyright (c) 2009 Zope Foundation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""Object load/store implementations.
+"""
+
+from base64 import decodestring
+from base64 import encodestring
+from ZODB.POSException import StorageError
+import time
+
+try:
+    from hashlib import md5
+except ImportError:
+    from md5 import new as md5
+
+
+def compute_md5sum(data):
+    if data is not None:
+        return md5(data).hexdigest()
+    else:
+        # George Bailey object
+        return None
+
+
+class HistoryPreservingPostgreSQLLoadStore(object):
+
+    def __init__(self, connmanager, disconnected_exceptions):
+        self.connmanager = connmanager
+        self.disconnected_exceptions = disconnected_exceptions
+
+    def open_for_load(self):
+        """Open and initialize a connection for loading objects.
+
+        Returns (conn, cursor).
+        """
+        conn, cursor = self.connmanager.open(
+            self.connmanager.isolation_serializable)
+        stmt = """
+        PREPARE get_latest_tid AS
+        SELECT tid
+        FROM transaction
+        ORDER BY tid DESC
+        LIMIT 1
+        """
+        cursor.execute(stmt)
+        return conn, cursor
+
+    def restart_load(self, cursor):
+        """Reinitialize a connection for loading objects."""
+        try:
+            cursor.connection.rollback()
+        except self.disconnected_exceptions, e:
+            raise StorageError(e)
+
+    def get_current_tid(self, cursor, oid):
+        """Returns the current integer tid for an object.
+
+        oid is an integer.  Returns None if object does not exist.
+        """
+        cursor.execute("""
+        SELECT tid
+        FROM current_object
+        WHERE zoid = %s
+        """, (oid,))
+        if cursor.rowcount:
+            assert cursor.rowcount == 1
+            return cursor.fetchone()[0]
+        return None
+
+    def load_current(self, cursor, oid):
+        """Returns the current pickle and integer tid for an object.
+
+        oid is an integer.  Returns (None, None) if object does not exist.
+        """
+        cursor.execute("""
+        SELECT encode(state, 'base64'), tid
+        FROM current_object
+            JOIN object_state USING(zoid, tid)
+        WHERE zoid = %s
+        """, (oid,))
+        if cursor.rowcount:
+            assert cursor.rowcount == 1
+            state64, tid = cursor.fetchone()
+            if state64 is not None:
+                state = decodestring(state64)
+            else:
+                # This object's creation has been undone
+                state = None
+            return state, tid
+        else:
+            return None, None
+
+    def load_revision(self, cursor, oid, tid):
+        """Returns the pickle for an object on a particular transaction.
+
+        Returns None if no such state exists.
+        """
+        cursor.execute("""
+        SELECT encode(state, 'base64')
+        FROM object_state
+        WHERE zoid = %s
+            AND tid = %s
+        """, (oid, tid))
+        if cursor.rowcount:
+            assert cursor.rowcount == 1
+            (state64,) = cursor.fetchone()
+            if state64 is not None:
+                return decodestring(state64)
+        return None
+
+    def exists(self, cursor, oid):
+        """Returns a true value if the given object exists."""
+        cursor.execute("SELECT 1 FROM current_object WHERE zoid = %s", (oid,))
+        return cursor.rowcount
+
+    def load_before(self, cursor, oid, tid):
+        """Returns the pickle and tid of an object before transaction tid.
+
+        Returns (None, None) if no earlier state exists.
+        """
+        cursor.execute("""
+        SELECT encode(state, 'base64'), tid
+        FROM object_state
+        WHERE zoid = %s
+            AND tid < %s
+        ORDER BY tid DESC
+        LIMIT 1
+        """, (oid, tid))
+        if cursor.rowcount:
+            assert cursor.rowcount == 1
+            state64, tid = cursor.fetchone()
+            if state64 is not None:
+                state = decodestring(state64)
+            else:
+                # The object's creation has been undone
+                state = None
+            return state, tid
+        else:
+            return None, None
+
+    def get_object_tid_after(self, cursor, oid, tid):
+        """Returns the tid of the next change after an object revision.
+
+        Returns None if no later state exists.
+        """
+        stmt = """
+        SELECT tid
+        FROM object_state
+        WHERE zoid = %s
+            AND tid > %s
+        ORDER BY tid
+        LIMIT 1
+        """
+        cursor.execute(stmt, (oid, tid))
+        if cursor.rowcount:
+            assert cursor.rowcount == 1
+            return cursor.fetchone()[0]
+        else:
+            return None
+
+    def _make_temp_table(self, cursor):
+        """Create the temporary table for storing objects"""
+        stmt = """
+        CREATE TEMPORARY TABLE temp_store (
+            zoid        BIGINT NOT NULL,
+            prev_tid    BIGINT NOT NULL,
+            md5         CHAR(32),
+            state       BYTEA
+        ) ON COMMIT DROP;
+        CREATE UNIQUE INDEX temp_store_zoid ON temp_store (zoid)
+        """
+        cursor.execute(stmt)
+
+    def open_for_store(self):
+        """Open and initialize a connection for storing objects.
+
+        Returns (conn, cursor).
+        """
+        conn, cursor = self.connmanager.open()
+        try:
+            self._make_temp_table(cursor)
+            return conn, cursor
+        except:
+            self.connmanager.close(conn, cursor)
+            raise
+
+    def restart_store(self, cursor):
+        """Reuse a store connection."""
+        try:
+            cursor.connection.rollback()
+            self._make_temp_table(cursor)
+        except self.disconnected_exceptions, e:
+            raise StorageError(e)
+
+    def store_temp(self, cursor, oid, prev_tid, data):
+        """Store an object in the temporary table."""
+        md5sum = compute_md5sum(data)
+        stmt = """
+        DELETE FROM temp_store WHERE zoid = %s;
+        INSERT INTO temp_store (zoid, prev_tid, md5, state)
+        VALUES (%s, %s, %s, decode(%s, 'base64'))
+        """
+        cursor.execute(stmt, (oid, oid, prev_tid, md5sum, encodestring(data)))
+
+    def replace_temp(self, cursor, oid, prev_tid, data):
+        """Replace an object in the temporary table."""
+        md5sum = compute_md5sum(data)
+        stmt = """
+        UPDATE temp_store SET
+            prev_tid = %s,
+            md5 = %s,
+            state = decode(%s, 'base64')
+        WHERE zoid = %s
+        """
+        cursor.execute(stmt, (prev_tid, md5sum, encodestring(data), oid))
+
+    def restore(self, cursor, oid, tid, data):
+        """Store an object directly, without conflict detection.
+
+        Used for copying transactions into this database.
+        """
+        md5sum = compute_md5sum(data)
+        stmt = """
+        INSERT INTO object_state (zoid, tid, prev_tid, md5, state)
+        VALUES (%s, %s,
+            COALESCE((SELECT tid FROM current_object WHERE zoid = %s), 0),
+            %s, decode(%s, 'base64'))
+        """
+        if data is not None:
+            data = encodestring(data)
+        cursor.execute(stmt, (oid, tid, oid, md5sum, data))
+
+    def detect_conflict(self, cursor):
+        """Find one conflict in the data about to be committed.
+
+        If there is a conflict, returns (oid, prev_tid, attempted_prev_tid,
+        attempted_data).  If there is no conflict, returns None.
+        """
+        stmt = """
+        SELECT temp_store.zoid, current_object.tid, temp_store.prev_tid,
+            encode(temp_store.state, 'base64')
+        FROM temp_store
+            JOIN current_object ON (temp_store.zoid = current_object.zoid)
+        WHERE temp_store.prev_tid != current_object.tid
+        LIMIT 1
+        """
+        cursor.execute(stmt)
+        if cursor.rowcount:
+            oid, prev_tid, attempted_prev_tid, data = cursor.fetchone()
+            return oid, prev_tid, attempted_prev_tid, decodestring(data)
+        return None
+
+    def move_from_temp(self, cursor, tid):
+        """Moved the temporarily stored objects to permanent storage.
+
+        Returns the list of oids stored.
+        """
+        stmt = """
+        INSERT INTO object_state (zoid, tid, prev_tid, md5, state)
+        SELECT zoid, %s, prev_tid, md5, state
+        FROM temp_store
+        """
+        cursor.execute(stmt, (tid,))
+
+        stmt = """
+        SELECT zoid FROM temp_store
+        """
+        cursor.execute(stmt)
+        return [oid for (oid,) in cursor]
+
+    def update_current(self, cursor, tid):
+        """Update the current object pointers.
+
+        tid is the integer tid of the transaction being committed.
+        """
+        cursor.execute("""
+        -- Insert objects created in this transaction into current_object.
+        INSERT INTO current_object (zoid, tid)
+        SELECT zoid, tid FROM object_state
+        WHERE tid = %(tid)s
+            AND prev_tid = 0;
+
+        -- Change existing objects.  To avoid deadlocks,
+        -- update in OID order.
+        UPDATE current_object SET tid = %(tid)s
+        WHERE zoid IN (
+            SELECT zoid FROM object_state
+            WHERE tid = %(tid)s
+                AND prev_tid != 0
+            ORDER BY zoid
+        )
+        """, {'tid': tid})
+
+    def set_min_oid(self, cursor, oid):
+        """Ensure the next OID is at least the given OID."""
+        cursor.execute("""
+        SELECT CASE WHEN %s > nextval('zoid_seq')
+            THEN setval('zoid_seq', %s)
+            ELSE 0
+            END
+        """, (oid, oid))
+
+    def new_oid(self, cursor):
+        """Return a new, unused OID."""
+        stmt = "SELECT NEXTVAL('zoid_seq')"
+        cursor.execute(stmt)
+        return cursor.fetchone()[0]
+
+
+class HistoryPreservingMySQLLoadStore(object):
+
+    def __init__(self, connmanager, disconnected_exceptions, Binary):
+        self.connmanager = connmanager
+        self.disconnected_exceptions = disconnected_exceptions
+        self.Binary = Binary
+
+    def open_for_load(self):
+        """Open and initialize a connection for loading objects.
+
+        Returns (conn, cursor).
+        """
+        return self.connmanager.open(
+            self.connmanager.isolation_repeatable_read)
+
+    def restart_load(self, cursor):
+        """Reinitialize a connection for loading objects."""
+        try:
+            cursor.connection.rollback()
+        except self.disconnected_exceptions, e:
+            raise StorageError(e)
+
+    def get_current_tid(self, cursor, oid):
+        """Returns the current integer tid for an object.
+
+        oid is an integer.  Returns None if object does not exist.
+        """
+        cursor.execute("""
+        SELECT tid
+        FROM current_object
+        WHERE zoid = %s
+        """, (oid,))
+        if cursor.rowcount:
+            assert cursor.rowcount == 1
+            return cursor.fetchone()[0]
+        return None
+
+    def load_current(self, cursor, oid):
+        """Returns the current pickle and integer tid for an object.
+
+        oid is an integer.  Returns (None, None) if object does not exist.
+        """
+        cursor.execute("""
+        SELECT state, tid
+        FROM current_object
+            JOIN object_state USING(zoid, tid)
+        WHERE zoid = %s
+        """, (oid,))
+        if cursor.rowcount:
+            assert cursor.rowcount == 1
+            return cursor.fetchone()
+        else:
+            return None, None
+
+    def load_revision(self, cursor, oid, tid):
+        """Returns the pickle for an object on a particular transaction.
+
+        Returns None if no such state exists.
+        """
+        cursor.execute("""
+        SELECT state
+        FROM object_state
+        WHERE zoid = %s
+            AND tid = %s
+        """, (oid, tid))
+        if cursor.rowcount:
+            assert cursor.rowcount == 1
+            (state,) = cursor.fetchone()
+            return state
+        return None
+
+    def exists(self, cursor, oid):
+        """Returns a true value if the given object exists."""
+        cursor.execute("SELECT 1 FROM current_object WHERE zoid = %s", (oid,))
+        return cursor.rowcount
+
+    def load_before(self, cursor, oid, tid):
+        """Returns the pickle and tid of an object before transaction tid.
+
+        Returns (None, None) if no earlier state exists.
+        """
+        cursor.execute("""
+        SELECT state, tid
+        FROM object_state
+        WHERE zoid = %s
+            AND tid < %s
+        ORDER BY tid DESC
+        LIMIT 1
+        """, (oid, tid))
+        if cursor.rowcount:
+            assert cursor.rowcount == 1
+            return cursor.fetchone()
+        else:
+            return None, None
+
+    def get_object_tid_after(self, cursor, oid, tid):
+        """Returns the tid of the next change after an object revision.
+
+        Returns None if no later state exists.
+        """
+        stmt = """
+        SELECT tid
+        FROM object_state
+        WHERE zoid = %s
+            AND tid > %s
+        ORDER BY tid
+        LIMIT 1
+        """
+        cursor.execute(stmt, (oid, tid))
+        if cursor.rowcount:
+            assert cursor.rowcount == 1
+            return cursor.fetchone()[0]
+        else:
+            return None
+
+    def _make_temp_table(self, cursor):
+        """Create the temporary table for storing objects"""
+        stmt = """
+        CREATE TEMPORARY TABLE temp_store (
+            zoid        BIGINT NOT NULL PRIMARY KEY,
+            prev_tid    BIGINT NOT NULL,
+            md5         CHAR(32),
+            state       LONGBLOB
+        ) ENGINE MyISAM
+        """
+        cursor.execute(stmt)
+
+    def open_for_store(self):
+        """Open and initialize a connection for storing objects.
+
+        Returns (conn, cursor).
+        """
+        conn, cursor = self.connmanager.open()
+        try:
+            self._make_temp_table(cursor)
+            return conn, cursor
+        except:
+            self.connmanager.close(conn, cursor)
+            raise
+
+    def _restart_temp_table(self, cursor):
+        """Restart the temporary table for storing objects"""
+        stmt = """
+        DROP TEMPORARY TABLE IF EXISTS temp_store
+        """
+        cursor.execute(stmt)
+        self._make_temp_table(cursor)
+
+    def restart_store(self, cursor):
+        """Reuse a store connection."""
+        try:
+            cursor.connection.rollback()
+            self._restart_temp_table(cursor)
+        except self.disconnected_exceptions, e:
+            raise StorageError(e)
+
+    def store_temp(self, cursor, oid, prev_tid, data):
+        """Store an object in the temporary table."""
+        md5sum = compute_md5sum(data)
+        stmt = """
+        REPLACE INTO temp_store (zoid, prev_tid, md5, state)
+        VALUES (%s, %s, %s, %s)
+        """
+        cursor.execute(stmt, (oid, prev_tid, md5sum, self.Binary(data)))
+
+    def replace_temp(self, cursor, oid, prev_tid, data):
+        """Replace an object in the temporary table."""
+        md5sum = compute_md5sum(data)
+        stmt = """
+        UPDATE temp_store SET
+            prev_tid = %s,
+            md5 = %s,
+            state = %s
+        WHERE zoid = %s
+        """
+        cursor.execute(stmt, (prev_tid, md5sum, self.Binary(data), oid))
+
+    def restore(self, cursor, oid, tid, data):
+        """Store an object directly, without conflict detection.
+
+        Used for copying transactions into this database.
+        """
+        md5sum = compute_md5sum(data)
+        stmt = """
+        INSERT INTO object_state (zoid, tid, prev_tid, md5, state)
+        VALUES (%s, %s,
+            COALESCE((SELECT tid FROM current_object WHERE zoid = %s), 0),
+            %s, %s)
+        """
+        if data is not None:
+            data = self.Binary(data)
+        cursor.execute(stmt, (oid, tid, oid, md5sum, data))
+
+    def detect_conflict(self, cursor):
+        """Find one conflict in the data about to be committed.
+
+        If there is a conflict, returns (oid, prev_tid, attempted_prev_tid,
+        attempted_data).  If there is no conflict, returns None.
+        """
+        # Lock in share mode to ensure the data being read is up to date.
+        stmt = """
+        SELECT temp_store.zoid, current_object.tid, temp_store.prev_tid,
+            temp_store.state
+        FROM temp_store
+            JOIN current_object ON (temp_store.zoid = current_object.zoid)
+        WHERE temp_store.prev_tid != current_object.tid
+        LIMIT 1
+        LOCK IN SHARE MODE
+        """
+        cursor.execute(stmt)
+        if cursor.rowcount:
+            return cursor.fetchone()
+        return None
+
+    def move_from_temp(self, cursor, tid):
+        """Moved the temporarily stored objects to permanent storage.
+
+        Returns the list of oids stored.
+        """
+        stmt = """
+        INSERT INTO object_state (zoid, tid, prev_tid, md5, state)
+        SELECT zoid, %s, prev_tid, md5, state
+        FROM temp_store
+        """
+        cursor.execute(stmt, (tid,))
+
+        stmt = """
+        SELECT zoid FROM temp_store
+        """
+        cursor.execute(stmt)
+        return [oid for (oid,) in cursor]
+
+    def update_current(self, cursor, tid):
+        """Update the current object pointers.
+
+        tid is the integer tid of the transaction being committed.
+        """
+        cursor.execute("""
+        REPLACE INTO current_object (zoid, tid)
+        SELECT zoid, tid FROM object_state
+        WHERE tid = %s
+        """, (tid,))
+
+    def set_min_oid(self, cursor, oid):
+        """Ensure the next OID is at least the given OID."""
+        cursor.execute("REPLACE INTO new_oid VALUES(%s)", (oid,))
+
+    def new_oid(self, cursor):
+        """Return a new, unused OID."""
+        stmt = "INSERT INTO new_oid VALUES ()"
+        cursor.execute(stmt)
+        oid = cursor.connection.insert_id()
+        if oid % 100 == 0:
+            # Clean out previously generated OIDs.
+            stmt = "DELETE FROM new_oid WHERE zoid < %s"
+            cursor.execute(stmt, (oid,))
+        return oid
+
+
+class HistoryPreservingOracleLoadStore(object):
+
+    def __init__(self, connmanager, runner, disconnected_exceptions,
+            Binary, inputsize_BLOB, inputsize_BINARY, twophase):
+        self.connmanager = connmanager
+        self.runner = runner
+        self.disconnected_exceptions = disconnected_exceptions
+        self.Binary = Binary
+        self.inputsize_BLOB = inputsize_BLOB
+        self.inputsize_BINARY = inputsize_BINARY
+        self.twophase = twophase
+
+    def open_for_load(self):
+        """Open and initialize a connection for loading objects.
+
+        Returns (conn, cursor).
+        """
+        return self.connmanager.open(self.connmanager.isolation_read_only)
+
+    def restart_load(self, cursor):
+        """Reinitialize a connection for loading objects."""
+        try:
+            cursor.connection.rollback()
+            cursor.execute("SET TRANSACTION READ ONLY")
+        except self.disconnected_exceptions, e:
+            raise StorageError(e)
+
+    def get_current_tid(self, cursor, oid):
+        """Returns the current integer tid for an object.
+
+        oid is an integer.  Returns None if object does not exist.
+        """
+        cursor.execute("""
+        SELECT tid
+        FROM current_object
+        WHERE zoid = :1
+        """, (oid,))
+        for (tid,) in cursor:
+            return tid
+        return None
+
+    def load_current(self, cursor, oid):
+        """Returns the current pickle and integer tid for an object.
+
+        oid is an integer.  Returns (None, None) if object does not exist.
+        """
+        stmt = """
+        SELECT state, tid
+        FROM current_object
+            JOIN object_state USING(zoid, tid)
+        WHERE zoid = :1
+        """
+        return self.runner.run_lob_stmt(
+            cursor, stmt, (oid,), default=(None, None))
+
+    def load_revision(self, cursor, oid, tid):
+        """Returns the pickle for an object on a particular transaction.
+
+        Returns None if no such state exists.
+        """
+        stmt = """
+        SELECT state
+        FROM object_state
+        WHERE zoid = :1
+            AND tid = :2
+        """
+        (state,) = self.runner.run_lob_stmt(
+            cursor, stmt, (oid, tid), default=(None,))
+        return state
+
+    def exists(self, cursor, oid):
+        """Returns a true value if the given object exists."""
+        cursor.execute("SELECT 1 FROM current_object WHERE zoid = :1", (oid,))
+        return len(list(cursor))
+
+    def load_before(self, cursor, oid, tid):
+        """Returns the pickle and tid of an object before transaction tid.
+
+        Returns (None, None) if no earlier state exists.
+        """
+        stmt = """
+        SELECT state, tid
+        FROM object_state
+        WHERE zoid = :oid
+            AND tid = (
+                SELECT MAX(tid)
+                FROM object_state
+                WHERE zoid = :oid
+                    AND tid < :tid
+            )
+        """
+        return self.runner.run_lob_stmt(
+            cursor, stmt, {'oid': oid, 'tid': tid}, default=(None, None))
+
+    def get_object_tid_after(self, cursor, oid, tid):
+        """Returns the tid of the next change after an object revision.
+
+        Returns None if no later state exists.
+        """
+        stmt = """
+        SELECT MIN(tid)
+        FROM object_state
+        WHERE zoid = :1
+            AND tid > :2
+        """
+        cursor.execute(stmt, (oid, tid))
+        rows = cursor.fetchall()
+        if rows:
+            assert len(rows) == 1
+            return rows[0][0]
+        else:
+            return None
+
+    def _set_xid(self, cursor):
+        """Set up a distributed transaction"""
+        stmt = """
+        SELECT SYS_CONTEXT('USERENV', 'SID') FROM DUAL
+        """
+        cursor.execute(stmt)
+        xid = str(cursor.fetchone()[0])
+        cursor.connection.begin(0, xid, '0')
+
+    def open_for_store(self):
+        """Open and initialize a connection for storing objects.
+
+        Returns (conn, cursor).
+        """
+        if self.twophase:
+            conn, cursor = self.connmanager.open(
+                transaction_mode=None, twophase=True)
+            try:
+                self._set_xid(cursor)
+            except:
+                self.close(conn, cursor)
+                raise
+        else:
+            conn, cursor = self.connmanager.open()
+        return conn, cursor
+
+    def restart_store(self, cursor):
+        """Reuse a store connection."""
+        try:
+            cursor.connection.rollback()
+            if self.twophase:
+                self._set_xid(cursor)
+        except self.disconnected_exceptions, e:
+            raise StorageError(e)
+
+    def store_temp(self, cursor, oid, prev_tid, data):
+        """Store an object in the temporary table."""
+        md5sum = compute_md5sum(data)
+        cursor.execute("DELETE FROM temp_store WHERE zoid = :oid", oid=oid)
+        if len(data) <= 2000:
+            # Send data inline for speed.  Oracle docs say maximum size
+            # of a RAW is 2000 bytes.  inputsize_BINARY corresponds with RAW.
+            cursor.setinputsizes(rawdata=self.inputsize_BINARY)
+            stmt = """
+            INSERT INTO temp_store (zoid, prev_tid, md5, state)
+            VALUES (:oid, :prev_tid, :md5sum, :rawdata)
+            """
+            cursor.execute(stmt, oid=oid, prev_tid=prev_tid,
+                md5sum=md5sum, rawdata=data)
+        else:
+            # Send data as a BLOB
+            cursor.setinputsizes(blobdata=self.inputsize_BLOB)
+            stmt = """
+            INSERT INTO temp_store (zoid, prev_tid, md5, state)
+            VALUES (:oid, :prev_tid, :md5sum, :blobdata)
+            """
+            cursor.execute(stmt, oid=oid, prev_tid=prev_tid,
+                md5sum=md5sum, blobdata=data)
+
+    def replace_temp(self, cursor, oid, prev_tid, data):
+        """Replace an object in the temporary table."""
+        md5sum = compute_md5sum(data)
+        cursor.setinputsizes(data=self.inputsize_BLOB)
+        stmt = """
+        UPDATE temp_store SET
+            prev_tid = :prev_tid,
+            md5 = :md5sum,
+            state = :data
+        WHERE zoid = :oid
+        """
+        cursor.execute(stmt, oid=oid, prev_tid=prev_tid,
+            md5sum=md5sum, data=self.Binary(data))
+
+    def restore(self, cursor, oid, tid, data):
+        """Store an object directly, without conflict detection.
+
+        Used for copying transactions into this database.
+        """
+        md5sum = compute_md5sum(data)
+        cursor.setinputsizes(data=self.inputsize_BLOB)
+        stmt = """
+        INSERT INTO object_state (zoid, tid, prev_tid, md5, state)
+        VALUES (:oid, :tid,
+            COALESCE((SELECT tid FROM current_object WHERE zoid = :oid), 0),
+            :md5sum, :data)
+        """
+        if data is not None:
+            data = self.Binary(data)
+        cursor.execute(stmt, oid=oid, tid=tid, md5sum=md5sum, data=data)
+
+    def detect_conflict(self, cursor):
+        """Find one conflict in the data about to be committed.
+
+        If there is a conflict, returns (oid, prev_tid, attempted_prev_tid,
+        attempted_data).  If there is no conflict, returns None.
+        """
+        stmt = """
+        SELECT temp_store.zoid, current_object.tid, temp_store.prev_tid,
+            temp_store.state
+        FROM temp_store
+            JOIN current_object ON (temp_store.zoid = current_object.zoid)
+        WHERE temp_store.prev_tid != current_object.tid
+        """
+        return self.runner.run_lob_stmt(cursor, stmt)
+
+    def move_from_temp(self, cursor, tid):
+        """Move the temporarily stored objects to permanent storage.
+
+        Returns the list of oids stored.
+        """
+        stmt = """
+        INSERT INTO object_state (zoid, tid, prev_tid, md5, state)
+        SELECT zoid, :tid, prev_tid, md5, state
+        FROM temp_store
+        """
+        cursor.execute(stmt, tid=tid)
+
+        stmt = """
+        SELECT zoid FROM temp_store
+        """
+        cursor.execute(stmt)
+        return [oid for (oid,) in cursor]
+
+    def update_current(self, cursor, tid):
+        """Update the current object pointers.
+
+        tid is the integer tid of the transaction being committed.
+        """
+        # Insert objects created in this transaction into current_object.
+        stmt = """
+        INSERT INTO current_object (zoid, tid)
+        SELECT zoid, tid FROM object_state
+        WHERE tid = :1
+            AND prev_tid = 0
+        """
+        cursor.execute(stmt, (tid,))
+
+        # Change existing objects.
+        stmt = """
+        UPDATE current_object SET tid = :1
+        WHERE zoid IN (
+            SELECT zoid FROM object_state
+            WHERE tid = :1
+                AND prev_tid != 0
+        )
+        """
+        cursor.execute(stmt, (tid,))
+
+    def set_min_oid(self, cursor, oid):
+        """Ensure the next OID is at least the given OID."""
+        next_oid = self.new_oid(cursor)
+        if next_oid < oid:
+            # Oracle provides no way modify the sequence value
+            # except through alter sequence or drop/create sequence,
+            # but either statement kills the current transaction.
+            # Therefore, open a temporary connection to make the
+            # alteration.
+            conn2, cursor2 = self.connmanager.open()
+            try:
+                # Change the sequence by altering the increment.
+                # (this is safer than dropping and re-creating the sequence)
+                diff = oid - next_oid
+                cursor2.execute(
+                    "ALTER SEQUENCE zoid_seq INCREMENT BY %d" % diff)
+                cursor2.execute("SELECT zoid_seq.nextval FROM DUAL")
+                cursor2.execute("ALTER SEQUENCE zoid_seq INCREMENT BY 1")
+                conn2.commit()
+            finally:
+                self.connmanager.close(conn2, cursor2)
+
+    def new_oid(self, cursor):
+        """Return a new, unused OID."""
+        stmt = "SELECT zoid_seq.nextval FROM DUAL"
+        cursor.execute(stmt)
+        return cursor.fetchone()[0]

Added: relstorage/trunk/relstorage/adapters/locker.py
===================================================================
--- relstorage/trunk/relstorage/adapters/locker.py	                        (rev 0)
+++ relstorage/trunk/relstorage/adapters/locker.py	2009-09-23 21:12:58 UTC (rev 104464)
@@ -0,0 +1,157 @@
+##############################################################################
+#
+# Copyright (c) 2009 Zope Foundation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""Locker implementations.
+"""
+
+from relstorage.adapters.interfaces import ILocker
+from ZODB.POSException import StorageError
+from zope.interface import implements
+import re
+
+commit_lock_timeout = 30
+
+
+class Locker(object):
+
+    def __init__(self, database_errors):
+        self.database_errors = database_errors
+
+
+class PostgreSQLLocker(Locker):
+    implements(ILocker)
+
+    def hold_commit_lock(self, cursor, ensure_current=False):
+        if ensure_current:
+            # Hold commit_lock to prevent concurrent commits
+            # (for as short a time as possible).
+            # Lock transaction and current_object in share mode to ensure
+            # conflict detection has the most current data.
+            cursor.execute("""
+            LOCK TABLE commit_lock IN EXCLUSIVE MODE;
+            LOCK TABLE transaction IN SHARE MODE;
+            LOCK TABLE current_object IN SHARE MODE
+            """)
+        else:
+            cursor.execute("LOCK TABLE commit_lock IN EXCLUSIVE MODE")
+
+    def release_commit_lock(self, cursor):
+        # no action needed
+        pass
+
+    def _pg_version(self, cursor):
+        """Return the (major, minor) version of PostgreSQL"""
+        cursor.execute("SELECT version()")
+        v = cursor.fetchone()[0]
+        m = re.search(r"([0-9]+)[.]([0-9]+)", v)
+        if m is None:
+            raise AssertionError("Unable to detect PostgreSQL version: " + v)
+        else:
+            return int(m.group(1)), int(m.group(2))
+
+    def _pg_has_advisory_locks(self, cursor):
+        """Return true if this version of PostgreSQL supports advisory locks"""
+        return self._pg_version(cursor) >= (8, 2)
+
+    def create_pack_lock(self, cursor):
+        if not self._pg_has_advisory_locks(cursor):
+            cursor.execute("CREATE TABLE pack_lock ()")
+
+    def hold_pack_lock(self, cursor):
+        """Try to acquire the pack lock.
+
+        Raise an exception if packing or undo is already in progress.
+        """
+        if self._pg_has_advisory_locks(cursor):
+            cursor.execute("SELECT pg_try_advisory_lock(1)")
+            locked = cursor.fetchone()[0]
+            if not locked:
+                raise StorageError('A pack or undo operation is in progress')
+        else:
+            # b/w compat
+            try:
+                cursor.execute("LOCK pack_lock IN EXCLUSIVE MODE NOWAIT")
+            except self.database_errors:  # psycopg2.DatabaseError:
+                raise StorageError('A pack or undo operation is in progress')
+
+    def release_pack_lock(self, cursor):
+        """Release the pack lock."""
+        if self._pg_has_advisory_locks(cursor):
+            cursor.execute("SELECT pg_advisory_unlock(1)")
+        # else no action needed since the lock will be released at txn commit
+
+
+class MySQLLocker(Locker):
+    implements(ILocker)
+
+    def hold_commit_lock(self, cursor, ensure_current=False):
+        cursor.execute("SELECT GET_LOCK(CONCAT(DATABASE(), '.commit'), %s)",
+            (commit_lock_timeout,))
+        locked = cursor.fetchone()[0]
+        if not locked:
+            raise StorageError("Unable to acquire commit lock")
+
+    def release_commit_lock(self, cursor):
+        cursor.execute("SELECT RELEASE_LOCK(CONCAT(DATABASE(), '.commit'))")
+
+    def hold_pack_lock(self, cursor):
+        """Try to acquire the pack lock.
+
+        Raise an exception if packing or undo is already in progress.
+        """
+        stmt = "SELECT GET_LOCK(CONCAT(DATABASE(), '.pack'), 0)"
+        cursor.execute(stmt)
+        res = cursor.fetchone()[0]
+        if not res:
+            raise StorageError('A pack or undo operation is in progress')
+
+    def release_pack_lock(self, cursor):
+        """Release the pack lock."""
+        stmt = "SELECT RELEASE_LOCK(CONCAT(DATABASE(), '.pack'))"
+        cursor.execute(stmt)
+
+
+class OracleLocker(Locker):
+    implements(ILocker)
+
+    def hold_commit_lock(self, cursor, ensure_current=False):
+        # Hold commit_lock to prevent concurrent commits
+        # (for as short a time as possible).
+        cursor.execute("LOCK TABLE commit_lock IN EXCLUSIVE MODE")
+        if ensure_current:
+            # Lock transaction and current_object in share mode to ensure
+            # conflict detection has the most current data.
+            cursor.execute("LOCK TABLE transaction IN SHARE MODE")
+            cursor.execute("LOCK TABLE current_object IN SHARE MODE")
+
+    def release_commit_lock(self, cursor):
+        # no action needed
+        pass
+
+    def hold_pack_lock(self, cursor):
+        """Try to acquire the pack lock.
+
+        Raise an exception if packing or undo is already in progress.
+        """
+        stmt = """
+        LOCK TABLE pack_lock IN EXCLUSIVE MODE NOWAIT
+        """
+        try:
+            cursor.execute(stmt)
+        except self.database_errors:  # cx_Oracle.DatabaseError:
+            raise StorageError('A pack or undo operation is in progress')
+
+    def release_pack_lock(self, cursor):
+        """Release the pack lock."""
+        # No action needed
+        pass

Modified: relstorage/trunk/relstorage/adapters/mysql.py
===================================================================
--- relstorage/trunk/relstorage/adapters/mysql.py	2009-09-23 18:16:28 UTC (rev 104463)
+++ relstorage/trunk/relstorage/adapters/mysql.py	2009-09-23 21:12:58 UTC (rev 104464)
@@ -50,232 +50,133 @@
 
 import logging
 import MySQLdb
-import time
-from ZODB.POSException import StorageError
 
-from relstorage.adapters.historypreserving import HistoryPreservingAdapter
+from relstorage.adapters.connmanager import AbstractConnectionManager
+from relstorage.adapters.dbiter import HistoryPreservingDatabaseIterator
+from relstorage.adapters.loadstore import HistoryPreservingMySQLLoadStore
+from relstorage.adapters.locker import MySQLLocker
+from relstorage.adapters.packundo import HistoryPreservingPackUndo
+from relstorage.adapters.poller import Poller
+from relstorage.adapters.schema import HistoryPreservingMySQLSchema
+from relstorage.adapters.scriptrunner import ScriptRunner
+from relstorage.adapters.stats import MySQLStats
+from relstorage.adapters.txncontrol import MySQLTransactionControl
 
-log = logging.getLogger("relstorage.adapters.mysql")
+log = logging.getLogger(__name__)
 
-commit_lock_timeout = 30
-
 # disconnected_exceptions contains the exception types that might be
 # raised when the connection to the database has been broken.
 disconnected_exceptions = (MySQLdb.OperationalError, MySQLdb.InterfaceError)
 
-# close_exceptions contains the exception types to ignore
-# when the adapter attempts to close a database connection.
-close_exceptions = disconnected_exceptions + (MySQLdb.ProgrammingError,)
 
-
-class MySQLAdapter(HistoryPreservingAdapter):
+class MySQLAdapter(object):
     """MySQL adapter for RelStorage."""
 
-    _scripts = HistoryPreservingAdapter._scripts.copy()
-    # Work around a MySQL performance bug by avoiding an expensive subquery.
-    # See: http://mail.zope.org/pipermail/zodb-dev/2008-May/011880.html
-    #      http://bugs.mysql.com/bug.php?id=28257
-    _scripts.update({
-        'create_temp_pack_visit': """
-            CREATE TEMPORARY TABLE temp_pack_visit (
-                zoid BIGINT NOT NULL,
-                keep_tid BIGINT
-            );
-            CREATE UNIQUE INDEX temp_pack_visit_zoid ON temp_pack_visit (zoid);
-            CREATE TEMPORARY TABLE temp_pack_child (
-                zoid BIGINT NOT NULL
-            );
-            CREATE UNIQUE INDEX temp_pack_child_zoid ON temp_pack_child (zoid);
-            """,
+    keep_history = True
 
-        # Note: UPDATE must be the last statement in the script
-        # because it returns a value.
-        'pre_pack_follow_child_refs': """
-            %(TRUNCATE)s temp_pack_child;
-
-            INSERT INTO temp_pack_child
-            SELECT DISTINCT to_zoid
-            FROM object_ref
-                JOIN temp_pack_visit USING (zoid)
-            WHERE object_ref.tid >= temp_pack_visit.keep_tid;
-
-            -- MySQL-specific syntax for table join in update
-            UPDATE pack_object, temp_pack_child SET keep = %(TRUE)s
-            WHERE keep = %(FALSE)s
-                AND pack_object.zoid = temp_pack_child.zoid;
-            """,
-
-        # MySQL optimizes deletion far better when using a join syntax.
-        'pack_current_object': """
-            DELETE FROM current_object
-            USING current_object
-                JOIN pack_state USING (zoid, tid)
-            WHERE current_object.tid = %(tid)s
-            """,
-
-        'pack_object_state': """
-            DELETE FROM object_state
-            USING object_state
-                JOIN pack_state USING (zoid, tid)
-            WHERE object_state.tid = %(tid)s
-            """,
-
-        'pack_object_ref': """
-            DELETE FROM object_refs_added
-            USING object_refs_added
-                JOIN transaction USING (tid)
-            WHERE transaction.empty = true;
-
-            DELETE FROM object_ref
-            USING object_ref
-                JOIN transaction USING (tid)
-            WHERE transaction.empty = true
-            """,
-        })
-
     def __init__(self, **params):
-        self._params = params.copy()
+        self.connmanager = MySQLdbConnectionManager(params)
+        self.runner = ScriptRunner()
+        self.locker = MySQLLocker((MySQLdb.DatabaseError,))
+        self.schema = HistoryPreservingMySQLSchema(
+            connmanager=self.connmanager,
+            runner=self.runner,
+            )
+        self.loadstore = HistoryPreservingMySQLLoadStore(
+            connmanager=self.connmanager,
+            disconnected_exceptions=disconnected_exceptions,
+            Binary=MySQLdb.Binary,
+            )
+        self.txncontrol = MySQLTransactionControl(
+            Binary=MySQLdb.Binary,
+            )
+        self.poller = Poller(
+            poll_query="SELECT tid FROM transaction ORDER BY tid DESC LIMIT 1",
+            keep_history=True,
+            runner=self.runner,
+            )
+        self.packundo = HistoryPreservingPackUndo(
+            connmanager=self.connmanager,
+            runner=self.runner,
+            locker=self.locker,
+            )
+        self.dbiter = HistoryPreservingDatabaseIterator(
+            runner=self.runner,
+            )
+        self.stats = MySQLStats(
+            connmanager=self.connmanager,
+            )
 
-    def create_schema(self, cursor):
-        """Create the database tables."""
-        stmt = """
-        -- The list of all transactions in the database
-        CREATE TABLE transaction (
-            tid         BIGINT NOT NULL PRIMARY KEY,
-            packed      BOOLEAN NOT NULL DEFAULT FALSE,
-            empty       BOOLEAN NOT NULL DEFAULT FALSE,
-            username    BLOB NOT NULL,
-            description BLOB NOT NULL,
-            extension   BLOB
-        ) ENGINE = InnoDB;
+        self.open = self.connmanager.open
+        self.close = self.connmanager.close
 
-        -- Create a special transaction to represent object creation.  This
-        -- row is often referenced by object_state.prev_tid, but never by
-        -- object_state.tid.
-        INSERT INTO transaction (tid, username, description)
-            VALUES (0, 'system', 'special transaction for object creation');
+        self.hold_commit_lock = self.locker.hold_commit_lock
+        self.release_commit_lock = self.locker.release_commit_lock
+        self.hold_pack_lock = self.locker.hold_pack_lock
+        self.release_pack_lock = self.locker.release_pack_lock
 
-        -- All OIDs allocated in the database.  Note that this table
-        -- is purposely non-transactional.
-        CREATE TABLE new_oid (
-            zoid        BIGINT NOT NULL PRIMARY KEY AUTO_INCREMENT
-        ) ENGINE = MyISAM;
+        self.create_schema = self.schema.create
+        self.prepare_schema = self.schema.prepare
+        self.zap_all = self.schema.zap_all
+        self.drop_all = self.schema.drop_all
 
-        -- All object states in all transactions.  Note that md5 and state
-        -- can be null to represent object uncreation.
-        CREATE TABLE object_state (
-            zoid        BIGINT NOT NULL,
-            tid         BIGINT NOT NULL REFERENCES transaction,
-            PRIMARY KEY (zoid, tid),
-            prev_tid    BIGINT NOT NULL REFERENCES transaction,
-            md5         CHAR(32) CHARACTER SET ascii,
-            state       LONGBLOB,
-            CHECK (tid > 0)
-        ) ENGINE = InnoDB;
-        CREATE INDEX object_state_tid ON object_state (tid);
-        CREATE INDEX object_state_prev_tid ON object_state (prev_tid);
+        self.open_for_load = self.loadstore.open_for_load
+        self.restart_load = self.loadstore.restart_load
+        self.get_current_tid = self.loadstore.get_current_tid
+        self.load_current = self.loadstore.load_current
+        self.load_revision = self.loadstore.load_revision
+        self.exists = self.loadstore.exists
+        self.load_before = self.loadstore.load_before
+        self.get_object_tid_after = self.loadstore.get_object_tid_after
 
-        -- Pointers to the current object state
-        CREATE TABLE current_object (
-            zoid        BIGINT NOT NULL PRIMARY KEY,
-            tid         BIGINT NOT NULL,
-            FOREIGN KEY (zoid, tid) REFERENCES object_state (zoid, tid)
-        ) ENGINE = InnoDB;
-        CREATE INDEX current_object_tid ON current_object (tid);
+        self.open_for_store = self.loadstore.open_for_store
+        self.restart_store = self.loadstore.restart_store
+        self.store_temp = self.loadstore.store_temp
+        self.replace_temp = self.loadstore.replace_temp
+        self.restore = self.loadstore.restore
+        self.detect_conflict = self.loadstore.detect_conflict
+        self.move_from_temp = self.loadstore.move_from_temp
+        self.update_current = self.loadstore.update_current
+        self.set_min_oid = self.loadstore.set_min_oid
+        self.new_oid = self.loadstore.new_oid
 
-        -- A list of referenced OIDs from each object_state.
-        -- This table is populated as needed during packing.
-        -- To prevent unnecessary table locking, it does not use
-        -- foreign keys, which is safe because rows in object_state
-        -- are never modified once committed, and rows are removed
-        -- from object_state only by packing.
-        CREATE TABLE object_ref (
-            zoid        BIGINT NOT NULL,
-            tid         BIGINT NOT NULL,
-            to_zoid     BIGINT NOT NULL,
-            PRIMARY KEY (tid, zoid, to_zoid)
-        ) ENGINE = MyISAM;
+        self.get_tid_and_time = self.txncontrol.get_tid_and_time
+        self.add_transaction = self.txncontrol.add_transaction
+        self.commit_phase1 = self.txncontrol.commit_phase1
+        self.commit_phase2 = self.txncontrol.commit_phase2
+        self.abort = self.txncontrol.abort
 
-        -- The object_refs_added table tracks whether object_refs has
-        -- been populated for all states in a given transaction.
-        -- An entry is added only when the work is finished.
-        -- To prevent unnecessary table locking, it does not use
-        -- foreign keys, which is safe because object states
-        -- are never added to a transaction once committed, and
-        -- rows are removed from the transaction table only by
-        -- packing.
-        CREATE TABLE object_refs_added (
-            tid         BIGINT NOT NULL PRIMARY KEY
-        ) ENGINE = MyISAM;
+        self.poll_invalidations = self.poller.poll_invalidations
 
-        -- Temporary state during packing:
-        -- The list of objects to pack.  If keep is false,
-        -- the object and all its revisions will be removed.
-        -- If keep is true, instead of removing the object,
-        -- the pack operation will cut the object's history.
-        -- The keep_tid field specifies the oldest revision
-        -- of the object to keep.
-        -- The visited flag is set when pre_pack is visiting an object's
-        -- references, and remains set.
-        CREATE TABLE pack_object (
-            zoid        BIGINT NOT NULL PRIMARY KEY,
-            keep        BOOLEAN NOT NULL,
-            keep_tid    BIGINT NOT NULL,
-            visited     BOOLEAN NOT NULL DEFAULT FALSE
-        ) ENGINE = MyISAM;
-        CREATE INDEX pack_object_keep_zoid ON pack_object (keep, zoid);
+        self.fill_object_refs = self.packundo.fill_object_refs
+        self.open_for_pre_pack = self.packundo.open_for_pre_pack
+        self.choose_pack_transaction = self.packundo.choose_pack_transaction
+        self.pre_pack = self.packundo.pre_pack
+        self.pack = self.packundo.pack
+        self.verify_undoable = self.packundo.verify_undoable
+        self.undo = self.packundo.undo
 
-        -- Temporary state during packing: the list of object states to pack.
-        CREATE TABLE pack_state (
-            tid         BIGINT NOT NULL,
-            zoid        BIGINT NOT NULL,
-            PRIMARY KEY (tid, zoid)
-        ) ENGINE = MyISAM;
+        self.iter_objects = self.dbiter.iter_objects
+        self.iter_transactions = self.dbiter.iter_transactions
+        self.iter_transactions_range = self.dbiter.iter_transactions_range
+        self.iter_object_history = self.dbiter.iter_object_history
 
-        -- Temporary state during packing: the list of transactions that
-        -- have at least one object state to pack.
-        CREATE TABLE pack_state_tid (
-            tid         BIGINT NOT NULL PRIMARY KEY
-        ) ENGINE = MyISAM;
-        """
-        self._run_script(cursor, stmt)
+        self.get_object_count = self.stats.get_object_count
+        self.get_db_size = self.stats.get_db_size
 
 
-    def prepare_schema(self):
-        """Create the database schema if it does not already exist."""
-        def callback(conn, cursor):
-            cursor.execute("SHOW TABLES LIKE 'object_state'")
-            if not cursor.rowcount:
-                self.create_schema(cursor)
-        self._open_and_call(callback)
+class MySQLdbConnectionManager(AbstractConnectionManager):
 
-    def zap_all(self):
-        """Clear all data out of the database."""
-        def callback(conn, cursor):
-            stmt = """
-            DELETE FROM object_refs_added;
-            DELETE FROM object_ref;
-            DELETE FROM current_object;
-            DELETE FROM object_state;
-            TRUNCATE new_oid;
-            DELETE FROM transaction;
-            -- Create a transaction to represent object creation.
-            INSERT INTO transaction (tid, username, description) VALUES
-                (0, 'system', 'special transaction for object creation');
-            """
-            self._run_script(cursor, stmt)
-        self._open_and_call(callback)
+    isolation_read_committed = "ISOLATION LEVEL READ COMMITTED"
+    isolation_repeatable_read = "ISOLATION LEVEL REPEATABLE READ"
 
-    def drop_all(self):
-        """Drop all tables and sequences."""
-        def callback(conn, cursor):
-            for tablename in ('pack_state_tid', 'pack_state',
-                    'pack_object', 'object_refs_added', 'object_ref',
-                    'current_object', 'object_state', 'new_oid',
-                    'transaction'):
-                cursor.execute("DROP TABLE IF EXISTS %s" % tablename)
-        self._open_and_call(callback)
+    # close_exceptions contains the exception types to ignore
+    # when the adapter attempts to close a database connection.
+    close_exceptions = disconnected_exceptions + (MySQLdb.ProgrammingError,)
 
+    def __init__(self, params):
+        self._params = params.copy()
+
     def open(self, transaction_mode="ISOLATION LEVEL READ COMMITTED"):
         """Open a database connection and return (conn, cursor)."""
         try:
@@ -291,396 +192,3 @@
             log.warning("Unable to connect: %s", e)
             raise
 
-    def close(self, conn, cursor):
-        """Close a connection and cursor, ignoring certain errors.
-        """
-        for obj in (cursor, conn):
-            if obj is not None:
-                try:
-                    obj.close()
-                except close_exceptions:
-                    pass
-
-    def open_for_load(self):
-        """Open and initialize a connection for loading objects.
-
-        Returns (conn, cursor).
-        """
-        return self.open("ISOLATION LEVEL REPEATABLE READ")
-
-    def restart_load(self, cursor):
-        """Reinitialize a connection for loading objects."""
-        try:
-            cursor.connection.rollback()
-        except disconnected_exceptions, e:
-            raise StorageError(e)
-
-    def get_object_count(self):
-        """Returns the number of objects in the database"""
-        # do later
-        return 0
-
-    def get_db_size(self):
-        """Returns the approximate size of the database in bytes"""
-        conn, cursor = self.open()
-        try:
-            cursor.execute("SHOW TABLE STATUS")
-            description = [i[0] for i in cursor.description]
-            rows = list(cursor)
-        finally:
-            self.close(conn, cursor)
-        data_column = description.index('Data_length')
-        index_column = description.index('Index_length')
-        return sum([row[data_column] + row[index_column] for row in rows], 0)
-
-    def get_current_tid(self, cursor, oid):
-        """Returns the current integer tid for an object.
-
-        oid is an integer.  Returns None if object does not exist.
-        """
-        cursor.execute("""
-        SELECT tid
-        FROM current_object
-        WHERE zoid = %s
-        """, (oid,))
-        if cursor.rowcount:
-            assert cursor.rowcount == 1
-            return cursor.fetchone()[0]
-        return None
-
-    def load_current(self, cursor, oid):
-        """Returns the current pickle and integer tid for an object.
-
-        oid is an integer.  Returns (None, None) if object does not exist.
-        """
-        cursor.execute("""
-        SELECT state, tid
-        FROM current_object
-            JOIN object_state USING(zoid, tid)
-        WHERE zoid = %s
-        """, (oid,))
-        if cursor.rowcount:
-            assert cursor.rowcount == 1
-            return cursor.fetchone()
-        else:
-            return None, None
-
-    def load_revision(self, cursor, oid, tid):
-        """Returns the pickle for an object on a particular transaction.
-
-        Returns None if no such state exists.
-        """
-        cursor.execute("""
-        SELECT state
-        FROM object_state
-        WHERE zoid = %s
-            AND tid = %s
-        """, (oid, tid))
-        if cursor.rowcount:
-            assert cursor.rowcount == 1
-            (state,) = cursor.fetchone()
-            return state
-        return None
-
-    def exists(self, cursor, oid):
-        """Returns a true value if the given object exists."""
-        cursor.execute("SELECT 1 FROM current_object WHERE zoid = %s", (oid,))
-        return cursor.rowcount
-
-    def load_before(self, cursor, oid, tid):
-        """Returns the pickle and tid of an object before transaction tid.
-
-        Returns (None, None) if no earlier state exists.
-        """
-        cursor.execute("""
-        SELECT state, tid
-        FROM object_state
-        WHERE zoid = %s
-            AND tid < %s
-        ORDER BY tid DESC
-        LIMIT 1
-        """, (oid, tid))
-        if cursor.rowcount:
-            assert cursor.rowcount == 1
-            return cursor.fetchone()
-        else:
-            return None, None
-
-    def get_object_tid_after(self, cursor, oid, tid):
-        """Returns the tid of the next change after an object revision.
-
-        Returns None if no later state exists.
-        """
-        stmt = """
-        SELECT tid
-        FROM object_state
-        WHERE zoid = %s
-            AND tid > %s
-        ORDER BY tid
-        LIMIT 1
-        """
-        cursor.execute(stmt, (oid, tid))
-        if cursor.rowcount:
-            assert cursor.rowcount == 1
-            return cursor.fetchone()[0]
-        else:
-            return None
-
-    def _make_temp_table(self, cursor):
-        """Create the temporary table for storing objects"""
-        stmt = """
-        CREATE TEMPORARY TABLE temp_store (
-            zoid        BIGINT NOT NULL PRIMARY KEY,
-            prev_tid    BIGINT NOT NULL,
-            md5         CHAR(32),
-            state       LONGBLOB
-        ) ENGINE MyISAM
-        """
-        cursor.execute(stmt)
-
-    def open_for_store(self):
-        """Open and initialize a connection for storing objects.
-
-        Returns (conn, cursor).
-        """
-        conn, cursor = self.open()
-        try:
-            self._make_temp_table(cursor)
-            return conn, cursor
-        except:
-            self.close(conn, cursor)
-            raise
-
-    def _restart_temp_table(self, cursor):
-        """Restart the temporary table for storing objects"""
-        stmt = """
-        DROP TEMPORARY TABLE IF EXISTS temp_store
-        """
-        cursor.execute(stmt)
-        self._make_temp_table(cursor)
-
-    def restart_store(self, cursor):
-        """Reuse a store connection."""
-        try:
-            cursor.connection.rollback()
-            self._restart_temp_table(cursor)
-        except disconnected_exceptions, e:
-            raise StorageError(e)
-
-    def store_temp(self, cursor, oid, prev_tid, data):
-        """Store an object in the temporary table."""
-        md5sum = self.md5sum(data)
-        stmt = """
-        REPLACE INTO temp_store (zoid, prev_tid, md5, state)
-        VALUES (%s, %s, %s, %s)
-        """
-        cursor.execute(stmt, (oid, prev_tid, md5sum, MySQLdb.Binary(data)))
-
-    def replace_temp(self, cursor, oid, prev_tid, data):
-        """Replace an object in the temporary table."""
-        md5sum = self.md5sum(data)
-        stmt = """
-        UPDATE temp_store SET
-            prev_tid = %s,
-            md5 = %s,
-            state = %s
-        WHERE zoid = %s
-        """
-        cursor.execute(stmt, (prev_tid, md5sum, MySQLdb.Binary(data), oid))
-
-    def restore(self, cursor, oid, tid, data):
-        """Store an object directly, without conflict detection.
-
-        Used for copying transactions into this database.
-        """
-        md5sum = self.md5sum(data)
-        stmt = """
-        INSERT INTO object_state (zoid, tid, prev_tid, md5, state)
-        VALUES (%s, %s,
-            COALESCE((SELECT tid FROM current_object WHERE zoid = %s), 0),
-            %s, %s)
-        """
-        if data is not None:
-            data = MySQLdb.Binary(data)
-        cursor.execute(stmt, (oid, tid, oid, md5sum, data))
-
-    def start_commit(self, cursor):
-        """Prepare to commit."""
-        self._hold_commit_lock(cursor)
-
-    def get_tid_and_time(self, cursor):
-        """Returns the most recent tid and the current database time.
-
-        The database time is the number of seconds since the epoch.
-        """
-        # Lock in share mode to ensure the data being read is up to date.
-        cursor.execute("""
-        SELECT tid, UNIX_TIMESTAMP()
-        FROM transaction
-        ORDER BY tid DESC
-        LIMIT 1
-        LOCK IN SHARE MODE
-        """)
-        assert cursor.rowcount == 1
-        tid, timestamp = cursor.fetchone()
-        # MySQL does not provide timestamps with more than one second
-        # precision.  To provide more precision, if the system time is
-        # within one minute of the MySQL time, use the system time instead.
-        now = time.time()
-        if abs(now - timestamp) <= 60.0:
-            timestamp = now
-        return tid, timestamp
-
-    def add_transaction(self, cursor, tid, username, description, extension,
-            packed=False):
-        """Add a transaction."""
-        stmt = """
-        INSERT INTO transaction
-            (tid, packed, username, description, extension)
-        VALUES (%s, %s, %s, %s, %s)
-        """
-        cursor.execute(stmt, (
-            tid, packed, MySQLdb.Binary(username),
-            MySQLdb.Binary(description), MySQLdb.Binary(extension)))
-
-    def detect_conflict(self, cursor):
-        """Find one conflict in the data about to be committed.
-
-        If there is a conflict, returns (oid, prev_tid, attempted_prev_tid,
-        attempted_data).  If there is no conflict, returns None.
-        """
-        # Lock in share mode to ensure the data being read is up to date.
-        stmt = """
-        SELECT temp_store.zoid, current_object.tid, temp_store.prev_tid,
-            temp_store.state
-        FROM temp_store
-            JOIN current_object ON (temp_store.zoid = current_object.zoid)
-        WHERE temp_store.prev_tid != current_object.tid
-        LIMIT 1
-        LOCK IN SHARE MODE
-        """
-        cursor.execute(stmt)
-        if cursor.rowcount:
-            return cursor.fetchone()
-        return None
-
-    def move_from_temp(self, cursor, tid):
-        """Moved the temporarily stored objects to permanent storage.
-
-        Returns the list of oids stored.
-        """
-        stmt = """
-        INSERT INTO object_state (zoid, tid, prev_tid, md5, state)
-        SELECT zoid, %s, prev_tid, md5, state
-        FROM temp_store
-        """
-        cursor.execute(stmt, (tid,))
-
-        stmt = """
-        SELECT zoid FROM temp_store
-        """
-        cursor.execute(stmt)
-        return [oid for (oid,) in cursor]
-
-    def update_current(self, cursor, tid):
-        """Update the current object pointers.
-
-        tid is the integer tid of the transaction being committed.
-        """
-        cursor.execute("""
-        REPLACE INTO current_object (zoid, tid)
-        SELECT zoid, tid FROM object_state
-        WHERE tid = %s
-        """, (tid,))
-
-    def set_min_oid(self, cursor, oid):
-        """Ensure the next OID is at least the given OID."""
-        cursor.execute("REPLACE INTO new_oid VALUES(%s)", (oid,))
-
-    def commit_phase1(self, cursor, tid):
-        """Begin a commit.  Returns the transaction name.
-
-        This method should guarantee that commit_phase2() will succeed,
-        meaning that if commit_phase2() would raise any error, the error
-        should be raised in commit_phase1() instead.
-        """
-        return '-'
-
-    def commit_phase2(self, cursor, txn):
-        """Final transaction commit."""
-        cursor.connection.commit()
-        self._release_commit_lock(cursor)
-
-    def abort(self, cursor, txn=None):
-        """Abort the commit.  If txn is not None, phase 1 is also aborted."""
-        cursor.connection.rollback()
-        self._release_commit_lock(cursor)
-
-    def new_oid(self, cursor):
-        """Return a new, unused OID."""
-        stmt = "INSERT INTO new_oid VALUES ()"
-        cursor.execute(stmt)
-        oid = cursor.connection.insert_id()
-        if oid % 100 == 0:
-            # Clean out previously generated OIDs.
-            stmt = "DELETE FROM new_oid WHERE zoid < %s"
-            cursor.execute(stmt, (oid,))
-        return oid
-
-
-    def hold_pack_lock(self, cursor):
-        """Try to acquire the pack lock.
-
-        Raise an exception if packing or undo is already in progress.
-        """
-        stmt = "SELECT GET_LOCK(CONCAT(DATABASE(), '.pack'), 0)"
-        cursor.execute(stmt)
-        res = cursor.fetchone()[0]
-        if not res:
-            raise StorageError('A pack or undo operation is in progress')
-
-
-    def release_pack_lock(self, cursor):
-        """Release the pack lock."""
-        stmt = "SELECT RELEASE_LOCK(CONCAT(DATABASE(), '.pack'))"
-        cursor.execute(stmt)
-
-
-    def open_for_pre_pack(self):
-        """Open a connection to be used for the pre-pack phase.
-        Returns (conn, cursor).
-
-        This overrides a method.
-        """
-        conn, cursor = self.open(transaction_mode=None)
-        try:
-            # This phase of packing works best with transactions
-            # disabled.  It changes no user-facing data.
-            conn.autocommit(True)
-            return conn, cursor
-        except:
-            self.close(conn, cursor)
-            raise
-
-
-    def _hold_commit_lock(self, cursor):
-        """Hold the commit lock.
-
-        This overrides a method.
-        """
-        cursor.execute("SELECT GET_LOCK(CONCAT(DATABASE(), '.commit'), %s)",
-            (commit_lock_timeout,))
-        locked = cursor.fetchone()[0]
-        if not locked:
-            raise StorageError("Unable to acquire commit lock")
-
-
-    def _release_commit_lock(self, cursor):
-        """Release the commit lock.
-
-        This overrides a method.
-        """
-        cursor.execute("SELECT RELEASE_LOCK(CONCAT(DATABASE(), '.commit'))")
-
-
-    _poll_query = "SELECT tid FROM transaction ORDER BY tid DESC LIMIT 1"

Modified: relstorage/trunk/relstorage/adapters/oracle.py
===================================================================
--- relstorage/trunk/relstorage/adapters/oracle.py	2009-09-23 18:16:28 UTC (rev 104463)
+++ relstorage/trunk/relstorage/adapters/oracle.py	2009-09-23 21:12:58 UTC (rev 104464)
@@ -14,81 +14,32 @@
 """Oracle adapter for RelStorage."""
 
 import logging
-import re
-import time
-
 import cx_Oracle
-from ZODB.POSException import StorageError
 
-from relstorage.adapters.historypreserving import HistoryPreservingAdapter
+from relstorage.adapters.connmanager import AbstractConnectionManager
+from relstorage.adapters.dbiter import HistoryPreservingDatabaseIterator
+from relstorage.adapters.loadstore import HistoryPreservingOracleLoadStore
+from relstorage.adapters.locker import OracleLocker
+from relstorage.adapters.packundo import OracleHistoryPreservingPackUndo
+from relstorage.adapters.poller import Poller
+from relstorage.adapters.schema import HistoryPreservingOracleSchema
+from relstorage.adapters.scriptrunner import OracleScriptRunner
+from relstorage.adapters.stats import OracleStats
+from relstorage.adapters.txncontrol import OracleTransactionControl
 
-log = logging.getLogger("relstorage.adapters.oracle")
+log = logging.getLogger(__name__)
 
 # disconnected_exceptions contains the exception types that might be
 # raised when the connection to the database has been broken.
 disconnected_exceptions = (cx_Oracle.OperationalError,
     cx_Oracle.InterfaceError, cx_Oracle.DatabaseError)
 
-def lob_handler(cursor, name, defaultType, size, precision, scale):
-    """cx_Oracle outputtypehandler that causes Oracle to send BLOBs inline.
 
-    Note that if a BLOB in the result is too large, Oracle generates an
-    error indicating truncation.  The execute_lob_stmt() method works
-    around this.
-    """
-    if defaultType == cx_Oracle.BLOB:
-        # Default size for BLOB is 4, we want the whole blob inline.
-        # Typical chunk size is 8132, we choose a multiple - 32528
-        return cursor.var(cx_Oracle.LONG_BINARY, 32528, cursor.arraysize)
-
-def read_lob(value):
-    """Handle an Oracle LOB by returning its byte stream.
-
-    Returns other objects unchanged.
-    """
-    if isinstance(value, cx_Oracle.LOB):
-        return value.read()
-    return value
-
-
-class OracleAdapter(HistoryPreservingAdapter):
+class OracleAdapter(object):
     """Oracle adapter for RelStorage."""
 
-    _script_vars = {
-        'TRUE':         "'Y'",
-        'FALSE':        "'N'",
-        'OCTET_LENGTH': 'LENGTH',
-        'TRUNCATE':     'TRUNCATE TABLE',
-        'oid':          ':oid',
-        'tid':          ':tid',
-        'pack_tid':     ':pack_tid',
-        'undo_tid':     ':undo_tid',
-        'self_tid':     ':self_tid',
-        'min_tid':      ':min_tid',
-        'max_tid':      ':max_tid',
-    }
+    keep_history = True
 
-    _scripts = HistoryPreservingAdapter._scripts.copy()
-    _scripts.update({
-        'choose_pack_transaction': """
-            SELECT MAX(tid)
-            FROM transaction
-            WHERE tid > 0
-                AND tid <= %(tid)s
-                AND packed = 'N'
-            """,
-
-        'create_temp_pack_visit': None,
-        'create_temp_undo': None,
-        'reset_temp_undo': "DELETE FROM temp_undo",
-
-        'transaction_has_data': """
-            SELECT DISTINCT tid
-            FROM object_state
-            WHERE tid = %(tid)s
-            """,
-    })
-
     def __init__(self, user, password, dsn, twophase=False, arraysize=64,
             use_inline_lobs=None):
         """Create an Oracle adapter.
@@ -107,322 +58,139 @@
         queries.  It depends on features in cx_Oracle 5.  The default is None,
         telling the adapter to auto-detect the presence of cx_Oracle 5.
         """
-        self._params = (user, password, dsn)
-        self._twophase = bool(twophase)
-        self._arraysize = arraysize
+        params = (user, password, dsn)
         if use_inline_lobs is None:
             use_inline_lobs = (cx_Oracle.version >= '5.0')
-        self._use_inline_lobs = bool(use_inline_lobs)
 
-    def _run_script_stmt(self, cursor, generic_stmt, generic_params=()):
-        """Execute a statement from a script with the given parameters.
+        self.connmanager = CXOracleConnectionManager(params, arraysize)
+        self.runner = CXOracleScriptRunner(bool(use_inline_lobs))
+        self.locker = OracleLocker((cx_Oracle.DatabaseError,))
+        self.schema = HistoryPreservingOracleSchema(
+            connmanager=self.connmanager,
+            runner=self.runner,
+            )
+        self.loadstore = HistoryPreservingOracleLoadStore(
+            connmanager=self.connmanager,
+            runner=self.runner,
+            disconnected_exceptions=disconnected_exceptions,
+            Binary=cx_Oracle.Binary,
+            inputsize_BLOB=cx_Oracle.BLOB,
+            inputsize_BINARY=cx_Oracle.BINARY,
+            twophase=bool(twophase),
+            )
+        self.txncontrol = OracleTransactionControl(
+            Binary=cx_Oracle.Binary,
+            )
+        self.poller = Poller(
+            poll_query="SELECT MAX(tid) FROM transaction",
+            keep_history=True,
+            runner=self.runner,
+            )
+        self.packundo = OracleHistoryPreservingPackUndo(
+            connmanager=self.connmanager,
+            runner=self.runner,
+            locker=self.locker,
+            )
+        self.dbiter = HistoryPreservingDatabaseIterator(
+            runner=self.runner,
+            )
+        self.stats = OracleStats(
+            connmanager=self.connmanager,
+            )
 
-        params should be either an empty tuple (no parameters) or
-        a map.
+        self.open = self.connmanager.open
+        self.close = self.connmanager.close
 
-        This overrides a method.
-        """
-        if generic_params:
-            # Oracle raises ORA-01036 if the parameter map contains extra keys,
-            # so filter out any unused parameters.
-            tracker = TrackingMap(self._script_vars)
-            stmt = generic_stmt % tracker
-            used = tracker.used
-            params = {}
-            for k, v in generic_params.iteritems():
-                if k in used:
-                    params[k] = v
-        else:
-            stmt = generic_stmt % self._script_vars
-            params = ()
+        self.hold_commit_lock = self.locker.hold_commit_lock
+        self.release_commit_lock = self.locker.release_commit_lock
+        self.hold_pack_lock = self.locker.hold_pack_lock
+        self.release_pack_lock = self.locker.release_pack_lock
 
-        try:
-            cursor.execute(stmt, params)
-        except:
-            log.warning("script statement failed: %r; parameters: %r",
-                stmt, params)
-            raise
+        self.create_schema = self.schema.create
+        self.prepare_schema = self.schema.prepare
+        self.zap_all = self.schema.zap_all
+        self.drop_all = self.schema.drop_all
 
-    def _run_many(self, cursor, stmt, items):
-        """Execute a statement repeatedly.  Items should be a list of tuples.
+        self.open_for_load = self.loadstore.open_for_load
+        self.restart_load = self.loadstore.restart_load
+        self.get_current_tid = self.loadstore.get_current_tid
+        self.load_current = self.loadstore.load_current
+        self.load_revision = self.loadstore.load_revision
+        self.exists = self.loadstore.exists
+        self.load_before = self.loadstore.load_before
+        self.get_object_tid_after = self.loadstore.get_object_tid_after
 
-        stmt should use '%s' parameter format.  Overrides a method.
-        """
-        # replace '%s' with ':n'
-        matches = []
-        def replace(match):
-            matches.append(None)
-            return ':%d' % len(matches)
-        stmt = re.sub('%s', replace, stmt)
+        self.open_for_store = self.loadstore.open_for_store
+        self.restart_store = self.loadstore.restart_store
+        self.store_temp = self.loadstore.store_temp
+        self.replace_temp = self.loadstore.replace_temp
+        self.restore = self.loadstore.restore
+        self.detect_conflict = self.loadstore.detect_conflict
+        self.move_from_temp = self.loadstore.move_from_temp
+        self.update_current = self.loadstore.update_current
+        self.set_min_oid = self.loadstore.set_min_oid
+        self.new_oid = self.loadstore.new_oid
 
-        cursor.executemany(stmt, items)
+        self.get_tid_and_time = self.txncontrol.get_tid_and_time
+        self.add_transaction = self.txncontrol.add_transaction
+        self.commit_phase1 = self.txncontrol.commit_phase1
+        self.commit_phase2 = self.txncontrol.commit_phase2
+        self.abort = self.txncontrol.abort
 
-    def create_schema(self, cursor):
-        """Create the database tables."""
-        stmt = """
-        CREATE TABLE commit_lock (dummy CHAR);
+        self.poll_invalidations = self.poller.poll_invalidations
 
-        -- The list of all transactions in the database
-        CREATE TABLE transaction (
-            tid         NUMBER(20) NOT NULL PRIMARY KEY,
-            packed      CHAR DEFAULT 'N' CHECK (packed IN ('N', 'Y')),
-            empty       CHAR DEFAULT 'N' CHECK (empty IN ('N', 'Y')),
-            username    RAW(500),
-            description RAW(2000),
-            extension   RAW(2000)
-        );
+        self.fill_object_refs = self.packundo.fill_object_refs
+        self.open_for_pre_pack = self.packundo.open_for_pre_pack
+        self.choose_pack_transaction = self.packundo.choose_pack_transaction
+        self.pre_pack = self.packundo.pre_pack
+        self.pack = self.packundo.pack
+        self.verify_undoable = self.packundo.verify_undoable
+        self.undo = self.packundo.undo
 
-        -- Create a special transaction to represent object creation.  This
-        -- row is often referenced by object_state.prev_tid, but never by
-        -- object_state.tid.
-        INSERT INTO transaction (tid, username, description)
-            VALUES (0,
-            UTL_I18N.STRING_TO_RAW('system', 'US7ASCII'),
-            UTL_I18N.STRING_TO_RAW(
-                'special transaction for object creation', 'US7ASCII'));
+        self.iter_objects = self.dbiter.iter_objects
+        self.iter_transactions = self.dbiter.iter_transactions
+        self.iter_transactions_range = self.dbiter.iter_transactions_range
+        self.iter_object_history = self.dbiter.iter_object_history
 
-        CREATE SEQUENCE zoid_seq;
+        self.get_object_count = self.stats.get_object_count
+        self.get_db_size = self.stats.get_db_size
 
-        -- All object states in all transactions.
-        -- md5 and state can be null to represent object uncreation.
-        CREATE TABLE object_state (
-            zoid        NUMBER(20) NOT NULL,
-            tid         NUMBER(20) NOT NULL REFERENCES transaction
-                        CHECK (tid > 0),
-            PRIMARY KEY (zoid, tid),
-            prev_tid    NUMBER(20) NOT NULL REFERENCES transaction,
-            md5         CHAR(32),
-            state       BLOB
-        );
-        CREATE INDEX object_state_tid ON object_state (tid);
-        CREATE INDEX object_state_prev_tid ON object_state (prev_tid);
 
-        -- Pointers to the current object state
-        CREATE TABLE current_object (
-            zoid        NUMBER(20) NOT NULL PRIMARY KEY,
-            tid         NUMBER(20) NOT NULL,
-            FOREIGN KEY (zoid, tid) REFERENCES object_state
-        );
-        CREATE INDEX current_object_tid ON current_object (tid);
+class CXOracleScriptRunner(OracleScriptRunner):
 
-        -- States that will soon be stored
-        CREATE GLOBAL TEMPORARY TABLE temp_store (
-            zoid        NUMBER(20) NOT NULL PRIMARY KEY,
-            prev_tid    NUMBER(20) NOT NULL,
-            md5         CHAR(32),
-            state       BLOB
-        ) ON COMMIT DELETE ROWS;
+    def __init__(self, use_inline_lobs):
+        self.use_inline_lobs = use_inline_lobs
 
-        -- During packing, an exclusive lock is held on pack_lock.
-        CREATE TABLE pack_lock (dummy CHAR);
+    def _outputtypehandler(self,
+            cursor, name, defaultType, size, precision, scale):
+        """cx_Oracle outputtypehandler that causes Oracle to send BLOBs inline.
 
-        -- A list of referenced OIDs from each object_state.
-        -- This table is populated as needed during packing.
-        -- To prevent unnecessary table locking, it does not use
-        -- foreign keys, which is safe because rows in object_state
-        -- are never modified once committed, and rows are removed
-        -- from object_state only by packing.
-        CREATE TABLE object_ref (
-            zoid        NUMBER(20) NOT NULL,
-            tid         NUMBER(20) NOT NULL,
-            to_zoid     NUMBER(20) NOT NULL,
-            PRIMARY KEY (tid, zoid, to_zoid)
-        );
-
-        -- The object_refs_added table tracks whether object_refs has
-        -- been populated for all states in a given transaction.
-        -- An entry is added only when the work is finished.
-        -- To prevent unnecessary table locking, it does not use
-        -- foreign keys, which is safe because object states
-        -- are never added to a transaction once committed, and
-        -- rows are removed from the transaction table only by
-        -- packing.
-        CREATE TABLE object_refs_added (
-            tid         NUMBER(20) NOT NULL PRIMARY KEY
-        );
-
-        -- Temporary state during packing:
-        -- The list of objects to pack.  If keep is 'N',
-        -- the object and all its revisions will be removed.
-        -- If keep is 'Y', instead of removing the object,
-        -- the pack operation will cut the object's history.
-        -- The keep_tid field specifies the oldest revision
-        -- of the object to keep.
-        -- The visited flag is set when pre_pack is visiting an object's
-        -- references, and remains set.
-        CREATE TABLE pack_object (
-            zoid        NUMBER(20) NOT NULL PRIMARY KEY,
-            keep        CHAR NOT NULL CHECK (keep IN ('N', 'Y')),
-            keep_tid    NUMBER(20) NOT NULL,
-            visited     CHAR DEFAULT 'N' NOT NULL CHECK (visited IN ('N', 'Y'))
-        );
-        CREATE INDEX pack_object_keep_zoid ON pack_object (keep, zoid);
-
-        -- Temporary state during packing: the list of object states to pack.
-        CREATE TABLE pack_state (
-            tid         NUMBER(20) NOT NULL,
-            zoid        NUMBER(20) NOT NULL,
-            PRIMARY KEY (tid, zoid)
-        );
-
-        -- Temporary state during packing: the list of transactions that
-        -- have at least one object state to pack.
-        CREATE TABLE pack_state_tid (
-            tid         NUMBER(20) NOT NULL PRIMARY KEY
-        );
-
-        -- Temporary state during packing: a list of objects
-        -- whose references need to be examined.
-        CREATE GLOBAL TEMPORARY TABLE temp_pack_visit (
-            zoid        NUMBER(20) NOT NULL PRIMARY KEY,
-            keep_tid    NUMBER(20)
-        );
-
-        -- Temporary state during undo: a list of objects
-        -- to be undone and the tid of the undone state.
-        CREATE GLOBAL TEMPORARY TABLE temp_undo (
-            zoid        NUMBER(20) NOT NULL PRIMARY KEY,
-            prev_tid    NUMBER(20) NOT NULL
-        );
+        Note that if a BLOB in the result is too large, Oracle generates an
+        error indicating truncation.  The run_lob_stmt() method works
+        around this.
         """
-        self._run_script(cursor, stmt)
-        # Let Oracle catch up with the new data definitions by sleeping.
-        # This reduces the likelihood of spurious ORA-01466 errors.
-	time.sleep(5)
+        if defaultType == cx_Oracle.BLOB:
+            # Default size for BLOB is 4, we want the whole blob inline.
+            # Typical chunk size is 8132, we choose a multiple - 32528
+            return cursor.var(cx_Oracle.LONG_BINARY, 32528, cursor.arraysize)
 
+    def _read_lob(self, value):
+        """Handle an Oracle LOB by returning its byte stream.
 
-    def prepare_schema(self):
-        """Create the database schema if it does not already exist."""
-        def callback(conn, cursor):
-            cursor.execute("""
-            SELECT 1 FROM USER_TABLES WHERE TABLE_NAME = 'OBJECT_STATE'
-            """)
-            if not cursor.fetchall():
-                self.create_schema(cursor)
-        self._open_and_call(callback)
-
-    def zap_all(self):
-        """Clear all data out of the database."""
-        def callback(conn, cursor):
-            stmt = """
-            DELETE FROM object_refs_added;
-            DELETE FROM object_ref;
-            DELETE FROM current_object;
-            DELETE FROM object_state;
-            DELETE FROM transaction;
-            -- Create a transaction to represent object creation.
-            INSERT INTO transaction (tid, username, description) VALUES
-                (0, UTL_I18N.STRING_TO_RAW('system', 'US7ASCII'),
-                UTL_I18N.STRING_TO_RAW(
-                'special transaction for object creation', 'US7ASCII'));
-            DROP SEQUENCE zoid_seq;
-            CREATE SEQUENCE zoid_seq;
-            """
-            self._run_script(cursor, stmt)
-        self._open_and_call(callback)
-
-    def drop_all(self):
-        """Drop all tables and sequences."""
-        def callback(conn, cursor):
-            for tablename in ('pack_state_tid', 'pack_state',
-                    'pack_object', 'object_refs_added', 'object_ref',
-                    'current_object', 'object_state', 'transaction',
-                    'commit_lock', 'pack_lock',
-                    'temp_store', 'temp_undo', 'temp_pack_visit'):
-                cursor.execute("DROP TABLE %s" % tablename)
-            cursor.execute("DROP SEQUENCE zoid_seq")
-        self._open_and_call(callback)
-
-    def open(self, transaction_mode="ISOLATION LEVEL READ COMMITTED",
-            twophase=False):
-        """Open a database connection and return (conn, cursor)."""
-        try:
-            kw = {'twophase': twophase}  #, 'threaded': True}
-            conn = cx_Oracle.connect(*self._params, **kw)
-            cursor = conn.cursor()
-            cursor.arraysize = self._arraysize
-            if transaction_mode:
-                cursor.execute("SET TRANSACTION %s" % transaction_mode)
-            return conn, cursor
-
-        except cx_Oracle.OperationalError, e:
-            log.warning("Unable to connect: %s", e)
-            raise
-
-    def close(self, conn, cursor):
-        """Close both a cursor and connection, ignoring certain errors."""
-        for obj in (cursor, conn):
-            if obj is not None:
-                try:
-                    obj.close()
-                except disconnected_exceptions:
-                    pass
-
-    def open_for_load(self):
-        """Open and initialize a connection for loading objects.
-
-        Returns (conn, cursor).
+        Returns other objects unchanged.
         """
-        return self.open('READ ONLY')
+        if isinstance(value, cx_Oracle.LOB):
+            return value.read()
+        return value
 
-    def restart_load(self, cursor):
-        """Reinitialize a connection for loading objects."""
-        try:
-            cursor.connection.rollback()
-            cursor.execute("SET TRANSACTION READ ONLY")
-        except disconnected_exceptions, e:
-            raise StorageError(e)
-
-    def get_object_count(self):
-        """Returns the number of objects in the database"""
-        # The tests expect an exact number, but the code below generates
-        # an estimate, so this is disabled for now.
-        if True:
-            return 0
-        else:
-            conn, cursor = self.open('READ ONLY')
-            try:
-                cursor.execute("""
-                SELECT NUM_ROWS
-                FROM USER_TABLES
-                WHERE TABLE_NAME = 'CURRENT_OBJECT'
-                """)
-                res = cursor.fetchone()[0]
-                if res is None:
-                    res = 0
-                else:
-                    res = int(res)
-                return res
-            finally:
-                self.close(conn, cursor)
-
-    def get_db_size(self):
-        """Returns the approximate size of the database in bytes"""
-        # May not be possible without access to the dba_* objects
-        return 0
-
-    def get_current_tid(self, cursor, oid):
-        """Returns the current integer tid for an object.
-
-        oid is an integer.  Returns None if object does not exist.
-        """
-        cursor.execute("""
-        SELECT tid
-        FROM current_object
-        WHERE zoid = :1
-        """, (oid,))
-        for (tid,) in cursor:
-            return tid
-        return None
-
-    def execute_lob_stmt(self, cursor, stmt, args=(), default=None):
+    def run_lob_stmt(self, cursor, stmt, args=(), default=None):
         """Execute a statement and return one row with all LOBs inline.
 
         Returns the value of the default parameter if the result was empty.
         """
-        if self._use_inline_lobs:
+        if self.use_inline_lobs:
             try:
-                cursor.outputtypehandler = lob_handler
+                cursor.outputtypehandler = self._outputtypehandler
                 try:
                     cursor.execute(stmt, args)
                     for row in cursor:
@@ -441,357 +209,37 @@
                 # with different output type parameters.
                 cursor.execute(stmt + ' ', args)
                 for row in cursor:
-                    return tuple(map(read_lob, row))
+                    return tuple(map(self._read_lob, row))
         else:
             cursor.execute(stmt, args)
             for row in cursor:
-                return tuple(map(read_lob, row))
+                return tuple(map(self._read_lob, row))
         return default
 
-    def load_current(self, cursor, oid):
-        """Returns the current pickle and integer tid for an object.
 
-        oid is an integer.  Returns (None, None) if object does not exist.
-        """
-        stmt = """
-        SELECT state, tid
-        FROM current_object
-            JOIN object_state USING(zoid, tid)
-        WHERE zoid = :1
-        """
-        return self.execute_lob_stmt(
-            cursor, stmt, (oid,), default=(None, None))
+class CXOracleConnectionManager(AbstractConnectionManager):
 
-    def load_revision(self, cursor, oid, tid):
-        """Returns the pickle for an object on a particular transaction.
+    isolation_read_committed = "ISOLATION LEVEL READ COMMITTED"
+    isolation_read_only = "READ ONLY"
 
-        Returns None if no such state exists.
-        """
-        stmt = """
-        SELECT state
-        FROM object_state
-        WHERE zoid = :1
-            AND tid = :2
-        """
-        (state,) = self.execute_lob_stmt(
-            cursor, stmt, (oid, tid), default=(None,))
-        return state
+    close_exceptions = disconnected_exceptions
 
-    def exists(self, cursor, oid):
-        """Returns a true value if the given object exists."""
-        cursor.execute("SELECT 1 FROM current_object WHERE zoid = :1", (oid,))
-        return len(list(cursor))
+    def __init__(self, params, arraysize):
+        self._params = params
+        self._arraysize = arraysize
 
-    def load_before(self, cursor, oid, tid):
-        """Returns the pickle and tid of an object before transaction tid.
-
-        Returns (None, None) if no earlier state exists.
-        """
-        stmt = """
-        SELECT state, tid
-        FROM object_state
-        WHERE zoid = :oid
-            AND tid = (
-                SELECT MAX(tid)
-                FROM object_state
-                WHERE zoid = :oid
-                    AND tid < :tid
-            )
-        """
-        return self.execute_lob_stmt(cursor, stmt, {'oid': oid, 'tid': tid},
-             default=(None, None))
-
-    def get_object_tid_after(self, cursor, oid, tid):
-        """Returns the tid of the next change after an object revision.
-
-        Returns None if no later state exists.
-        """
-        stmt = """
-        SELECT MIN(tid)
-        FROM object_state
-        WHERE zoid = :1
-            AND tid > :2
-        """
-        cursor.execute(stmt, (oid, tid))
-        rows = cursor.fetchall()
-        if rows:
-            assert len(rows) == 1
-            return rows[0][0]
-        else:
-            return None
-
-    def _set_xid(self, cursor):
-        """Set up a distributed transaction"""
-        stmt = """
-        SELECT SYS_CONTEXT('USERENV', 'SID') FROM DUAL
-        """
-        cursor.execute(stmt)
-        xid = str(cursor.fetchone()[0])
-        cursor.connection.begin(0, xid, '0')
-
-    def open_for_store(self):
-        """Open and initialize a connection for storing objects.
-
-        Returns (conn, cursor).
-        """
-        if self._twophase:
-            conn, cursor = self.open(transaction_mode=None, twophase=True)
-            try:
-                self._set_xid(cursor)
-            except:
-                self.close(conn, cursor)
-                raise
-        else:
-            conn, cursor = self.open()
-        return conn, cursor
-
-    def restart_store(self, cursor):
-        """Reuse a store connection."""
+    def open(self, transaction_mode="ISOLATION LEVEL READ COMMITTED",
+            twophase=False):
+        """Open a database connection and return (conn, cursor)."""
         try:
-            cursor.connection.rollback()
-            if self._twophase:
-                self._set_xid(cursor)
-        except disconnected_exceptions, e:
-            raise StorageError(e)
+            kw = {'twophase': twophase}  #, 'threaded': True}
+            conn = cx_Oracle.connect(*self._params, **kw)
+            cursor = conn.cursor()
+            cursor.arraysize = self._arraysize
+            if transaction_mode:
+                cursor.execute("SET TRANSACTION %s" % transaction_mode)
+            return conn, cursor
 
-    def store_temp(self, cursor, oid, prev_tid, data):
-        """Store an object in the temporary table."""
-        md5sum = self.md5sum(data)
-        cursor.execute("DELETE FROM temp_store WHERE zoid = :oid", oid=oid)
-        if len(data) <= 2000:
-            # Send data inline for speed.  Oracle docs say maximum size
-            # of a RAW is 2000 bytes.  cx_Oracle.BINARY corresponds with RAW.
-            cursor.setinputsizes(rawdata=cx_Oracle.BINARY)
-            stmt = """
-            INSERT INTO temp_store (zoid, prev_tid, md5, state)
-            VALUES (:oid, :prev_tid, :md5sum, :rawdata)
-            """
-            cursor.execute(stmt, oid=oid, prev_tid=prev_tid,
-                md5sum=md5sum, rawdata=data)
-        else:
-            # Send data as a BLOB
-            cursor.setinputsizes(blobdata=cx_Oracle.BLOB)
-            stmt = """
-            INSERT INTO temp_store (zoid, prev_tid, md5, state)
-            VALUES (:oid, :prev_tid, :md5sum, :blobdata)
-            """
-            cursor.execute(stmt, oid=oid, prev_tid=prev_tid,
-                md5sum=md5sum, blobdata=data)
-
-    def replace_temp(self, cursor, oid, prev_tid, data):
-        """Replace an object in the temporary table."""
-        md5sum = self.md5sum(data)
-        cursor.setinputsizes(data=cx_Oracle.BLOB)
-        stmt = """
-        UPDATE temp_store SET
-            prev_tid = :prev_tid,
-            md5 = :md5sum,
-            state = :data
-        WHERE zoid = :oid
-        """
-        cursor.execute(stmt, oid=oid, prev_tid=prev_tid,
-            md5sum=md5sum, data=cx_Oracle.Binary(data))
-
-    def restore(self, cursor, oid, tid, data):
-        """Store an object directly, without conflict detection.
-
-        Used for copying transactions into this database.
-        """
-        md5sum = self.md5sum(data)
-        cursor.setinputsizes(data=cx_Oracle.BLOB)
-        stmt = """
-        INSERT INTO object_state (zoid, tid, prev_tid, md5, state)
-        VALUES (:oid, :tid,
-            COALESCE((SELECT tid FROM current_object WHERE zoid = :oid), 0),
-            :md5sum, :data)
-        """
-        if data is not None:
-            data = cx_Oracle.Binary(data)
-        cursor.execute(stmt, oid=oid, tid=tid, md5sum=md5sum, data=data)
-
-    def start_commit(self, cursor):
-        """Prepare to commit."""
-        # Hold commit_lock to prevent concurrent commits
-        # (for as short a time as possible).
-        # Lock transaction and current_object in share mode to ensure
-        # conflict detection has the most current data.
-        cursor.execute("LOCK TABLE commit_lock IN EXCLUSIVE MODE")
-        cursor.execute("LOCK TABLE transaction IN SHARE MODE")
-        cursor.execute("LOCK TABLE current_object IN SHARE MODE")
-
-    def _parse_dsinterval(self, s):
-        """Convert an Oracle dsinterval (as a string) to a float."""
-        mo = re.match(r'([+-]\d+) (\d+):(\d+):([0-9.]+)', s)
-        if not mo:
-            raise ValueError(s)
-        day, hour, min, sec = [float(v) for v in mo.groups()]
-        return day * 86400 + hour * 3600 + min * 60 + sec
-
-    def get_tid_and_time(self, cursor):
-        """Returns the most recent tid and the current database time.
-
-        The database time is the number of seconds since the epoch.
-        """
-        cursor.execute("""
-        SELECT MAX(tid), TO_CHAR(TO_DSINTERVAL(SYSTIMESTAMP - TO_TIMESTAMP_TZ(
-            '1970-01-01 00:00:00 +00:00','YYYY-MM-DD HH24:MI:SS TZH:TZM')))
-        FROM transaction
-        """)
-        tid, now = cursor.fetchone()
-        return tid, self._parse_dsinterval(now)
-
-    def add_transaction(self, cursor, tid, username, description, extension,
-            packed=False):
-        """Add a transaction."""
-        stmt = """
-        INSERT INTO transaction
-            (tid, packed, username, description, extension)
-        VALUES (:1, :2, :3, :4, :5)
-        """
-        max_desc_len = 2000
-        if len(description) > max_desc_len:
-            log.warning('Trimming description of transaction %s '
-                'to %d characters', tid, max_desc_len)
-            description = description[:max_desc_len]
-        cursor.execute(stmt, (
-            tid, packed and 'Y' or 'N', cx_Oracle.Binary(username),
-            cx_Oracle.Binary(description), cx_Oracle.Binary(extension)))
-
-    def detect_conflict(self, cursor):
-        """Find one conflict in the data about to be committed.
-
-        If there is a conflict, returns (oid, prev_tid, attempted_prev_tid,
-        attempted_data).  If there is no conflict, returns None.
-        """
-        stmt = """
-        SELECT temp_store.zoid, current_object.tid, temp_store.prev_tid,
-            temp_store.state
-        FROM temp_store
-            JOIN current_object ON (temp_store.zoid = current_object.zoid)
-        WHERE temp_store.prev_tid != current_object.tid
-        """
-        return self.execute_lob_stmt(cursor, stmt)
-
-    def move_from_temp(self, cursor, tid):
-        """Move the temporarily stored objects to permanent storage.
-
-        Returns the list of oids stored.
-        """
-        stmt = """
-        INSERT INTO object_state (zoid, tid, prev_tid, md5, state)
-        SELECT zoid, :tid, prev_tid, md5, state
-        FROM temp_store
-        """
-        cursor.execute(stmt, tid=tid)
-
-        stmt = """
-        SELECT zoid FROM temp_store
-        """
-        cursor.execute(stmt)
-        return [oid for (oid,) in cursor]
-
-    def update_current(self, cursor, tid):
-        """Update the current object pointers.
-
-        tid is the integer tid of the transaction being committed.
-        """
-        # Insert objects created in this transaction into current_object.
-        stmt = """
-        INSERT INTO current_object (zoid, tid)
-        SELECT zoid, tid FROM object_state
-        WHERE tid = :1
-            AND prev_tid = 0
-        """
-        cursor.execute(stmt, (tid,))
-
-        # Change existing objects.
-        stmt = """
-        UPDATE current_object SET tid = :1
-        WHERE zoid IN (
-            SELECT zoid FROM object_state
-            WHERE tid = :1
-                AND prev_tid != 0
-        )
-        """
-        cursor.execute(stmt, (tid,))
-
-    def set_min_oid(self, cursor, oid):
-        """Ensure the next OID is at least the given OID."""
-        next_oid = self.new_oid(cursor)
-        if next_oid < oid:
-            # Oracle provides no way modify the sequence value
-            # except through alter sequence or drop/create sequence,
-            # but either statement kills the current transaction.
-            # Therefore, open a temporary connection to make the
-            # alteration.
-            conn2, cursor2 = self.open()
-            try:
-                # Change the sequence by altering the increment.
-                # (this is safer than dropping and re-creating the sequence)
-                diff = oid - next_oid
-                cursor2.execute(
-                    "ALTER SEQUENCE zoid_seq INCREMENT BY %d" % diff)
-                cursor2.execute("SELECT zoid_seq.nextval FROM DUAL")
-                cursor2.execute("ALTER SEQUENCE zoid_seq INCREMENT BY 1")
-                conn2.commit()
-            finally:
-                self.close(conn2, cursor2)
-
-    def commit_phase1(self, cursor, tid):
-        """Begin a commit.  Returns the transaction name.
-
-        This method should guarantee that commit_phase2() will succeed,
-        meaning that if commit_phase2() would raise any error, the error
-        should be raised in commit_phase1() instead.
-        """
-        if self._twophase:
-            cursor.connection.prepare()
-        return '-'
-
-    def commit_phase2(self, cursor, txn):
-        """Final transaction commit."""
-        cursor.connection.commit()
-
-    def abort(self, cursor, txn=None):
-        """Abort the commit.  If txn is not None, phase 1 is also aborted."""
-        cursor.connection.rollback()
-
-
-    def new_oid(self, cursor):
-        """Return a new, unused OID."""
-        stmt = "SELECT zoid_seq.nextval FROM DUAL"
-        cursor.execute(stmt)
-        return cursor.fetchone()[0]
-
-
-    def hold_pack_lock(self, cursor):
-        """Try to acquire the pack lock.
-
-        Raise an exception if packing or undo is already in progress.
-        """
-        stmt = """
-        LOCK TABLE pack_lock IN EXCLUSIVE MODE NOWAIT
-        """
-        try:
-            cursor.execute(stmt)
-        except cx_Oracle.DatabaseError:
-            raise StorageError('A pack or undo operation is in progress')
-
-    def release_pack_lock(self, cursor):
-        """Release the pack lock."""
-        # No action needed
-        pass
-
-    _poll_query = "SELECT MAX(tid) FROM transaction"
-
-
-class TrackingMap:
-    """Provides values for keys while tracking which keys are accessed."""
-
-    def __init__(self, source):
-        self.source = source
-        self.used = set()
-
-    def __getitem__(self, key):
-        self.used.add(key)
-        return self.source[key]
+        except cx_Oracle.OperationalError, e:
+            log.warning("Unable to connect: %s", e)
+            raise

Added: relstorage/trunk/relstorage/adapters/packundo.py
===================================================================
--- relstorage/trunk/relstorage/adapters/packundo.py	                        (rev 0)
+++ relstorage/trunk/relstorage/adapters/packundo.py	2009-09-23 21:12:58 UTC (rev 104464)
@@ -0,0 +1,1053 @@
+##############################################################################
+#
+# Copyright (c) 2009 Zope Foundation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""Pack/Undo implementations.
+"""
+
+from relstorage.adapters.interfaces import IPackUndo
+from ZODB.POSException import UndoError
+from zope.interface import implements
+import logging
+import time
+
+log = logging.getLogger(__name__)
+
+
+class PackUndo(object):
+    """Abstract base class for pack/undo"""
+
+    verify_sane_database = False
+
+    def __init__(self, connmanager, runner, locker):
+        self.connmanager = connmanager
+        self.runner = runner
+        self.locker = locker
+
+    def fill_object_refs(self, conn, cursor, get_references):
+        """Update the object_refs table by analyzing new transactions."""
+        if self.keep_history:
+            stmt = """
+            SELECT transaction.tid
+            FROM transaction
+                LEFT JOIN object_refs_added
+                    ON (transaction.tid = object_refs_added.tid)
+            WHERE object_refs_added.tid IS NULL
+            ORDER BY transaction.tid
+            """
+        else:
+            stmt = """
+            SELECT transaction.tid
+            FROM (SELECT DISTINCT tid FROM object_state) AS transaction
+                LEFT JOIN object_refs_added
+                    ON (transaction.tid = object_refs_added.tid)
+            WHERE object_refs_added.tid IS NULL
+            ORDER BY transaction.tid
+            """
+
+        self.runner.run_script_stmt(cursor, stmt)
+        tids = [tid for (tid,) in cursor]
+        if tids:
+            added = 0
+            log.info("discovering references from objects in %d "
+                "transaction(s)" % len(tids))
+            for tid in tids:
+                added += self._add_refs_for_tid(cursor, tid, get_references)
+                if added >= 10000:
+                    # save the work done so far
+                    conn.commit()
+                    added = 0
+            if added:
+                conn.commit()
+
+    def _add_refs_for_tid(self, cursor, tid, get_references):
+        """Fill object_refs with all states for a transaction.
+
+        Returns the number of references added.
+        """
+        log.debug("pre_pack: transaction %d: computing references ", tid)
+        from_count = 0
+
+        stmt = """
+        SELECT zoid, state
+        FROM object_state
+        WHERE tid = %(tid)s
+        """
+        self.runner.run_script_stmt(cursor, stmt, {'tid': tid})
+
+        add_rows = []  # [(from_oid, tid, to_oid)]
+        for from_oid, state in cursor:
+            if hasattr(state, 'read'):
+                # Oracle
+                state = state.read()
+            if state:
+                from_count += 1
+                try:
+                    to_oids = get_references(str(state))
+                except:
+                    log.error("pre_pack: can't unpickle "
+                        "object %d in transaction %d; state length = %d" % (
+                        from_oid, tid, len(state)))
+                    raise
+                if self.keep_history:
+                    for to_oid in to_oids:
+                        add_rows.append((from_oid, tid, to_oid))
+                else:
+                    for to_oid in to_oids:
+                        add_rows.append((from_oid, to_oid))
+
+        if self.keep_history:
+            stmt = """
+            INSERT INTO object_ref (zoid, tid, to_zoid)
+            VALUES (%s, %s, %s)
+            """
+            self.runner.run_many(cursor, stmt, add_rows)
+
+        else:
+            stmt = """
+            DELETE FROM object_ref
+            WHERE zoid in (
+                SELECT zoid
+                FROM object_state
+                WHERE tid = %(tid)s
+                )
+            """
+            self.runner.run_script(cursor, stmt, {'tid': tid})
+
+            stmt = """
+            INSERT INTO object_ref (zoid, to_zoid)
+            VALUES (%s, %s)
+            """
+            self.runner.run_many(cursor, stmt, add_rows)
+
+        # The references have been computed for this transaction.
+        stmt = """
+        INSERT INTO object_refs_added (tid)
+        VALUES (%(tid)s)
+        """
+        self.runner.run_script_stmt(cursor, stmt, {'tid': tid})
+
+        to_count = len(add_rows)
+        log.debug("pre_pack: transaction %d: has %d reference(s) "
+            "from %d object(s)", tid, to_count, from_count)
+        return to_count
+
+
+    def _visit_all_references(self, cursor):
+        """Visit all references in pack_object and set the keep flags.
+        """
+        # Each of the objects to be kept might refer to other objects.
+        # Mark the referenced objects to be kept as well. Do this
+        # repeatedly until all references have been satisfied.
+        pass_num = 1
+        while True:
+            log.info("pre_pack: following references, pass %d", pass_num)
+
+            # Make a list of all parent objects that still need to be
+            # visited. Then set pack_object.visited for all pack_object
+            # rows with keep = true.
+            stmt = """
+            %(TRUNCATE)s temp_pack_visit;
+
+            INSERT INTO temp_pack_visit (zoid, keep_tid)
+            SELECT zoid, keep_tid
+            FROM pack_object
+            WHERE keep = %(TRUE)s
+                AND visited = %(FALSE)s;
+
+            UPDATE pack_object SET visited = %(TRUE)s
+            WHERE keep = %(TRUE)s
+                AND visited = %(FALSE)s
+            """
+            self.runner.run_script(cursor, stmt)
+            visit_count = cursor.rowcount
+
+            if self.verify_sane_database:
+                # Verify the update actually worked.
+                # MySQL 5.1.23 fails this test; 5.1.24 passes.
+                stmt = """
+                SELECT 1
+                FROM pack_object
+                WHERE keep = %(TRUE)s AND visited = %(FALSE)s
+                """
+                self.runner.run_script_stmt(cursor, stmt)
+                if list(cursor):
+                    raise AssertionError(
+                        "database failed to update pack_object")
+
+            log.debug("pre_pack: checking references from %d object(s)",
+                visit_count)
+
+            # Visit the children of all parent objects that were
+            # just visited.
+            stmt = self._script_pre_pack_follow_child_refs
+            self.runner.run_script(cursor, stmt)
+            found_count = cursor.rowcount
+
+            log.debug("pre_pack: found %d more referenced object(s) in "
+                "pass %d", found_count, pass_num)
+            if not found_count:
+                # No new references detected.
+                break
+            else:
+                pass_num += 1
+
+    def _pause_pack(self, sleep, options, start):
+        """Pause packing to allow concurrent commits."""
+        if sleep is None:
+            sleep = time.sleep
+        elapsed = time.time() - start
+        if elapsed == 0.0:
+            # Compensate for low timer resolution by
+            # assuming that at least 10 ms elapsed.
+            elapsed = 0.01
+        duty_cycle = options.pack_duty_cycle
+        if duty_cycle > 0.0 and duty_cycle < 1.0:
+            delay = min(options.pack_max_delay,
+                elapsed * (1.0 / duty_cycle - 1.0))
+            if delay > 0:
+                log.debug('pack: sleeping %.4g second(s)', delay)
+                sleep(delay)
+
+    def open_for_pre_pack(self):
+        """Open a connection to be used for the pre-pack phase.
+        Returns (conn, cursor).
+
+        Subclasses may override this.
+        """
+        return self.connmanager.open()
+
+
+class HistoryPreservingPackUndo(PackUndo):
+    implements(IPackUndo)
+
+    keep_history = True
+
+    _script_create_temp_pack_visit = """
+        CREATE TEMPORARY TABLE temp_pack_visit (
+            zoid BIGINT NOT NULL,
+            keep_tid BIGINT
+        );
+        CREATE UNIQUE INDEX temp_pack_visit_zoid ON temp_pack_visit (zoid)
+        """
+
+    _script_pre_pack_follow_child_refs = """
+        UPDATE pack_object SET keep = %(TRUE)s
+        WHERE keep = %(FALSE)s
+            AND zoid IN (
+                SELECT DISTINCT to_zoid
+                FROM object_ref
+                    JOIN temp_pack_visit USING (zoid)
+                WHERE object_ref.tid >= temp_pack_visit.keep_tid
+            )
+        """
+
+    _script_choose_pack_transaction = """
+        SELECT tid
+        FROM transaction
+        WHERE tid > 0
+            AND tid <= %(tid)s
+            AND packed = FALSE
+        ORDER BY tid DESC
+        LIMIT 1
+        """
+
+    _script_create_temp_undo = """
+        CREATE TEMPORARY TABLE temp_undo (
+            zoid BIGINT NOT NULL,
+            prev_tid BIGINT NOT NULL
+        );
+        CREATE UNIQUE INDEX temp_undo_zoid ON temp_undo (zoid)
+        """
+
+    _script_reset_temp_undo = "DROP TABLE temp_undo"
+
+    _script_transaction_has_data = """
+        SELECT tid
+        FROM object_state
+        WHERE tid = %(tid)s
+        LIMIT 1
+        """
+
+    _script_pack_current_object = """
+        DELETE FROM current_object
+        WHERE tid = %(tid)s
+            AND zoid in (
+                SELECT pack_state.zoid
+                FROM pack_state
+                WHERE pack_state.tid = %(tid)s
+            )
+        """
+
+    _script_pack_object_state = """
+        DELETE FROM object_state
+        WHERE tid = %(tid)s
+            AND zoid in (
+                SELECT pack_state.zoid
+                FROM pack_state
+                WHERE pack_state.tid = %(tid)s
+            )
+        """
+
+    _script_pack_object_ref = """
+        DELETE FROM object_refs_added
+        WHERE tid IN (
+            SELECT tid
+            FROM transaction
+            WHERE empty = %(TRUE)s
+            );
+        DELETE FROM object_ref
+        WHERE tid IN (
+            SELECT tid
+            FROM transaction
+            WHERE empty = %(TRUE)s
+            )
+        """
+
+    def verify_undoable(self, cursor, undo_tid):
+        """Raise UndoError if it is not safe to undo the specified txn."""
+        stmt = """
+        SELECT 1 FROM transaction
+        WHERE tid = %(undo_tid)s
+            AND packed = %(FALSE)s
+        """
+        self.runner.run_script_stmt(cursor, stmt, {'undo_tid': undo_tid})
+        if not cursor.fetchall():
+            raise UndoError("Transaction not found or packed")
+
+        # Rule: we can undo an object if the object's state in the
+        # transaction to undo matches the object's current state.
+        # If any object in the transaction does not fit that rule,
+        # refuse to undo.
+        stmt = """
+        SELECT prev_os.zoid, current_object.tid
+        FROM object_state prev_os
+            JOIN object_state cur_os ON (prev_os.zoid = cur_os.zoid)
+            JOIN current_object ON (cur_os.zoid = current_object.zoid
+                AND cur_os.tid = current_object.tid)
+        WHERE prev_os.tid = %(undo_tid)s
+            AND cur_os.md5 != prev_os.md5
+        """
+        self.runner.run_script_stmt(cursor, stmt, {'undo_tid': undo_tid})
+        if cursor.fetchmany():
+            raise UndoError(
+                "Some data were modified by a later transaction")
+
+        # Rule: don't allow the creation of the root object to
+        # be undone.  It's hard to get it back.
+        stmt = """
+        SELECT 1
+        FROM object_state
+        WHERE tid = %(undo_tid)s
+            AND zoid = 0
+            AND prev_tid = 0
+        """
+        self.runner.run_script_stmt(cursor, stmt, {'undo_tid': undo_tid})
+        if cursor.fetchall():
+            raise UndoError("Can't undo the creation of the root object")
+
+
+    def undo(self, cursor, undo_tid, self_tid):
+        """Undo a transaction.
+
+        Parameters: "undo_tid", the integer tid of the transaction to undo,
+        and "self_tid", the integer tid of the current transaction.
+
+        Returns the states copied forward by the undo operation as a
+        list of (oid, old_tid).
+        """
+        stmt = self._script_create_temp_undo
+        if stmt:
+            self.runner.run_script(cursor, stmt)
+
+        stmt = """
+        DELETE FROM temp_undo;
+
+        -- Put into temp_undo the list of objects to be undone and
+        -- the tid of the transaction that has the undone state.
+        INSERT INTO temp_undo (zoid, prev_tid)
+        SELECT zoid, prev_tid
+        FROM object_state
+        WHERE tid = %(undo_tid)s;
+
+        -- Override previous undo operations within this transaction
+        -- by resetting the current_object pointer and deleting
+        -- copied states from object_state.
+        UPDATE current_object
+        SET tid = (
+                SELECT prev_tid
+                FROM object_state
+                WHERE zoid = current_object.zoid
+                    AND tid = %(self_tid)s
+            )
+        WHERE zoid IN (SELECT zoid FROM temp_undo)
+            AND tid = %(self_tid)s;
+
+        DELETE FROM object_state
+        WHERE zoid IN (SELECT zoid FROM temp_undo)
+            AND tid = %(self_tid)s;
+
+        -- Copy old states forward.
+        INSERT INTO object_state (zoid, tid, prev_tid, md5, state)
+        SELECT temp_undo.zoid, %(self_tid)s, current_object.tid,
+            prev.md5, prev.state
+        FROM temp_undo
+            JOIN current_object ON (temp_undo.zoid = current_object.zoid)
+            LEFT JOIN object_state prev
+                ON (prev.zoid = temp_undo.zoid
+                    AND prev.tid = temp_undo.prev_tid);
+
+        -- List the copied states.
+        SELECT zoid, prev_tid FROM temp_undo
+        """
+        self.runner.run_script(cursor, stmt,
+            {'undo_tid': undo_tid, 'self_tid': self_tid})
+        res = list(cursor)
+
+        stmt = self._script_reset_temp_undo
+        if stmt:
+            self.runner.run_script(cursor, stmt)
+
+        return res
+
+    def choose_pack_transaction(self, pack_point):
+        """Return the transaction before or at the specified pack time.
+
+        Returns None if there is nothing to pack.
+        """
+        conn, cursor = self.connmanager.open()
+        try:
+            stmt = self._script_choose_pack_transaction
+            self.runner.run_script(cursor, stmt, {'tid': pack_point})
+            rows = cursor.fetchall()
+            if not rows:
+                # Nothing needs to be packed.
+                return None
+            return rows[0][0]
+        finally:
+            self.connmanager.close(conn, cursor)
+
+
+    def pre_pack(self, pack_tid, get_references, options):
+        """Decide what to pack.
+
+        tid specifies the most recent transaction to pack.
+
+        get_references is a function that accepts a pickled state and
+        returns a set of OIDs that state refers to.
+
+        options is an instance of relstorage.Options.
+        The options.pack_gc flag indicates whether to run garbage collection.
+        If pack_gc is false, at least one revision of every object is kept,
+        even if nothing refers to it.  Packing with pack_gc disabled can be
+        much faster.
+        """
+        conn, cursor = self.open_for_pre_pack()
+        try:
+            try:
+                if options.pack_gc:
+                    log.info("pre_pack: start with gc enabled")
+                    self._pre_pack_with_gc(
+                        conn, cursor, pack_tid, get_references)
+                else:
+                    log.info("pre_pack: start without gc")
+                    self._pre_pack_without_gc(
+                        conn, cursor, pack_tid)
+                conn.commit()
+
+                log.info("pre_pack: enumerating states to pack")
+                stmt = "%(TRUNCATE)s pack_state"
+                self.runner.run_script_stmt(cursor, stmt)
+                to_remove = 0
+
+                if options.pack_gc:
+                    # Pack objects with the keep flag set to false.
+                    stmt = """
+                    INSERT INTO pack_state (tid, zoid)
+                    SELECT tid, zoid
+                    FROM object_state
+                        JOIN pack_object USING (zoid)
+                    WHERE keep = %(FALSE)s
+                        AND tid > 0
+                        AND tid <= %(pack_tid)s
+                    """
+                    self.runner.run_script_stmt(
+                        cursor, stmt, {'pack_tid': pack_tid})
+                    to_remove += cursor.rowcount
+
+                # Pack object states with the keep flag set to true.
+                stmt = """
+                INSERT INTO pack_state (tid, zoid)
+                SELECT tid, zoid
+                FROM object_state
+                    JOIN pack_object USING (zoid)
+                WHERE keep = %(TRUE)s
+                    AND tid > 0
+                    AND tid != keep_tid
+                    AND tid <= %(pack_tid)s
+                """
+                self.runner.run_script_stmt(
+                    cursor, stmt, {'pack_tid':pack_tid})
+                to_remove += cursor.rowcount
+
+                log.info("pre_pack: enumerating transactions to pack")
+                stmt = "%(TRUNCATE)s pack_state_tid"
+                self.runner.run_script_stmt(cursor, stmt)
+                stmt = """
+                INSERT INTO pack_state_tid (tid)
+                SELECT DISTINCT tid
+                FROM pack_state
+                """
+                cursor.execute(stmt)
+
+                log.info("pre_pack: will remove %d object state(s)",
+                    to_remove)
+
+            except:
+                log.exception("pre_pack: failed")
+                conn.rollback()
+                raise
+            else:
+                log.info("pre_pack: finished successfully")
+                conn.commit()
+        finally:
+            self.connmanager.close(conn, cursor)
+
+
+    def _pre_pack_without_gc(self, conn, cursor, pack_tid):
+        """Determine what to pack, without garbage collection.
+
+        With garbage collection disabled, there is no need to follow
+        object references.
+        """
+        # Fill the pack_object table with OIDs, but configure them
+        # all to be kept by setting keep to true.
+        log.debug("pre_pack: populating pack_object")
+        stmt = """
+        %(TRUNCATE)s pack_object;
+
+        INSERT INTO pack_object (zoid, keep, keep_tid)
+        SELECT zoid, %(TRUE)s, MAX(tid)
+        FROM object_state
+        WHERE tid > 0 AND tid <= %(pack_tid)s
+        GROUP BY zoid
+        """
+        self.runner.run_script(cursor, stmt, {'pack_tid': pack_tid})
+
+
+    def _pre_pack_with_gc(self, conn, cursor, pack_tid, get_references):
+        """Determine what to pack, with garbage collection.
+        """
+        stmt = self._script_create_temp_pack_visit
+        if stmt:
+            self.runner.run_script(cursor, stmt)
+
+        self.fill_object_refs(conn, cursor, get_references)
+
+        log.info("pre_pack: filling the pack_object table")
+        # Fill the pack_object table with OIDs that either will be
+        # removed (if nothing references the OID) or whose history will
+        # be cut.
+        stmt = """
+        %(TRUNCATE)s pack_object;
+
+        INSERT INTO pack_object (zoid, keep, keep_tid)
+        SELECT zoid, %(FALSE)s, MAX(tid)
+        FROM object_state
+        WHERE tid > 0 AND tid <= %(pack_tid)s
+        GROUP BY zoid;
+
+        -- If the root object is in pack_object, keep it.
+        UPDATE pack_object SET keep = %(TRUE)s
+        WHERE zoid = 0;
+
+        -- Keep objects that have been revised since pack_tid.
+        UPDATE pack_object SET keep = %(TRUE)s
+        WHERE zoid IN (
+            SELECT zoid
+            FROM current_object
+            WHERE tid > %(pack_tid)s
+        );
+
+        -- Keep objects that are still referenced by object states in
+        -- transactions that will not be packed.
+        -- Use temp_pack_visit for temporary state; otherwise MySQL 5 chokes.
+        INSERT INTO temp_pack_visit (zoid)
+        SELECT DISTINCT to_zoid
+        FROM object_ref
+        WHERE tid > %(pack_tid)s;
+
+        UPDATE pack_object SET keep = %(TRUE)s
+        WHERE zoid IN (
+            SELECT zoid
+            FROM temp_pack_visit
+        );
+
+        %(TRUNCATE)s temp_pack_visit;
+        """
+        self.runner.run_script(cursor, stmt, {'pack_tid': pack_tid})
+
+        # Set the 'keep' flags in pack_object
+        self._visit_all_references(cursor)
+
+
+    def pack(self, pack_tid, options, sleep=None, packed_func=None):
+        """Pack.  Requires the information provided by pre_pack."""
+
+        # Read committed mode is sufficient.
+        conn, cursor = self.connmanager.open()
+        try:
+            try:
+                stmt = """
+                SELECT transaction.tid,
+                    CASE WHEN packed = %(TRUE)s THEN 1 ELSE 0 END,
+                    CASE WHEN pack_state_tid.tid IS NOT NULL THEN 1 ELSE 0 END
+                FROM transaction
+                    LEFT JOIN pack_state_tid ON (
+                        transaction.tid = pack_state_tid.tid)
+                WHERE transaction.tid > 0
+                    AND transaction.tid <= %(pack_tid)s
+                    AND (packed = %(FALSE)s OR pack_state_tid.tid IS NOT NULL)
+                """
+                self.runner.run_script_stmt(
+                    cursor, stmt, {'pack_tid': pack_tid})
+                tid_rows = list(cursor)
+                tid_rows.sort()  # oldest first
+
+                log.info("pack: will pack %d transaction(s)", len(tid_rows))
+
+                stmt = self._script_create_temp_pack_visit
+                if stmt:
+                    self.runner.run_script(cursor, stmt)
+
+                # Hold the commit lock while packing to prevent deadlocks.
+                # Pack in small batches of transactions in order to minimize
+                # the interruption of concurrent write operations.
+                start = time.time()
+                packed_list = []
+                self.locker.hold_commit_lock(cursor)
+                for tid, packed, has_removable in tid_rows:
+                    self._pack_transaction(
+                        cursor, pack_tid, tid, packed, has_removable,
+                        packed_list)
+                    if time.time() >= start + options.pack_batch_timeout:
+                        conn.commit()
+                        if packed_func is not None:
+                            for oid, tid in packed_list:
+                                packed_func(oid, tid)
+                        del packed_list[:]
+                        self.locker.release_commit_lock(cursor)
+                        self._pause_pack(sleep, options, start)
+                        self.locker.hold_commit_lock(cursor)
+                        start = time.time()
+                if packed_func is not None:
+                    for oid, tid in packed_list:
+                        packed_func(oid, tid)
+                packed_list = None
+
+                self._pack_cleanup(conn, cursor)
+
+            except:
+                log.exception("pack: failed")
+                conn.rollback()
+                raise
+
+            else:
+                log.info("pack: finished successfully")
+                conn.commit()
+
+        finally:
+            self.connmanager.close(conn, cursor)
+
+
+    def _pack_transaction(self, cursor, pack_tid, tid, packed,
+            has_removable, packed_list):
+        """Pack one transaction.  Requires populated pack tables."""
+        log.debug("pack: transaction %d: packing", tid)
+        removed_objects = 0
+        removed_states = 0
+
+        if has_removable:
+            stmt = self._script_pack_current_object
+            self.runner.run_script_stmt(cursor, stmt, {'tid': tid})
+            removed_objects = cursor.rowcount
+
+            stmt = self._script_pack_object_state
+            self.runner.run_script_stmt(cursor, stmt, {'tid': tid})
+            removed_states = cursor.rowcount
+
+            # Terminate prev_tid chains
+            stmt = """
+            UPDATE object_state SET prev_tid = 0
+            WHERE prev_tid = %(tid)s
+                AND tid <= %(pack_tid)s
+            """
+            self.runner.run_script_stmt(cursor, stmt,
+                {'pack_tid': pack_tid, 'tid': tid})
+
+            stmt = """
+            SELECT pack_state.zoid
+            FROM pack_state
+            WHERE pack_state.tid = %(tid)s
+            """
+            self.runner.run_script_stmt(cursor, stmt, {'tid': tid})
+            for (oid,) in cursor:
+                packed_list.append((oid, tid))
+
+        # Find out whether the transaction is empty
+        stmt = self._script_transaction_has_data
+        self.runner.run_script_stmt(cursor, stmt, {'tid': tid})
+        empty = not list(cursor)
+
+        # mark the transaction packed and possibly empty
+        if empty:
+            clause = 'empty = %(TRUE)s'
+            state = 'empty'
+        else:
+            clause = 'empty = %(FALSE)s'
+            state = 'not empty'
+        stmt = "UPDATE transaction SET packed = %(TRUE)s, " + clause
+        stmt += " WHERE tid = %(tid)s"
+        self.runner.run_script_stmt(cursor, stmt, {'tid': tid})
+
+        log.debug(
+            "pack: transaction %d (%s): removed %d object(s) and %d state(s)",
+            tid, state, removed_objects, removed_states)
+
+
+    def _pack_cleanup(self, conn, cursor):
+        """Remove unneeded table rows after packing"""
+        # commit the work done so far
+        conn.commit()
+        self.locker.release_commit_lock(cursor)
+        self.locker.hold_commit_lock(cursor)
+        log.info("pack: cleaning up")
+
+        log.debug("pack: removing unused object references")
+        stmt = self._script_pack_object_ref
+        self.runner.run_script(cursor, stmt)
+
+        log.debug("pack: removing empty packed transactions")
+        stmt = """
+        DELETE FROM transaction
+        WHERE packed = %(TRUE)s
+            AND empty = %(TRUE)s
+        """
+        self.runner.run_script_stmt(cursor, stmt)
+
+        # perform cleanup that does not require the commit lock
+        conn.commit()
+        self.locker.release_commit_lock(cursor)
+
+        log.debug("pack: clearing temporary pack state")
+        for _table in ('pack_object', 'pack_state', 'pack_state_tid'):
+            stmt = '%(TRUNCATE)s ' + _table
+            self.runner.run_script_stmt(cursor, stmt)
+
+
+class MySQLHistoryPreservingPackUndo(HistoryPreservingPackUndo):
+
+    # Work around a MySQL performance bug by avoiding an expensive subquery.
+    # See: http://mail.zope.org/pipermail/zodb-dev/2008-May/011880.html
+    #      http://bugs.mysql.com/bug.php?id=28257
+    _script_create_temp_pack_visit = """
+        CREATE TEMPORARY TABLE temp_pack_visit (
+            zoid BIGINT NOT NULL,
+            keep_tid BIGINT
+        );
+        CREATE UNIQUE INDEX temp_pack_visit_zoid ON temp_pack_visit (zoid);
+        CREATE TEMPORARY TABLE temp_pack_child (
+            zoid BIGINT NOT NULL
+        );
+        CREATE UNIQUE INDEX temp_pack_child_zoid ON temp_pack_child (zoid);
+        """
+
+    # Note: UPDATE must be the last statement in the script
+    # because it returns a value.
+    _script_pre_pack_follow_child_refs = """
+        %(TRUNCATE)s temp_pack_child;
+
+        INSERT INTO temp_pack_child
+        SELECT DISTINCT to_zoid
+        FROM object_ref
+            JOIN temp_pack_visit USING (zoid)
+        WHERE object_ref.tid >= temp_pack_visit.keep_tid;
+
+        -- MySQL-specific syntax for table join in update
+        UPDATE pack_object, temp_pack_child SET keep = %(TRUE)s
+        WHERE keep = %(FALSE)s
+            AND pack_object.zoid = temp_pack_child.zoid;
+        """
+
+    # MySQL optimizes deletion far better when using a join syntax.
+    _script_pack_current_object = """
+        DELETE FROM current_object
+        USING current_object
+            JOIN pack_state USING (zoid, tid)
+        WHERE current_object.tid = %(tid)s
+        """
+
+    _script_pack_object_state = """
+        DELETE FROM object_state
+        USING object_state
+            JOIN pack_state USING (zoid, tid)
+        WHERE object_state.tid = %(tid)s
+        """
+
+    _script_pack_object_ref = """
+        DELETE FROM object_refs_added
+        USING object_refs_added
+            JOIN transaction USING (tid)
+        WHERE transaction.empty = true;
+
+        DELETE FROM object_ref
+        USING object_ref
+            JOIN transaction USING (tid)
+        WHERE transaction.empty = true
+        """
+
+    def open_for_pre_pack(self):
+        """Open a connection to be used for the pre-pack phase.
+        Returns (conn, cursor).
+
+        This overrides a method.
+        """
+        conn, cursor = self.connmanager.open(transaction_mode=None)
+        try:
+            # This phase of packing works best with transactions
+            # disabled.  It changes no user-facing data.
+            conn.autocommit(True)
+            return conn, cursor
+        except:
+            self.connmanager.close(conn, cursor)
+            raise
+
+
+class OracleHistoryPreservingPackUndo(HistoryPreservingPackUndo):
+
+    _script_choose_pack_transaction = """
+        SELECT MAX(tid)
+        FROM transaction
+        WHERE tid > 0
+            AND tid <= %(tid)s
+            AND packed = 'N'
+        """
+
+    _script_create_temp_pack_visit = None
+    _script_create_temp_undo = None
+    _script_reset_temp_undo = "DELETE FROM temp_undo"
+
+    _script_transaction_has_data = """
+        SELECT DISTINCT tid
+        FROM object_state
+        WHERE tid = %(tid)s
+        """
+
+
+class HistoryFreePackUndo(PackUndo):
+    implements(IPackUndo)
+
+    keep_history = False
+
+    _script_create_temp_pack_visit = """
+        CREATE TEMPORARY TABLE temp_pack_visit (
+            zoid BIGINT NOT NULL,
+            keep_tid BIGINT
+        );
+        CREATE UNIQUE INDEX temp_pack_visit_zoid ON temp_pack_visit (zoid)
+        """
+
+    _script_pre_pack_follow_child_refs = """
+        UPDATE pack_object SET keep = %(TRUE)s
+        WHERE keep = %(FALSE)s
+            AND zoid IN (
+                SELECT DISTINCT to_zoid
+                FROM object_ref
+                    JOIN temp_pack_visit USING (zoid)
+            )
+        """
+
+    def verify_undoable(self, cursor, undo_tid):
+        """Raise UndoError if it is not safe to undo the specified txn."""
+        raise UndoError("Undo is not supported by this storage")
+
+    def undo(self, cursor, undo_tid, self_tid):
+        """Undo a transaction.
+
+        Parameters: "undo_tid", the integer tid of the transaction to undo,
+        and "self_tid", the integer tid of the current transaction.
+
+        Returns the list of OIDs undone.
+        """
+        raise UndoError("Undo is not supported by this storage")
+
+    def choose_pack_transaction(self, pack_point):
+        """Return the transaction before or at the specified pack time.
+
+        Returns None if there is nothing to pack.
+        """
+        return 1
+
+
+    def pre_pack(self, pack_tid, get_references, options):
+        """Decide what the garbage collector should delete.
+
+        pack_tid is ignored.
+
+        get_references is a function that accepts a pickled state and
+        returns a set of OIDs that state refers to.
+
+        options is an instance of relstorage.Options.
+        The options.pack_gc flag indicates whether to run garbage collection.
+        If pack_gc is false, this method does nothing.
+        """
+        if not options.pack_gc:
+            log.warning("pre_pack: garbage collection is disabled on a "
+                "history-free storage, so doing nothing")
+            return
+
+        conn, cursor = self.open_for_pre_pack()
+        try:
+            try:
+                self._pre_pack_main(conn, cursor, get_references)
+            except:
+                log.exception("pre_pack: failed")
+                conn.rollback()
+                raise
+            else:
+                conn.commit()
+                log.info("pre_pack: finished successfully")
+        finally:
+            self.connmanager.close(conn, cursor)
+
+
+    def _pre_pack_main(self, conn, cursor, get_references):
+        """Determine what to garbage collect.
+        """
+        stmt = self._script_create_temp_pack_visit
+        if stmt:
+            self.runner.run_script(cursor, stmt)
+
+        self.fill_object_refs(conn, cursor, get_references)
+
+        log.info("pre_pack: filling the pack_object table")
+        # Fill the pack_object table with all known OIDs.
+        stmt = """
+        %(TRUNCATE)s pack_object;
+
+        INSERT INTO pack_object (zoid, keep_tid)
+        SELECT zoid, tid
+        FROM object_state;
+
+        -- Keep the root object
+        UPDATE pack_object SET keep = %(TRUE)s
+        WHERE zoid = 0;
+        """
+        self.runner.run_script(cursor, stmt)
+
+        # Set the 'keep' flags in pack_object
+        self._visit_all_references(cursor)
+
+
+    def pack(self, pack_tid, options, sleep=None, packed_func=None):
+        """Run garbage collection.
+
+        Requires the information provided by _pre_gc.
+        """
+
+        # Read committed mode is sufficient.
+        conn, cursor = self.connmanager.open()
+        try:
+            try:
+                stmt = """
+                SELECT zoid, keep_tid
+                FROM pack_object
+                WHERE keep = %(FALSE)s
+                """
+                self.runner.run_script_stmt(cursor, stmt)
+                to_remove = list(cursor)
+
+                log.info("pack: will remove %d object(s)", len(to_remove))
+
+                # Hold the commit lock while packing to prevent deadlocks.
+                # Pack in small batches of transactions in order to minimize
+                # the interruption of concurrent write operations.
+                start = time.time()
+                packed_list = []
+                self.locker.hold_commit_lock(cursor)
+
+                for item in to_remove:
+                    oid, tid = item
+                    stmt = """
+                    DELETE FROM object_state
+                    WHERE zoid = %(oid)s
+                        AND tid = %(tid)s
+                    """
+                    self.runner.run_script_stmt(
+                        cursor, stmt, {'oid': oid, 'tid': tid})
+                    packed_list.append(item)
+
+                    if time.time() >= start + options.pack_batch_timeout:
+                        conn.commit()
+                        if packed_func is not None:
+                            for oid, tid in packed_list:
+                                packed_func(oid, tid)
+                        del packed_list[:]
+                        self.locker.release_commit_lock(cursor)
+                        self._pause_pack(sleep, options, start)
+                        self.locker.hold_commit_lock(cursor)
+                        start = time.time()
+
+                if packed_func is not None:
+                    for oid, tid in packed_list:
+                        packed_func(oid, tid)
+                packed_list = None
+
+                self._pack_cleanup(conn, cursor)
+
+            except:
+                log.exception("pack: failed")
+                conn.rollback()
+                raise
+
+            else:
+                log.info("pack: finished successfully")
+                conn.commit()
+
+        finally:
+            self.connmanager.close(conn, cursor)
+
+
+    def _pack_cleaup(self, conn, cursor):
+        # commit the work done so far
+        conn.commit()
+        self.locker.release_commit_lock(cursor)
+        self.locker.hold_commit_lock(cursor)
+        log.info("pack: cleaning up")
+
+        stmt = """
+        DELETE FROM object_refs_added
+        WHERE tid NOT IN (
+            SELECT DISTINCT tid
+            FROM object_state
+        );
+
+        DELETE FROM object_ref
+        WHERE zoid IN (
+            SELECT zoid
+            FROM pack_object
+            WHERE keep = %(FALSE)s
+        );
+
+        %(TRUNCATE)s pack_object
+        """
+        self.runner.run_script(cursor, stmt)

Added: relstorage/trunk/relstorage/adapters/poller.py
===================================================================
--- relstorage/trunk/relstorage/adapters/poller.py	                        (rev 0)
+++ relstorage/trunk/relstorage/adapters/poller.py	2009-09-23 21:12:58 UTC (rev 104464)
@@ -0,0 +1,83 @@
+##############################################################################
+#
+# Copyright (c) 2009 Zope Foundation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+
+from relstorage.adapters.interfaces import IPoller
+from zope.interface import implements
+
+class Poller:
+    """Database change notification poller"""
+    implements(IPoller)
+
+    def __init__(self, poll_query, keep_history, runner):
+        self.poll_query = poll_query
+        self.keep_history = keep_history
+        self.runner = runner
+
+    def poll_invalidations(self, conn, cursor, prev_polled_tid, ignore_tid):
+        """Polls for new transactions.
+
+        conn and cursor must have been created previously by open_for_load().
+        prev_polled_tid is the tid returned at the last poll, or None
+        if this is the first poll.  If ignore_tid is not None, changes
+        committed in that transaction will not be included in the list
+        of changed OIDs.
+
+        Returns (changed_oids, new_polled_tid).
+        """
+        # find out the tid of the most recent transaction.
+        cursor.execute(self.poll_query)
+        new_polled_tid = cursor.fetchone()[0]
+
+        if prev_polled_tid is None:
+            # This is the first time the connection has polled.
+            return None, new_polled_tid
+
+        if new_polled_tid == prev_polled_tid:
+            # No transactions have been committed since prev_polled_tid.
+            return (), new_polled_tid
+
+        if self.keep_history:
+            stmt = "SELECT 1 FROM transaction WHERE tid = %(tid)s"
+        else:
+            stmt = "SELECT 1 FROM object_state WHERE tid <= %(tid)s LIMIT 1"
+        cursor.execute(intern(stmt % self.runner.script_vars),
+            {'tid': prev_polled_tid})
+        rows = cursor.fetchall()
+        if not rows:
+            # Transaction not found; perhaps it has been packed.
+            # The connection cache needs to be cleared.
+            return None, new_polled_tid
+
+        # Get the list of changed OIDs and return it.
+        if ignore_tid is None:
+            stmt = """
+            SELECT zoid
+            FROM current_object
+            WHERE tid > %(tid)s
+            """
+            cursor.execute(intern(stmt % self.runner.script_vars),
+                {'tid': prev_polled_tid})
+        else:
+            stmt = """
+            SELECT zoid
+            FROM current_object
+            WHERE tid > %(tid)s
+                AND tid != %(self_tid)s
+            """
+            cursor.execute(intern(stmt % self.runner.script_vars),
+                {'tid': prev_polled_tid, 'self_tid': ignore_tid})
+        oids = [oid for (oid,) in cursor]
+
+        return oids, new_polled_tid
+

Modified: relstorage/trunk/relstorage/adapters/postgresql.py
===================================================================
--- relstorage/trunk/relstorage/adapters/postgresql.py	2009-09-23 18:16:28 UTC (rev 104463)
+++ relstorage/trunk/relstorage/adapters/postgresql.py	2009-09-23 21:12:58 UTC (rev 104464)
@@ -13,178 +13,133 @@
 ##############################################################################
 """PostgreSQL adapter for RelStorage."""
 
-from base64 import decodestring, encodestring
 import logging
-import psycopg2, psycopg2.extensions
-import re
-from ZODB.POSException import StorageError
+import psycopg2
+import psycopg2.extensions
 
-from relstorage.adapters.historypreserving import HistoryPreservingAdapter
+from relstorage.adapters.connmanager import AbstractConnectionManager
+from relstorage.adapters.dbiter import HistoryPreservingDatabaseIterator
+from relstorage.adapters.loadstore import HistoryPreservingPostgreSQLLoadStore
+from relstorage.adapters.locker import PostgreSQLLocker
+from relstorage.adapters.packundo import HistoryPreservingPackUndo
+from relstorage.adapters.poller import Poller
+from relstorage.adapters.schema import HistoryPreservingPostgreSQLSchema
+from relstorage.adapters.scriptrunner import ScriptRunner
+from relstorage.adapters.stats import PostgreSQLStats
+from relstorage.adapters.txncontrol import PostgreSQLTransactionControl
 
-log = logging.getLogger("relstorage.adapters.postgresql")
+log = logging.getLogger(__name__)
 
 # disconnected_exceptions contains the exception types that might be
 # raised when the connection to the database has been broken.
 disconnected_exceptions = (psycopg2.OperationalError, psycopg2.InterfaceError)
 
 
-class PostgreSQLAdapter(HistoryPreservingAdapter):
+class PostgreSQLAdapter(object):
     """PostgreSQL adapter for RelStorage."""
 
+    keep_history = True
+
     def __init__(self, dsn=''):
-        self._dsn = dsn
+        self.connmanager = Psycopg2ConnectionManager(dsn)
+        self.runner = ScriptRunner()
+        self.locker = PostgreSQLLocker((psycopg2.DatabaseError,))
+        self.schema = HistoryPreservingPostgreSQLSchema(
+            locker=self.locker,
+            connmanager=self.connmanager,
+            )
+        self.loadstore = HistoryPreservingPostgreSQLLoadStore(
+            connmanager=self.connmanager,
+            disconnected_exceptions=disconnected_exceptions,
+            )
+        self.txncontrol = PostgreSQLTransactionControl()
+        self.poller = Poller(
+            poll_query="EXECUTE get_latest_tid",
+            keep_history=True,
+            runner=self.runner,
+            )
+        self.packundo = HistoryPreservingPackUndo(
+            connmanager=self.connmanager,
+            runner=self.runner,
+            locker=self.locker,
+            )
+        self.dbiter = HistoryPreservingDatabaseIterator(
+            runner=self.runner,
+            )
+        self.stats = PostgreSQLStats(
+            connmanager=self.connmanager,
+            )
 
-    def create_schema(self, cursor):
-        """Create the database tables."""
-        stmt = """
-        CREATE TABLE commit_lock ();
+        self.open = self.connmanager.open
+        self.close = self.connmanager.close
 
-        -- The list of all transactions in the database
-        CREATE TABLE transaction (
-            tid         BIGINT NOT NULL PRIMARY KEY,
-            packed      BOOLEAN NOT NULL DEFAULT FALSE,
-            empty       BOOLEAN NOT NULL DEFAULT FALSE,
-            username    BYTEA NOT NULL,
-            description BYTEA NOT NULL,
-            extension   BYTEA
-        );
+        self.hold_commit_lock = self.locker.hold_commit_lock
+        self.release_commit_lock = self.locker.release_commit_lock
+        self.hold_pack_lock = self.locker.hold_pack_lock
+        self.release_pack_lock = self.locker.release_pack_lock
 
-        -- Create a special transaction to represent object creation.  This
-        -- row is often referenced by object_state.prev_tid, but never by
-        -- object_state.tid.
-        INSERT INTO transaction (tid, username, description)
-            VALUES (0, 'system', 'special transaction for object creation');
+        self.create_schema = self.schema.create
+        self.prepare_schema = self.schema.prepare
+        self.zap_all = self.schema.zap_all
+        self.drop_all = self.schema.drop_all
 
-        CREATE SEQUENCE zoid_seq;
+        self.open_for_load = self.loadstore.open_for_load
+        self.restart_load = self.loadstore.restart_load
+        self.get_current_tid = self.loadstore.get_current_tid
+        self.load_current = self.loadstore.load_current
+        self.load_revision = self.loadstore.load_revision
+        self.exists = self.loadstore.exists
+        self.load_before = self.loadstore.load_before
+        self.get_object_tid_after = self.loadstore.get_object_tid_after
 
-        -- All object states in all transactions.  Note that md5 and state
-        -- can be null to represent object uncreation.
-        CREATE TABLE object_state (
-            zoid        BIGINT NOT NULL,
-            tid         BIGINT NOT NULL REFERENCES transaction
-                        CHECK (tid > 0),
-            PRIMARY KEY (zoid, tid),
-            prev_tid    BIGINT NOT NULL REFERENCES transaction,
-            md5         CHAR(32),
-            state       BYTEA
-        );
-        CREATE INDEX object_state_tid ON object_state (tid);
-        CREATE INDEX object_state_prev_tid ON object_state (prev_tid);
+        self.open_for_store = self.loadstore.open_for_store
+        self.restart_store = self.loadstore.restart_store
+        self.store_temp = self.loadstore.store_temp
+        self.replace_temp = self.loadstore.replace_temp
+        self.restore = self.loadstore.restore
+        self.detect_conflict = self.loadstore.detect_conflict
+        self.move_from_temp = self.loadstore.move_from_temp
+        self.update_current = self.loadstore.update_current
+        self.set_min_oid = self.loadstore.set_min_oid
+        self.new_oid = self.loadstore.new_oid
 
-        -- Pointers to the current object state
-        CREATE TABLE current_object (
-            zoid        BIGINT NOT NULL PRIMARY KEY,
-            tid         BIGINT NOT NULL,
-            FOREIGN KEY (zoid, tid) REFERENCES object_state
-        );
-        CREATE INDEX current_object_tid ON current_object (tid);
+        self.get_tid_and_time = self.txncontrol.get_tid_and_time
+        self.add_transaction = self.txncontrol.add_transaction
+        self.commit_phase1 = self.txncontrol.commit_phase1
+        self.commit_phase2 = self.txncontrol.commit_phase2
+        self.abort = self.txncontrol.abort
 
-        -- A list of referenced OIDs from each object_state.
-        -- This table is populated as needed during packing.
-        -- To prevent unnecessary table locking, it does not use
-        -- foreign keys, which is safe because rows in object_state
-        -- are never modified once committed, and rows are removed
-        -- from object_state only by packing.
-        CREATE TABLE object_ref (
-            zoid        BIGINT NOT NULL,
-            tid         BIGINT NOT NULL,
-            to_zoid     BIGINT NOT NULL,
-            PRIMARY KEY (tid, zoid, to_zoid)
-        );
+        self.poll_invalidations = self.poller.poll_invalidations
 
-        -- The object_refs_added table tracks whether object_refs has
-        -- been populated for all states in a given transaction.
-        -- An entry is added only when the work is finished.
-        -- To prevent unnecessary table locking, it does not use
-        -- foreign keys, which is safe because object states
-        -- are never added to a transaction once committed, and
-        -- rows are removed from the transaction table only by
-        -- packing.
-        CREATE TABLE object_refs_added (
-            tid         BIGINT NOT NULL PRIMARY KEY
-        );
+        self.fill_object_refs = self.packundo.fill_object_refs
+        self.open_for_pre_pack = self.packundo.open_for_pre_pack
+        self.choose_pack_transaction = self.packundo.choose_pack_transaction
+        self.pre_pack = self.packundo.pre_pack
+        self.pack = self.packundo.pack
+        self.verify_undoable = self.packundo.verify_undoable
+        self.undo = self.packundo.undo
 
-        -- Temporary state during packing:
-        -- The list of objects to pack.  If keep is false,
-        -- the object and all its revisions will be removed.
-        -- If keep is true, instead of removing the object,
-        -- the pack operation will cut the object's history.
-        -- The keep_tid field specifies the oldest revision
-        -- of the object to keep.
-        -- The visited flag is set when pre_pack is visiting an object's
-        -- references, and remains set.
-        CREATE TABLE pack_object (
-            zoid        BIGINT NOT NULL PRIMARY KEY,
-            keep        BOOLEAN NOT NULL,
-            keep_tid    BIGINT NOT NULL,
-            visited     BOOLEAN NOT NULL DEFAULT FALSE
-        );
-        CREATE INDEX pack_object_keep_false ON pack_object (zoid)
-            WHERE keep = false;
-        CREATE INDEX pack_object_keep_true ON pack_object (visited)
-            WHERE keep = true;
+        self.iter_objects = self.dbiter.iter_objects
+        self.iter_transactions = self.dbiter.iter_transactions
+        self.iter_transactions_range = self.dbiter.iter_transactions_range
+        self.iter_object_history = self.dbiter.iter_object_history
 
-        -- Temporary state during packing: the list of object states to pack.
-        CREATE TABLE pack_state (
-            tid         BIGINT NOT NULL,
-            zoid        BIGINT NOT NULL,
-            PRIMARY KEY (tid, zoid)
-        );
+        self.get_object_count = self.stats.get_object_count
+        self.get_db_size = self.stats.get_db_size
 
-        -- Temporary state during packing: the list of transactions that
-        -- have at least one object state to pack.
-        CREATE TABLE pack_state_tid (
-            tid         BIGINT NOT NULL PRIMARY KEY
-        );
-        """
-        cursor.execute(stmt)
 
-        if not self._pg_has_advisory_locks(cursor):
-            cursor.execute("CREATE TABLE pack_lock ()")
+class Psycopg2ConnectionManager(AbstractConnectionManager):
 
+    isolation_read_committed = (
+        psycopg2.extensions.ISOLATION_LEVEL_READ_COMMITTED)
+    isolation_serializable = (
+        psycopg2.extensions.ISOLATION_LEVEL_SERIALIZABLE)
 
-    def prepare_schema(self):
-        """Create the database schema if it does not already exist."""
-        def callback(conn, cursor):
-            cursor.execute("""
-            SELECT tablename
-            FROM pg_tables
-            WHERE tablename = 'object_state'
-            """)
-            if not cursor.rowcount:
-                self.create_schema(cursor)
-        self._open_and_call(callback)
+    close_exceptions = disconnected_exceptions
 
-    def zap_all(self):
-        """Clear all data out of the database."""
-        def callback(conn, cursor):
-            cursor.execute("""
-            DELETE FROM object_refs_added;
-            DELETE FROM object_ref;
-            DELETE FROM current_object;
-            DELETE FROM object_state;
-            DELETE FROM transaction;
-            -- Create a special transaction to represent object creation.
-            INSERT INTO transaction (tid, username, description) VALUES
-                (0, 'system', 'special transaction for object creation');
-            ALTER SEQUENCE zoid_seq RESTART WITH 1;
-            """)
-        self._open_and_call(callback)
+    def __init__(self, dsn):
+        self._dsn = dsn
 
-    def drop_all(self):
-        """Drop all tables and sequences."""
-        def callback(conn, cursor):
-            cursor.execute("SELECT tablename FROM pg_tables")
-            existent = set([name for (name,) in cursor])
-            for tablename in ('pack_state_tid', 'pack_state',
-                    'pack_object', 'object_refs_added', 'object_ref',
-                    'current_object', 'object_state', 'transaction',
-                    'commit_lock', 'pack_lock'):
-                if tablename in existent:
-                    cursor.execute("DROP TABLE %s" % tablename)
-            cursor.execute("DROP SEQUENCE zoid_seq")
-        self._open_and_call(callback)
-
     def open(self,
             isolation=psycopg2.extensions.ISOLATION_LEVEL_READ_COMMITTED):
         """Open a database connection and return (conn, cursor)."""
@@ -198,397 +153,3 @@
             raise
         return conn, cursor
 
-    def close(self, conn, cursor):
-        """Close a connection and cursor, ignoring certain errors.
-        """
-        for obj in (cursor, conn):
-            if obj is not None:
-                try:
-                    obj.close()
-                except disconnected_exceptions:
-                    pass
-
-    def _pg_version(self, cursor):
-        """Return the (major, minor) version of PostgreSQL"""
-        cursor.execute("SELECT version()")
-        v = cursor.fetchone()[0]
-        m = re.search(r"([0-9]+)[.]([0-9]+)", v)
-        if m is None:
-            raise AssertionError("Unable to detect PostgreSQL version: " + v)
-        else:
-            return int(m.group(1)), int(m.group(2))
-
-    def _pg_has_advisory_locks(self, cursor):
-        """Return true if this version of PostgreSQL supports advisory locks"""
-        return self._pg_version(cursor) >= (8, 2)
-
-    def open_for_load(self):
-        """Open and initialize a connection for loading objects.
-
-        Returns (conn, cursor).
-        """
-        conn, cursor = self.open(
-            psycopg2.extensions.ISOLATION_LEVEL_SERIALIZABLE)
-        stmt = """
-        PREPARE get_latest_tid AS
-        SELECT tid
-        FROM transaction
-        ORDER BY tid DESC
-        LIMIT 1
-        """
-        cursor.execute(stmt)
-        return conn, cursor
-
-    def restart_load(self, cursor):
-        """Reinitialize a connection for loading objects."""
-        try:
-            cursor.connection.rollback()
-        except disconnected_exceptions, e:
-            raise StorageError(e)
-
-    def get_object_count(self):
-        """Returns the number of objects in the database"""
-        # do later
-        return 0
-
-    def get_db_size(self):
-        """Returns the approximate size of the database in bytes"""
-        def callback(conn, cursor):
-            cursor.execute("SELECT pg_database_size(current_database())")
-            return cursor.fetchone()[0]
-        return self._open_and_call(callback)
-
-    def get_current_tid(self, cursor, oid):
-        """Returns the current integer tid for an object.
-
-        oid is an integer.  Returns None if object does not exist.
-        """
-        cursor.execute("""
-        SELECT tid
-        FROM current_object
-        WHERE zoid = %s
-        """, (oid,))
-        if cursor.rowcount:
-            assert cursor.rowcount == 1
-            return cursor.fetchone()[0]
-        return None
-
-    def load_current(self, cursor, oid):
-        """Returns the current pickle and integer tid for an object.
-
-        oid is an integer.  Returns (None, None) if object does not exist.
-        """
-        cursor.execute("""
-        SELECT encode(state, 'base64'), tid
-        FROM current_object
-            JOIN object_state USING(zoid, tid)
-        WHERE zoid = %s
-        """, (oid,))
-        if cursor.rowcount:
-            assert cursor.rowcount == 1
-            state64, tid = cursor.fetchone()
-            if state64 is not None:
-                state = decodestring(state64)
-            else:
-                # This object's creation has been undone
-                state = None
-            return state, tid
-        else:
-            return None, None
-
-    def load_revision(self, cursor, oid, tid):
-        """Returns the pickle for an object on a particular transaction.
-
-        Returns None if no such state exists.
-        """
-        cursor.execute("""
-        SELECT encode(state, 'base64')
-        FROM object_state
-        WHERE zoid = %s
-            AND tid = %s
-        """, (oid, tid))
-        if cursor.rowcount:
-            assert cursor.rowcount == 1
-            (state64,) = cursor.fetchone()
-            if state64 is not None:
-                return decodestring(state64)
-        return None
-
-    def exists(self, cursor, oid):
-        """Returns a true value if the given object exists."""
-        cursor.execute("SELECT 1 FROM current_object WHERE zoid = %s", (oid,))
-        return cursor.rowcount
-
-    def load_before(self, cursor, oid, tid):
-        """Returns the pickle and tid of an object before transaction tid.
-
-        Returns (None, None) if no earlier state exists.
-        """
-        cursor.execute("""
-        SELECT encode(state, 'base64'), tid
-        FROM object_state
-        WHERE zoid = %s
-            AND tid < %s
-        ORDER BY tid DESC
-        LIMIT 1
-        """, (oid, tid))
-        if cursor.rowcount:
-            assert cursor.rowcount == 1
-            state64, tid = cursor.fetchone()
-            if state64 is not None:
-                state = decodestring(state64)
-            else:
-                # The object's creation has been undone
-                state = None
-            return state, tid
-        else:
-            return None, None
-
-    def get_object_tid_after(self, cursor, oid, tid):
-        """Returns the tid of the next change after an object revision.
-
-        Returns None if no later state exists.
-        """
-        stmt = """
-        SELECT tid
-        FROM object_state
-        WHERE zoid = %s
-            AND tid > %s
-        ORDER BY tid
-        LIMIT 1
-        """
-        cursor.execute(stmt, (oid, tid))
-        if cursor.rowcount:
-            assert cursor.rowcount == 1
-            return cursor.fetchone()[0]
-        else:
-            return None
-
-    def _make_temp_table(self, cursor):
-        """Create the temporary table for storing objects"""
-        stmt = """
-        CREATE TEMPORARY TABLE temp_store (
-            zoid        BIGINT NOT NULL,
-            prev_tid    BIGINT NOT NULL,
-            md5         CHAR(32),
-            state       BYTEA
-        ) ON COMMIT DROP;
-        CREATE UNIQUE INDEX temp_store_zoid ON temp_store (zoid)
-        """
-        cursor.execute(stmt)
-
-    def open_for_store(self):
-        """Open and initialize a connection for storing objects.
-
-        Returns (conn, cursor).
-        """
-        conn, cursor = self.open()
-        try:
-            self._make_temp_table(cursor)
-            return conn, cursor
-        except:
-            self.close(conn, cursor)
-            raise
-
-    def restart_store(self, cursor):
-        """Reuse a store connection."""
-        try:
-            cursor.connection.rollback()
-            self._make_temp_table(cursor)
-        except disconnected_exceptions, e:
-            raise StorageError(e)
-
-    def store_temp(self, cursor, oid, prev_tid, data):
-        """Store an object in the temporary table."""
-        md5sum = self.md5sum(data)
-        stmt = """
-        DELETE FROM temp_store WHERE zoid = %s;
-        INSERT INTO temp_store (zoid, prev_tid, md5, state)
-        VALUES (%s, %s, %s, decode(%s, 'base64'))
-        """
-        cursor.execute(stmt, (oid, oid, prev_tid, md5sum, encodestring(data)))
-
-    def replace_temp(self, cursor, oid, prev_tid, data):
-        """Replace an object in the temporary table."""
-        md5sum = self.md5sum(data)
-        stmt = """
-        UPDATE temp_store SET
-            prev_tid = %s,
-            md5 = %s,
-            state = decode(%s, 'base64')
-        WHERE zoid = %s
-        """
-        cursor.execute(stmt, (prev_tid, md5sum, encodestring(data), oid))
-
-    def restore(self, cursor, oid, tid, data):
-        """Store an object directly, without conflict detection.
-
-        Used for copying transactions into this database.
-        """
-        md5sum = self.md5sum(data)
-        stmt = """
-        INSERT INTO object_state (zoid, tid, prev_tid, md5, state)
-        VALUES (%s, %s,
-            COALESCE((SELECT tid FROM current_object WHERE zoid = %s), 0),
-            %s, decode(%s, 'base64'))
-        """
-        if data is not None:
-            data = encodestring(data)
-        cursor.execute(stmt, (oid, tid, oid, md5sum, data))
-
-    def start_commit(self, cursor):
-        """Prepare to commit."""
-        # Hold commit_lock to prevent concurrent commits
-        # (for as short a time as possible).
-        # Lock transaction and current_object in share mode to ensure
-        # conflict detection has the most current data.
-        cursor.execute("""
-        LOCK TABLE commit_lock IN EXCLUSIVE MODE;
-        LOCK TABLE transaction IN SHARE MODE;
-        LOCK TABLE current_object IN SHARE MODE
-        """)
-
-    def get_tid_and_time(self, cursor):
-        """Returns the most recent tid and the current database time.
-
-        The database time is the number of seconds since the epoch.
-        """
-        cursor.execute("""
-        SELECT tid, EXTRACT(EPOCH FROM CURRENT_TIMESTAMP)
-        FROM transaction
-        ORDER BY tid DESC
-        LIMIT 1
-        """)
-        assert cursor.rowcount == 1
-        return cursor.fetchone()
-
-    def add_transaction(self, cursor, tid, username, description, extension,
-            packed=False):
-        """Add a transaction."""
-        stmt = """
-        INSERT INTO transaction
-            (tid, packed, username, description, extension)
-        VALUES (%s, %s,
-            decode(%s, 'base64'), decode(%s, 'base64'), decode(%s, 'base64'))
-        """
-        cursor.execute(stmt, (tid, packed,
-            encodestring(username), encodestring(description),
-            encodestring(extension)))
-
-    def detect_conflict(self, cursor):
-        """Find one conflict in the data about to be committed.
-
-        If there is a conflict, returns (oid, prev_tid, attempted_prev_tid,
-        attempted_data).  If there is no conflict, returns None.
-        """
-        stmt = """
-        SELECT temp_store.zoid, current_object.tid, temp_store.prev_tid,
-            encode(temp_store.state, 'base64')
-        FROM temp_store
-            JOIN current_object ON (temp_store.zoid = current_object.zoid)
-        WHERE temp_store.prev_tid != current_object.tid
-        LIMIT 1
-        """
-        cursor.execute(stmt)
-        if cursor.rowcount:
-            oid, prev_tid, attempted_prev_tid, data = cursor.fetchone()
-            return oid, prev_tid, attempted_prev_tid, decodestring(data)
-        return None
-
-    def move_from_temp(self, cursor, tid):
-        """Moved the temporarily stored objects to permanent storage.
-
-        Returns the list of oids stored.
-        """
-        stmt = """
-        INSERT INTO object_state (zoid, tid, prev_tid, md5, state)
-        SELECT zoid, %s, prev_tid, md5, state
-        FROM temp_store
-        """
-        cursor.execute(stmt, (tid,))
-
-        stmt = """
-        SELECT zoid FROM temp_store
-        """
-        cursor.execute(stmt)
-        return [oid for (oid,) in cursor]
-
-    def update_current(self, cursor, tid):
-        """Update the current object pointers.
-
-        tid is the integer tid of the transaction being committed.
-        """
-        cursor.execute("""
-        -- Insert objects created in this transaction into current_object.
-        INSERT INTO current_object (zoid, tid)
-        SELECT zoid, tid FROM object_state
-        WHERE tid = %(tid)s
-            AND prev_tid = 0;
-
-        -- Change existing objects.  To avoid deadlocks,
-        -- update in OID order.
-        UPDATE current_object SET tid = %(tid)s
-        WHERE zoid IN (
-            SELECT zoid FROM object_state
-            WHERE tid = %(tid)s
-                AND prev_tid != 0
-            ORDER BY zoid
-        )
-        """, {'tid': tid})
-
-    def set_min_oid(self, cursor, oid):
-        """Ensure the next OID is at least the given OID."""
-        cursor.execute("""
-        SELECT CASE WHEN %s > nextval('zoid_seq')
-            THEN setval('zoid_seq', %s)
-            ELSE 0
-            END
-        """, (oid, oid))
-
-    def commit_phase1(self, cursor, tid):
-        """Begin a commit.  Returns the transaction name.
-
-        This method should guarantee that commit_phase2() will succeed,
-        meaning that if commit_phase2() would raise any error, the error
-        should be raised in commit_phase1() instead.
-        """
-        return '-'
-
-    def commit_phase2(self, cursor, txn):
-        """Final transaction commit."""
-        cursor.connection.commit()
-
-    def abort(self, cursor, txn=None):
-        """Abort the commit.  If txn is not None, phase 1 is also aborted."""
-        cursor.connection.rollback()
-
-    def new_oid(self, cursor):
-        """Return a new, unused OID."""
-        stmt = "SELECT NEXTVAL('zoid_seq')"
-        cursor.execute(stmt)
-        return cursor.fetchone()[0]
-
-    def hold_pack_lock(self, cursor):
-        """Try to acquire the pack lock.
-
-        Raise an exception if packing or undo is already in progress.
-        """
-        if self._pg_has_advisory_locks(cursor):
-            cursor.execute("SELECT pg_try_advisory_lock(1)")
-            locked = cursor.fetchone()[0]
-            if not locked:
-                raise StorageError('A pack or undo operation is in progress')
-        else:
-            # b/w compat
-            try:
-                cursor.execute("LOCK pack_lock IN EXCLUSIVE MODE NOWAIT")
-            except psycopg2.DatabaseError:
-                raise StorageError('A pack or undo operation is in progress')
-
-    def release_pack_lock(self, cursor):
-        """Release the pack lock."""
-        if self._pg_has_advisory_locks(cursor):
-            cursor.execute("SELECT pg_advisory_unlock(1)")
-        # else no action needed since the lock will be released at txn commit
-
-    _poll_query = "EXECUTE get_latest_tid"

Added: relstorage/trunk/relstorage/adapters/schema.py
===================================================================
--- relstorage/trunk/relstorage/adapters/schema.py	                        (rev 0)
+++ relstorage/trunk/relstorage/adapters/schema.py	2009-09-23 21:12:58 UTC (rev 104464)
@@ -0,0 +1,506 @@
+##############################################################################
+#
+# Copyright (c) 2009 Zope Foundation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""Database schema installers
+"""
+from relstorage.adapters.interfaces import ISchemaInstaller
+from zope.interface import implements
+import time
+
+class HistoryPreservingPostgreSQLSchema(object):
+    implements(ISchemaInstaller)
+
+    script = """
+    CREATE TABLE commit_lock ();
+
+    -- The list of all transactions in the database
+    CREATE TABLE transaction (
+        tid         BIGINT NOT NULL PRIMARY KEY,
+        packed      BOOLEAN NOT NULL DEFAULT FALSE,
+        empty       BOOLEAN NOT NULL DEFAULT FALSE,
+        username    BYTEA NOT NULL,
+        description BYTEA NOT NULL,
+        extension   BYTEA
+    );
+
+    -- Create a special transaction to represent object creation.  This
+    -- row is often referenced by object_state.prev_tid, but never by
+    -- object_state.tid.
+    INSERT INTO transaction (tid, username, description)
+        VALUES (0, 'system', 'special transaction for object creation');
+
+    CREATE SEQUENCE zoid_seq;
+
+    -- All object states in all transactions.  Note that md5 and state
+    -- can be null to represent object uncreation.
+    CREATE TABLE object_state (
+        zoid        BIGINT NOT NULL,
+        tid         BIGINT NOT NULL REFERENCES transaction
+                    CHECK (tid > 0),
+        PRIMARY KEY (zoid, tid),
+        prev_tid    BIGINT NOT NULL REFERENCES transaction,
+        md5         CHAR(32),
+        state       BYTEA
+    );
+    CREATE INDEX object_state_tid ON object_state (tid);
+    CREATE INDEX object_state_prev_tid ON object_state (prev_tid);
+
+    -- Pointers to the current object state
+    CREATE TABLE current_object (
+        zoid        BIGINT NOT NULL PRIMARY KEY,
+        tid         BIGINT NOT NULL,
+        FOREIGN KEY (zoid, tid) REFERENCES object_state
+    );
+    CREATE INDEX current_object_tid ON current_object (tid);
+
+    -- A list of referenced OIDs from each object_state.
+    -- This table is populated as needed during packing.
+    -- To prevent unnecessary table locking, it does not use
+    -- foreign keys, which is safe because rows in object_state
+    -- are never modified once committed, and rows are removed
+    -- from object_state only by packing.
+    CREATE TABLE object_ref (
+        zoid        BIGINT NOT NULL,
+        tid         BIGINT NOT NULL,
+        to_zoid     BIGINT NOT NULL,
+        PRIMARY KEY (tid, zoid, to_zoid)
+    );
+
+    -- The object_refs_added table tracks whether object_refs has
+    -- been populated for all states in a given transaction.
+    -- An entry is added only when the work is finished.
+    -- To prevent unnecessary table locking, it does not use
+    -- foreign keys, which is safe because object states
+    -- are never added to a transaction once committed, and
+    -- rows are removed from the transaction table only by
+    -- packing.
+    CREATE TABLE object_refs_added (
+        tid         BIGINT NOT NULL PRIMARY KEY
+    );
+
+    -- Temporary state during packing:
+    -- The list of objects to pack.  If keep is false,
+    -- the object and all its revisions will be removed.
+    -- If keep is true, instead of removing the object,
+    -- the pack operation will cut the object's history.
+    -- The keep_tid field specifies the oldest revision
+    -- of the object to keep.
+    -- The visited flag is set when pre_pack is visiting an object's
+    -- references, and remains set.
+    CREATE TABLE pack_object (
+        zoid        BIGINT NOT NULL PRIMARY KEY,
+        keep        BOOLEAN NOT NULL,
+        keep_tid    BIGINT NOT NULL,
+        visited     BOOLEAN NOT NULL DEFAULT FALSE
+    );
+    CREATE INDEX pack_object_keep_false ON pack_object (zoid)
+        WHERE keep = false;
+    CREATE INDEX pack_object_keep_true ON pack_object (visited)
+        WHERE keep = true;
+
+    -- Temporary state during packing: the list of object states to pack.
+    CREATE TABLE pack_state (
+        tid         BIGINT NOT NULL,
+        zoid        BIGINT NOT NULL,
+        PRIMARY KEY (tid, zoid)
+    );
+
+    -- Temporary state during packing: the list of transactions that
+    -- have at least one object state to pack.
+    CREATE TABLE pack_state_tid (
+        tid         BIGINT NOT NULL PRIMARY KEY
+    );
+    """
+
+    def __init__(self, locker, connmanager):
+        self.locker = locker
+        self.connmanager = connmanager
+
+    def create(self, cursor):
+        """Create the database tables."""
+        cursor.execute(self.script)
+        self.locker.create_pack_lock(cursor)
+
+    def prepare(self):
+        """Create the database schema if it does not already exist."""
+        def callback(conn, cursor):
+            cursor.execute("""
+            SELECT tablename
+            FROM pg_tables
+            WHERE tablename = 'object_state'
+            """)
+            if not cursor.rowcount:
+                self.create(cursor)
+        self.connmanager.open_and_call(callback)
+
+    def zap_all(self):
+        """Clear all data out of the database."""
+        def callback(conn, cursor):
+            cursor.execute("""
+            DELETE FROM object_refs_added;
+            DELETE FROM object_ref;
+            DELETE FROM current_object;
+            DELETE FROM object_state;
+            DELETE FROM transaction;
+            -- Create a special transaction to represent object creation.
+            INSERT INTO transaction (tid, username, description) VALUES
+                (0, 'system', 'special transaction for object creation');
+            ALTER SEQUENCE zoid_seq RESTART WITH 1;
+            """)
+        self.connmanager.open_and_call(callback)
+
+    def drop_all(self):
+        """Drop all tables and sequences."""
+        def callback(conn, cursor):
+            cursor.execute("SELECT tablename FROM pg_tables")
+            existent = set([name for (name,) in cursor])
+            for tablename in ('pack_state_tid', 'pack_state',
+                    'pack_object', 'object_refs_added', 'object_ref',
+                    'current_object', 'object_state', 'transaction',
+                    'commit_lock', 'pack_lock'):
+                if tablename in existent:
+                    cursor.execute("DROP TABLE %s" % tablename)
+            cursor.execute("DROP SEQUENCE zoid_seq")
+        self.connmanager.open_and_call(callback)
+
+
+class HistoryPreservingMySQLSchema(object):
+    implements(ISchemaInstaller)
+
+    script = """
+    -- The list of all transactions in the database
+    CREATE TABLE transaction (
+        tid         BIGINT NOT NULL PRIMARY KEY,
+        packed      BOOLEAN NOT NULL DEFAULT FALSE,
+        empty       BOOLEAN NOT NULL DEFAULT FALSE,
+        username    BLOB NOT NULL,
+        description BLOB NOT NULL,
+        extension   BLOB
+    ) ENGINE = InnoDB;
+
+    -- Create a special transaction to represent object creation.  This
+    -- row is often referenced by object_state.prev_tid, but never by
+    -- object_state.tid.
+    INSERT INTO transaction (tid, username, description)
+        VALUES (0, 'system', 'special transaction for object creation');
+
+    -- All OIDs allocated in the database.  Note that this table
+    -- is purposely non-transactional.
+    CREATE TABLE new_oid (
+        zoid        BIGINT NOT NULL PRIMARY KEY AUTO_INCREMENT
+    ) ENGINE = MyISAM;
+
+    -- All object states in all transactions.  Note that md5 and state
+    -- can be null to represent object uncreation.
+    CREATE TABLE object_state (
+        zoid        BIGINT NOT NULL,
+        tid         BIGINT NOT NULL REFERENCES transaction,
+        PRIMARY KEY (zoid, tid),
+        prev_tid    BIGINT NOT NULL REFERENCES transaction,
+        md5         CHAR(32) CHARACTER SET ascii,
+        state       LONGBLOB,
+        CHECK (tid > 0)
+    ) ENGINE = InnoDB;
+    CREATE INDEX object_state_tid ON object_state (tid);
+    CREATE INDEX object_state_prev_tid ON object_state (prev_tid);
+
+    -- Pointers to the current object state
+    CREATE TABLE current_object (
+        zoid        BIGINT NOT NULL PRIMARY KEY,
+        tid         BIGINT NOT NULL,
+        FOREIGN KEY (zoid, tid) REFERENCES object_state (zoid, tid)
+    ) ENGINE = InnoDB;
+    CREATE INDEX current_object_tid ON current_object (tid);
+
+    -- A list of referenced OIDs from each object_state.
+    -- This table is populated as needed during packing.
+    -- To prevent unnecessary table locking, it does not use
+    -- foreign keys, which is safe because rows in object_state
+    -- are never modified once committed, and rows are removed
+    -- from object_state only by packing.
+    CREATE TABLE object_ref (
+        zoid        BIGINT NOT NULL,
+        tid         BIGINT NOT NULL,
+        to_zoid     BIGINT NOT NULL,
+        PRIMARY KEY (tid, zoid, to_zoid)
+    ) ENGINE = MyISAM;
+
+    -- The object_refs_added table tracks whether object_refs has
+    -- been populated for all states in a given transaction.
+    -- An entry is added only when the work is finished.
+    -- To prevent unnecessary table locking, it does not use
+    -- foreign keys, which is safe because object states
+    -- are never added to a transaction once committed, and
+    -- rows are removed from the transaction table only by
+    -- packing.
+    CREATE TABLE object_refs_added (
+        tid         BIGINT NOT NULL PRIMARY KEY
+    ) ENGINE = MyISAM;
+
+    -- Temporary state during packing:
+    -- The list of objects to pack.  If keep is false,
+    -- the object and all its revisions will be removed.
+    -- If keep is true, instead of removing the object,
+    -- the pack operation will cut the object's history.
+    -- The keep_tid field specifies the oldest revision
+    -- of the object to keep.
+    -- The visited flag is set when pre_pack is visiting an object's
+    -- references, and remains set.
+    CREATE TABLE pack_object (
+        zoid        BIGINT NOT NULL PRIMARY KEY,
+        keep        BOOLEAN NOT NULL,
+        keep_tid    BIGINT NOT NULL,
+        visited     BOOLEAN NOT NULL DEFAULT FALSE
+    ) ENGINE = MyISAM;
+    CREATE INDEX pack_object_keep_zoid ON pack_object (keep, zoid);
+
+    -- Temporary state during packing: the list of object states to pack.
+    CREATE TABLE pack_state (
+        tid         BIGINT NOT NULL,
+        zoid        BIGINT NOT NULL,
+        PRIMARY KEY (tid, zoid)
+    ) ENGINE = MyISAM;
+
+    -- Temporary state during packing: the list of transactions that
+    -- have at least one object state to pack.
+    CREATE TABLE pack_state_tid (
+        tid         BIGINT NOT NULL PRIMARY KEY
+    ) ENGINE = MyISAM;
+    """
+
+    def __init__(self, connmanager, runner):
+        self.connmanager = connmanager
+        self.runner = runner
+
+    def create(self, cursor):
+        """Create the database tables."""
+        self.runner.run_script(cursor, self.script)
+
+    def prepare(self):
+        """Create the database schema if it does not already exist."""
+        def callback(conn, cursor):
+            cursor.execute("SHOW TABLES LIKE 'object_state'")
+            if not cursor.rowcount:
+                self.create(cursor)
+        self.connmanager.open_and_call(callback)
+
+    def zap_all(self):
+        """Clear all data out of the database."""
+        def callback(conn, cursor):
+            stmt = """
+            DELETE FROM object_refs_added;
+            DELETE FROM object_ref;
+            DELETE FROM current_object;
+            DELETE FROM object_state;
+            TRUNCATE new_oid;
+            DELETE FROM transaction;
+            -- Create a transaction to represent object creation.
+            INSERT INTO transaction (tid, username, description) VALUES
+                (0, 'system', 'special transaction for object creation');
+            """
+            self.runner.run_script(cursor, stmt)
+        self.connmanager.open_and_call(callback)
+
+    def drop_all(self):
+        """Drop all tables and sequences."""
+        def callback(conn, cursor):
+            for tablename in ('pack_state_tid', 'pack_state',
+                    'pack_object', 'object_refs_added', 'object_ref',
+                    'current_object', 'object_state', 'new_oid',
+                    'transaction'):
+                cursor.execute("DROP TABLE IF EXISTS %s" % tablename)
+        self.connmanager.open_and_call(callback)
+
+
+class HistoryPreservingOracleSchema(object):
+    implements(ISchemaInstaller)
+
+    script = """
+    CREATE TABLE commit_lock (dummy CHAR);
+
+    -- The list of all transactions in the database
+    CREATE TABLE transaction (
+        tid         NUMBER(20) NOT NULL PRIMARY KEY,
+        packed      CHAR DEFAULT 'N' CHECK (packed IN ('N', 'Y')),
+        empty       CHAR DEFAULT 'N' CHECK (empty IN ('N', 'Y')),
+        username    RAW(500),
+        description RAW(2000),
+        extension   RAW(2000)
+    );
+
+    -- Create a special transaction to represent object creation.  This
+    -- row is often referenced by object_state.prev_tid, but never by
+    -- object_state.tid.
+    INSERT INTO transaction (tid, username, description)
+        VALUES (0,
+        UTL_I18N.STRING_TO_RAW('system', 'US7ASCII'),
+        UTL_I18N.STRING_TO_RAW(
+            'special transaction for object creation', 'US7ASCII'));
+
+    CREATE SEQUENCE zoid_seq;
+
+    -- All object states in all transactions.
+    -- md5 and state can be null to represent object uncreation.
+    CREATE TABLE object_state (
+        zoid        NUMBER(20) NOT NULL,
+        tid         NUMBER(20) NOT NULL REFERENCES transaction
+                    CHECK (tid > 0),
+        PRIMARY KEY (zoid, tid),
+        prev_tid    NUMBER(20) NOT NULL REFERENCES transaction,
+        md5         CHAR(32),
+        state       BLOB
+    );
+    CREATE INDEX object_state_tid ON object_state (tid);
+    CREATE INDEX object_state_prev_tid ON object_state (prev_tid);
+
+    -- Pointers to the current object state
+    CREATE TABLE current_object (
+        zoid        NUMBER(20) NOT NULL PRIMARY KEY,
+        tid         NUMBER(20) NOT NULL,
+        FOREIGN KEY (zoid, tid) REFERENCES object_state
+    );
+    CREATE INDEX current_object_tid ON current_object (tid);
+
+    -- States that will soon be stored
+    CREATE GLOBAL TEMPORARY TABLE temp_store (
+        zoid        NUMBER(20) NOT NULL PRIMARY KEY,
+        prev_tid    NUMBER(20) NOT NULL,
+        md5         CHAR(32),
+        state       BLOB
+    ) ON COMMIT DELETE ROWS;
+
+    -- During packing, an exclusive lock is held on pack_lock.
+    CREATE TABLE pack_lock (dummy CHAR);
+
+    -- A list of referenced OIDs from each object_state.
+    -- This table is populated as needed during packing.
+    -- To prevent unnecessary table locking, it does not use
+    -- foreign keys, which is safe because rows in object_state
+    -- are never modified once committed, and rows are removed
+    -- from object_state only by packing.
+    CREATE TABLE object_ref (
+        zoid        NUMBER(20) NOT NULL,
+        tid         NUMBER(20) NOT NULL,
+        to_zoid     NUMBER(20) NOT NULL,
+        PRIMARY KEY (tid, zoid, to_zoid)
+    );
+
+    -- The object_refs_added table tracks whether object_refs has
+    -- been populated for all states in a given transaction.
+    -- An entry is added only when the work is finished.
+    -- To prevent unnecessary table locking, it does not use
+    -- foreign keys, which is safe because object states
+    -- are never added to a transaction once committed, and
+    -- rows are removed from the transaction table only by
+    -- packing.
+    CREATE TABLE object_refs_added (
+        tid         NUMBER(20) NOT NULL PRIMARY KEY
+    );
+
+    -- Temporary state during packing:
+    -- The list of objects to pack.  If keep is 'N',
+    -- the object and all its revisions will be removed.
+    -- If keep is 'Y', instead of removing the object,
+    -- the pack operation will cut the object's history.
+    -- The keep_tid field specifies the oldest revision
+    -- of the object to keep.
+    -- The visited flag is set when pre_pack is visiting an object's
+    -- references, and remains set.
+    CREATE TABLE pack_object (
+        zoid        NUMBER(20) NOT NULL PRIMARY KEY,
+        keep        CHAR NOT NULL CHECK (keep IN ('N', 'Y')),
+        keep_tid    NUMBER(20) NOT NULL,
+        visited     CHAR DEFAULT 'N' NOT NULL CHECK (visited IN ('N', 'Y'))
+    );
+    CREATE INDEX pack_object_keep_zoid ON pack_object (keep, zoid);
+
+    -- Temporary state during packing: the list of object states to pack.
+    CREATE TABLE pack_state (
+        tid         NUMBER(20) NOT NULL,
+        zoid        NUMBER(20) NOT NULL,
+        PRIMARY KEY (tid, zoid)
+    );
+
+    -- Temporary state during packing: the list of transactions that
+    -- have at least one object state to pack.
+    CREATE TABLE pack_state_tid (
+        tid         NUMBER(20) NOT NULL PRIMARY KEY
+    );
+
+    -- Temporary state during packing: a list of objects
+    -- whose references need to be examined.
+    CREATE GLOBAL TEMPORARY TABLE temp_pack_visit (
+        zoid        NUMBER(20) NOT NULL PRIMARY KEY,
+        keep_tid    NUMBER(20)
+    );
+
+    -- Temporary state during undo: a list of objects
+    -- to be undone and the tid of the undone state.
+    CREATE GLOBAL TEMPORARY TABLE temp_undo (
+        zoid        NUMBER(20) NOT NULL PRIMARY KEY,
+        prev_tid    NUMBER(20) NOT NULL
+    );
+    """
+
+    def __init__(self, connmanager, runner):
+        self.connmanager = connmanager
+        self.runner = runner
+
+    def create(self, cursor):
+        """Create the database tables."""
+        self.runner.run_script(cursor, self.script)
+        # Let Oracle catch up with the new data definitions by sleeping.
+        # This reduces the likelihood of spurious ORA-01466 errors.
+        time.sleep(5)
+
+    def prepare(self):
+        """Create the database schema if it does not already exist."""
+        def callback(conn, cursor):
+            cursor.execute("""
+            SELECT 1 FROM USER_TABLES WHERE TABLE_NAME = 'OBJECT_STATE'
+            """)
+            if not cursor.fetchall():
+                self.create(cursor)
+        self.connmanager.open_and_call(callback)
+
+    def zap_all(self):
+        """Clear all data out of the database."""
+        def callback(conn, cursor):
+            stmt = """
+            DELETE FROM object_refs_added;
+            DELETE FROM object_ref;
+            DELETE FROM current_object;
+            DELETE FROM object_state;
+            DELETE FROM transaction;
+            -- Create a transaction to represent object creation.
+            INSERT INTO transaction (tid, username, description) VALUES
+                (0, UTL_I18N.STRING_TO_RAW('system', 'US7ASCII'),
+                UTL_I18N.STRING_TO_RAW(
+                'special transaction for object creation', 'US7ASCII'));
+            DROP SEQUENCE zoid_seq;
+            CREATE SEQUENCE zoid_seq;
+            """
+            self.runner.run_script(cursor, stmt)
+        self.connmanager.open_and_call(callback)
+
+    def drop_all(self):
+        """Drop all tables and sequences."""
+        def callback(conn, cursor):
+            for tablename in ('pack_state_tid', 'pack_state',
+                    'pack_object', 'object_refs_added', 'object_ref',
+                    'current_object', 'object_state', 'transaction',
+                    'commit_lock', 'pack_lock',
+                    'temp_store', 'temp_undo', 'temp_pack_visit'):
+                cursor.execute("DROP TABLE %s" % tablename)
+            cursor.execute("DROP SEQUENCE zoid_seq")
+        self.connmanager.open_and_call(callback)

Added: relstorage/trunk/relstorage/adapters/scriptrunner.py
===================================================================
--- relstorage/trunk/relstorage/adapters/scriptrunner.py	                        (rev 0)
+++ relstorage/trunk/relstorage/adapters/scriptrunner.py	2009-09-23 21:12:58 UTC (rev 104464)
@@ -0,0 +1,161 @@
+##############################################################################
+#
+# Copyright (c) 2009 Zope Foundation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+
+from relstorage.adapters.interfaces import IScriptRunner
+from zope.interface import implements
+import logging
+import re
+
+log = logging.getLogger(__name__)
+
+
+class ScriptRunner(object):
+    implements(IScriptRunner)
+
+    # script_vars contains replacements for parts of scripts.
+    # These are correct for PostgreSQL and MySQL but not for Oracle.
+    script_vars = {
+        'TRUE':         'TRUE',
+        'FALSE':        'FALSE',
+        'OCTET_LENGTH': 'OCTET_LENGTH',
+        'TRUNCATE':     'TRUNCATE',
+        'oid':          '%(oid)s',
+        'tid':          '%(tid)s',
+        'pack_tid':     '%(pack_tid)s',
+        'undo_tid':     '%(undo_tid)s',
+        'self_tid':     '%(self_tid)s',
+        'min_tid':      '%(min_tid)s',
+        'max_tid':      '%(max_tid)s',
+    }
+
+    def run_script_stmt(self, cursor, generic_stmt, generic_params=()):
+        """Execute a statement from a script with the given parameters.
+
+        params should be either an empty tuple (no parameters) or
+        a map.
+
+        The input statement is generic and needs to be transformed
+        into a database-specific statement.
+        """
+        stmt = generic_stmt % self.script_vars
+        try:
+            cursor.execute(stmt, generic_params)
+        except:
+            log.warning("script statement failed: %r; parameters: %r",
+                stmt, generic_params)
+            raise
+
+    def run_script(self, cursor, script, params=()):
+        """Execute a series of statements in the database.
+
+        params should be either an empty tuple (no parameters) or
+        a map.
+
+        The statements are transformed by run_script_stmt
+        before execution.
+        """
+        lines = []
+        for line in script.split('\n'):
+            line = line.strip()
+            if not line or line.startswith('--'):
+                continue
+            if line.endswith(';'):
+                line = line[:-1]
+                lines.append(line)
+                stmt = '\n'.join(lines)
+                self.run_script_stmt(cursor, stmt, params)
+                lines = []
+            else:
+                lines.append(line)
+        if lines:
+            stmt = '\n'.join(lines)
+            self.run_script_stmt(cursor, stmt, params)
+
+    def run_many(self, cursor, stmt, items):
+        """Execute a statement repeatedly.  Items should be a list of tuples.
+
+        stmt should use '%s' parameter format.
+        """
+        cursor.executemany(stmt, items)
+
+
+class OracleScriptRunner(ScriptRunner):
+
+    script_vars = {
+        'TRUE':         "'Y'",
+        'FALSE':        "'N'",
+        'OCTET_LENGTH': 'LENGTH',
+        'TRUNCATE':     'TRUNCATE TABLE',
+        'oid':          ':oid',
+        'tid':          ':tid',
+        'pack_tid':     ':pack_tid',
+        'undo_tid':     ':undo_tid',
+        'self_tid':     ':self_tid',
+        'min_tid':      ':min_tid',
+        'max_tid':      ':max_tid',
+    }
+
+    def run_script_stmt(self, cursor, generic_stmt, generic_params=()):
+        """Execute a statement from a script with the given parameters.
+
+        params should be either an empty tuple (no parameters) or
+        a map.
+        """
+        if generic_params:
+            # Oracle raises ORA-01036 if the parameter map contains extra keys,
+            # so filter out any unused parameters.
+            tracker = TrackingMap(self.script_vars)
+            stmt = generic_stmt % tracker
+            used = tracker.used
+            params = {}
+            for k, v in generic_params.iteritems():
+                if k in used:
+                    params[k] = v
+        else:
+            stmt = generic_stmt % self.script_vars
+            params = ()
+
+        try:
+            cursor.execute(stmt, params)
+        except:
+            log.warning("script statement failed: %r; parameters: %r",
+                stmt, params)
+            raise
+
+    def run_many(self, cursor, stmt, items):
+        """Execute a statement repeatedly.  Items should be a list of tuples.
+
+        stmt should use '%s' parameter format.
+        """
+        # replace '%s' with ':n'
+        matches = []
+        def replace(match):
+            matches.append(None)
+            return ':%d' % len(matches)
+        stmt = intern(re.sub('%s', replace, stmt))
+
+        cursor.executemany(stmt, items)
+
+
+class TrackingMap:
+    """Provides values for keys while tracking which keys are accessed."""
+
+    def __init__(self, source):
+        self.source = source
+        self.used = set()
+
+    def __getitem__(self, key):
+        self.used.add(key)
+        return self.source[key]
+

Added: relstorage/trunk/relstorage/adapters/stats.py
===================================================================
--- relstorage/trunk/relstorage/adapters/stats.py	                        (rev 0)
+++ relstorage/trunk/relstorage/adapters/stats.py	2009-09-23 21:12:58 UTC (rev 104464)
@@ -0,0 +1,93 @@
+##############################################################################
+#
+# Copyright (c) 2009 Zope Foundation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""Stats implementations
+"""
+
+class PostgreSQLStats(object):
+
+    def __init__(self, connmanager):
+        self.connmanager = connmanager
+
+    def get_object_count(self):
+        """Returns the number of objects in the database"""
+        # do later
+        return 0
+
+    def get_db_size(self):
+        """Returns the approximate size of the database in bytes"""
+        def callback(conn, cursor):
+            cursor.execute("SELECT pg_database_size(current_database())")
+            return cursor.fetchone()[0]
+        return self.connmanager.open_and_call(callback)
+
+
+class MySQLStats(object):
+
+    def __init__(self, connmanager):
+        self.connmanager = connmanager
+
+    def get_object_count(self):
+        """Returns the number of objects in the database"""
+        # do later
+        return 0
+
+    def get_db_size(self):
+        """Returns the approximate size of the database in bytes"""
+        conn, cursor = self.connmanager.open()
+        try:
+            cursor.execute("SHOW TABLE STATUS")
+            description = [i[0] for i in cursor.description]
+            rows = list(cursor)
+        finally:
+            self.connmanager.close(conn, cursor)
+        data_column = description.index('Data_length')
+        index_column = description.index('Index_length')
+        return sum([row[data_column] + row[index_column] for row in rows], 0)
+
+
+class OracleStats(object):
+
+    def __init__(self, connmanager):
+        self.connmanager = connmanager
+
+    def get_object_count(self):
+        """Returns the number of objects in the database"""
+        # The tests expect an exact number, but the code below generates
+        # an estimate, so this is disabled for now.
+        if True:
+            return 0
+        else:
+            conn, cursor = self.connmanager.open(
+                self.connmanager.isolation_read_only)
+            try:
+                stmt = """
+                SELECT NUM_ROWS
+                FROM USER_TABLES
+                WHERE TABLE_NAME = 'CURRENT_OBJECT'
+                """
+                cursor.execute(stmt)
+                res = cursor.fetchone()[0]
+                if res is None:
+                    res = 0
+                else:
+                    res = int(res)
+                return res
+            finally:
+                self.connmanager.close(conn, cursor)
+
+    def get_db_size(self):
+        """Returns the approximate size of the database in bytes"""
+        # May not be possible without access to the dba_* objects
+        return 0
+

Added: relstorage/trunk/relstorage/adapters/txncontrol.py
===================================================================
--- relstorage/trunk/relstorage/adapters/txncontrol.py	                        (rev 0)
+++ relstorage/trunk/relstorage/adapters/txncontrol.py	2009-09-23 21:12:58 UTC (rev 104464)
@@ -0,0 +1,182 @@
+##############################################################################
+#
+# Copyright (c) 2009 Zope Foundation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""TransactionControl implementations"""
+
+from base64 import encodestring
+from relstorage.adapters.interfaces import ITransactionControl
+from zope.interface import implements
+import logging
+import re
+import time
+
+log = logging.getLogger(__name__)
+
+
+class TransactionControl(object):
+    """Abstract base class"""
+
+    def commit_phase1(self, conn, cursor, tid):
+        """Begin a commit.  Returns the transaction name.
+
+        The transaction name must not be None.
+
+        This method should guarantee that commit_phase2() will succeed,
+        meaning that if commit_phase2() would raise any error, the error
+        should be raised in commit_phase1() instead.
+        """
+        return '-'
+
+    def commit_phase2(self, conn, cursor, txn):
+        """Final transaction commit.
+
+        txn is the name returned by commit_phase1.
+        """
+        conn.commit()
+
+    def abort(self, conn, cursor, txn=None):
+        """Abort the commit.  If txn is not None, phase 1 is also aborted."""
+        conn.rollback()
+
+
+class PostgreSQLTransactionControl(TransactionControl):
+    implements(ITransactionControl)
+
+    def get_tid_and_time(self, cursor):
+        """Returns the most recent tid and the current database time.
+
+        The database time is the number of seconds since the epoch.
+        """
+        cursor.execute("""
+        SELECT tid, EXTRACT(EPOCH FROM CURRENT_TIMESTAMP)
+        FROM transaction
+        ORDER BY tid DESC
+        LIMIT 1
+        """)
+        assert cursor.rowcount == 1
+        return cursor.fetchone()
+
+    def add_transaction(self, cursor, tid, username, description, extension,
+            packed=False):
+        """Add a transaction."""
+        stmt = """
+        INSERT INTO transaction
+            (tid, packed, username, description, extension)
+        VALUES (%s, %s,
+            decode(%s, 'base64'), decode(%s, 'base64'), decode(%s, 'base64'))
+        """
+        cursor.execute(stmt, (tid, packed,
+            encodestring(username), encodestring(description),
+            encodestring(extension)))
+
+
+class MySQLTransactionControl(TransactionControl):
+    implements(ITransactionControl)
+
+    def __init__(self, Binary):
+        self.Binary = Binary
+
+    def get_tid_and_time(self, cursor):
+        """Returns the most recent tid and the current database time.
+
+        The database time is the number of seconds since the epoch.
+        """
+        # Lock in share mode to ensure the data being read is up to date.
+        cursor.execute("""
+        SELECT tid, UNIX_TIMESTAMP()
+        FROM transaction
+        ORDER BY tid DESC
+        LIMIT 1
+        LOCK IN SHARE MODE
+        """)
+        assert cursor.rowcount == 1
+        tid, timestamp = cursor.fetchone()
+        # MySQL does not provide timestamps with more than one second
+        # precision.  To provide more precision, if the system time is
+        # within one minute of the MySQL time, use the system time instead.
+        now = time.time()
+        if abs(now - timestamp) <= 60.0:
+            timestamp = now
+        return tid, timestamp
+
+    def add_transaction(self, cursor, tid, username, description, extension,
+            packed=False):
+        """Add a transaction."""
+        stmt = """
+        INSERT INTO transaction
+            (tid, packed, username, description, extension)
+        VALUES (%s, %s, %s, %s, %s)
+        """
+        cursor.execute(stmt, (
+            tid, packed, self.Binary(username),
+            self.Binary(description), self.Binary(extension)))
+
+
+class OracleTransactionControl(TransactionControl):
+    implements(ITransactionControl)
+
+    def __init__(self, Binary, twophase=False):
+        self.Binary = Binary
+        self.twophase = twophase
+
+    def commit_phase1(self, conn, cursor, tid):
+        """Begin a commit.  Returns the transaction name.
+
+        The transaction name must not be None.
+
+        This method should guarantee that commit_phase2() will succeed,
+        meaning that if commit_phase2() would raise any error, the error
+        should be raised in commit_phase1() instead.
+        """
+        if self.twophase:
+            conn.prepare()
+        return '-'
+
+    def _parse_dsinterval(self, s):
+        """Convert an Oracle dsinterval (as a string) to a float."""
+        mo = re.match(r'([+-]\d+) (\d+):(\d+):([0-9.]+)', s)
+        if not mo:
+            raise ValueError(s)
+        day, hour, min, sec = [float(v) for v in mo.groups()]
+        return day * 86400 + hour * 3600 + min * 60 + sec
+
+    def get_tid_and_time(self, cursor):
+        """Returns the most recent tid and the current database time.
+
+        The database time is the number of seconds since the epoch.
+        """
+        cursor.execute("""
+        SELECT MAX(tid), TO_CHAR(TO_DSINTERVAL(SYSTIMESTAMP - TO_TIMESTAMP_TZ(
+            '1970-01-01 00:00:00 +00:00','YYYY-MM-DD HH24:MI:SS TZH:TZM')))
+        FROM transaction
+        """)
+        tid, now = cursor.fetchone()
+        return tid, self._parse_dsinterval(now)
+
+    def add_transaction(self, cursor, tid, username, description, extension,
+            packed=False):
+        """Add a transaction."""
+        stmt = """
+        INSERT INTO transaction
+            (tid, packed, username, description, extension)
+        VALUES (:1, :2, :3, :4, :5)
+        """
+        max_desc_len = 2000
+        if len(description) > max_desc_len:
+            log.warning('Trimming description of transaction %s '
+                'to %d characters', tid, max_desc_len)
+            description = description[:max_desc_len]
+        cursor.execute(stmt, (
+            tid, packed and 'Y' or 'N', self.Binary(username),
+            self.Binary(description), self.Binary(extension)))
+

Modified: relstorage/trunk/relstorage/relstorage.py
===================================================================
--- relstorage/trunk/relstorage/relstorage.py	2009-09-23 18:16:28 UTC (rev 104463)
+++ relstorage/trunk/relstorage/relstorage.py	2009-09-23 21:12:58 UTC (rev 104464)
@@ -554,7 +554,7 @@
                 # get the commit lock and add the transaction now
                 cursor = self._store_cursor
                 packed = (status == 'p')
-                adapter.start_commit(cursor)
+                adapter.hold_commit_lock(cursor, ensure_current=True)
                 tid_int = u64(tid)
                 try:
                     adapter.add_transaction(
@@ -581,7 +581,7 @@
 
         adapter = self._adapter
         cursor = self._store_cursor
-        adapter.start_commit(cursor)
+        adapter.hold_commit_lock(cursor, ensure_current=True)
         user, desc, ext = self._ude
 
         # Choose a transaction ID.
@@ -669,6 +669,7 @@
 
         cursor = self._store_cursor
         assert cursor is not None
+        conn = self._store_conn
 
         if self._max_stored_oid > self._max_new_oid:
             self._adapter.set_min_oid(cursor, self._max_stored_oid + 1)
@@ -678,7 +679,8 @@
 
         serials = self._finish_store()
         self._adapter.update_current(cursor, tid_int)
-        self._prepared_txn = self._adapter.commit_phase1(cursor, tid_int)
+        self._prepared_txn = self._adapter.commit_phase1(
+            conn, cursor, tid_int)
 
         if self._txn_blobs:
             # We now have a transaction ID, so rename all the blobs
@@ -718,7 +720,9 @@
             self._rollback_load_connection()
             txn = self._prepared_txn
             assert txn is not None
-            self._adapter.commit_phase2(self._store_cursor, txn)
+            self._adapter.commit_phase2(
+                self._store_conn, self._store_cursor, txn)
+            self._adapter.release_commit_lock(self._store_cursor)
             cache = self._cache_client
             if cache is not None:
                 if cache.incr('commit_count') is None:
@@ -740,7 +744,9 @@
         try:
             self._rollback_load_connection()
             if self._store_cursor is not None:
-                self._adapter.abort(self._store_cursor, self._prepared_txn)
+                self._adapter.abort(
+                    self._store_conn, self._store_cursor, self._prepared_txn)
+                self._adapter.release_commit_lock(self._store_cursor)
             if self._txn_blobs:
                 for oid, filename in self._txn_blobs.iteritems():
                     if os.path.exists(filename):



More information about the checkins mailing list