[Checkins] SVN: relstorage/trunk/ Added a MySQL adapter. All tests
pass (tested with MySQL 5.0.54 and MySQLdb 1.2.2).
Shane Hathaway
shane at hathawaymix.org
Wed Feb 6 03:19:59 EST 2008
Log message for revision 83572:
Added a MySQL adapter. All tests pass (tested with MySQL 5.0.54 and MySQLdb 1.2.2).
In fact, the whole test suite finishes in 15 seconds. Wow.
Changed:
U relstorage/trunk/CHANGELOG.txt
A relstorage/trunk/relstorage/adapters/mysql.py
U relstorage/trunk/relstorage/adapters/oracle.py
U relstorage/trunk/relstorage/adapters/postgresql.py
U relstorage/trunk/relstorage/relstorage.py
U relstorage/trunk/relstorage/tests/speedtest.py
A relstorage/trunk/relstorage/tests/testmysql.py
-=-
Modified: relstorage/trunk/CHANGELOG.txt
===================================================================
--- relstorage/trunk/CHANGELOG.txt 2008-02-06 07:51:40 UTC (rev 83571)
+++ relstorage/trunk/CHANGELOG.txt 2008-02-06 08:19:58 UTC (rev 83572)
@@ -32,9 +32,9 @@
- Removed the code in the Oracle adapter for retrying connection attempts.
(It is better to just reconfigure Oracle.)
+- Added support for MySQL. (Version 5.0 is probably the minimum.)
-
PGStorage 0.4
- Began using the PostgreSQL LISTEN and NOTIFY statements as a shortcut
Added: relstorage/trunk/relstorage/adapters/mysql.py
===================================================================
--- relstorage/trunk/relstorage/adapters/mysql.py (rev 0)
+++ relstorage/trunk/relstorage/adapters/mysql.py 2008-02-06 08:19:58 UTC (rev 83572)
@@ -0,0 +1,1044 @@
+##############################################################################
+#
+# Copyright (c) 2008 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""MySQL adapter for RelStorage.
+
+Connection parameters supported by MySQLdb:
+
+host
+ string, host to connect
+user
+ string, user to connect as
+passwd
+ string, password to use
+db
+ string, database to use
+port
+ integer, TCP/IP port to connect to
+unix_socket
+ string, location of unix_socket (UNIX-ish only)
+conv
+ mapping, maps MySQL FIELD_TYPE.* to Python functions which convert a
+ string to the appropriate Python type
+connect_timeout
+ number of seconds to wait before the connection attempt fails.
+compress
+ if set, gzip compression is enabled
+named_pipe
+ if set, connect to server via named pipe (Windows only)
+init_command
+ command which is run once the connection is created
+read_default_file
+ see the MySQL documentation for mysql_options()
+read_default_group
+ see the MySQL documentation for mysql_options()
+client_flag
+ client flags from MySQLdb.constants.CLIENT
+load_infile
+ int, non-zero enables LOAD LOCAL INFILE, zero disables
+"""
+
+import logging
+import MySQLdb
+import time
+from ZODB.POSException import ConflictError, StorageError, UndoError
+
+log = logging.getLogger("relstorage.mysql")
+
+commit_lock_timeout = 30
+
+
+class MySQLAdapter(object):
+ """MySQL adapter for RelStorage."""
+
+ def __init__(self, **params):
+ self._params = params
+
+ def create_schema(self, cursor):
+ """Create the database tables."""
+ stmt = """
+ -- The list of all transactions in the database
+ CREATE TABLE transaction (
+ tid BIGINT NOT NULL PRIMARY KEY,
+ packed BOOLEAN NOT NULL DEFAULT FALSE,
+ username VARCHAR(255) NOT NULL,
+ description TEXT NOT NULL,
+ extension BLOB
+ ) ENGINE = InnoDB;
+
+ -- Create a special transaction to represent object creation. This
+ -- row is often referenced by object_state.prev_tid, but never by
+ -- object_state.tid.
+ INSERT INTO transaction (tid, username, description)
+ VALUES (0, 'system', 'special transaction for object creation');
+
+ -- All OIDs allocated in the database. Note that this table
+ -- is purposely non-transactional.
+ CREATE TABLE new_oid (
+ zoid BIGINT NOT NULL PRIMARY KEY AUTO_INCREMENT
+ ) ENGINE = MyISAM;
+
+ -- All object states in all transactions. Note that md5 and state
+ -- can be null to represent object uncreation.
+ CREATE TABLE object_state (
+ zoid BIGINT NOT NULL,
+ tid BIGINT NOT NULL REFERENCES transaction,
+ PRIMARY KEY (zoid, tid),
+ prev_tid BIGINT NOT NULL REFERENCES transaction,
+ md5 CHAR(32),
+ state LONGBLOB,
+ CHECK (tid > 0)
+ ) ENGINE = InnoDB;
+ CREATE INDEX object_state_tid ON object_state (tid);
+
+ -- Pointers to the current object state
+ CREATE TABLE current_object (
+ zoid BIGINT NOT NULL PRIMARY KEY,
+ tid BIGINT NOT NULL,
+ FOREIGN KEY (zoid, tid) REFERENCES object_state (zoid, tid)
+ ) ENGINE = InnoDB;
+
+ -- A list of referenced OIDs from each object_state.
+ -- This table is populated as needed during packing.
+ -- To prevent unnecessary table locking, it does not use
+ -- foreign keys, which is safe because rows in object_state
+ -- are never modified once committed, and rows are removed
+ -- from object_state only by packing.
+ CREATE TABLE object_ref (
+ zoid BIGINT NOT NULL,
+ tid BIGINT NOT NULL,
+ to_zoid BIGINT NOT NULL
+ ) ENGINE = MyISAM;
+ CREATE INDEX object_ref_from ON object_ref (zoid);
+ CREATE INDEX object_ref_tid ON object_ref (tid);
+ CREATE INDEX object_ref_to ON object_ref (to_zoid);
+
+ -- The object_refs_added table tracks whether object_refs has
+ -- been populated for all states in a given transaction.
+ -- An entry is added only when the work is finished.
+ -- To prevent unnecessary table locking, it does not use
+ -- foreign keys, which is safe because object states
+ -- are never added to a transaction once committed, and
+ -- rows are removed from the transaction table only by
+ -- packing.
+ CREATE TABLE object_refs_added (
+ tid BIGINT NOT NULL PRIMARY KEY
+ ) ENGINE = MyISAM;
+
+ -- Temporary state during packing:
+ -- The list of objects to pack. If keep is 'N',
+ -- the object and all its revisions will be removed.
+ -- If keep is 'Y', instead of removing the object,
+ -- the pack operation will cut the object's history.
+ -- If keep is 'Y' then the keep_tid field must also be set.
+ -- The keep_tid field specifies which revision to keep within
+ -- the list of packable transactions.
+ CREATE TABLE pack_object (
+ zoid BIGINT NOT NULL PRIMARY KEY,
+ keep BOOLEAN NOT NULL,
+ keep_tid BIGINT
+ ) ENGINE = MyISAM;
+ CREATE INDEX pack_object_keep_zoid ON pack_object (keep, zoid);
+ """
+ self._run_script(cursor, stmt)
+
+
+ def _run_script(self, cursor, script, params=()):
+ """Execute a series of statements in the database."""
+ lines = []
+ for line in script.split('\n'):
+ line = line.strip()
+ if not line or line.startswith('--'):
+ continue
+ if line.endswith(';'):
+ line = line[:-1]
+ lines.append(line)
+ stmt = '\n'.join(lines)
+ try:
+ cursor.execute(stmt, params)
+ except:
+ log.warning("script statement failed: %s", stmt)
+ raise
+ lines = []
+ else:
+ lines.append(line)
+ if lines:
+ try:
+ stmt = '\n'.join(lines)
+ cursor.execute(stmt, params)
+ except:
+ log.warning("script statement failed: %s", stmt)
+ raise
+
+
+ def prepare_schema(self):
+ """Create the database schema if it does not already exist."""
+ conn, cursor = self.open()
+ try:
+ try:
+ cursor.execute("SHOW TABLES LIKE 'object_state'")
+ if not cursor.rowcount:
+ self.create_schema(cursor)
+ except:
+ conn.rollback()
+ raise
+ else:
+ conn.commit()
+ finally:
+ self.close(conn, cursor)
+
+
+ def zap(self):
+ """Clear all data out of the database.
+
+ Used by the test suite.
+ """
+ conn, cursor = self.open()
+ try:
+ try:
+ stmt = """
+ DELETE FROM object_refs_added;
+ DELETE FROM object_ref;
+ DELETE FROM current_object;
+ DELETE FROM object_state;
+ TRUNCATE new_oid;
+ DELETE FROM transaction;
+ -- Create a transaction to represent object creation.
+ INSERT INTO transaction (tid, username, description) VALUES
+ (0, 'system', 'special transaction for object creation');
+ """
+ self._run_script(cursor, stmt)
+ except:
+ conn.rollback()
+ raise
+ else:
+ conn.commit()
+ finally:
+ self.close(conn, cursor)
+
+
+ def open(self, transaction_mode="ISOLATION LEVEL READ COMMITTED"):
+ """Open a database connection and return (conn, cursor)."""
+ try:
+ conn = MySQLdb.connect(**self._params)
+ cursor = conn.cursor()
+ cursor.arraysize = 64
+ if transaction_mode:
+ conn.autocommit(True)
+ cursor.execute("SET SESSION TRANSACTION %s" % transaction_mode)
+ conn.autocommit(False)
+ return conn, cursor
+ except MySQLdb.OperationalError:
+ log.debug("Unable to connect in %s", repr(self))
+ raise
+
+ def close(self, conn, cursor):
+ """Close a connection and cursor, ignoring certain errors.
+ """
+ for obj in (cursor, conn):
+ if obj is not None:
+ try:
+ obj.close()
+ except (MySQLdb.InterfaceError,
+ MySQLdb.OperationalError):
+ pass
+
+ def open_for_load(self):
+ """Open and initialize a connection for loading objects.
+
+ Returns (conn, cursor).
+ """
+ return self.open("ISOLATION LEVEL REPEATABLE READ")
+
+ def restart_load(self, cursor):
+ """After a rollback, reinitialize a connection for loading objects."""
+ # No re-init necessary
+ pass
+
+ def get_object_count(self):
+ """Returns the number of objects in the database"""
+ # do later
+ return 0
+
+ def get_db_size(self):
+ """Returns the approximate size of the database in bytes"""
+ # do later
+ return 0
+
+ def load_current(self, cursor, oid):
+ """Returns the current pickle and integer tid for an object.
+
+ oid is an integer. Returns (None, None) if object does not exist.
+ """
+ cursor.execute("""
+ SELECT state, tid
+ FROM current_object
+ JOIN object_state USING(zoid, tid)
+ WHERE zoid = %s
+ """, (oid,))
+ if cursor.rowcount:
+ assert cursor.rowcount == 1
+ return cursor.fetchone()
+ else:
+ return None, None
+
+ def load_revision(self, cursor, oid, tid):
+ """Returns the pickle for an object on a particular transaction.
+
+ Returns None if no such state exists.
+ """
+ cursor.execute("""
+ SELECT state
+ FROM object_state
+ WHERE zoid = %s
+ AND tid = %s
+ """, (oid, tid))
+ if cursor.rowcount:
+ assert cursor.rowcount == 1
+ (state,) = cursor.fetchone()
+ return state
+ return None
+
+ def exists(self, cursor, oid):
+ """Returns a true value if the given object exists."""
+ cursor.execute("SELECT 1 FROM current_object WHERE zoid = %s", (oid,))
+ return cursor.rowcount
+
+ def load_before(self, cursor, oid, tid):
+ """Returns the pickle and tid of an object before transaction tid.
+
+ Returns (None, None) if no earlier state exists.
+ """
+ cursor.execute("""
+ SELECT state, tid
+ FROM object_state
+ WHERE zoid = %s
+ AND tid < %s
+ ORDER BY tid DESC
+ LIMIT 1
+ """, (oid, tid))
+ if cursor.rowcount:
+ assert cursor.rowcount == 1
+ return cursor.fetchone()
+ else:
+ return None, None
+
+ def get_object_tid_after(self, cursor, oid, tid):
+ """Returns the tid of the next change after an object revision.
+
+ Returns None if no later state exists.
+ """
+ stmt = """
+ SELECT tid
+ FROM object_state
+ WHERE zoid = %s
+ AND tid > %s
+ ORDER BY tid
+ LIMIT 1
+ """
+ cursor.execute(stmt, (oid, tid))
+ if cursor.rowcount:
+ assert cursor.rowcount == 1
+ return cursor.fetchone()[0]
+ else:
+ return None
+
+ def _make_temp_table(self, cursor):
+ """Create the temporary table for storing objects"""
+ stmt = """
+ CREATE TEMPORARY TABLE temp_store (
+ zoid BIGINT NOT NULL PRIMARY KEY,
+ prev_tid BIGINT NOT NULL,
+ md5 CHAR(32),
+ state LONGBLOB
+ ) ENGINE MyISAM
+ """
+ cursor.execute(stmt)
+
+ def open_for_store(self):
+ """Open and initialize a connection for storing objects.
+
+ Returns (conn, cursor).
+ """
+ conn, cursor = self.open()
+ try:
+ self._make_temp_table(cursor)
+ return conn, cursor
+ except:
+ self.close(conn, cursor)
+ raise
+
+ def restart_store(self, cursor):
+ """Reuse a store connection."""
+ try:
+ cursor.connection.rollback()
+ cursor.execute("TRUNCATE temp_store")
+ except (MySQLdb.OperationalError, MySQLdb.InterfaceError):
+ raise StorageError("database disconnected")
+
+ def store_temp(self, cursor, oid, prev_tid, md5sum, data):
+ """Store an object in the temporary table."""
+ stmt = """
+ INSERT INTO temp_store (zoid, prev_tid, md5, state)
+ VALUES (%s, %s, %s, %s)
+ """
+ cursor.execute(stmt, (oid, prev_tid, md5sum, MySQLdb.Binary(data)))
+
+ def replace_temp(self, cursor, oid, prev_tid, md5sum, data):
+ """Replace an object in the temporary table."""
+ stmt = """
+ UPDATE temp_store SET
+ prev_tid = %s,
+ md5 = %s,
+ state = %s
+ WHERE zoid = %s
+ """
+ cursor.execute(stmt, (prev_tid, md5sum, MySQLdb.Binary(data), oid))
+
+ def start_commit(self, cursor):
+ """Prepare to commit."""
+ # Hold commit_lock to prevent concurrent commits.
+ cursor.execute("SELECT GET_LOCK('relstorage.commit', %s)",
+ (commit_lock_timeout,))
+ locked = cursor.fetchone()[0]
+ if not locked:
+ raise StorageError("Unable to acquire commit lock")
+
+ def get_tid_and_time(self, cursor):
+ """Returns the most recent tid and the current database time.
+
+ The database time is the number of seconds since the epoch.
+ """
+ # Lock in share mode to ensure the data being read is up to date.
+ cursor.execute("""
+ SELECT tid, UNIX_TIMESTAMP()
+ FROM transaction
+ ORDER BY tid DESC
+ LIMIT 1
+ LOCK IN SHARE MODE
+ """)
+ assert cursor.rowcount == 1
+ tid, timestamp = cursor.fetchone()
+ # MySQL does not provide timestamps with more than one second
+ # precision. To provide more precision, if the system time is
+ # within one minute of the MySQL time, use the system time instead.
+ now = time.time()
+ if abs(now - timestamp) <= 60.0:
+ timestamp = now
+ return tid, timestamp
+
+ def add_transaction(self, cursor, tid, username, description, extension):
+ """Add a transaction.
+
+ Raises ConflictError if the given tid has already been used.
+ """
+ stmt = """
+ INSERT INTO transaction
+ (tid, username, description, extension)
+ VALUES (%s, %s, %s, %s)
+ """
+ cursor.execute(stmt, (
+ tid, username, description, MySQLdb.Binary(extension)))
+
+ def detect_conflict(self, cursor):
+ """Find one conflict in the data about to be committed.
+
+ If there is a conflict, returns (oid, prev_tid, attempted_prev_tid,
+ attempted_data). If there is no conflict, returns None.
+ """
+ # Lock in share mode to ensure the data being read is up to date.
+ stmt = """
+ SELECT temp_store.zoid, current_object.tid, temp_store.prev_tid,
+ temp_store.state
+ FROM temp_store
+ JOIN current_object ON (temp_store.zoid = current_object.zoid)
+ WHERE temp_store.prev_tid != current_object.tid
+ LIMIT 1
+ LOCK IN SHARE MODE
+ """
+ cursor.execute(stmt)
+ if cursor.rowcount:
+ return cursor.fetchone()
+ return None
+
+ def move_from_temp(self, cursor, tid):
+ """Moved the temporarily stored objects to permanent storage.
+
+ Returns the list of oids stored.
+ """
+ stmt = """
+ INSERT INTO object_state (zoid, tid, prev_tid, md5, state)
+ SELECT zoid, %s, prev_tid, md5, state
+ FROM temp_store
+ """
+ cursor.execute(stmt, (tid,))
+
+ stmt = """
+ SELECT zoid FROM temp_store
+ """
+ cursor.execute(stmt)
+ return [oid for (oid,) in cursor]
+
+ def update_current(self, cursor, tid):
+ """Update the current object pointers.
+
+ tid is the integer tid of the transaction being committed.
+ """
+ cursor.execute("""
+ REPLACE INTO current_object (zoid, tid)
+ SELECT zoid, tid FROM object_state
+ WHERE tid = %s
+ """, (tid,))
+
+ def commit_phase1(self, cursor, tid):
+ """Begin a commit. Returns the transaction name.
+
+ This method should guarantee that commit_phase2() will succeed,
+ meaning that if commit_phase2() would raise any error, the error
+ should be raised in commit_phase1() instead.
+ """
+ return '-'
+
+ def commit_phase2(self, cursor, txn):
+ """Final transaction commit."""
+ cursor.connection.commit()
+ cursor.execute("SELECT RELEASE_LOCK('relstorage.commit')")
+
+ def abort(self, cursor, txn=None):
+ """Abort the commit. If txn is not None, phase 1 is also aborted."""
+ cursor.connection.rollback()
+ cursor.execute("SELECT RELEASE_LOCK('relstorage.commit')")
+
+ def new_oid(self, cursor):
+ """Return a new, unused OID."""
+ stmt = "INSERT INTO new_oid VALUES ()"
+ cursor.execute(stmt)
+ oid = cursor.connection.insert_id()
+ if oid % 100 == 0:
+ # Clean out previously generated OIDs.
+ stmt = "DELETE FROM new_oid WHERE zoid < %s"
+ cursor.execute(stmt, (oid,))
+ return oid
+
+
+ def iter_transactions(self, cursor):
+ """Iterate over the transaction log.
+
+ Yields (tid, username, description, extension) for each transaction.
+ """
+ stmt = """
+ SELECT tid, username, description, extension
+ FROM transaction
+ WHERE packed = FALSE
+ AND tid != 0
+ ORDER BY tid DESC
+ """
+ cursor.execute(stmt)
+ return iter(cursor)
+
+
+ def iter_object_history(self, cursor, oid):
+ """Iterate over an object's history.
+
+ Raises KeyError if the object does not exist.
+ Yields (tid, username, description, extension, pickle_size)
+ for each modification.
+ """
+ stmt = """
+ SELECT 1 FROM current_object WHERE zoid = %s
+ """
+ cursor.execute(stmt, (oid,))
+ if not cursor.rowcount:
+ raise KeyError(oid)
+
+ stmt = """
+ SELECT tid, username, description, extension, OCTET_LENGTH(state)
+ FROM transaction
+ JOIN object_state USING (tid)
+ WHERE zoid = %s
+ AND packed = FALSE
+ ORDER BY tid DESC
+ """
+ cursor.execute(stmt, (oid,))
+ return iter(cursor)
+
+
+ def hold_pack_lock(self, cursor):
+ """Try to acquire the pack lock.
+
+ Raise an exception if packing or undo is already in progress.
+ """
+ stmt = "SELECT GET_LOCK('relstorage.pack', 0)"
+ cursor.execute(stmt)
+ res = cursor.fetchone()[0]
+ if not res:
+ raise StorageError('A pack or undo operation is in progress')
+
+ def release_pack_lock(self, cursor):
+ """Release the pack lock."""
+ stmt = "SELECT RELEASE_LOCK('relstorage.pack')"
+ cursor.execute(stmt)
+
+
+ def verify_undoable(self, cursor, undo_tid):
+ """Raise UndoError if it is not safe to undo the specified txn."""
+ stmt = """
+ SELECT 1 FROM transaction WHERE tid = %s AND packed = FALSE
+ """
+ cursor.execute(stmt, (undo_tid,))
+ if not cursor.rowcount:
+ raise UndoError("Transaction not found or packed")
+
+ # Rule: we can undo an object if the object's state in the
+ # transaction to undo matches the object's current state.
+ # If any object in the transaction does not fit that rule,
+ # refuse to undo.
+ stmt = """
+ SELECT prev_os.zoid, current_object.tid
+ FROM object_state prev_os
+ JOIN object_state cur_os ON (prev_os.zoid = cur_os.zoid)
+ JOIN current_object ON (cur_os.zoid = current_object.zoid
+ AND cur_os.tid = current_object.tid)
+ WHERE prev_os.tid = %s
+ AND cur_os.md5 != prev_os.md5
+ LIMIT 1
+ """
+ cursor.execute(stmt, (undo_tid,))
+ if cursor.rowcount:
+ raise UndoError(
+ "Some data were modified by a later transaction")
+
+ # Rule: don't allow the creation of the root object to
+ # be undone. It's hard to get it back.
+ stmt = """
+ SELECT 1
+ FROM object_state
+ WHERE tid = %s
+ AND zoid = 0
+ AND prev_tid = 0
+ """
+ cursor.execute(stmt, (undo_tid,))
+ if cursor.rowcount:
+ raise UndoError("Can't undo the creation of the root object")
+
+
+ def undo(self, cursor, undo_tid, self_tid):
+ """Undo a transaction.
+
+ Parameters: "undo_tid", the integer tid of the transaction to undo,
+ and "self_tid", the integer tid of the current transaction.
+
+ Returns the list of OIDs undone.
+ """
+ stmt = """
+ CREATE TEMPORARY TABLE temp_undo_state (
+ zoid BIGINT NOT NULL PRIMARY KEY,
+ md5 CHAR(32),
+ state LONGBLOB
+ );
+
+ -- Put the state to revert to in temp_undo_state.
+ -- Some of the states can be null, indicating object uncreation.
+ INSERT INTO temp_undo_state
+ SELECT undoing.zoid, prev.md5, prev.state
+ FROM object_state undoing
+ LEFT JOIN object_state prev
+ ON (prev.zoid = undoing.zoid
+ AND prev.tid = undoing.prev_tid)
+ WHERE undoing.tid = %(undo_tid)s;
+
+ -- Update records produced by earlier undo operations
+ -- within this transaction. Change the state, but not
+ -- prev_tid, since prev_tid is still correct.
+ UPDATE object_state
+ JOIN temp_undo_state USING (zoid)
+ SET object_state.md5 = temp_undo_state.md5,
+ object_state.state = temp_undo_state.state
+ WHERE tid = %(self_tid)s;
+
+ -- Add new undo records.
+ INSERT INTO object_state (zoid, tid, prev_tid, md5, state)
+ SELECT zoid, %(self_tid)s, tid, md5, state
+ FROM temp_undo_state
+ JOIN current_object USING (zoid)
+ WHERE zoid NOT IN (
+ SELECT zoid FROM object_state WHERE tid = %(self_tid)s);
+
+ DROP TABLE temp_undo_state;
+ """
+ self._run_script(cursor, stmt,
+ {'undo_tid': undo_tid, 'self_tid': self_tid})
+
+ # List the changed OIDs.
+ stmt = "SELECT zoid FROM object_state WHERE tid = %s"
+ cursor.execute(stmt, (undo_tid,))
+ return [oid_int for (oid_int,) in cursor]
+
+
+ def choose_pack_transaction(self, pack_point):
+ """Return the transaction before or at the specified pack time.
+
+ Returns None if there is nothing to pack.
+ """
+ conn, cursor = self.open()
+ try:
+ stmt = """
+ SELECT tid
+ FROM transaction
+ WHERE tid > 0 AND tid <= %s
+ AND packed = FALSE
+ ORDER BY tid DESC
+ LIMIT 1
+ """
+ cursor.execute(stmt, (pack_point,))
+ if not cursor.rowcount:
+ # Nothing needs to be packed.
+ return None
+ assert cursor.rowcount == 1
+ return cursor.fetchone()[0]
+ finally:
+ self.close(conn, cursor)
+
+
+ def pre_pack(self, pack_tid, get_references):
+ """Decide what to pack.
+
+ tid specifies the most recent transaction to pack.
+
+ get_references is a function that accepts a pickled state and
+ returns a set of OIDs that state refers to.
+ """
+ conn, cursor = self.open(transaction_mode=None)
+ try:
+ # This phase of packing works best with transactions
+ # disabled. It changes no user-facing data.
+ conn.autocommit(True)
+ self._pre_pack_cursor(cursor, pack_tid, get_references)
+ finally:
+ self.close(conn, cursor)
+
+
+ def _pre_pack_cursor(self, cursor, pack_tid, get_references):
+ """pre_pack implementation.
+ """
+ # Fill object_ref with references from object states
+ # in transactions that will not be packed.
+ self._fill_nonpacked_refs(cursor, pack_tid, get_references)
+
+ # Ensure the temporary pack_object table is clear.
+ cursor.execute("TRUNCATE pack_object")
+
+ args = {'pack_tid': pack_tid}
+
+ # Fill the pack_object table with OIDs that either will be
+ # removed (if nothing references the OID) or whose history will
+ # be cut.
+ stmt = """
+ INSERT INTO pack_object (zoid, keep)
+ SELECT DISTINCT zoid, false
+ FROM object_state
+ WHERE tid <= %(pack_tid)s
+ """
+ cursor.execute(stmt, args)
+
+ # If the root object is in pack_object, keep it.
+ stmt = """
+ UPDATE pack_object SET keep = true
+ WHERE zoid = 0
+ """
+ cursor.execute(stmt)
+
+ # Keep objects that have been revised since pack_tid.
+ stmt = """
+ UPDATE pack_object SET keep = true
+ WHERE keep = false
+ AND zoid IN (
+ SELECT zoid
+ FROM current_object
+ WHERE tid > %(pack_tid)s
+ )
+ """
+ cursor.execute(stmt, args)
+
+ # Keep objects that are still referenced by object states in
+ # transactions that will not be packed.
+ stmt = """
+ UPDATE pack_object SET keep = true
+ WHERE keep = false
+ AND zoid IN (
+ SELECT to_zoid
+ FROM object_ref
+ WHERE tid > %(pack_tid)s
+ )
+ """
+ cursor.execute(stmt, args)
+
+ # Each of the packable objects to be kept might
+ # refer to other objects. If some of those references
+ # include objects currently set to be removed, keep
+ # those objects as well. Do this
+ # repeatedly until all references have been satisfied.
+ while True:
+
+ # Set keep_tid for all pack_object rows with keep = 'Y'.
+ # This must be done before _fill_pack_object_refs examines
+ # references.
+ stmt = """
+ UPDATE pack_object SET keep_tid = (
+ SELECT tid
+ FROM object_state
+ WHERE zoid = pack_object.zoid
+ AND tid > 0
+ AND tid <= %(pack_tid)s
+ ORDER BY tid DESC
+ LIMIT 1
+ )
+ WHERE keep = true AND keep_tid IS NULL
+ """
+ cursor.execute(stmt, args)
+
+ self._fill_pack_object_refs(cursor, get_references)
+
+ stmt = """
+ UPDATE pack_object SET keep = true
+ WHERE keep = false
+ AND zoid IN (
+ SELECT to_zoid FROM (
+ SELECT DISTINCT to_zoid
+ FROM object_ref
+ JOIN pack_object parent ON (
+ object_ref.zoid = parent.zoid)
+ WHERE parent.keep = true
+ ) AS all_references
+ )
+ """
+ cursor.execute(stmt)
+ if not cursor.rowcount:
+ # No new references detected.
+ break
+
+
+ def _fill_nonpacked_refs(self, cursor, pack_tid, get_references):
+ """Fill object_ref for all transactions that will not be packed."""
+ stmt = """
+ SELECT DISTINCT tid
+ FROM object_state
+ WHERE tid > %s
+ AND NOT EXISTS (
+ SELECT 1
+ FROM object_refs_added
+ WHERE tid = object_state.tid
+ )
+ """
+ cursor.execute(stmt, (pack_tid,))
+ for (tid,) in cursor.fetchall():
+ self._add_refs_for_tid(cursor, tid, get_references)
+
+
+ def _fill_pack_object_refs(self, cursor, get_references):
+ """Fill object_ref for all pack_object rows that have keep_tid."""
+ stmt = """
+ SELECT DISTINCT keep_tid
+ FROM pack_object
+ WHERE keep_tid IS NOT NULL
+ AND NOT EXISTS (
+ SELECT 1
+ FROM object_refs_added
+ WHERE tid = keep_tid
+ )
+ """
+ cursor.execute(stmt)
+ for (tid,) in cursor.fetchall():
+ self._add_refs_for_tid(cursor, tid, get_references)
+
+
+ def _add_refs_for_tid(self, cursor, tid, get_references):
+ """Fills object_refs with all states for a transaction.
+ """
+ stmt = """
+ SELECT zoid, state
+ FROM object_state
+ WHERE tid = %s
+ """
+ cursor.execute(stmt, (tid,))
+
+ to_add = [] # [(from_oid, tid, to_oid)]
+ for from_oid, state in cursor:
+ if state:
+ to_oids = get_references(state)
+ for to_oid in to_oids:
+ to_add.append((from_oid, tid, to_oid))
+
+ if to_add:
+ stmt = """
+ INSERT INTO object_ref (zoid, tid, to_zoid)
+ VALUES (%s, %s, %s)
+ """
+ cursor.executemany(stmt, to_add)
+
+ # The references have been computed for this transaction.
+ stmt = """
+ INSERT INTO object_refs_added (tid)
+ VALUES (%s)
+ """
+ cursor.execute(stmt, (tid,))
+
+
+ def pack(self, pack_tid):
+ """Pack. Requires populated pack tables."""
+
+ # Read committed mode is sufficient.
+ conn, cursor = self.open()
+ try:
+ # Pause concurrent commits.
+ cursor.execute("SELECT GET_LOCK('relstorage.commit', %s)",
+ (commit_lock_timeout,))
+ locked = cursor.fetchone()[0]
+ if not locked:
+ raise StorageError("Unable to acquire commit lock")
+
+ try:
+
+ for table in ('object_ref', 'current_object', 'object_state'):
+
+ # Remove objects that are in pack_object and have keep
+ # set to false.
+ stmt = """
+ DELETE FROM %s
+ WHERE zoid IN (
+ SELECT zoid
+ FROM pack_object
+ WHERE keep = false
+ )
+ """ % table
+ cursor.execute(stmt)
+
+ if table != 'current_object':
+ # Cut the history of objects in pack_object that
+ # have keep set to true.
+ stmt = """
+ DELETE FROM %s
+ WHERE zoid IN (
+ SELECT zoid
+ FROM pack_object
+ WHERE keep = true
+ )
+ AND tid < (
+ SELECT keep_tid
+ FROM pack_object
+ WHERE zoid = %s.zoid
+ )
+ """ % (table, table)
+ cursor.execute(stmt)
+
+ stmt = """
+ -- Terminate prev_tid chains
+ UPDATE object_state SET prev_tid = 0
+ WHERE tid <= %(tid)s
+ AND prev_tid != 0;
+
+ -- For each tid to be removed, delete the corresponding row in
+ -- object_refs_added.
+ DELETE FROM object_refs_added
+ WHERE tid > 0
+ AND tid <= %(tid)s
+ AND NOT EXISTS (
+ SELECT 1
+ FROM object_state
+ WHERE tid = object_refs_added.tid
+ );
+
+ -- Delete transactions no longer used.
+ DELETE FROM transaction
+ WHERE tid > 0
+ AND tid <= %(tid)s
+ AND NOT EXISTS (
+ SELECT 1
+ FROM object_state
+ WHERE tid = transaction.tid
+ );
+
+ -- Mark the remaining packable transactions as packed
+ UPDATE transaction SET packed = true
+ WHERE tid > 0
+ AND tid <= %(tid)s
+ AND packed = false
+ """
+ self._run_script(cursor, stmt, {'tid': pack_tid})
+
+ # Clean up
+ cursor.execute("TRUNCATE pack_object")
+
+ except:
+ conn.rollback()
+ raise
+
+ else:
+ conn.commit()
+
+ finally:
+ self.close(conn, cursor)
+
+
+ def poll_invalidations(self, conn, cursor, prev_polled_tid, ignore_tid):
+ """Polls for new transactions.
+
+ conn and cursor must have been created previously by open_for_load().
+ prev_polled_tid is the tid returned at the last poll, or None
+ if this is the first poll. If ignore_tid is not None, changes
+ committed in that transaction will not be included in the list
+ of changed OIDs.
+
+ Returns (changed_oids, new_polled_tid). Raises StorageError
+ if the database has disconnected.
+ """
+ try:
+ # find out the tid of the most recent transaction.
+ stmt = "SELECT tid FROM transaction ORDER BY tid DESC LIMIT 1"
+ cursor.execute(stmt)
+ # Expect the transaction table to always have at least one row.
+ assert cursor.rowcount == 1
+ new_polled_tid = cursor.fetchone()[0]
+
+ if prev_polled_tid is None:
+ # This is the first time the connection has polled.
+ return None, new_polled_tid
+
+ if new_polled_tid == prev_polled_tid:
+ # No transactions have been committed since prev_polled_tid.
+ return (), new_polled_tid
+
+ stmt = "SELECT 1 FROM transaction WHERE tid = %s"
+ cursor.execute(stmt, (prev_polled_tid,))
+ if not cursor.rowcount:
+ # Transaction not found; perhaps it has been packed.
+ # The connection cache needs to be cleared.
+ return None, new_polled_tid
+
+ # Get the list of changed OIDs and return it.
+ stmt = """
+ SELECT DISTINCT zoid
+ FROM object_state
+ JOIN transaction USING (tid)
+ WHERE tid > %s
+ """
+ if ignore_tid is not None:
+ stmt += " AND tid != %d" % ignore_tid
+ cursor.execute(stmt, (prev_polled_tid,))
+ oids = [oid for (oid,) in cursor]
+
+ return oids, new_polled_tid
+
+ except (MySQLdb.OperationalError, MySQLdb.InterfaceError):
+ raise StorageError("database disconnected")
+
Modified: relstorage/trunk/relstorage/adapters/oracle.py
===================================================================
--- relstorage/trunk/relstorage/adapters/oracle.py 2008-02-06 07:51:40 UTC (rev 83571)
+++ relstorage/trunk/relstorage/adapters/oracle.py 2008-02-06 08:19:58 UTC (rev 83572)
@@ -414,16 +414,12 @@
"""Prepare to commit."""
# Hold commit_lock to prevent concurrent commits
# (for as short a time as possible).
- # Lock current_object in share mode to ensure conflict
- # detection has the most current data.
+ # Lock transaction and current_object in share mode to ensure
+ # conflict detection has the most current data.
cursor.execute("LOCK TABLE commit_lock IN EXCLUSIVE MODE")
+ cursor.execute("LOCK TABLE transaction IN SHARE MODE")
cursor.execute("LOCK TABLE current_object IN SHARE MODE")
- cursor.execute("SAVEPOINT start_commit")
- def restart_commit(self, cursor):
- """Rollback the attempt to commit and start over."""
- cursor.execute("ROLLBACK TO SAVEPOINT start_commit")
-
def _parse_dsinterval(self, s):
"""Convert an Oracle dsinterval (as a string) to a float."""
mo = re.match(r'([+-]\d+) (\d+):(\d+):([0-9.]+)', s)
@@ -604,7 +600,12 @@
except cx_Oracle.DatabaseError:
raise StorageError('A pack or undo operation is in progress')
+ def release_pack_lock(self, cursor):
+ """Release the pack lock."""
+ # No action needed
+ pass
+
def verify_undoable(self, cursor, undo_tid):
"""Raise UndoError if it is not safe to undo the specified txn."""
stmt = """
@@ -699,27 +700,31 @@
return [oid_int for (oid_int,) in cursor]
- def choose_pack_transaction(self, cursor, pack_point):
+ def choose_pack_transaction(self, pack_point):
"""Return the transaction before or at the specified pack time.
Returns None if there is nothing to pack.
"""
- stmt = """
- SELECT MAX(tid)
- FROM transaction
- WHERE tid > 0
- AND tid <= :1
- AND packed = 'N'
- """
- cursor.execute(stmt, (pack_point,))
- rows = cursor.fetchall()
- if not rows:
- # Nothing needs to be packed.
- return None
- return rows[0][0]
+ conn, cursor = self.open()
+ try:
+ stmt = """
+ SELECT MAX(tid)
+ FROM transaction
+ WHERE tid > 0
+ AND tid <= :1
+ AND packed = 'N'
+ """
+ cursor.execute(stmt, (pack_point,))
+ rows = cursor.fetchall()
+ if not rows:
+ # Nothing needs to be packed.
+ return None
+ return rows[0][0]
+ finally:
+ self.close(conn, cursor)
- def pre_pack(self, cursor, pack_tid, get_references):
+ def pre_pack(self, pack_tid, get_references):
"""Decide what to pack.
tid specifies the most recent transaction to pack.
@@ -727,8 +732,22 @@
get_references is a function that accepts a pickled state and
returns a set of OIDs that state refers to.
"""
- # Fill object_ref with references from object states
- # in transactions that will not be packed.
+ conn, cursor = self.open()
+ try:
+ try:
+ self._pre_pack_cursor(cursor, pack_tid, get_references)
+ except:
+ conn.rollback()
+ raise
+ else:
+ conn.commit()
+ finally:
+ self.close(conn, cursor)
+
+
+ def _pre_pack_cursor(self, cursor, pack_tid, get_references):
+ """pre_pack implementation.
+ """
self._fill_nonpacked_refs(cursor, pack_tid, get_references)
args = {'pack_tid': pack_tid}
@@ -895,6 +914,8 @@
conn, cursor = self.open()
try:
try:
+ # hold the commit lock for a moment to prevent deadlocks.
+ cursor.execute("LOCK TABLE commit_lock IN EXCLUSIVE MODE")
for table in ('object_ref', 'current_object', 'object_state'):
Modified: relstorage/trunk/relstorage/adapters/postgresql.py
===================================================================
--- relstorage/trunk/relstorage/adapters/postgresql.py 2008-02-06 07:51:40 UTC (rev 83571)
+++ relstorage/trunk/relstorage/adapters/postgresql.py 2008-02-06 08:19:58 UTC (rev 83572)
@@ -162,8 +162,8 @@
DELETE FROM object_state;
DELETE FROM transaction;
-- Create a special transaction to represent object creation.
- INSERT INTO transaction (tid, username, description)
- VALUES (0, '', '');
+ INSERT INTO transaction (tid, username, description) VALUES
+ (0, 'system', 'special transaction for object creation');
ALTER SEQUENCE zoid_seq START WITH 1;
""")
except:
@@ -399,18 +399,14 @@
"""Prepare to commit."""
# Hold commit_lock to prevent concurrent commits
# (for as short a time as possible).
- # Lock current_object in share mode to ensure conflict
- # detection has the most current data.
+ # Lock transaction and current_object in share mode to ensure
+ # conflict detection has the most current data.
cursor.execute("""
LOCK TABLE commit_lock IN EXCLUSIVE MODE;
+ LOCK TABLE transaction IN SHARE MODE;
LOCK TABLE current_object IN SHARE MODE
""")
- cursor.execute("SAVEPOINT start_commit")
- def restart_commit(self, cursor):
- """Rollback the attempt to commit and start again."""
- cursor.execute("ROLLBACK TO SAVEPOINT start_commit")
-
def get_tid_and_time(self, cursor):
"""Returns the most recent tid and the current database time.
@@ -599,7 +595,12 @@
except psycopg2.DatabaseError:
raise StorageError('A pack or undo operation is in progress')
+ def release_pack_lock(self, cursor):
+ """Release the pack lock."""
+ # No action needed
+ pass
+
def verify_undoable(self, cursor, undo_tid):
"""Raise UndoError if it is not safe to undo the specified txn."""
stmt = """
@@ -703,28 +704,32 @@
return [oid_int for (oid_int,) in cursor]
- def choose_pack_transaction(self, cursor, pack_point):
+ def choose_pack_transaction(self, pack_point):
"""Return the transaction before or at the specified pack time.
Returns None if there is nothing to pack.
"""
- stmt = """
- SELECT tid
- FROM transaction
- WHERE tid > 0 AND tid <= %s
- AND packed = FALSE
- ORDER BY tid DESC
- LIMIT 1
- """
- cursor.execute(stmt, (pack_point,))
- if not cursor.rowcount:
- # Nothing needs to be packed.
- return None
- assert cursor.rowcount == 1
- return cursor.fetchone()[0]
+ conn, cursor = self.open()
+ try:
+ stmt = """
+ SELECT tid
+ FROM transaction
+ WHERE tid > 0 AND tid <= %s
+ AND packed = FALSE
+ ORDER BY tid DESC
+ LIMIT 1
+ """
+ cursor.execute(stmt, (pack_point,))
+ if not cursor.rowcount:
+ # Nothing needs to be packed.
+ return None
+ assert cursor.rowcount == 1
+ return cursor.fetchone()[0]
+ finally:
+ self.close(conn, cursor)
- def pre_pack(self, cursor, pack_tid, get_references):
+ def pre_pack(self, pack_tid, get_references):
"""Decide what to pack.
tid specifies the most recent transaction to pack.
@@ -732,6 +737,22 @@
get_references is a function that accepts a pickled state and
returns a set of OIDs that state refers to.
"""
+ conn, cursor = self.open()
+ try:
+ try:
+ self._pre_pack_cursor(cursor, pack_tid, get_references)
+ except:
+ conn.rollback()
+ raise
+ else:
+ conn.commit()
+ finally:
+ self.close(conn, cursor)
+
+
+ def _pre_pack_cursor(self, cursor, pack_tid, get_references):
+ """pre_pack implementation.
+ """
# Fill object_ref with references from object states
# in transactions that will not be packed.
self._fill_nonpacked_refs(cursor, pack_tid, get_references)
@@ -901,6 +922,8 @@
conn, cursor = self.open()
try:
try:
+ # hold the commit lock for a moment to prevent deadlocks.
+ cursor.execute("LOCK TABLE commit_lock IN EXCLUSIVE MODE")
for table in ('object_ref', 'current_object', 'object_state'):
Modified: relstorage/trunk/relstorage/relstorage.py
===================================================================
--- relstorage/trunk/relstorage/relstorage.py 2008-02-06 07:51:40 UTC (rev 83571)
+++ relstorage/trunk/relstorage/relstorage.py 2008-02-06 08:19:58 UTC (rev 83572)
@@ -334,38 +334,20 @@
adapter.start_commit(cursor)
user, desc, ext = self._ude
- attempt = 1
- while True:
- # get the commit lock
- try:
- # Choose a transaction ID.
- # Base the transaction ID on the database time,
- # while ensuring that the tid of this transaction
- # is greater than any existing tid.
- last_tid, now = adapter.get_tid_and_time(cursor)
- stamp = TimeStamp(*(time.gmtime(now)[:5] + (now % 60,)))
- stamp = stamp.laterThan(TimeStamp(p64(last_tid)))
- tid = repr(stamp)
+ # Choose a transaction ID.
+ # Base the transaction ID on the database time,
+ # while ensuring that the tid of this transaction
+ # is greater than any existing tid.
+ last_tid, now = adapter.get_tid_and_time(cursor)
+ stamp = TimeStamp(*(time.gmtime(now)[:5] + (now % 60,)))
+ stamp = stamp.laterThan(TimeStamp(p64(last_tid)))
+ tid = repr(stamp)
- tid_int = u64(tid)
- adapter.add_transaction(cursor, tid_int, user, desc, ext)
- self._tid = tid
- break
+ tid_int = u64(tid)
+ adapter.add_transaction(cursor, tid_int, user, desc, ext)
+ self._tid = tid
- except POSException.ConflictError:
- if attempt < 3:
- # A concurrent transaction claimed the tid.
- # Rollback and try again.
- adapter.restart_commit(cursor)
- attempt += 1
- else:
- self._drop_store_connection()
- raise
- except:
- self._drop_store_connection()
- raise
-
def _clear_temp(self):
# It is assumed that self._lock_acquire was called before this
# method was called.
@@ -601,23 +583,31 @@
self._lock_acquire()
try:
- self._prepare_tid()
- self_tid_int = u64(self._tid)
+ adapter = self._adapter
cursor = self._store_cursor
assert cursor is not None
- adapter = self._adapter
- adapter.verify_undoable(cursor, undo_tid_int)
- oid_ints = adapter.undo(cursor, undo_tid_int, self_tid_int)
- oids = [p64(oid_int) for oid_int in oid_ints]
+ adapter.hold_pack_lock(cursor)
+ try:
+ # Note that _prepare_tid acquires the commit lock.
+ # The commit lock must be acquired after the pack lock
+ # because the database adapters also acquire in that
+ # order during packing.
+ self._prepare_tid()
+ adapter.verify_undoable(cursor, undo_tid_int)
- # Update the current object pointers immediately, so that
- # subsequent undo operations within this transaction will see
- # the new current objects.
- adapter.update_current(cursor, self_tid_int)
+ self_tid_int = u64(self._tid)
+ oid_ints = adapter.undo(cursor, undo_tid_int, self_tid_int)
+ oids = [p64(oid_int) for oid_int in oid_ints]
- return self._tid, oids
+ # Update the current object pointers immediately, so that
+ # subsequent undo operations within this transaction will see
+ # the new current objects.
+ adapter.update_current(cursor, self_tid_int)
+ return self._tid, oids
+ finally:
+ adapter.release_pack_lock(cursor)
finally:
self._lock_release()
@@ -638,49 +628,30 @@
return refs
# Use a private connection (lock_conn and lock_cursor) to
- # hold the pack lock, while using a second private connection
- # (conn and cursor) to decide exactly what to pack.
- # Close the second connection. Then, with the lock still held,
- # perform the pack in a third connection opened by the adapter.
- # This structure is designed to maximize the scalability
- # of packing and minimize conflicts with concurrent writes.
- # A consequence of this structure is that the adapter must
- # not choke on transactions that may have been added between
- # pre_pack and pack.
+ # hold the pack lock. Have the adapter open temporary
+ # connections to do the actual work, allowing the adapter
+ # to use special transaction modes for packing.
adapter = self._adapter
lock_conn, lock_cursor = adapter.open()
try:
adapter.hold_pack_lock(lock_cursor)
-
- conn, cursor = adapter.open()
try:
- try:
- # Find the latest commit before or at the pack time.
- tid_int = adapter.choose_pack_transaction(
- cursor, pack_point_int)
- if tid_int is None:
- # Nothing needs to be packed.
- # TODO: log the fact that nothing needs to be packed.
- return
+ # Find the latest commit before or at the pack time.
+ tid_int = adapter.choose_pack_transaction(pack_point_int)
+ if tid_int is None:
+ # Nothing needs to be packed.
+ return
- # In pre_pack, the adapter fills tables with information
- # about what to pack. The adapter should not actually
- # pack anything yet.
- adapter.pre_pack(cursor, tid_int, get_references)
- except:
- conn.rollback()
- raise
- else:
- conn.commit()
- finally:
- adapter.close(conn, cursor)
+ # In pre_pack, the adapter fills tables with
+ # information about what to pack. The adapter
+ # should not actually pack anything yet.
+ adapter.pre_pack(tid_int, get_references)
- # Now pack. The adapter makes its own connection just for the
- # pack operation, possibly using a special transaction mode
- # and connection flags.
- adapter.pack(tid_int)
- self._after_pack()
-
+ # Now pack.
+ adapter.pack(tid_int)
+ self._after_pack()
+ finally:
+ adapter.release_pack_lock(lock_cursor)
finally:
lock_conn.rollback()
adapter.close(lock_conn, lock_cursor)
Modified: relstorage/trunk/relstorage/tests/speedtest.py
===================================================================
--- relstorage/trunk/relstorage/tests/speedtest.py 2008-02-06 07:51:40 UTC (rev 83571)
+++ relstorage/trunk/relstorage/tests/speedtest.py 2008-02-06 08:19:58 UTC (rev 83572)
@@ -33,17 +33,19 @@
from ZODB.Connection import Connection
from relstorage.relstorage import RelStorage
+from relstorage.adapters.mysql import MySQLAdapter
from relstorage.adapters.postgresql import PostgreSQLAdapter
from relstorage.adapters.oracle import OracleAdapter
from relstorage.tests.testoracle import getOracleParams
debug = False
txn_count = 10
-object_counts = [10000] # [1, 100, 10000]
+object_counts = [100] # [1, 100, 10000]
concurrency_levels = range(1, 16, 2)
contenders = [
('ZEO + FileStorage', 'zeofs_test'),
('PostgreSQLAdapter', 'postgres_test'),
+ ('MySQLAdapter', 'mysql_test'),
('OracleAdapter', 'oracle_test'),
]
repetitions = 3
@@ -203,7 +205,15 @@
return RelStorage(adapter)
return self.run_tests(make_storage)
+ def mysql_test(self):
+ adapter = MySQLAdapter(db='relstoragetest')
+ adapter.zap()
+ def make_storage():
+ return RelStorage(adapter)
+ return self.run_tests(make_storage)
+
+
def distribute(func, param_iter):
"""Call a function in separate processes concurrently.
Added: relstorage/trunk/relstorage/tests/testmysql.py
===================================================================
--- relstorage/trunk/relstorage/tests/testmysql.py (rev 0)
+++ relstorage/trunk/relstorage/tests/testmysql.py 2008-02-06 08:19:58 UTC (rev 83572)
@@ -0,0 +1,35 @@
+##############################################################################
+#
+# Copyright (c) 2008 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""Tests of relstorage.adapters.mysql"""
+
+import logging
+import unittest
+
+from reltestbase import RelStorageTests
+from relstorage.adapters.mysql import MySQLAdapter
+
+
+class MySQLTests(RelStorageTests):
+ def make_adapter(self):
+ return MySQLAdapter(db='relstoragetest')
+
+def test_suite():
+ suite = unittest.TestSuite()
+ suite.addTest(unittest.makeSuite(MySQLTests, "check"))
+ return suite
+
+if __name__=='__main__':
+ logging.basicConfig()
+ unittest.main(defaultTest="test_suite")
+
More information about the Checkins
mailing list