[Checkins] SVN: relstorage/trunk/relstorage/ Leave store
connections open after commit. This makes Oracle much faster
and probably also helps PostgreSQL.
Shane Hathaway
shane at hathawaymix.org
Tue Feb 5 04:30:30 EST 2008
Log message for revision 83524:
Leave store connections open after commit. This makes Oracle much faster and probably also helps PostgreSQL.
Also corrected a theoretical source of corruption discovered by Dieter Maurer.
At commit, we now call 'LOCK TABLE current_object IN SHARE MODE' before detecting conflicts.
Changed:
U relstorage/trunk/relstorage/adapters/oracle.py
U relstorage/trunk/relstorage/adapters/postgresql.py
U relstorage/trunk/relstorage/relstorage.py
U relstorage/trunk/relstorage/tests/reltestbase.py
-=-
Modified: relstorage/trunk/relstorage/adapters/oracle.py
===================================================================
--- relstorage/trunk/relstorage/adapters/oracle.py 2008-02-05 07:08:27 UTC (rev 83523)
+++ relstorage/trunk/relstorage/adapters/oracle.py 2008-02-05 09:30:29 UTC (rev 83524)
@@ -353,6 +353,15 @@
else:
return None
+ def _set_xid(self, cursor):
+ """Set up a distributed transaction"""
+ stmt = """
+ SELECT SYS_CONTEXT('USERENV', 'SID') FROM DUAL
+ """
+ cursor.execute(stmt)
+ xid = str(cursor.fetchone()[0])
+ cursor.connection.begin(0, xid, '0')
+
def open_for_store(self):
"""Open and initialize a connection for storing objects.
@@ -361,12 +370,7 @@
if self._twophase:
conn, cursor = self.open(transaction_mode=None, twophase=True)
try:
- stmt = """
- SELECT SYS_CONTEXT('USERENV', 'SID') FROM DUAL
- """
- cursor.execute(stmt)
- xid = str(cursor.fetchone()[0])
- conn.begin(0, xid, '0')
+ self._set_xid(cursor)
except:
self.close(conn, cursor)
raise
@@ -374,6 +378,15 @@
conn, cursor = self.open()
return conn, cursor
+ def restart_store(self, cursor):
+ """Reuse a store connection."""
+ try:
+ cursor.connection.rollback()
+ if self._twophase:
+ self._set_xid(cursor)
+ except (cx_Oracle.OperationalError, cx_Oracle.InterfaceError):
+ raise StorageError("database disconnected")
+
def store_temp(self, cursor, oid, prev_tid, md5sum, data):
"""Store an object in the temporary table."""
cursor.setinputsizes(data=cx_Oracle.BLOB)
@@ -399,13 +412,17 @@
def start_commit(self, cursor):
"""Prepare to commit."""
+ # Hold commit_lock to prevent concurrent commits
+ # (for as short a time as possible).
+ # Lock current_object in share mode to ensure conflict
+ # detection has the most current data.
+ cursor.execute("LOCK TABLE commit_lock IN EXCLUSIVE MODE")
+ cursor.execute("LOCK TABLE current_object IN SHARE MODE")
cursor.execute("SAVEPOINT start_commit")
- cursor.execute("LOCK TABLE commit_lock IN EXCLUSIVE MODE")
def restart_commit(self, cursor):
"""Rollback the attempt to commit and start over."""
cursor.execute("ROLLBACK TO SAVEPOINT start_commit")
- cursor.execute("LOCK TABLE commit_lock IN EXCLUSIVE MODE")
def _parse_dsinterval(self, s):
"""Convert an Oracle dsinterval (as a string) to a float."""
@@ -542,7 +559,7 @@
FROM transaction
WHERE packed = 'N'
AND tid != 0
- ORDER BY tid desc
+ ORDER BY tid DESC
"""
cursor.execute(stmt)
return iter(cursor)
@@ -568,7 +585,7 @@
JOIN object_state USING (tid)
WHERE zoid = :1
AND packed = 'N'
- ORDER BY tid desc
+ ORDER BY tid DESC
"""
cursor.execute(stmt, (oid,))
return iter(cursor)
Modified: relstorage/trunk/relstorage/adapters/postgresql.py
===================================================================
--- relstorage/trunk/relstorage/adapters/postgresql.py 2008-02-05 07:08:27 UTC (rev 83523)
+++ relstorage/trunk/relstorage/adapters/postgresql.py 2008-02-05 09:30:29 UTC (rev 83524)
@@ -291,7 +291,7 @@
FROM object_state
WHERE zoid = %s
AND tid < %s
- ORDER BY tid desc
+ ORDER BY tid DESC
LIMIT 1
""", (oid, tid))
if cursor.rowcount:
@@ -326,6 +326,32 @@
else:
return None
+ def _make_temp_table(self, cursor):
+ """Create the temporary table for storing objects"""
+ if self._twophase:
+ # PostgreSQL does not allow two phase transactions
+ # to use temporary tables. :-(
+ stmt = """
+ CREATE TABLE temp_store (
+ zoid BIGINT NOT NULL,
+ prev_tid BIGINT NOT NULL,
+ md5 CHAR(32),
+ state BYTEA
+ );
+ CREATE UNIQUE INDEX temp_store_zoid ON temp_store (zoid)
+ """
+ else:
+ stmt = """
+ CREATE TEMPORARY TABLE temp_store (
+ zoid BIGINT NOT NULL,
+ prev_tid BIGINT NOT NULL,
+ md5 CHAR(32),
+ state BYTEA
+ ) ON COMMIT DROP;
+ CREATE UNIQUE INDEX temp_store_zoid ON temp_store (zoid)
+ """
+ cursor.execute(stmt)
+
def open_for_store(self):
"""Open and initialize a connection for storing objects.
@@ -333,36 +359,25 @@
"""
conn, cursor = self.open()
try:
- if self._twophase:
- # PostgreSQL does not allow two phase transactions
- # to use temporary tables. :-(
- stmt = """
- CREATE TABLE temp_store (
- zoid BIGINT NOT NULL,
- prev_tid BIGINT NOT NULL,
- md5 CHAR(32),
- state BYTEA
- );
- CREATE UNIQUE INDEX temp_store_zoid ON temp_store (zoid)
- """
- else:
- stmt = """
- CREATE TEMPORARY TABLE temp_store (
- zoid BIGINT NOT NULL,
- prev_tid BIGINT NOT NULL,
- md5 CHAR(32),
- state BYTEA
- ) ON COMMIT DROP;
- CREATE UNIQUE INDEX temp_store_zoid ON temp_store (zoid)
- """
- cursor.execute(stmt)
+ self._make_temp_table(cursor)
return conn, cursor
except:
self.close(conn, cursor)
raise
+ def restart_store(self, cursor):
+ """Reuse a store connection."""
+ try:
+ cursor.connection.rollback()
+ if self._twophase:
+ cursor.connection.set_isolation_level(
+ psycopg2.extensions.ISOLATION_LEVEL_READ_COMMITTED)
+ self._make_temp_table(cursor)
+ except (psycopg2.OperationalError, psycopg2.InterfaceError):
+ raise StorageError("database disconnected")
+
def store_temp(self, cursor, oid, prev_tid, md5sum, data):
- """Store an object."""
+ """Store an object in the temporary table."""
stmt = """
INSERT INTO temp_store (zoid, prev_tid, md5, state)
VALUES (%s, %s, %s, decode(%s, 'base64'))
@@ -382,13 +397,19 @@
def start_commit(self, cursor):
"""Prepare to commit."""
+ # Hold commit_lock to prevent concurrent commits
+ # (for as short a time as possible).
+ # Lock current_object in share mode to ensure conflict
+ # detection has the most current data.
+ cursor.execute("""
+ LOCK TABLE commit_lock IN EXCLUSIVE MODE;
+ LOCK TABLE current_object IN SHARE MODE
+ """)
cursor.execute("SAVEPOINT start_commit")
- cursor.execute("LOCK TABLE commit_lock IN EXCLUSIVE MODE")
def restart_commit(self, cursor):
- """Rollback the attempt to commit and start over."""
+ """Rollback the attempt to commit and start again."""
cursor.execute("ROLLBACK TO SAVEPOINT start_commit")
- cursor.execute("LOCK TABLE commit_lock IN EXCLUSIVE MODE")
def get_tid_and_time(self, cursor):
"""Returns the most recent tid and the current database time.
@@ -533,7 +554,7 @@
FROM transaction
WHERE packed = FALSE
AND tid != 0
- ORDER BY tid desc
+ ORDER BY tid DESC
"""
cursor.execute(stmt)
return iter(cursor)
@@ -559,7 +580,7 @@
JOIN object_state USING (tid)
WHERE zoid = %s
AND packed = FALSE
- ORDER BY tid desc
+ ORDER BY tid DESC
"""
cursor.execute(stmt, (oid,))
return iter(cursor)
@@ -692,7 +713,7 @@
FROM transaction
WHERE tid > 0 AND tid <= %s
AND packed = FALSE
- ORDER BY tid desc
+ ORDER BY tid DESC
LIMIT 1
"""
cursor.execute(stmt, (pack_point,))
Modified: relstorage/trunk/relstorage/relstorage.py
===================================================================
--- relstorage/trunk/relstorage/relstorage.py 2008-02-05 07:08:27 UTC (rev 83523)
+++ relstorage/trunk/relstorage/relstorage.py 2008-02-05 09:30:29 UTC (rev 83524)
@@ -51,12 +51,13 @@
if create:
self._adapter.prepare_schema()
- # load_conn and load_cursor are usually open
+ # load_conn and load_cursor are open most of the time.
self._load_conn = None
self._load_cursor = None
self._load_started = False
self._open_load_connection()
- # store_conn and store_cursor are open only during commit
+ # store_conn and store_cursor are open during commit,
+ # but not necessarily open at other times.
self._store_conn = None
self._store_cursor = None
@@ -289,8 +290,18 @@
self._tstatus = status
adapter = self._adapter
- conn, cursor = adapter.open_for_store()
- self._store_conn, self._store_cursor = conn, cursor
+ cursor = self._store_cursor
+ if cursor is not None:
+ # Store cursor is still open, so try to use it again.
+ try:
+ adapter.restart_store(cursor)
+ except POSException.StorageError:
+ cursor = None
+ log.exception("Store connection failed; retrying")
+ self._drop_store_connection()
+ if cursor is None:
+ conn, cursor = adapter.open_for_store()
+ self._store_conn, self._store_cursor = conn, cursor
if tid is not None:
# get the commit lock and add the transaction now
@@ -371,7 +382,7 @@
cursor = self._store_cursor
adapter = self._adapter
- # List conflicting changes.
+ # Detect conflicting changes.
# Try to resolve the conflicts.
resolved = set() # a set of OIDs
while True:
@@ -386,16 +397,18 @@
rdata = self.tryToResolveConflict(oid, prev_tid, serial, data)
if rdata is None:
+ # unresolvable; kill the whole transaction
raise POSException.ConflictError(
oid=oid, serials=(prev_tid, serial), data=data)
else:
+ # resolved
data = rdata
md5sum = md5.new(data).hexdigest()
self._adapter.replace_temp(
cursor, oid_int, prev_tid_int, md5sum, data)
resolved.add(oid)
- # Move the data
+ # Move the new states into the permanent table
tid_int = u64(self._tid)
serials = []
oid_ints = adapter.move_from_temp(cursor, tid_int)
@@ -460,7 +473,6 @@
txn = self._prepared_txn
assert txn is not None
self._adapter.commit_phase2(self._store_cursor, txn)
- self._drop_store_connection()
self._prepared_txn = None
self._ltid = self._tid
self._tid = None
@@ -471,7 +483,6 @@
if self._store_cursor is not None:
self._adapter.abort(self._store_cursor, self._prepared_txn)
self._prepared_txn = None
- self._drop_store_connection()
self._tid = None
def lastTransaction(self):
Modified: relstorage/trunk/relstorage/tests/reltestbase.py
===================================================================
--- relstorage/trunk/relstorage/tests/reltestbase.py 2008-02-05 07:08:27 UTC (rev 83523)
+++ relstorage/trunk/relstorage/tests/reltestbase.py 2008-02-05 09:30:29 UTC (rev 83524)
@@ -206,3 +206,16 @@
got, serialno = self._storage.load(oid, '')
self.assertEqual(len(got), len(data))
self.assertEqual(got, data)
+
+ def checkMultipleStores(self):
+ # Verify a connection can commit multiple transactions
+ db = DB(self._storage)
+ try:
+ c1 = db.open()
+ r1 = c1.root()
+ r1['alpha'] = 1
+ transaction.commit()
+ r1['alpha'] = 2
+ transaction.commit()
+ finally:
+ db.close()
More information about the Checkins
mailing list