[Checkins] SVN: relstorage/trunk/ Various performance enhancements:
Shane Hathaway
shane at hathawaymix.org
Wed Jan 30 01:55:52 EST 2008
Log message for revision 83311:
Various performance enhancements:
* Made strict two phase commit optional
* After verifying it's safe, switched from serializable to read committed isolation mode in a lot of places
* Buffer new objects in a table rather than a file
* In PostgreSQL, poll using a prepared statement instead of listen/notify
Changed:
U relstorage/trunk/CHANGELOG.txt
U relstorage/trunk/relstorage/adapters/oracle.py
U relstorage/trunk/relstorage/adapters/postgresql.py
D relstorage/trunk/relstorage/autotemp.py
U relstorage/trunk/relstorage/relstorage.py
U relstorage/trunk/relstorage/tests/reltestbase.py
-=-
Modified: relstorage/trunk/CHANGELOG.txt
===================================================================
--- relstorage/trunk/CHANGELOG.txt 2008-01-30 00:40:34 UTC (rev 83310)
+++ relstorage/trunk/CHANGELOG.txt 2008-01-30 06:55:51 UTC (rev 83311)
@@ -11,7 +11,30 @@
- Moved to svn.zope.org and switched to ZPL 2.1 (required for projects
on svn.zope.org.)
+- Made two-phase commit optional in both Oracle and PostgreSQL. They
+ both use commit_lock in such a way that the commit is not likely to
+ fail in the second phase.
+- Switched most database transaction isolation levels from serializable
+ to read committed. It turns out that commit_lock already provides
+ the serializability guarantees we need, so it is safe to take advantage
+ of the potential speed gains. The one major exception is the load
+ connection, which requires an unchanging view of the database.
+
+- Stored objects are now buffered in a database table rather than a file.
+
+- Stopped using the LISTEN and NOTIFY statements in PostgreSQL since
+ they are not reliable enough.
+
+- Started using a prepared statement in PostgreSQL for getting the
+ newest transaction ID quickly.
+
+- Removed the code in the Oracle adapter for retrying connection attempts.
+ (It is better to just reconfigure Oracle.)
+
+
+
+
PGStorage 0.4
- Began using the PostgreSQL LISTEN and NOTIFY statements as a shortcut
Modified: relstorage/trunk/relstorage/adapters/oracle.py
===================================================================
--- relstorage/trunk/relstorage/adapters/oracle.py 2008-01-30 00:40:34 UTC (rev 83310)
+++ relstorage/trunk/relstorage/adapters/oracle.py 2008-01-30 06:55:51 UTC (rev 83311)
@@ -14,12 +14,9 @@
"""Oracle adapter for RelStorage."""
import logging
-import os
import re
-import socket
import thread
import time
-import traceback
import cx_Oracle
from ZODB.POSException import ConflictError, StorageError, UndoError
@@ -29,11 +26,10 @@
class OracleAdapter(object):
"""Oracle adapter for RelStorage."""
- def __init__(self, user, password, dsn,
- arraysize=64, max_connect_attempts=3):
+ def __init__(self, user, password, dsn, twophase=False, arraysize=64):
self._params = (user, password, dsn)
+ self._twophase = twophase
self._arraysize = arraysize
- self._max_connect_attempts = max_connect_attempts
def create_schema(self, cursor):
"""Create the database tables."""
@@ -77,6 +73,14 @@
FOREIGN KEY (zoid, tid) REFERENCES object_state
);
+ -- States that will soon be stored
+ CREATE GLOBAL TEMPORARY TABLE temp_store (
+ zoid NUMBER(20) NOT NULL PRIMARY KEY,
+ prev_tid NUMBER(20) NOT NULL,
+ md5 CHAR(32),
+ state BLOB
+ ) ON COMMIT DELETE ROWS;
+
-- During packing, an exclusive lock is held on pack_lock.
CREATE TABLE pack_lock (dummy CHAR);
@@ -202,25 +206,12 @@
self.close(conn, cursor)
- def open(self, transaction_mode="ISOLATION LEVEL SERIALIZABLE",
+ def open(self, transaction_mode="ISOLATION LEVEL READ COMMITTED",
twophase=False):
"""Open a database connection and return (conn, cursor)."""
try:
- attempt = 1
- while True:
- try:
- kw = {'twophase': twophase} #, 'threaded': True}
- conn = cx_Oracle.connect(*self._params, **kw)
- break
- except cx_Oracle.DatabaseError, e:
- log.warning("Connection failed: %s", e)
- attempt += 1
- if attempt > self._max_connect_attempts:
- raise
- else:
- # Pause, then try to connect again
- time.sleep(2**(attempt - 1))
-
+ kw = {'twophase': twophase} #, 'threaded': True}
+ conn = cx_Oracle.connect(*self._params, **kw)
cursor = conn.cursor()
cursor.arraysize = self._arraysize
if transaction_mode:
@@ -228,7 +219,7 @@
return conn, cursor
except cx_Oracle.OperationalError:
- log.debug("Unable to connect in %s", repr(self))
+ log.error("Unable to connect to DSN %s", self._params[2])
raise
def close(self, conn, cursor):
@@ -362,49 +353,60 @@
else:
return None
- def get_object_tids(self, cursor, oids):
- """Returns a map containing the current tid for each oid in a list.
-
- OIDs that do not exist are not included.
- """
- # query in chunks to avoid running into a maximum query length
- chunk_size = 512
- res = {}
- for i in xrange(0, len(oids), chunk_size):
- chunk = oids[i : i + chunk_size]
- oid_str = ','.join(str(oid) for oid in chunk)
- stmt = """
- SELECT zoid, tid FROM current_object WHERE zoid IN (%s)
- """ % oid_str
- cursor.execute(stmt)
- res.update(dict(iter(cursor)))
- return res
-
- def open_for_commit(self, cursor=None):
+ def open_for_store(self):
"""Open and initialize a connection for storing objects.
Returns (conn, cursor).
"""
- if cursor is None:
+ if self._twophase:
conn, cursor = self.open(transaction_mode=None, twophase=True)
+ try:
+ stmt = """
+ SELECT SYS_CONTEXT('USERENV', 'SID') FROM DUAL
+ """
+ cursor.execute(stmt)
+ xid = str(cursor.fetchone()[0])
+ conn.begin(0, xid, '0')
+ except:
+ self.close(conn, cursor)
+ raise
else:
- conn = cursor.connection
- # Use an xid that is unique to this Oracle connection, assuming
- # the hostname is unique within the Oracle cluster. The xid
- # can have up to 64 bytes.
- xid = '%s-%d-%d' % (socket.gethostname(), os.getpid(), id(conn))
- conn.begin(0, xid, '0')
- cursor.execute("LOCK TABLE commit_lock IN EXCLUSIVE MODE")
+ conn, cursor = self.open()
return conn, cursor
- def restart_commit(self, cursor):
- """Rollback the commit and start over.
+ def store_temp(self, cursor, oid, prev_tid, md5sum, data):
+ """Store an object in the temporary table."""
+ cursor.setinputsizes(data=cx_Oracle.BLOB)
+ stmt = """
+ INSERT INTO temp_store (zoid, prev_tid, md5, state)
+ VALUES (:oid, :prev_tid, :md5sum, :data)
+ """
+ cursor.execute(stmt, oid=oid, prev_tid=prev_tid,
+ md5sum=md5sum, data=cx_Oracle.Binary(data))
- The cursor must be the type created by open_for_commit().
+ def replace_temp(self, cursor, oid, prev_tid, md5sum, data):
+ """Replace an object in the temporary table."""
+ cursor.setinputsizes(data=cx_Oracle.BLOB)
+ stmt = """
+ UPDATE temp_store SET
+ prev_tid = :prev_tid,
+ md5 = :md5sum,
+ state = :data
+ WHERE zoid = :oid
"""
- cursor.connection.rollback()
- self.open_for_commit(cursor)
+ cursor.execute(stmt, oid=oid, prev_tid=prev_tid,
+ md5sum=md5sum, data=cx_Oracle.Binary(data))
+ def start_commit(self, cursor):
+ """Prepare to commit."""
+ cursor.execute("SAVEPOINT start_commit")
+ cursor.execute("LOCK TABLE commit_lock IN EXCLUSIVE MODE")
+
+ def restart_commit(self, cursor):
+ """Rollback the attempt to commit and start over."""
+ cursor.execute("ROLLBACK TO SAVEPOINT start_commit")
+ cursor.execute("LOCK TABLE commit_lock IN EXCLUSIVE MODE")
+
def _parse_dsinterval(self, s):
"""Convert an Oracle dsinterval (as a string) to a float."""
mo = re.match(r'([+-]\d+) (\d+):(\d+):([0-9.]+)', s)
@@ -443,55 +445,67 @@
except cx_Oracle.IntegrityError:
raise ConflictError
- def store(self, cursor, oid, tid, prev_tid, md5sum, data):
- """Store an object. May raise ConflictError."""
- try:
- cursor.setinputsizes(data=cx_Oracle.BLOB)
- stmt = """
- INSERT INTO object_state (zoid, tid, prev_tid, md5, state)
- VALUES (:oid, :tid, :prev_tid, :md5sum, :data)
- """
- cursor.execute(stmt, oid=oid, tid=tid, prev_tid=prev_tid,
- md5sum=md5sum, data=cx_Oracle.Binary(data))
- except cx_Oracle.ProgrammingError, e:
- # This can happen if another thread is currently packing
- # and prev_tid refers to a transaction being packed.
- if 'concurrent' in e.args[0]:
- raise ConflictError(e)
- else:
- raise
+ def detect_conflict(self, cursor):
+ """Find one conflict in the data about to be committed.
+ If there is a conflict, returns (oid, prev_tid, attempted_prev_tid,
+ attempted_data). If there is no conflict, returns None.
+ """
+ stmt = """
+ SELECT temp_store.zoid, current_object.tid, temp_store.prev_tid,
+ temp_store.state
+ FROM temp_store
+ JOIN current_object ON (temp_store.zoid = current_object.zoid)
+ WHERE temp_store.prev_tid != current_object.tid
+ """
+ cursor.execute(stmt)
+ for oid, prev_tid, attempted_prev_tid, data in cursor:
+ return oid, prev_tid, attempted_prev_tid, data.read()
+ return None
+
+ def move_from_temp(self, cursor, tid):
+ """Moved the temporarily stored objects to permanent storage.
+
+ Returns the list of oids stored.
+ """
+ stmt = """
+ INSERT INTO object_state (zoid, tid, prev_tid, md5, state)
+ SELECT zoid, :tid, prev_tid, md5, state
+ FROM temp_store
+ """
+ cursor.execute(stmt, tid=tid)
+
+ stmt = """
+ SELECT zoid FROM temp_store
+ """
+ cursor.execute(stmt)
+ return [oid for (oid,) in cursor]
+
def update_current(self, cursor, tid):
"""Update the current object pointers.
tid is the integer tid of the transaction being committed.
"""
- try:
- # Insert objects created in this transaction into current_object.
- stmt = """
- INSERT INTO current_object (zoid, tid)
- SELECT zoid, tid FROM object_state
+ # Insert objects created in this transaction into current_object.
+ stmt = """
+ INSERT INTO current_object (zoid, tid)
+ SELECT zoid, tid FROM object_state
+ WHERE tid = :1
+ AND prev_tid = 0
+ """
+ cursor.execute(stmt, (tid,))
+
+ # Change existing objects.
+ stmt = """
+ UPDATE current_object SET tid = :1
+ WHERE zoid IN (
+ SELECT zoid FROM object_state
WHERE tid = :1
- AND prev_tid = 0
- """
- cursor.execute(stmt, (tid,))
+ AND prev_tid != 0
+ )
+ """
+ cursor.execute(stmt, (tid,))
- # Change existing objects.
- stmt = """
- UPDATE current_object SET tid = :1
- WHERE zoid IN (
- SELECT zoid FROM object_state
- WHERE tid = :1
- AND prev_tid != 0
- )
- """
- cursor.execute(stmt, (tid,))
- except cx_Oracle.ProgrammingError, e:
- if 'concurrent' in e.args[0]:
- raise ConflictError(e)
- else:
- raise
-
def commit_phase1(self, cursor, tid):
"""Begin a commit. Returns the transaction name.
@@ -499,14 +513,9 @@
meaning that if commit_phase2() would raise any error, the error
should be raised in commit_phase1() instead.
"""
- try:
+ if self._twophase:
cursor.connection.prepare()
- return '-' # the transaction name is not important
- except cx_Oracle.ProgrammingError, e:
- if 'concurrent' in e.args[0]:
- raise ConflictError(e)
- else:
- raise
+ return '-'
def commit_phase2(self, cursor, txn):
"""Final transaction commit."""
@@ -866,7 +875,7 @@
"""Pack. Requires populated pack tables."""
# Read committed mode is sufficient.
- conn, cursor = self.open('ISOLATION LEVEL READ COMMITTED')
+ conn, cursor = self.open()
try:
try:
@@ -967,8 +976,7 @@
# find out the tid of the most recent transaction.
stmt = "SELECT MAX(tid) FROM transaction"
cursor.execute(stmt)
- rows = cursor.fetchall()
- new_polled_tid = rows[0][0]
+ new_polled_tid = list(cursor)[0][0]
if prev_polled_tid is None:
# This is the first time the connection has polled.
Modified: relstorage/trunk/relstorage/adapters/postgresql.py
===================================================================
--- relstorage/trunk/relstorage/adapters/postgresql.py 2008-01-30 00:40:34 UTC (rev 83310)
+++ relstorage/trunk/relstorage/adapters/postgresql.py 2008-01-30 06:55:51 UTC (rev 83311)
@@ -32,8 +32,9 @@
class PostgreSQLAdapter(object):
"""PostgreSQL adapter for RelStorage."""
- def __init__(self, dsn=''):
+ def __init__(self, dsn='', twophase=False):
self._dsn = dsn
+ self._twophase = twophase
def create_schema(self, cursor):
"""Create the database tables."""
@@ -155,8 +156,11 @@
try:
try:
cursor.execute("""
- TRUNCATE object_refs_added, object_ref, current_object,
- object_state, transaction;
+ DELETE FROM object_refs_added;
+ DELETE FROM object_ref;
+ DELETE FROM current_object;
+ DELETE FROM object_state;
+ DELETE FROM transaction;
-- Create a special transaction to represent object creation.
INSERT INTO transaction (tid, username, description)
VALUES (0, '', '');
@@ -171,7 +175,8 @@
self.close(conn, cursor)
- def open(self, isolation=psycopg2.extensions.ISOLATION_LEVEL_SERIALIZABLE):
+ def open(self,
+ isolation=psycopg2.extensions.ISOLATION_LEVEL_READ_COMMITTED):
"""Open a database connection and return (conn, cursor)."""
try:
conn = psycopg2.connect(self._dsn)
@@ -199,8 +204,16 @@
Returns (conn, cursor).
"""
- conn, cursor = self.open()
- cursor.execute("LISTEN invalidate")
+ conn, cursor = self.open(
+ psycopg2.extensions.ISOLATION_LEVEL_SERIALIZABLE)
+ stmt = """
+ PREPARE get_latest_tid AS
+ SELECT tid
+ FROM transaction
+ ORDER BY tid DESC
+ LIMIT 1
+ """
+ cursor.execute(stmt)
return conn, cursor
def restart_load(self, cursor):
@@ -217,10 +230,8 @@
"""Returns the approximate size of the database in bytes"""
conn, cursor = self.open()
try:
- cursor.execute("SELECT SUM(relpages) FROM pg_class")
- # A relative page on postgres is 8K
- relpages = cursor.fetchone()[0]
- return relpages * 8 * 1024
+ cursor.execute("SELECT pg_database_size(current_database())")
+ return cursor.fetchone()[0]
finally:
self.close(conn, cursor)
@@ -315,52 +326,70 @@
else:
return None
- def get_object_tids(self, cursor, oids):
- """Returns a map containing the current tid for each oid in a list.
+ def open_for_store(self):
+ """Open and initialize a connection for storing objects.
- OIDs that do not exist are not included.
+ Returns (conn, cursor).
"""
- # query in chunks to avoid running into a maximum query length
- chunk_size = 512
- res = {}
- for i in xrange(0, len(oids), chunk_size):
- chunk = oids[i : i + chunk_size]
- oid_str = ','.join(str(oid) for oid in chunk)
- stmt = """
- SELECT zoid, tid FROM current_object WHERE zoid IN (%s)
- """ % oid_str
+ conn, cursor = self.open()
+ try:
+ if self._twophase:
+ # PostgreSQL does not allow two phase transactions
+ # to use temporary tables. :-(
+ stmt = """
+ CREATE TABLE temp_store (
+ zoid BIGINT NOT NULL,
+ prev_tid BIGINT NOT NULL,
+ md5 CHAR(32),
+ state BYTEA
+ );
+ CREATE UNIQUE INDEX temp_store_zoid ON temp_store (zoid)
+ """
+ else:
+ stmt = """
+ CREATE TEMPORARY TABLE temp_store (
+ zoid BIGINT NOT NULL,
+ prev_tid BIGINT NOT NULL,
+ md5 CHAR(32),
+ state BYTEA
+ ) ON COMMIT DROP;
+ CREATE UNIQUE INDEX temp_store_zoid ON temp_store (zoid)
+ """
cursor.execute(stmt)
- res.update(dict(iter(cursor)))
- return res
+ return conn, cursor
+ except:
+ self.close(conn, cursor)
+ raise
- def open_for_commit(self):
- """Open and initialize a connection for storing objects.
+ def store_temp(self, cursor, oid, prev_tid, md5sum, data):
+ """Store an object."""
+ stmt = """
+ INSERT INTO temp_store (zoid, prev_tid, md5, state)
+ VALUES (%s, %s, %s, decode(%s, 'base64'))
+ """
+ cursor.execute(stmt, (oid, prev_tid, md5sum, encodestring(data)))
- Returns (conn, cursor).
+ def replace_temp(self, cursor, oid, prev_tid, md5sum, data):
+ """Replace an object in the temporary table."""
+ stmt = """
+ UPDATE temp_store SET
+ prev_tid = %s,
+ md5 = %s,
+ state = decode(%s, 'base64')
+ WHERE zoid = %s
"""
- # psycopg2 doesn't support prepared transactions, so
- # tell psycopg2 to use the autocommit isolation level, but covertly
- # switch to the serializable isolation level.
- conn, cursor = self.open(
- isolation=psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT
- )
- cursor.execute("""
- BEGIN ISOLATION LEVEL SERIALIZABLE;
- LOCK commit_lock IN EXCLUSIVE MODE
- """)
- return conn, cursor
+ cursor.execute(stmt, (prev_tid, md5sum, encodestring(data), oid))
+ def start_commit(self, cursor):
+ """Prepare to commit."""
+ cursor.execute("SAVEPOINT start_commit")
+ cursor.execute("LOCK TABLE commit_lock IN EXCLUSIVE MODE")
+
def restart_commit(self, cursor):
- """Rollback the commit and start over.
+ """Rollback the attempt to commit and start over."""
+ cursor.execute("ROLLBACK TO SAVEPOINT start_commit")
+ cursor.execute("LOCK TABLE commit_lock IN EXCLUSIVE MODE")
- The cursor must be the type created by open_for_commit().
- """
- cursor.execute("""
- ROLLBACK;
- BEGIN ISOLATION LEVEL SERIALIZABLE;
- LOCK commit_lock IN EXCLUSIVE MODE
- """)
-
def get_tid_and_time(self, cursor):
"""Returns the most recent tid and the current database time.
@@ -391,52 +420,67 @@
except psycopg2.IntegrityError, e:
raise ConflictError(e)
- def store(self, cursor, oid, tid, prev_tid, md5sum, data):
- """Store an object. May raise ConflictError."""
+ def detect_conflict(self, cursor):
+ """Find one conflict in the data about to be committed.
+
+ If there is a conflict, returns (oid, prev_tid, attempted_prev_tid,
+ attempted_data). If there is no conflict, returns None.
+ """
stmt = """
+ SELECT temp_store.zoid, current_object.tid, temp_store.prev_tid,
+ encode(temp_store.state, 'base64')
+ FROM temp_store
+ JOIN current_object ON (temp_store.zoid = current_object.zoid)
+ WHERE temp_store.prev_tid != current_object.tid
+ LIMIT 1
+ """
+ cursor.execute(stmt)
+ if cursor.rowcount:
+ oid, prev_tid, attempted_prev_tid, data = cursor.fetchone()
+ return oid, prev_tid, attempted_prev_tid, decodestring(data)
+ return None
+
+ def move_from_temp(self, cursor, tid):
+ """Moved the temporarily stored objects to permanent storage.
+
+ Returns the list of oids stored.
+ """
+ stmt = """
INSERT INTO object_state (zoid, tid, prev_tid, md5, state)
- VALUES (%s, %s, %s, %s, decode(%s, 'base64'))
+ SELECT zoid, %s, prev_tid, md5, state
+ FROM temp_store
"""
- try:
- cursor.execute(stmt, (
- oid, tid, prev_tid, md5sum, encodestring(data)))
- except psycopg2.ProgrammingError, e:
- # This can happen if another thread is currently packing
- # and prev_tid refers to a transaction being packed.
- if 'concurrent update' in e.args[0]:
- raise ConflictError(e)
- else:
- raise
+ cursor.execute(stmt, (tid,))
+ stmt = """
+ SELECT zoid FROM temp_store
+ """
+ cursor.execute(stmt)
+ return [oid for (oid,) in cursor]
+
def update_current(self, cursor, tid):
"""Update the current object pointers.
tid is the integer tid of the transaction being committed.
"""
- try:
- cursor.execute("""
- -- Insert objects created in this transaction into current_object.
- INSERT INTO current_object (zoid, tid)
- SELECT zoid, tid FROM object_state
+ cursor.execute("""
+ -- Insert objects created in this transaction into current_object.
+ INSERT INTO current_object (zoid, tid)
+ SELECT zoid, tid FROM object_state
+ WHERE tid = %(tid)s
+ AND prev_tid = 0;
+
+ -- Change existing objects. To avoid deadlocks,
+ -- update in OID order.
+ UPDATE current_object SET tid = %(tid)s
+ WHERE zoid IN (
+ SELECT zoid FROM object_state
WHERE tid = %(tid)s
- AND prev_tid = 0;
+ AND prev_tid != 0
+ ORDER BY zoid
+ )
+ """, {'tid': tid})
- -- Change existing objects. To avoid deadlocks,
- -- update in OID order.
- UPDATE current_object SET tid = %(tid)s
- WHERE zoid IN (
- SELECT zoid FROM object_state
- WHERE tid = %(tid)s
- AND prev_tid != 0
- ORDER BY zoid
- )
- """, {'tid': tid})
- except psycopg2.ProgrammingError, e:
- if 'concurrent update' in e.args[0]:
- raise ConflictError(e)
- else:
- raise
-
def commit_phase1(self, cursor, tid):
"""Begin a commit. Returns the transaction name.
@@ -444,27 +488,33 @@
meaning that if commit_phase2() would raise any error, the error
should be raised in commit_phase1() instead.
"""
- try:
+ if self._twophase:
txn = 'T%d' % tid
- stmt = "NOTIFY invalidate; PREPARE TRANSACTION %s"
+ stmt = """
+ DROP TABLE temp_store;
+ PREPARE TRANSACTION %s
+ """
cursor.execute(stmt, (txn,))
+ cursor.connection.set_isolation_level(
+ psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
return txn
- except psycopg2.ProgrammingError, e:
- if 'concurrent update' in e.args[0]:
- raise ConflictError(e)
- else:
- raise
+ else:
+ return '-'
def commit_phase2(self, cursor, txn):
"""Final transaction commit."""
- cursor.execute('COMMIT PREPARED %s', (txn,))
+ if self._twophase:
+ cursor.execute('COMMIT PREPARED %s', (txn,))
+ else:
+ cursor.connection.commit()
def abort(self, cursor, txn=None):
"""Abort the commit. If txn is not None, phase 1 is also aborted."""
- if txn is not None:
- cursor.execute('ROLLBACK PREPARED %s', (txn,))
+ if self._twophase:
+ if txn is not None:
+ cursor.execute('ROLLBACK PREPARED %s', (txn,))
else:
- cursor.execute('ROLLBACK')
+ cursor.connection.rollback()
def new_oid(self, cursor):
"""Return a new, unused OID."""
@@ -827,8 +877,7 @@
"""Pack. Requires populated pack tables."""
# Read committed mode is sufficient.
- conn, cursor = self.open(
- isolation=psycopg2.extensions.ISOLATION_LEVEL_READ_COMMITTED)
+ conn, cursor = self.open()
try:
try:
@@ -926,20 +975,8 @@
if the database has disconnected.
"""
try:
- if prev_polled_tid is not None:
- if not cursor.isready():
- # No invalidate notifications arrived.
- return (), prev_polled_tid
- del conn.notifies[:]
-
# find out the tid of the most recent transaction.
- stmt = """
- SELECT tid
- FROM transaction
- ORDER BY tid desc
- LIMIT 1
- """
- cursor.execute(stmt)
+ cursor.execute("EXECUTE get_latest_tid")
# Expect the transaction table to always have at least one row.
assert cursor.rowcount == 1
new_polled_tid = cursor.fetchone()[0]
Deleted: relstorage/trunk/relstorage/autotemp.py
===================================================================
--- relstorage/trunk/relstorage/autotemp.py 2008-01-30 00:40:34 UTC (rev 83310)
+++ relstorage/trunk/relstorage/autotemp.py 2008-01-30 06:55:51 UTC (rev 83311)
@@ -1,43 +0,0 @@
-##############################################################################
-#
-# Copyright (c) 2008 Zope Corporation and Contributors.
-# All Rights Reserved.
-#
-# This software is subject to the provisions of the Zope Public License,
-# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
-# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
-# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
-# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
-# FOR A PARTICULAR PURPOSE.
-#
-##############################################################################
-"""A temporary file that switches from StringIO to TemporaryFile if needed.
-
-This could be a useful addition to Python's tempfile module.
-"""
-
-from cStringIO import StringIO
-import tempfile
-
-class AutoTemporaryFile:
- """Initially a StringIO, but becomes a TemporaryFile if it grows too big"""
- def __init__(self, threshold=10485760):
- self._threshold = threshold
- self._f = f = StringIO()
- # delegate most methods
- for m in ('read', 'readline', 'seek', 'tell', 'close'):
- setattr(self, m, getattr(f, m))
-
- def write(self, data):
- threshold = self._threshold
- if threshold > 0 and self.tell() + len(data) >= threshold:
- # convert to TemporaryFile
- f = tempfile.TemporaryFile()
- f.write(self._f.getvalue())
- f.seek(self.tell())
- self._f = f
- self._threshold = 0
- # delegate all important methods
- for m in ('write', 'read', 'readline', 'seek', 'tell', 'close'):
- setattr(self, m, getattr(f, m))
- self._f.write(data)
Modified: relstorage/trunk/relstorage/relstorage.py
===================================================================
--- relstorage/trunk/relstorage/relstorage.py 2008-01-30 00:40:34 UTC (rev 83310)
+++ relstorage/trunk/relstorage/relstorage.py 2008-01-30 06:55:51 UTC (rev 83311)
@@ -28,8 +28,6 @@
from ZODB import ConflictResolution, POSException
from persistent.TimeStamp import TimeStamp
-from autotemp import AutoTemporaryFile
-
log = logging.getLogger("relstorage")
# Set the RELSTORAGE_ABORT_EARLY environment variable when debugging
@@ -50,7 +48,10 @@
self._name = name
self._is_read_only = read_only
- # load_conn and load_cursor are always open
+ if create:
+ self._adapter.prepare_schema()
+
+ # load_conn and load_cursor are usually open
self._load_conn = None
self._load_cursor = None
self._load_started = False
@@ -59,21 +60,11 @@
self._store_conn = None
self._store_cursor = None
- if create:
- self._adapter.prepare_schema()
-
BaseStorage.__init__(self, name)
self._tid = None
self._ltid = None
- # _tbuf is a temporary file that contains pickles of data to be
- # committed. _pickler writes pickles to that file. _stored_oids
- # is a list of integer OIDs to be stored.
- self._tbuf = None
- self._pickler = None
- self._stored_oids = None
-
# _prepared_txn is the name of the transaction to commit in the
# second phase.
self._prepared_txn = None
@@ -256,21 +247,24 @@
assert self._prepared_txn is None
md5sum = md5.new(data).hexdigest()
+ adapter = self._adapter
+ cursor = self._store_cursor
+ assert cursor is not None
+ oid_int = u64(oid)
+ if serial:
+ prev_tid_int = u64(serial)
+ else:
+ prev_tid_int = 0
+
self._lock_acquire()
try:
- # buffer the stores
- if self._tbuf is None:
- self._tbuf = AutoTemporaryFile()
- self._pickler = cPickle.Pickler(
- self._tbuf, cPickle.HIGHEST_PROTOCOL)
- self._stored_oids = []
-
- self._pickler.dump((oid, serial, md5sum, data))
- self._stored_oids.append(u64(oid))
+ # save the data in a temporary table
+ adapter.store_temp(cursor, oid_int, prev_tid_int, md5sum, data)
return None
finally:
self._lock_release()
+
def tpc_begin(self, transaction, tid=None, status=' '):
if self._is_read_only:
raise POSException.ReadOnlyError()
@@ -294,11 +288,13 @@
self._ude = user, desc, ext
self._tstatus = status
+ adapter = self._adapter
+ conn, cursor = adapter.open_for_store()
+ self._store_conn, self._store_cursor = conn, cursor
+
if tid is not None:
# get the commit lock and add the transaction now
- adapter = self._adapter
- conn, cursor = adapter.open_for_commit()
- self._store_conn, self._store_cursor = conn, cursor
+ adapter.start_commit(cursor)
tid_int = u64(tid)
try:
adapter.add_transaction(cursor, tid_int, user, desc, ext)
@@ -323,12 +319,13 @@
raise POSException.StorageError("No transaction in progress")
adapter = self._adapter
- conn, cursor = adapter.open_for_commit()
- self._store_conn, self._store_cursor = conn, cursor
+ cursor = self._store_cursor
+ adapter.start_commit(cursor)
user, desc, ext = self._ude
attempt = 1
while True:
+ # get the commit lock
try:
# Choose a transaction ID.
# Base the transaction ID on the database time,
@@ -362,63 +359,53 @@
# It is assumed that self._lock_acquire was called before this
# method was called.
self._prepared_txn = None
- tbuf = self._tbuf
- if tbuf is not None:
- self._tbuf = None
- self._pickler = None
- self._stored_oids = None
- tbuf.close()
- def _send_stored(self):
- """Send the buffered pickles to the database.
+ def _finish_store(self):
+ """Move stored objects from the temporary table to final storage.
Returns a list of (oid, tid) to be received by
Connection._handle_serial().
"""
+ assert self._tid is not None
cursor = self._store_cursor
adapter = self._adapter
- tid_int = u64(self._tid)
- self._pickler = None # Don't allow any more store operations
- self._tbuf.seek(0)
- unpickler = cPickle.Unpickler(self._tbuf)
- serials = [] # [(oid, serial)]
- prev_tids = adapter.get_object_tids(cursor, self._stored_oids)
-
+ # List conflicting changes.
+ # Try to resolve the conflicts.
+ resolved = set() # a set of OIDs
while True:
- try:
- oid, serial, md5sum, data = unpickler.load()
- except EOFError:
+ conflict = adapter.detect_conflict(cursor)
+ if conflict is None:
break
- oid_int = u64(oid)
- if not serial:
- serial = z64
- prev_tid_int = prev_tids.get(oid_int)
- if prev_tid_int is None:
- prev_tid_int = 0
- prev_tid = z64
+ oid_int, prev_tid_int, serial_int, data = conflict
+ oid = p64(oid_int)
+ prev_tid = p64(prev_tid_int)
+ serial = p64(serial_int)
+
+ rdata = self.tryToResolveConflict(oid, prev_tid, serial, data)
+ if rdata is None:
+ raise POSException.ConflictError(
+ oid=oid, serials=(prev_tid, serial), data=data)
else:
- prev_tid = p64(prev_tid_int)
- if prev_tid != serial:
- # conflict detected
- rdata = self.tryToResolveConflict(
- oid, prev_tid, serial, data)
- if rdata is None:
- raise POSException.ConflictError(
- oid=oid, serials=(prev_tid, serial), data=data)
- else:
- data = rdata
- md5sum = md5.new(data).hexdigest()
+ data = rdata
+ md5sum = md5.new(data).hexdigest()
+ self._adapter.replace_temp(
+ cursor, oid_int, prev_tid_int, md5sum, data)
+ resolved.add(oid)
- self._adapter.store(
- cursor, oid_int, tid_int, prev_tid_int, md5sum, data)
-
- if prev_tid and serial != prev_tid:
- serials.append((oid, ConflictResolution.ResolvedSerial))
+ # Move the data
+ tid_int = u64(self._tid)
+ serials = []
+ oid_ints = adapter.move_from_temp(cursor, tid_int)
+ for oid_int in oid_ints:
+ oid = p64(oid_int)
+ if oid in resolved:
+ serial = ConflictResolution.ResolvedSerial
else:
- serials.append((oid, self._tid))
+ serial = self._tid
+ serials.append((oid, serial))
return serials
@@ -440,11 +427,7 @@
cursor = self._store_cursor
assert cursor is not None
- if self._tbuf is not None:
- serials = self._send_stored()
- else:
- serials = []
-
+ serials = self._finish_store()
self._adapter.update_current(cursor, tid_int)
self._prepared_txn = self._adapter.commit_phase1(cursor, tid_int)
# From the point of view of self._store_cursor,
@@ -700,7 +683,7 @@
propagate_invalidations = False
def __init__(self, parent, zodb_conn):
- # self._conn = conn
+ # self._zodb_conn = zodb_conn
RelStorage.__init__(self, adapter=parent._adapter, name=parent._name,
read_only=parent._is_read_only, create=False)
# _prev_polled_tid contains the tid at the previous poll
Modified: relstorage/trunk/relstorage/tests/reltestbase.py
===================================================================
--- relstorage/trunk/relstorage/tests/reltestbase.py 2008-01-30 00:40:34 UTC (rev 83310)
+++ relstorage/trunk/relstorage/tests/reltestbase.py 2008-01-30 06:55:51 UTC (rev 83311)
@@ -189,6 +189,15 @@
finally:
self._storage = root_storage
+ def check16KObject(self):
+ # Store 16 * 1024 bytes in an object, then retrieve it
+ data = 'a 16 byte string' * 1024
+ oid = self._storage.new_oid()
+ self._dostoreNP(oid, data=data)
+ got, serialno = self._storage.load(oid, '')
+ self.assertEqual(len(got), len(data))
+ self.assertEqual(got, data)
+
def check16MObject(self):
# Store 16 * 1024 * 1024 bytes in an object, then retrieve it
data = 'a 16 byte string' * (1024 * 1024)
More information about the Checkins
mailing list