[Checkins] SVN: relstorage/trunk/ Added a MySQL adapter. All tests pass (tested with MySQL 5.0.54 and MySQLdb 1.2.2).

Shane Hathaway shane at hathawaymix.org
Wed Feb 6 03:19:59 EST 2008


Log message for revision 83572:
  Added a MySQL adapter.  All tests pass (tested with MySQL 5.0.54 and MySQLdb 1.2.2).
  In fact, the whole test suite finishes in 15 seconds.  Wow.
  

Changed:
  U   relstorage/trunk/CHANGELOG.txt
  A   relstorage/trunk/relstorage/adapters/mysql.py
  U   relstorage/trunk/relstorage/adapters/oracle.py
  U   relstorage/trunk/relstorage/adapters/postgresql.py
  U   relstorage/trunk/relstorage/relstorage.py
  U   relstorage/trunk/relstorage/tests/speedtest.py
  A   relstorage/trunk/relstorage/tests/testmysql.py

-=-
Modified: relstorage/trunk/CHANGELOG.txt
===================================================================
--- relstorage/trunk/CHANGELOG.txt	2008-02-06 07:51:40 UTC (rev 83571)
+++ relstorage/trunk/CHANGELOG.txt	2008-02-06 08:19:58 UTC (rev 83572)
@@ -32,9 +32,9 @@
 - Removed the code in the Oracle adapter for retrying connection attempts.
   (It is better to just reconfigure Oracle.)
 
+- Added support for MySQL.  (Version 5.0 is probably the minimum.)
 
 
-
 PGStorage 0.4
 
 - Began using the PostgreSQL LISTEN and NOTIFY statements as a shortcut

Added: relstorage/trunk/relstorage/adapters/mysql.py
===================================================================
--- relstorage/trunk/relstorage/adapters/mysql.py	                        (rev 0)
+++ relstorage/trunk/relstorage/adapters/mysql.py	2008-02-06 08:19:58 UTC (rev 83572)
@@ -0,0 +1,1044 @@
+##############################################################################
+#
+# Copyright (c) 2008 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""MySQL adapter for RelStorage.
+
+Connection parameters supported by MySQLdb:
+
+host
+    string, host to connect
+user
+    string, user to connect as
+passwd
+    string, password to use
+db
+    string, database to use
+port
+    integer, TCP/IP port to connect to
+unix_socket
+    string, location of unix_socket (UNIX-ish only)
+conv
+    mapping, maps MySQL FIELD_TYPE.* to Python functions which convert a
+    string to the appropriate Python type
+connect_timeout
+    number of seconds to wait before the connection attempt fails.
+compress
+    if set, gzip compression is enabled
+named_pipe
+    if set, connect to server via named pipe (Windows only)
+init_command
+    command which is run once the connection is created
+read_default_file
+    see the MySQL documentation for mysql_options()
+read_default_group
+    see the MySQL documentation for mysql_options()
+client_flag
+    client flags from MySQLdb.constants.CLIENT
+load_infile
+    int, non-zero enables LOAD LOCAL INFILE, zero disables
+"""
+
+import logging
+import MySQLdb
+import time
+from ZODB.POSException import ConflictError, StorageError, UndoError
+
+log = logging.getLogger("relstorage.mysql")
+
+commit_lock_timeout = 30
+
+
+class MySQLAdapter(object):
+    """MySQL adapter for RelStorage."""
+
+    def __init__(self, **params):
+        self._params = params
+
+    def create_schema(self, cursor):
+        """Create the database tables."""
+        stmt = """
+        -- The list of all transactions in the database
+        CREATE TABLE transaction (
+            tid         BIGINT NOT NULL PRIMARY KEY,
+            packed      BOOLEAN NOT NULL DEFAULT FALSE,
+            username    VARCHAR(255) NOT NULL,
+            description TEXT NOT NULL,
+            extension   BLOB
+        ) ENGINE = InnoDB;
+
+        -- Create a special transaction to represent object creation.  This
+        -- row is often referenced by object_state.prev_tid, but never by
+        -- object_state.tid.
+        INSERT INTO transaction (tid, username, description)
+            VALUES (0, 'system', 'special transaction for object creation');
+
+        -- All OIDs allocated in the database.  Note that this table
+        -- is purposely non-transactional.
+        CREATE TABLE new_oid (
+            zoid        BIGINT NOT NULL PRIMARY KEY AUTO_INCREMENT
+        ) ENGINE = MyISAM;
+
+        -- All object states in all transactions.  Note that md5 and state
+        -- can be null to represent object uncreation.
+        CREATE TABLE object_state (
+            zoid        BIGINT NOT NULL,
+            tid         BIGINT NOT NULL REFERENCES transaction,
+            PRIMARY KEY (zoid, tid),
+            prev_tid    BIGINT NOT NULL REFERENCES transaction,
+            md5         CHAR(32),
+            state       LONGBLOB,
+            CHECK (tid > 0)
+        ) ENGINE = InnoDB;
+        CREATE INDEX object_state_tid ON object_state (tid);
+
+        -- Pointers to the current object state
+        CREATE TABLE current_object (
+            zoid        BIGINT NOT NULL PRIMARY KEY,
+            tid         BIGINT NOT NULL,
+            FOREIGN KEY (zoid, tid) REFERENCES object_state (zoid, tid)
+        ) ENGINE = InnoDB;
+
+        -- A list of referenced OIDs from each object_state.
+        -- This table is populated as needed during packing.
+        -- To prevent unnecessary table locking, it does not use
+        -- foreign keys, which is safe because rows in object_state
+        -- are never modified once committed, and rows are removed
+        -- from object_state only by packing.
+        CREATE TABLE object_ref (
+            zoid        BIGINT NOT NULL,
+            tid         BIGINT NOT NULL,
+            to_zoid     BIGINT NOT NULL
+        ) ENGINE = MyISAM;
+        CREATE INDEX object_ref_from ON object_ref (zoid);
+        CREATE INDEX object_ref_tid ON object_ref (tid);
+        CREATE INDEX object_ref_to ON object_ref (to_zoid);
+
+        -- The object_refs_added table tracks whether object_refs has
+        -- been populated for all states in a given transaction.
+        -- An entry is added only when the work is finished.
+        -- To prevent unnecessary table locking, it does not use
+        -- foreign keys, which is safe because object states
+        -- are never added to a transaction once committed, and
+        -- rows are removed from the transaction table only by
+        -- packing.
+        CREATE TABLE object_refs_added (
+            tid         BIGINT NOT NULL PRIMARY KEY
+        ) ENGINE = MyISAM;
+
+        -- Temporary state during packing:
+        -- The list of objects to pack.  If keep is 'N',
+        -- the object and all its revisions will be removed.
+        -- If keep is 'Y', instead of removing the object,
+        -- the pack operation will cut the object's history.
+        -- If keep is 'Y' then the keep_tid field must also be set.
+        -- The keep_tid field specifies which revision to keep within
+        -- the list of packable transactions.
+        CREATE TABLE pack_object (
+            zoid        BIGINT NOT NULL PRIMARY KEY,
+            keep        BOOLEAN NOT NULL,
+            keep_tid    BIGINT
+        ) ENGINE = MyISAM;
+        CREATE INDEX pack_object_keep_zoid ON pack_object (keep, zoid);
+        """
+        self._run_script(cursor, stmt)
+
+
+    def _run_script(self, cursor, script, params=()):
+        """Execute a series of statements in the database."""
+        lines = []
+        for line in script.split('\n'):
+            line = line.strip()
+            if not line or line.startswith('--'):
+                continue
+            if line.endswith(';'):
+                line = line[:-1]
+                lines.append(line)
+                stmt = '\n'.join(lines)
+                try:
+                    cursor.execute(stmt, params)
+                except:
+                    log.warning("script statement failed: %s", stmt)
+                    raise
+                lines = []
+            else:
+                lines.append(line)
+        if lines:
+            try:
+                stmt = '\n'.join(lines)
+                cursor.execute(stmt, params)
+            except:
+                log.warning("script statement failed: %s", stmt)
+                raise
+
+
+    def prepare_schema(self):
+        """Create the database schema if it does not already exist."""
+        conn, cursor = self.open()
+        try:
+            try:
+                cursor.execute("SHOW TABLES LIKE 'object_state'")
+                if not cursor.rowcount:
+                    self.create_schema(cursor)
+            except:
+                conn.rollback()
+                raise
+            else:
+                conn.commit()
+        finally:
+            self.close(conn, cursor)
+
+
+    def zap(self):
+        """Clear all data out of the database.
+
+        Used by the test suite.
+        """
+        conn, cursor = self.open()
+        try:
+            try:
+                stmt = """
+                DELETE FROM object_refs_added;
+                DELETE FROM object_ref;
+                DELETE FROM current_object;
+                DELETE FROM object_state;
+                TRUNCATE new_oid;
+                DELETE FROM transaction;
+                -- Create a transaction to represent object creation.
+                INSERT INTO transaction (tid, username, description) VALUES
+                    (0, 'system', 'special transaction for object creation');
+                """
+                self._run_script(cursor, stmt)
+            except:
+                conn.rollback()
+                raise
+            else:
+                conn.commit()
+        finally:
+            self.close(conn, cursor)
+
+
+    def open(self, transaction_mode="ISOLATION LEVEL READ COMMITTED"):
+        """Open a database connection and return (conn, cursor)."""
+        try:
+            conn = MySQLdb.connect(**self._params)
+            cursor = conn.cursor()
+            cursor.arraysize = 64
+            if transaction_mode:
+                conn.autocommit(True)
+                cursor.execute("SET SESSION TRANSACTION %s" % transaction_mode)
+                conn.autocommit(False)
+            return conn, cursor
+        except MySQLdb.OperationalError:
+            log.debug("Unable to connect in %s", repr(self))
+            raise
+
+    def close(self, conn, cursor):
+        """Close a connection and cursor, ignoring certain errors.
+        """
+        for obj in (cursor, conn):
+            if obj is not None:
+                try:
+                    obj.close()
+                except (MySQLdb.InterfaceError,
+                        MySQLdb.OperationalError):
+                    pass
+
+    def open_for_load(self):
+        """Open and initialize a connection for loading objects.
+
+        Returns (conn, cursor).
+        """
+        return self.open("ISOLATION LEVEL REPEATABLE READ")
+
+    def restart_load(self, cursor):
+        """After a rollback, reinitialize a connection for loading objects."""
+        # No re-init necessary
+        pass
+
+    def get_object_count(self):
+        """Returns the number of objects in the database"""
+        # do later
+        return 0
+
+    def get_db_size(self):
+        """Returns the approximate size of the database in bytes"""
+        # do later
+        return 0
+
+    def load_current(self, cursor, oid):
+        """Returns the current pickle and integer tid for an object.
+
+        oid is an integer.  Returns (None, None) if object does not exist.
+        """
+        cursor.execute("""
+        SELECT state, tid
+        FROM current_object
+            JOIN object_state USING(zoid, tid)
+        WHERE zoid = %s
+        """, (oid,))
+        if cursor.rowcount:
+            assert cursor.rowcount == 1
+            return cursor.fetchone()
+        else:
+            return None, None
+
+    def load_revision(self, cursor, oid, tid):
+        """Returns the pickle for an object on a particular transaction.
+
+        Returns None if no such state exists.
+        """
+        cursor.execute("""
+        SELECT state
+        FROM object_state
+        WHERE zoid = %s
+            AND tid = %s
+        """, (oid, tid))
+        if cursor.rowcount:
+            assert cursor.rowcount == 1
+            (state,) = cursor.fetchone()
+            return state
+        return None
+
+    def exists(self, cursor, oid):
+        """Returns a true value if the given object exists."""
+        cursor.execute("SELECT 1 FROM current_object WHERE zoid = %s", (oid,))
+        return cursor.rowcount
+
+    def load_before(self, cursor, oid, tid):
+        """Returns the pickle and tid of an object before transaction tid.
+
+        Returns (None, None) if no earlier state exists.
+        """
+        cursor.execute("""
+        SELECT state, tid
+        FROM object_state
+        WHERE zoid = %s
+            AND tid < %s
+        ORDER BY tid DESC
+        LIMIT 1
+        """, (oid, tid))
+        if cursor.rowcount:
+            assert cursor.rowcount == 1
+            return cursor.fetchone()
+        else:
+            return None, None
+
+    def get_object_tid_after(self, cursor, oid, tid):
+        """Returns the tid of the next change after an object revision.
+
+        Returns None if no later state exists.
+        """
+        stmt = """
+        SELECT tid
+        FROM object_state
+        WHERE zoid = %s
+            AND tid > %s
+        ORDER BY tid
+        LIMIT 1
+        """
+        cursor.execute(stmt, (oid, tid))
+        if cursor.rowcount:
+            assert cursor.rowcount == 1
+            return cursor.fetchone()[0]
+        else:
+            return None
+
+    def _make_temp_table(self, cursor):
+        """Create the temporary table for storing objects"""
+        stmt = """
+        CREATE TEMPORARY TABLE temp_store (
+            zoid        BIGINT NOT NULL PRIMARY KEY,
+            prev_tid    BIGINT NOT NULL,
+            md5         CHAR(32),
+            state       LONGBLOB
+        ) ENGINE MyISAM
+        """
+        cursor.execute(stmt)
+
+    def open_for_store(self):
+        """Open and initialize a connection for storing objects.
+
+        Returns (conn, cursor).
+        """
+        conn, cursor = self.open()
+        try:
+            self._make_temp_table(cursor)
+            return conn, cursor
+        except:
+            self.close(conn, cursor)
+            raise
+
+    def restart_store(self, cursor):
+        """Reuse a store connection."""
+        try:
+            cursor.connection.rollback()
+            cursor.execute("TRUNCATE temp_store")
+        except (MySQLdb.OperationalError, MySQLdb.InterfaceError):
+            raise StorageError("database disconnected")
+
+    def store_temp(self, cursor, oid, prev_tid, md5sum, data):
+        """Store an object in the temporary table."""
+        stmt = """
+        INSERT INTO temp_store (zoid, prev_tid, md5, state)
+        VALUES (%s, %s, %s, %s)
+        """
+        cursor.execute(stmt, (oid, prev_tid, md5sum, MySQLdb.Binary(data)))
+
+    def replace_temp(self, cursor, oid, prev_tid, md5sum, data):
+        """Replace an object in the temporary table."""
+        stmt = """
+        UPDATE temp_store SET
+            prev_tid = %s,
+            md5 = %s,
+            state = %s
+        WHERE zoid = %s
+        """
+        cursor.execute(stmt, (prev_tid, md5sum, MySQLdb.Binary(data), oid))
+
+    def start_commit(self, cursor):
+        """Prepare to commit."""
+        # Hold commit_lock to prevent concurrent commits.
+        cursor.execute("SELECT GET_LOCK('relstorage.commit', %s)",
+            (commit_lock_timeout,))
+        locked = cursor.fetchone()[0]
+        if not locked:
+            raise StorageError("Unable to acquire commit lock")
+
+    def get_tid_and_time(self, cursor):
+        """Returns the most recent tid and the current database time.
+
+        The database time is the number of seconds since the epoch.
+        """
+        # Lock in share mode to ensure the data being read is up to date.
+        cursor.execute("""
+        SELECT tid, UNIX_TIMESTAMP()
+        FROM transaction
+        ORDER BY tid DESC
+        LIMIT 1
+        LOCK IN SHARE MODE
+        """)
+        assert cursor.rowcount == 1
+        tid, timestamp = cursor.fetchone()
+        # MySQL does not provide timestamps with more than one second
+        # precision.  To provide more precision, if the system time is
+        # within one minute of the MySQL time, use the system time instead.
+        now = time.time()
+        if abs(now - timestamp) <= 60.0:
+            timestamp = now
+        return tid, timestamp
+
+    def add_transaction(self, cursor, tid, username, description, extension):
+        """Add a transaction.
+
+        Raises ConflictError if the given tid has already been used.
+        """
+        stmt = """
+        INSERT INTO transaction
+            (tid, username, description, extension)
+        VALUES (%s, %s, %s, %s)
+        """
+        cursor.execute(stmt, (
+            tid, username, description, MySQLdb.Binary(extension)))
+
+    def detect_conflict(self, cursor):
+        """Find one conflict in the data about to be committed.
+
+        If there is a conflict, returns (oid, prev_tid, attempted_prev_tid,
+        attempted_data).  If there is no conflict, returns None.
+        """
+        # Lock in share mode to ensure the data being read is up to date.
+        stmt = """
+        SELECT temp_store.zoid, current_object.tid, temp_store.prev_tid,
+            temp_store.state
+        FROM temp_store
+            JOIN current_object ON (temp_store.zoid = current_object.zoid)
+        WHERE temp_store.prev_tid != current_object.tid
+        LIMIT 1
+        LOCK IN SHARE MODE
+        """
+        cursor.execute(stmt)
+        if cursor.rowcount:
+            return cursor.fetchone()
+        return None
+
+    def move_from_temp(self, cursor, tid):
+        """Moved the temporarily stored objects to permanent storage.
+
+        Returns the list of oids stored.
+        """
+        stmt = """
+        INSERT INTO object_state (zoid, tid, prev_tid, md5, state)
+        SELECT zoid, %s, prev_tid, md5, state
+        FROM temp_store
+        """
+        cursor.execute(stmt, (tid,))
+
+        stmt = """
+        SELECT zoid FROM temp_store
+        """
+        cursor.execute(stmt)
+        return [oid for (oid,) in cursor]
+
+    def update_current(self, cursor, tid):
+        """Update the current object pointers.
+
+        tid is the integer tid of the transaction being committed.
+        """
+        cursor.execute("""
+        REPLACE INTO current_object (zoid, tid)
+        SELECT zoid, tid FROM object_state
+        WHERE tid = %s
+        """, (tid,))
+
+    def commit_phase1(self, cursor, tid):
+        """Begin a commit.  Returns the transaction name.
+
+        This method should guarantee that commit_phase2() will succeed,
+        meaning that if commit_phase2() would raise any error, the error
+        should be raised in commit_phase1() instead.
+        """
+        return '-'
+
+    def commit_phase2(self, cursor, txn):
+        """Final transaction commit."""
+        cursor.connection.commit()
+        cursor.execute("SELECT RELEASE_LOCK('relstorage.commit')")
+
+    def abort(self, cursor, txn=None):
+        """Abort the commit.  If txn is not None, phase 1 is also aborted."""
+        cursor.connection.rollback()
+        cursor.execute("SELECT RELEASE_LOCK('relstorage.commit')")
+
+    def new_oid(self, cursor):
+        """Return a new, unused OID."""
+        stmt = "INSERT INTO new_oid VALUES ()"
+        cursor.execute(stmt)
+        oid = cursor.connection.insert_id()
+        if oid % 100 == 0:
+            # Clean out previously generated OIDs.
+            stmt = "DELETE FROM new_oid WHERE zoid < %s"
+            cursor.execute(stmt, (oid,))
+        return oid
+
+
+    def iter_transactions(self, cursor):
+        """Iterate over the transaction log.
+
+        Yields (tid, username, description, extension) for each transaction.
+        """
+        stmt = """
+        SELECT tid, username, description, extension
+        FROM transaction
+        WHERE packed = FALSE
+            AND tid != 0
+        ORDER BY tid DESC
+        """
+        cursor.execute(stmt)
+        return iter(cursor)
+
+
+    def iter_object_history(self, cursor, oid):
+        """Iterate over an object's history.
+
+        Raises KeyError if the object does not exist.
+        Yields (tid, username, description, extension, pickle_size)
+        for each modification.
+        """
+        stmt = """
+        SELECT 1 FROM current_object WHERE zoid = %s
+        """
+        cursor.execute(stmt, (oid,))
+        if not cursor.rowcount:
+            raise KeyError(oid)
+
+        stmt = """
+        SELECT tid, username, description, extension, OCTET_LENGTH(state)
+        FROM transaction
+            JOIN object_state USING (tid)
+        WHERE zoid = %s
+            AND packed = FALSE
+        ORDER BY tid DESC
+        """
+        cursor.execute(stmt, (oid,))
+        return iter(cursor)
+
+
+    def hold_pack_lock(self, cursor):
+        """Try to acquire the pack lock.
+
+        Raise an exception if packing or undo is already in progress.
+        """
+        stmt = "SELECT GET_LOCK('relstorage.pack', 0)"
+        cursor.execute(stmt)
+        res = cursor.fetchone()[0]
+        if not res:
+            raise StorageError('A pack or undo operation is in progress')
+
+    def release_pack_lock(self, cursor):
+        """Release the pack lock."""
+        stmt = "SELECT RELEASE_LOCK('relstorage.pack')"
+        cursor.execute(stmt)
+
+
+    def verify_undoable(self, cursor, undo_tid):
+        """Raise UndoError if it is not safe to undo the specified txn."""
+        stmt = """
+        SELECT 1 FROM transaction WHERE tid = %s AND packed = FALSE
+        """
+        cursor.execute(stmt, (undo_tid,))
+        if not cursor.rowcount:
+            raise UndoError("Transaction not found or packed")
+
+        # Rule: we can undo an object if the object's state in the
+        # transaction to undo matches the object's current state.
+        # If any object in the transaction does not fit that rule,
+        # refuse to undo.
+        stmt = """
+        SELECT prev_os.zoid, current_object.tid
+        FROM object_state prev_os
+            JOIN object_state cur_os ON (prev_os.zoid = cur_os.zoid)
+            JOIN current_object ON (cur_os.zoid = current_object.zoid
+                AND cur_os.tid = current_object.tid)
+        WHERE prev_os.tid = %s
+            AND cur_os.md5 != prev_os.md5
+        LIMIT 1
+        """
+        cursor.execute(stmt, (undo_tid,))
+        if cursor.rowcount:
+            raise UndoError(
+                "Some data were modified by a later transaction")
+
+        # Rule: don't allow the creation of the root object to
+        # be undone.  It's hard to get it back.
+        stmt = """
+        SELECT 1
+        FROM object_state
+        WHERE tid = %s
+            AND zoid = 0
+            AND prev_tid = 0
+        """
+        cursor.execute(stmt, (undo_tid,))
+        if cursor.rowcount:
+            raise UndoError("Can't undo the creation of the root object")
+
+
+    def undo(self, cursor, undo_tid, self_tid):
+        """Undo a transaction.
+
+        Parameters: "undo_tid", the integer tid of the transaction to undo,
+        and "self_tid", the integer tid of the current transaction.
+
+        Returns the list of OIDs undone.
+        """
+        stmt = """
+        CREATE TEMPORARY TABLE temp_undo_state (
+            zoid BIGINT NOT NULL PRIMARY KEY,
+            md5 CHAR(32),
+            state LONGBLOB
+        );
+
+        -- Put the state to revert to in temp_undo_state.
+        -- Some of the states can be null, indicating object uncreation.
+        INSERT INTO temp_undo_state
+        SELECT undoing.zoid, prev.md5, prev.state
+        FROM object_state undoing
+            LEFT JOIN object_state prev
+                ON (prev.zoid = undoing.zoid
+                    AND prev.tid = undoing.prev_tid)
+        WHERE undoing.tid = %(undo_tid)s;
+
+        -- Update records produced by earlier undo operations
+        -- within this transaction.  Change the state, but not
+        -- prev_tid, since prev_tid is still correct.
+        UPDATE object_state
+        JOIN temp_undo_state USING (zoid)
+        SET object_state.md5 = temp_undo_state.md5,
+            object_state.state = temp_undo_state.state
+        WHERE tid = %(self_tid)s;
+
+        -- Add new undo records.
+        INSERT INTO object_state (zoid, tid, prev_tid, md5, state)
+        SELECT zoid, %(self_tid)s, tid, md5, state
+        FROM temp_undo_state
+            JOIN current_object USING (zoid)
+        WHERE zoid NOT IN (
+            SELECT zoid FROM object_state WHERE tid = %(self_tid)s);
+
+        DROP TABLE temp_undo_state;
+        """
+        self._run_script(cursor, stmt,
+            {'undo_tid': undo_tid, 'self_tid': self_tid})
+
+        # List the changed OIDs.
+        stmt = "SELECT zoid FROM object_state WHERE tid = %s"
+        cursor.execute(stmt, (undo_tid,))
+        return [oid_int for (oid_int,) in cursor]
+
+
+    def choose_pack_transaction(self, pack_point):
+        """Return the transaction before or at the specified pack time.
+
+        Returns None if there is nothing to pack.
+        """
+        conn, cursor = self.open()
+        try:
+            stmt = """
+            SELECT tid
+            FROM transaction
+            WHERE tid > 0 AND tid <= %s
+                AND packed = FALSE
+            ORDER BY tid DESC
+            LIMIT 1
+            """
+            cursor.execute(stmt, (pack_point,))
+            if not cursor.rowcount:
+                # Nothing needs to be packed.
+                return None
+            assert cursor.rowcount == 1
+            return cursor.fetchone()[0]
+        finally:
+            self.close(conn, cursor)
+
+
+    def pre_pack(self, pack_tid, get_references):
+        """Decide what to pack.
+
+        tid specifies the most recent transaction to pack.
+
+        get_references is a function that accepts a pickled state and
+        returns a set of OIDs that state refers to.
+        """
+        conn, cursor = self.open(transaction_mode=None)
+        try:
+            # This phase of packing works best with transactions
+            # disabled.  It changes no user-facing data.
+            conn.autocommit(True)
+            self._pre_pack_cursor(cursor, pack_tid, get_references)
+        finally:
+            self.close(conn, cursor)
+
+
+    def _pre_pack_cursor(self, cursor, pack_tid, get_references):
+        """pre_pack implementation.
+        """
+        # Fill object_ref with references from object states
+        # in transactions that will not be packed.
+        self._fill_nonpacked_refs(cursor, pack_tid, get_references)
+
+        # Ensure the temporary pack_object table is clear.
+        cursor.execute("TRUNCATE pack_object")
+
+        args = {'pack_tid': pack_tid}
+
+        # Fill the pack_object table with OIDs that either will be
+        # removed (if nothing references the OID) or whose history will
+        # be cut.
+        stmt = """
+        INSERT INTO pack_object (zoid, keep)
+        SELECT DISTINCT zoid, false
+        FROM object_state
+        WHERE tid <= %(pack_tid)s
+        """
+        cursor.execute(stmt, args)
+
+        # If the root object is in pack_object, keep it.
+        stmt = """
+        UPDATE pack_object SET keep = true
+        WHERE zoid = 0
+        """
+        cursor.execute(stmt)
+
+        # Keep objects that have been revised since pack_tid.
+        stmt = """
+        UPDATE pack_object SET keep = true
+        WHERE keep = false
+            AND zoid IN (
+                SELECT zoid
+                FROM current_object
+                WHERE tid > %(pack_tid)s
+            )
+        """
+        cursor.execute(stmt, args)
+
+        # Keep objects that are still referenced by object states in
+        # transactions that will not be packed.
+        stmt = """
+        UPDATE pack_object SET keep = true
+        WHERE keep = false
+            AND zoid IN (
+                SELECT to_zoid
+                FROM object_ref
+                WHERE tid > %(pack_tid)s
+            )
+        """
+        cursor.execute(stmt, args)
+
+        # Each of the packable objects to be kept might
+        # refer to other objects.  If some of those references
+        # include objects currently set to be removed, keep
+        # those objects as well.  Do this
+        # repeatedly until all references have been satisfied.
+        while True:
+
+            # Set keep_tid for all pack_object rows with keep = 'Y'.
+            # This must be done before _fill_pack_object_refs examines
+            # references.
+            stmt = """
+            UPDATE pack_object SET keep_tid = (
+                    SELECT tid
+                    FROM object_state
+                    WHERE zoid = pack_object.zoid
+                        AND tid > 0
+                        AND tid <= %(pack_tid)s
+                    ORDER BY tid DESC
+                    LIMIT 1
+                )
+            WHERE keep = true AND keep_tid IS NULL
+            """
+            cursor.execute(stmt, args)
+
+            self._fill_pack_object_refs(cursor, get_references)
+
+            stmt = """
+            UPDATE pack_object SET keep = true
+            WHERE keep = false
+                AND zoid IN (
+                    SELECT to_zoid FROM (
+                        SELECT DISTINCT to_zoid
+                        FROM object_ref
+                            JOIN pack_object parent ON (
+                                object_ref.zoid = parent.zoid)
+                        WHERE parent.keep = true
+                    ) AS all_references
+                )
+            """
+            cursor.execute(stmt)
+            if not cursor.rowcount:
+                # No new references detected.
+                break
+
+
+    def _fill_nonpacked_refs(self, cursor, pack_tid, get_references):
+        """Fill object_ref for all transactions that will not be packed."""
+        stmt = """
+        SELECT DISTINCT tid
+        FROM object_state
+        WHERE tid > %s
+            AND NOT EXISTS (
+                SELECT 1
+                FROM object_refs_added
+                WHERE tid = object_state.tid
+            )
+        """
+        cursor.execute(stmt, (pack_tid,))
+        for (tid,) in cursor.fetchall():
+            self._add_refs_for_tid(cursor, tid, get_references)
+
+
+    def _fill_pack_object_refs(self, cursor, get_references):
+        """Fill object_ref for all pack_object rows that have keep_tid."""
+        stmt = """
+        SELECT DISTINCT keep_tid
+        FROM pack_object
+        WHERE keep_tid IS NOT NULL
+            AND NOT EXISTS (
+                SELECT 1
+                FROM object_refs_added
+                WHERE tid = keep_tid
+            )
+        """
+        cursor.execute(stmt)
+        for (tid,) in cursor.fetchall():
+            self._add_refs_for_tid(cursor, tid, get_references)
+
+
+    def _add_refs_for_tid(self, cursor, tid, get_references):
+        """Fills object_refs with all states for a transaction.
+        """
+        stmt = """
+        SELECT zoid, state
+        FROM object_state
+        WHERE tid = %s
+        """
+        cursor.execute(stmt, (tid,))
+
+        to_add = []  # [(from_oid, tid, to_oid)]
+        for from_oid, state in cursor:
+            if state:
+                to_oids = get_references(state)
+                for to_oid in to_oids:
+                    to_add.append((from_oid, tid, to_oid))
+
+        if to_add:
+            stmt = """
+            INSERT INTO object_ref (zoid, tid, to_zoid)
+            VALUES (%s, %s, %s)
+            """
+            cursor.executemany(stmt, to_add)
+
+        # The references have been computed for this transaction.
+        stmt = """
+        INSERT INTO object_refs_added (tid)
+        VALUES (%s)
+        """
+        cursor.execute(stmt, (tid,))
+
+
+    def pack(self, pack_tid):
+        """Pack.  Requires populated pack tables."""
+
+        # Read committed mode is sufficient.
+        conn, cursor = self.open()
+        try:
+            # Pause concurrent commits.
+            cursor.execute("SELECT GET_LOCK('relstorage.commit', %s)",
+                (commit_lock_timeout,))
+            locked = cursor.fetchone()[0]
+            if not locked:
+                raise StorageError("Unable to acquire commit lock")
+
+            try:
+
+                for table in ('object_ref', 'current_object', 'object_state'):
+
+                    # Remove objects that are in pack_object and have keep
+                    # set to false.
+                    stmt = """
+                    DELETE FROM %s
+                    WHERE zoid IN (
+                            SELECT zoid
+                            FROM pack_object
+                            WHERE keep = false
+                        )
+                    """ % table
+                    cursor.execute(stmt)
+
+                    if table != 'current_object':
+                        # Cut the history of objects in pack_object that
+                        # have keep set to true.
+                        stmt = """
+                        DELETE FROM %s
+                        WHERE zoid IN (
+                                SELECT zoid
+                                FROM pack_object
+                                WHERE keep = true
+                            )
+                            AND tid < (
+                                SELECT keep_tid
+                                FROM pack_object
+                                WHERE zoid = %s.zoid
+                            )
+                        """ % (table, table)
+                        cursor.execute(stmt)
+
+                stmt = """
+                -- Terminate prev_tid chains
+                UPDATE object_state SET prev_tid = 0
+                WHERE tid <= %(tid)s
+                    AND prev_tid != 0;
+
+                -- For each tid to be removed, delete the corresponding row in
+                -- object_refs_added.
+                DELETE FROM object_refs_added
+                WHERE tid > 0
+                    AND tid <= %(tid)s
+                    AND NOT EXISTS (
+                        SELECT 1
+                        FROM object_state
+                        WHERE tid = object_refs_added.tid
+                    );
+
+                -- Delete transactions no longer used.
+                DELETE FROM transaction
+                WHERE tid > 0
+                    AND tid <= %(tid)s
+                    AND NOT EXISTS (
+                        SELECT 1
+                        FROM object_state
+                        WHERE tid = transaction.tid
+                    );
+
+                -- Mark the remaining packable transactions as packed
+                UPDATE transaction SET packed = true
+                WHERE tid > 0
+                    AND tid <= %(tid)s
+                    AND packed = false
+                """
+                self._run_script(cursor, stmt, {'tid': pack_tid})
+
+                # Clean up
+                cursor.execute("TRUNCATE pack_object")
+
+            except:
+                conn.rollback()
+                raise
+
+            else:
+                conn.commit()
+
+        finally:
+            self.close(conn, cursor)
+
+
+    def poll_invalidations(self, conn, cursor, prev_polled_tid, ignore_tid):
+        """Polls for new transactions.
+
+        conn and cursor must have been created previously by open_for_load().
+        prev_polled_tid is the tid returned at the last poll, or None
+        if this is the first poll.  If ignore_tid is not None, changes
+        committed in that transaction will not be included in the list
+        of changed OIDs.
+
+        Returns (changed_oids, new_polled_tid).  Raises StorageError
+        if the database has disconnected.
+        """
+        try:
+            # find out the tid of the most recent transaction.
+            stmt = "SELECT tid FROM transaction ORDER BY tid DESC LIMIT 1"
+            cursor.execute(stmt)
+            # Expect the transaction table to always have at least one row.
+            assert cursor.rowcount == 1
+            new_polled_tid = cursor.fetchone()[0]
+
+            if prev_polled_tid is None:
+                # This is the first time the connection has polled.
+                return None, new_polled_tid
+
+            if new_polled_tid == prev_polled_tid:
+                # No transactions have been committed since prev_polled_tid.
+                return (), new_polled_tid
+
+            stmt = "SELECT 1 FROM transaction WHERE tid = %s"
+            cursor.execute(stmt, (prev_polled_tid,))
+            if not cursor.rowcount:
+                # Transaction not found; perhaps it has been packed.
+                # The connection cache needs to be cleared.
+                return None, new_polled_tid
+
+            # Get the list of changed OIDs and return it.
+            stmt = """
+            SELECT DISTINCT zoid
+            FROM object_state
+                JOIN transaction USING (tid)
+            WHERE tid > %s
+            """
+            if ignore_tid is not None:
+                stmt += " AND tid != %d" % ignore_tid
+            cursor.execute(stmt, (prev_polled_tid,))
+            oids = [oid for (oid,) in cursor]
+
+            return oids, new_polled_tid
+
+        except (MySQLdb.OperationalError, MySQLdb.InterfaceError):
+            raise StorageError("database disconnected")
+

Modified: relstorage/trunk/relstorage/adapters/oracle.py
===================================================================
--- relstorage/trunk/relstorage/adapters/oracle.py	2008-02-06 07:51:40 UTC (rev 83571)
+++ relstorage/trunk/relstorage/adapters/oracle.py	2008-02-06 08:19:58 UTC (rev 83572)
@@ -414,16 +414,12 @@
         """Prepare to commit."""
         # Hold commit_lock to prevent concurrent commits
         # (for as short a time as possible).
-        # Lock current_object in share mode to ensure conflict
-        # detection has the most current data.
+        # Lock transaction and current_object in share mode to ensure
+        # conflict detection has the most current data.
         cursor.execute("LOCK TABLE commit_lock IN EXCLUSIVE MODE")
+        cursor.execute("LOCK TABLE transaction IN SHARE MODE")
         cursor.execute("LOCK TABLE current_object IN SHARE MODE")
-        cursor.execute("SAVEPOINT start_commit")
 
-    def restart_commit(self, cursor):
-        """Rollback the attempt to commit and start over."""
-        cursor.execute("ROLLBACK TO SAVEPOINT start_commit")
-
     def _parse_dsinterval(self, s):
         """Convert an Oracle dsinterval (as a string) to a float."""
         mo = re.match(r'([+-]\d+) (\d+):(\d+):([0-9.]+)', s)
@@ -604,7 +600,12 @@
         except cx_Oracle.DatabaseError:
             raise StorageError('A pack or undo operation is in progress')
 
+    def release_pack_lock(self, cursor):
+        """Release the pack lock."""
+        # No action needed
+        pass
 
+
     def verify_undoable(self, cursor, undo_tid):
         """Raise UndoError if it is not safe to undo the specified txn."""
         stmt = """
@@ -699,27 +700,31 @@
         return [oid_int for (oid_int,) in cursor]
 
 
-    def choose_pack_transaction(self, cursor, pack_point):
+    def choose_pack_transaction(self, pack_point):
         """Return the transaction before or at the specified pack time.
 
         Returns None if there is nothing to pack.
         """
-        stmt = """
-        SELECT MAX(tid)
-        FROM transaction
-        WHERE tid > 0
-            AND tid <= :1
-            AND packed = 'N'
-        """
-        cursor.execute(stmt, (pack_point,))
-        rows = cursor.fetchall()
-        if not rows:
-            # Nothing needs to be packed.
-            return None
-        return rows[0][0]
+        conn, cursor = self.open()
+        try:
+            stmt = """
+            SELECT MAX(tid)
+            FROM transaction
+            WHERE tid > 0
+                AND tid <= :1
+                AND packed = 'N'
+            """
+            cursor.execute(stmt, (pack_point,))
+            rows = cursor.fetchall()
+            if not rows:
+                # Nothing needs to be packed.
+                return None
+            return rows[0][0]
+        finally:
+            self.close(conn, cursor)
 
 
-    def pre_pack(self, cursor, pack_tid, get_references):
+    def pre_pack(self, pack_tid, get_references):
         """Decide what to pack.
 
         tid specifies the most recent transaction to pack.
@@ -727,8 +732,22 @@
         get_references is a function that accepts a pickled state and
         returns a set of OIDs that state refers to.
         """
-        # Fill object_ref with references from object states
-        # in transactions that will not be packed.
+        conn, cursor = self.open()
+        try:
+            try:
+                self._pre_pack_cursor(cursor, pack_tid, get_references)
+            except:
+                conn.rollback()
+                raise
+            else:
+                conn.commit()
+        finally:
+            self.close(conn, cursor)
+
+
+    def _pre_pack_cursor(self, cursor, pack_tid, get_references):
+        """pre_pack implementation.
+        """
         self._fill_nonpacked_refs(cursor, pack_tid, get_references)
 
         args = {'pack_tid': pack_tid}
@@ -895,6 +914,8 @@
         conn, cursor = self.open()
         try:
             try:
+                # hold the commit lock for a moment to prevent deadlocks.
+                cursor.execute("LOCK TABLE commit_lock IN EXCLUSIVE MODE")
 
                 for table in ('object_ref', 'current_object', 'object_state'):
 

Modified: relstorage/trunk/relstorage/adapters/postgresql.py
===================================================================
--- relstorage/trunk/relstorage/adapters/postgresql.py	2008-02-06 07:51:40 UTC (rev 83571)
+++ relstorage/trunk/relstorage/adapters/postgresql.py	2008-02-06 08:19:58 UTC (rev 83572)
@@ -162,8 +162,8 @@
                 DELETE FROM object_state;
                 DELETE FROM transaction;
                 -- Create a special transaction to represent object creation.
-                INSERT INTO transaction (tid, username, description)
-                    VALUES (0, '', '');
+                INSERT INTO transaction (tid, username, description) VALUES
+                    (0, 'system', 'special transaction for object creation');
                 ALTER SEQUENCE zoid_seq START WITH 1;
                 """)
             except:
@@ -399,18 +399,14 @@
         """Prepare to commit."""
         # Hold commit_lock to prevent concurrent commits
         # (for as short a time as possible).
-        # Lock current_object in share mode to ensure conflict
-        # detection has the most current data.
+        # Lock transaction and current_object in share mode to ensure
+        # conflict detection has the most current data.
         cursor.execute("""
         LOCK TABLE commit_lock IN EXCLUSIVE MODE;
+        LOCK TABLE transaction IN SHARE MODE;
         LOCK TABLE current_object IN SHARE MODE
         """)
-        cursor.execute("SAVEPOINT start_commit")
 
-    def restart_commit(self, cursor):
-        """Rollback the attempt to commit and start again."""
-        cursor.execute("ROLLBACK TO SAVEPOINT start_commit")
-
     def get_tid_and_time(self, cursor):
         """Returns the most recent tid and the current database time.
 
@@ -599,7 +595,12 @@
         except psycopg2.DatabaseError:
             raise StorageError('A pack or undo operation is in progress')
 
+    def release_pack_lock(self, cursor):
+        """Release the pack lock."""
+        # No action needed
+        pass
 
+
     def verify_undoable(self, cursor, undo_tid):
         """Raise UndoError if it is not safe to undo the specified txn."""
         stmt = """
@@ -703,28 +704,32 @@
         return [oid_int for (oid_int,) in cursor]
 
 
-    def choose_pack_transaction(self, cursor, pack_point):
+    def choose_pack_transaction(self, pack_point):
         """Return the transaction before or at the specified pack time.
 
         Returns None if there is nothing to pack.
         """
-        stmt = """
-        SELECT tid
-        FROM transaction
-        WHERE tid > 0 AND tid <= %s
-            AND packed = FALSE
-        ORDER BY tid DESC
-        LIMIT 1
-        """
-        cursor.execute(stmt, (pack_point,))
-        if not cursor.rowcount:
-            # Nothing needs to be packed.
-            return None
-        assert cursor.rowcount == 1
-        return cursor.fetchone()[0]
+        conn, cursor = self.open()
+        try:
+            stmt = """
+            SELECT tid
+            FROM transaction
+            WHERE tid > 0 AND tid <= %s
+                AND packed = FALSE
+            ORDER BY tid DESC
+            LIMIT 1
+            """
+            cursor.execute(stmt, (pack_point,))
+            if not cursor.rowcount:
+                # Nothing needs to be packed.
+                return None
+            assert cursor.rowcount == 1
+            return cursor.fetchone()[0]
+        finally:
+            self.close(conn, cursor)
 
 
-    def pre_pack(self, cursor, pack_tid, get_references):
+    def pre_pack(self, pack_tid, get_references):
         """Decide what to pack.
 
         tid specifies the most recent transaction to pack.
@@ -732,6 +737,22 @@
         get_references is a function that accepts a pickled state and
         returns a set of OIDs that state refers to.
         """
+        conn, cursor = self.open()
+        try:
+            try:
+                self._pre_pack_cursor(cursor, pack_tid, get_references)
+            except:
+                conn.rollback()
+                raise
+            else:
+                conn.commit()
+        finally:
+            self.close(conn, cursor)
+
+
+    def _pre_pack_cursor(self, cursor, pack_tid, get_references):
+        """pre_pack implementation.
+        """
         # Fill object_ref with references from object states
         # in transactions that will not be packed.
         self._fill_nonpacked_refs(cursor, pack_tid, get_references)
@@ -901,6 +922,8 @@
         conn, cursor = self.open()
         try:
             try:
+                # hold the commit lock for a moment to prevent deadlocks.
+                cursor.execute("LOCK TABLE commit_lock IN EXCLUSIVE MODE")
 
                 for table in ('object_ref', 'current_object', 'object_state'):
 

Modified: relstorage/trunk/relstorage/relstorage.py
===================================================================
--- relstorage/trunk/relstorage/relstorage.py	2008-02-06 07:51:40 UTC (rev 83571)
+++ relstorage/trunk/relstorage/relstorage.py	2008-02-06 08:19:58 UTC (rev 83572)
@@ -334,38 +334,20 @@
         adapter.start_commit(cursor)
         user, desc, ext = self._ude
 
-        attempt = 1
-        while True:
-            # get the commit lock
-            try:
-                # Choose a transaction ID.
-                # Base the transaction ID on the database time,
-                # while ensuring that the tid of this transaction
-                # is greater than any existing tid.
-                last_tid, now = adapter.get_tid_and_time(cursor)
-                stamp = TimeStamp(*(time.gmtime(now)[:5] + (now % 60,)))
-                stamp = stamp.laterThan(TimeStamp(p64(last_tid)))
-                tid = repr(stamp)
+        # Choose a transaction ID.
+        # Base the transaction ID on the database time,
+        # while ensuring that the tid of this transaction
+        # is greater than any existing tid.
+        last_tid, now = adapter.get_tid_and_time(cursor)
+        stamp = TimeStamp(*(time.gmtime(now)[:5] + (now % 60,)))
+        stamp = stamp.laterThan(TimeStamp(p64(last_tid)))
+        tid = repr(stamp)
 
-                tid_int = u64(tid)
-                adapter.add_transaction(cursor, tid_int, user, desc, ext)
-                self._tid = tid
-                break
+        tid_int = u64(tid)
+        adapter.add_transaction(cursor, tid_int, user, desc, ext)
+        self._tid = tid
 
-            except POSException.ConflictError:
-                if attempt < 3:
-                    # A concurrent transaction claimed the tid.
-                    # Rollback and try again.
-                    adapter.restart_commit(cursor)
-                    attempt += 1
-                else:
-                    self._drop_store_connection()
-                    raise
-            except:
-                self._drop_store_connection()
-                raise
 
-
     def _clear_temp(self):
         # It is assumed that self._lock_acquire was called before this
         # method was called.
@@ -601,23 +583,31 @@
 
         self._lock_acquire()
         try:
-            self._prepare_tid()
-            self_tid_int = u64(self._tid)
+            adapter = self._adapter
             cursor = self._store_cursor
             assert cursor is not None
-            adapter = self._adapter
 
-            adapter.verify_undoable(cursor, undo_tid_int)
-            oid_ints = adapter.undo(cursor, undo_tid_int, self_tid_int)
-            oids = [p64(oid_int) for oid_int in oid_ints]
+            adapter.hold_pack_lock(cursor)
+            try:
+                # Note that _prepare_tid acquires the commit lock.
+                # The commit lock must be acquired after the pack lock
+                # because the database adapters also acquire in that
+                # order during packing.
+                self._prepare_tid()
+                adapter.verify_undoable(cursor, undo_tid_int)
 
-            # Update the current object pointers immediately, so that
-            # subsequent undo operations within this transaction will see
-            # the new current objects.
-            adapter.update_current(cursor, self_tid_int)
+                self_tid_int = u64(self._tid)
+                oid_ints = adapter.undo(cursor, undo_tid_int, self_tid_int)
+                oids = [p64(oid_int) for oid_int in oid_ints]
 
-            return self._tid, oids
+                # Update the current object pointers immediately, so that
+                # subsequent undo operations within this transaction will see
+                # the new current objects.
+                adapter.update_current(cursor, self_tid_int)
 
+                return self._tid, oids
+            finally:
+                adapter.release_pack_lock(cursor)
         finally:
             self._lock_release()
 
@@ -638,49 +628,30 @@
             return refs
 
         # Use a private connection (lock_conn and lock_cursor) to
-        # hold the pack lock, while using a second private connection
-        # (conn and cursor) to decide exactly what to pack.
-        # Close the second connection.  Then, with the lock still held,
-        # perform the pack in a third connection opened by the adapter.
-        # This structure is designed to maximize the scalability
-        # of packing and minimize conflicts with concurrent writes.
-        # A consequence of this structure is that the adapter must
-        # not choke on transactions that may have been added between
-        # pre_pack and pack.
+        # hold the pack lock.  Have the adapter open temporary
+        # connections to do the actual work, allowing the adapter
+        # to use special transaction modes for packing.
         adapter = self._adapter
         lock_conn, lock_cursor = adapter.open()
         try:
             adapter.hold_pack_lock(lock_cursor)
-
-            conn, cursor = adapter.open()
             try:
-                try:
-                    # Find the latest commit before or at the pack time.
-                    tid_int = adapter.choose_pack_transaction(
-                        cursor, pack_point_int)
-                    if tid_int is None:
-                        # Nothing needs to be packed.
-                        # TODO: log the fact that nothing needs to be packed.
-                        return
+                # Find the latest commit before or at the pack time.
+                tid_int = adapter.choose_pack_transaction(pack_point_int)
+                if tid_int is None:
+                    # Nothing needs to be packed.
+                    return
 
-                    # In pre_pack, the adapter fills tables with information
-                    # about what to pack.  The adapter should not actually
-                    # pack anything yet.
-                    adapter.pre_pack(cursor, tid_int, get_references)
-                except:
-                    conn.rollback()
-                    raise
-                else:
-                    conn.commit()
-            finally:
-                adapter.close(conn, cursor)
+                # In pre_pack, the adapter fills tables with
+                # information about what to pack.  The adapter
+                # should not actually pack anything yet.
+                adapter.pre_pack(tid_int, get_references)
 
-            # Now pack.  The adapter makes its own connection just for the
-            # pack operation, possibly using a special transaction mode
-            # and connection flags.
-            adapter.pack(tid_int)
-            self._after_pack()
-
+                # Now pack.
+                adapter.pack(tid_int)
+                self._after_pack()
+            finally:
+                adapter.release_pack_lock(lock_cursor)
         finally:
             lock_conn.rollback()
             adapter.close(lock_conn, lock_cursor)

Modified: relstorage/trunk/relstorage/tests/speedtest.py
===================================================================
--- relstorage/trunk/relstorage/tests/speedtest.py	2008-02-06 07:51:40 UTC (rev 83571)
+++ relstorage/trunk/relstorage/tests/speedtest.py	2008-02-06 08:19:58 UTC (rev 83572)
@@ -33,17 +33,19 @@
 from ZODB.Connection import Connection
 
 from relstorage.relstorage import RelStorage
+from relstorage.adapters.mysql import MySQLAdapter
 from relstorage.adapters.postgresql import PostgreSQLAdapter
 from relstorage.adapters.oracle import OracleAdapter
 from relstorage.tests.testoracle import getOracleParams
 
 debug = False
 txn_count = 10
-object_counts = [10000]  # [1, 100, 10000]
+object_counts = [100]  # [1, 100, 10000]
 concurrency_levels = range(1, 16, 2)
 contenders = [
     ('ZEO + FileStorage', 'zeofs_test'),
     ('PostgreSQLAdapter', 'postgres_test'),
+    ('MySQLAdapter', 'mysql_test'),
     ('OracleAdapter', 'oracle_test'),
     ]
 repetitions = 3
@@ -203,7 +205,15 @@
             return RelStorage(adapter)
         return self.run_tests(make_storage)
 
+    def mysql_test(self):
+        adapter = MySQLAdapter(db='relstoragetest')
+        adapter.zap()
+        def make_storage():
+            return RelStorage(adapter)
+        return self.run_tests(make_storage)
 
+
+
 def distribute(func, param_iter):
     """Call a function in separate processes concurrently.
 

Added: relstorage/trunk/relstorage/tests/testmysql.py
===================================================================
--- relstorage/trunk/relstorage/tests/testmysql.py	                        (rev 0)
+++ relstorage/trunk/relstorage/tests/testmysql.py	2008-02-06 08:19:58 UTC (rev 83572)
@@ -0,0 +1,35 @@
+##############################################################################
+#
+# Copyright (c) 2008 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""Tests of relstorage.adapters.mysql"""
+
+import logging
+import unittest
+
+from reltestbase import RelStorageTests
+from relstorage.adapters.mysql import MySQLAdapter
+
+
+class MySQLTests(RelStorageTests):
+    def make_adapter(self):
+        return MySQLAdapter(db='relstoragetest')
+
+def test_suite():
+    suite = unittest.TestSuite()
+    suite.addTest(unittest.makeSuite(MySQLTests, "check"))
+    return suite
+
+if __name__=='__main__':
+    logging.basicConfig()
+    unittest.main(defaultTest="test_suite")
+



More information about the Checkins mailing list