[Checkins] SVN: relstorage/trunk/relstorage/ More refactoring: Expanded IConnectionManager and added IOIDAllocator
Shane Hathaway
shane at hathawaymix.org
Wed Sep 23 19:14:01 EDT 2009
Log message for revision 104466:
More refactoring: Expanded IConnectionManager and added IOIDAllocator
Changed:
U relstorage/trunk/relstorage/adapters/connmanager.py
U relstorage/trunk/relstorage/adapters/interfaces.py
U relstorage/trunk/relstorage/adapters/loadstore.py
U relstorage/trunk/relstorage/adapters/mysql.py
A relstorage/trunk/relstorage/adapters/oidallocator.py
U relstorage/trunk/relstorage/adapters/oracle.py
U relstorage/trunk/relstorage/adapters/postgresql.py
U relstorage/trunk/relstorage/relstorage.py
-=-
Modified: relstorage/trunk/relstorage/adapters/connmanager.py
===================================================================
--- relstorage/trunk/relstorage/adapters/connmanager.py 2009-09-23 21:36:09 UTC (rev 104465)
+++ relstorage/trunk/relstorage/adapters/connmanager.py 2009-09-23 23:14:01 UTC (rev 104466)
@@ -14,6 +14,7 @@
from relstorage.adapters.interfaces import IConnectionManager
from zope.interface import implements
+from ZODB.POSException import StorageError
class AbstractConnectionManager(object):
"""Abstract base class for connection management.
@@ -22,13 +23,25 @@
"""
implements(IConnectionManager)
+ # disconnected_exceptions contains the exception types that might be
+ # raised when the connection to the database has been broken.
+ disconnected_exceptions = ()
+
# close_exceptions contains the exception types to ignore
# when the adapter attempts to close a database connection.
close_exceptions = ()
+ # on_store_opened is either None or a callable that
+ # will be called whenever a store cursor is opened or rolled back.
+ on_store_opened = None
+
+ def set_on_store_opened(self, f):
+ """Set the on_store_opened hook"""
+ self.on_store_opened = f
+
def open(self):
"""Open a database connection and return (conn, cursor)."""
- raise NotImplementedError
+ raise NotImplementedError()
def close(self, conn, cursor):
"""Close a connection and cursor, ignoring certain errors.
@@ -61,3 +74,35 @@
finally:
self.close(conn, cursor)
+ def open_for_load(self):
+ raise NotImplementedError()
+
+ def restart_load(self, conn, cursor):
+ """Reinitialize a connection for loading objects."""
+ try:
+ conn.rollback()
+ except self.disconnected_exceptions, e:
+ raise StorageError(e)
+
+ def open_for_store(self):
+ """Open and initialize a connection for storing objects.
+
+ Returns (conn, cursor).
+ """
+ conn, cursor = self.open()
+ try:
+ if self.on_store_opened is not None:
+ self.on_store_opened(cursor, restart=False)
+ return conn, cursor
+ except:
+ self.close(conn, cursor)
+ raise
+
+ def restart_store(self, conn, cursor):
+ """Reuse a store connection."""
+ try:
+ conn.rollback()
+ if self.on_store_opened is not None:
+ self.on_store_opened(cursor, restart=True)
+ except self.disconnected_exceptions, e:
+ raise StorageError(e)
Modified: relstorage/trunk/relstorage/adapters/interfaces.py
===================================================================
--- relstorage/trunk/relstorage/adapters/interfaces.py 2009-09-23 21:36:09 UTC (rev 104465)
+++ relstorage/trunk/relstorage/adapters/interfaces.py 2009-09-23 23:14:01 UTC (rev 104466)
@@ -34,7 +34,31 @@
then propagates the exception.
"""
+ def open_for_load():
+ """Open a connection for loading objects.
+ Returns (conn, cursor).
+ """
+
+ def restart_load(conn, cursor):
+ """Reinitialize a connection for loading objects.
+
+ Raise StorageError if the database has disconnected.
+ """
+
+ def open_for_store():
+ """Open and initialize a connection for storing objects.
+
+ Returns (conn, cursor).
+ """
+
+ def restart_store(conn, cursor):
+ """Rollback and reuse a store connection.
+
+ Raise StorageError if the database has disconnected.
+ """
+
+
class IDatabaseIterator(Interface):
def iter_objects(cursor, tid):
@@ -92,6 +116,15 @@
"""Release the pack lock."""
+class IOIDAllocator(Interface):
+
+ def set_min_oid(cursor, oid):
+ """Ensure the next OID is at least the given OID."""
+
+ def new_oid(cursor):
+ """Return a new, unused OID."""
+
+
class IPackUndo(Interface):
def verify_undoable(cursor, undo_tid):
Modified: relstorage/trunk/relstorage/adapters/loadstore.py
===================================================================
--- relstorage/trunk/relstorage/adapters/loadstore.py 2009-09-23 21:36:09 UTC (rev 104465)
+++ relstorage/trunk/relstorage/adapters/loadstore.py 2009-09-23 23:14:01 UTC (rev 104466)
@@ -35,47 +35,19 @@
class HistoryPreservingPostgreSQLLoadStore(object):
- def __init__(self, connmanager, disconnected_exceptions):
- self.connmanager = connmanager
- self.disconnected_exceptions = disconnected_exceptions
-
- def open_for_load(self):
- """Open and initialize a connection for loading objects.
-
- Returns (conn, cursor).
- """
- conn, cursor = self.connmanager.open(
- self.connmanager.isolation_serializable)
- stmt = """
- PREPARE get_latest_tid AS
- SELECT tid
- FROM transaction
- ORDER BY tid DESC
- LIMIT 1
- """
- cursor.execute(stmt)
- return conn, cursor
-
- def restart_load(self, cursor):
- """Reinitialize a connection for loading objects."""
- try:
- cursor.connection.rollback()
- except self.disconnected_exceptions, e:
- raise StorageError(e)
-
def get_current_tid(self, cursor, oid):
"""Returns the current integer tid for an object.
oid is an integer. Returns None if object does not exist.
"""
- cursor.execute("""
+ stmt = """
SELECT tid
FROM current_object
WHERE zoid = %s
- """, (oid,))
- if cursor.rowcount:
- assert cursor.rowcount == 1
- return cursor.fetchone()[0]
+ """
+ cursor.execute(stmt, (oid,))
+ for (tid,) in cursor:
+ return tid
return None
def load_current(self, cursor, oid):
@@ -121,8 +93,11 @@
def exists(self, cursor, oid):
"""Returns a true value if the given object exists."""
- cursor.execute("SELECT 1 FROM current_object WHERE zoid = %s", (oid,))
- return cursor.rowcount
+ stmt = "SELECT 1 FROM current_object WHERE zoid = %s"
+ cursor.execute(stmt, (oid,))
+ for row in cursor:
+ return True
+ return False
def load_before(self, cursor, oid, tid):
"""Returns the pickle and tid of an object before transaction tid.
@@ -169,7 +144,7 @@
else:
return None
- def _make_temp_table(self, cursor):
+ def on_store_opened(self, cursor, restart=False):
"""Create the temporary table for storing objects"""
stmt = """
CREATE TEMPORARY TABLE temp_store (
@@ -182,27 +157,6 @@
"""
cursor.execute(stmt)
- def open_for_store(self):
- """Open and initialize a connection for storing objects.
-
- Returns (conn, cursor).
- """
- conn, cursor = self.connmanager.open()
- try:
- self._make_temp_table(cursor)
- return conn, cursor
- except:
- self.connmanager.close(conn, cursor)
- raise
-
- def restart_store(self, cursor):
- """Reuse a store connection."""
- try:
- cursor.connection.rollback()
- self._make_temp_table(cursor)
- except self.disconnected_exceptions, e:
- raise StorageError(e)
-
def store_temp(self, cursor, oid, prev_tid, data):
"""Store an object in the temporary table."""
md5sum = compute_md5sum(data)
@@ -302,57 +256,25 @@
)
""", {'tid': tid})
- def set_min_oid(self, cursor, oid):
- """Ensure the next OID is at least the given OID."""
- cursor.execute("""
- SELECT CASE WHEN %s > nextval('zoid_seq')
- THEN setval('zoid_seq', %s)
- ELSE 0
- END
- """, (oid, oid))
- def new_oid(self, cursor):
- """Return a new, unused OID."""
- stmt = "SELECT NEXTVAL('zoid_seq')"
- cursor.execute(stmt)
- return cursor.fetchone()[0]
-
-
class HistoryPreservingMySQLLoadStore(object):
- def __init__(self, connmanager, disconnected_exceptions, Binary):
- self.connmanager = connmanager
- self.disconnected_exceptions = disconnected_exceptions
+ def __init__(self, Binary):
self.Binary = Binary
- def open_for_load(self):
- """Open and initialize a connection for loading objects.
-
- Returns (conn, cursor).
- """
- return self.connmanager.open(
- self.connmanager.isolation_repeatable_read)
-
- def restart_load(self, cursor):
- """Reinitialize a connection for loading objects."""
- try:
- cursor.connection.rollback()
- except self.disconnected_exceptions, e:
- raise StorageError(e)
-
def get_current_tid(self, cursor, oid):
"""Returns the current integer tid for an object.
oid is an integer. Returns None if object does not exist.
"""
- cursor.execute("""
+ stmt = """
SELECT tid
FROM current_object
WHERE zoid = %s
- """, (oid,))
- if cursor.rowcount:
- assert cursor.rowcount == 1
- return cursor.fetchone()[0]
+ """
+ cursor.execute(stmt, (oid,))
+ for (tid,) in cursor:
+ return tid
return None
def load_current(self, cursor, oid):
@@ -391,8 +313,11 @@
def exists(self, cursor, oid):
"""Returns a true value if the given object exists."""
- cursor.execute("SELECT 1 FROM current_object WHERE zoid = %s", (oid,))
- return cursor.rowcount
+ stmt = "SELECT 1 FROM current_object WHERE zoid = %s"
+ cursor.execute(stmt, (oid,))
+ for row in cursor:
+ return True
+ return False
def load_before(self, cursor, oid, tid):
"""Returns the pickle and tid of an object before transaction tid.
@@ -433,8 +358,14 @@
else:
return None
- def _make_temp_table(self, cursor):
+ def on_store_opened(self, cursor, restart=False):
"""Create the temporary table for storing objects"""
+ if restart:
+ stmt = """
+ DROP TEMPORARY TABLE IF EXISTS temp_store
+ """
+ cursor.execute(stmt)
+
stmt = """
CREATE TEMPORARY TABLE temp_store (
zoid BIGINT NOT NULL PRIMARY KEY,
@@ -445,35 +376,6 @@
"""
cursor.execute(stmt)
- def open_for_store(self):
- """Open and initialize a connection for storing objects.
-
- Returns (conn, cursor).
- """
- conn, cursor = self.connmanager.open()
- try:
- self._make_temp_table(cursor)
- return conn, cursor
- except:
- self.connmanager.close(conn, cursor)
- raise
-
- def _restart_temp_table(self, cursor):
- """Restart the temporary table for storing objects"""
- stmt = """
- DROP TEMPORARY TABLE IF EXISTS temp_store
- """
- cursor.execute(stmt)
- self._make_temp_table(cursor)
-
- def restart_store(self, cursor):
- """Reuse a store connection."""
- try:
- cursor.connection.rollback()
- self._restart_temp_table(cursor)
- except self.disconnected_exceptions, e:
- raise StorageError(e)
-
def store_temp(self, cursor, oid, prev_tid, data):
"""Store an object in the temporary table."""
md5sum = compute_md5sum(data)
@@ -561,59 +463,26 @@
WHERE tid = %s
""", (tid,))
- def set_min_oid(self, cursor, oid):
- """Ensure the next OID is at least the given OID."""
- cursor.execute("REPLACE INTO new_oid VALUES(%s)", (oid,))
- def new_oid(self, cursor):
- """Return a new, unused OID."""
- stmt = "INSERT INTO new_oid VALUES ()"
- cursor.execute(stmt)
- oid = cursor.connection.insert_id()
- if oid % 100 == 0:
- # Clean out previously generated OIDs.
- stmt = "DELETE FROM new_oid WHERE zoid < %s"
- cursor.execute(stmt, (oid,))
- return oid
-
-
class HistoryPreservingOracleLoadStore(object):
- def __init__(self, connmanager, runner, disconnected_exceptions,
- Binary, inputsize_BLOB, inputsize_BINARY, twophase):
- self.connmanager = connmanager
+ def __init__(self, runner, Binary, inputsize_BLOB, inputsize_BINARY):
self.runner = runner
- self.disconnected_exceptions = disconnected_exceptions
self.Binary = Binary
self.inputsize_BLOB = inputsize_BLOB
self.inputsize_BINARY = inputsize_BINARY
- self.twophase = twophase
- def open_for_load(self):
- """Open and initialize a connection for loading objects.
-
- Returns (conn, cursor).
- """
- return self.connmanager.open(self.connmanager.isolation_read_only)
-
- def restart_load(self, cursor):
- """Reinitialize a connection for loading objects."""
- try:
- cursor.connection.rollback()
- cursor.execute("SET TRANSACTION READ ONLY")
- except self.disconnected_exceptions, e:
- raise StorageError(e)
-
def get_current_tid(self, cursor, oid):
"""Returns the current integer tid for an object.
oid is an integer. Returns None if object does not exist.
"""
- cursor.execute("""
+ stmt = """
SELECT tid
FROM current_object
WHERE zoid = :1
- """, (oid,))
+ """
+ cursor.execute(stmt, (oid,))
for (tid,) in cursor:
return tid
return None
@@ -649,8 +518,11 @@
def exists(self, cursor, oid):
"""Returns a true value if the given object exists."""
- cursor.execute("SELECT 1 FROM current_object WHERE zoid = :1", (oid,))
- return len(list(cursor))
+ stmt = "SELECT 1 FROM current_object WHERE zoid = :1"
+ cursor.execute(stmt, (oid,))
+ for row in cursor:
+ return True
+ return False
def load_before(self, cursor, oid, tid):
"""Returns the pickle and tid of an object before transaction tid.
@@ -690,41 +562,8 @@
else:
return None
- def _set_xid(self, cursor):
- """Set up a distributed transaction"""
- stmt = """
- SELECT SYS_CONTEXT('USERENV', 'SID') FROM DUAL
- """
- cursor.execute(stmt)
- xid = str(cursor.fetchone()[0])
- cursor.connection.begin(0, xid, '0')
+ on_store_opened = None # no store connection initialization needed
- def open_for_store(self):
- """Open and initialize a connection for storing objects.
-
- Returns (conn, cursor).
- """
- if self.twophase:
- conn, cursor = self.connmanager.open(
- transaction_mode=None, twophase=True)
- try:
- self._set_xid(cursor)
- except:
- self.close(conn, cursor)
- raise
- else:
- conn, cursor = self.connmanager.open()
- return conn, cursor
-
- def restart_store(self, cursor):
- """Reuse a store connection."""
- try:
- cursor.connection.rollback()
- if self.twophase:
- self._set_xid(cursor)
- except self.disconnected_exceptions, e:
- raise StorageError(e)
-
def store_temp(self, cursor, oid, prev_tid, data):
"""Store an object in the temporary table."""
md5sum = compute_md5sum(data)
@@ -838,30 +677,3 @@
"""
cursor.execute(stmt, (tid,))
- def set_min_oid(self, cursor, oid):
- """Ensure the next OID is at least the given OID."""
- next_oid = self.new_oid(cursor)
- if next_oid < oid:
- # Oracle provides no way modify the sequence value
- # except through alter sequence or drop/create sequence,
- # but either statement kills the current transaction.
- # Therefore, open a temporary connection to make the
- # alteration.
- conn2, cursor2 = self.connmanager.open()
- try:
- # Change the sequence by altering the increment.
- # (this is safer than dropping and re-creating the sequence)
- diff = oid - next_oid
- cursor2.execute(
- "ALTER SEQUENCE zoid_seq INCREMENT BY %d" % diff)
- cursor2.execute("SELECT zoid_seq.nextval FROM DUAL")
- cursor2.execute("ALTER SEQUENCE zoid_seq INCREMENT BY 1")
- conn2.commit()
- finally:
- self.connmanager.close(conn2, cursor2)
-
- def new_oid(self, cursor):
- """Return a new, unused OID."""
- stmt = "SELECT zoid_seq.nextval FROM DUAL"
- cursor.execute(stmt)
- return cursor.fetchone()[0]
Modified: relstorage/trunk/relstorage/adapters/mysql.py
===================================================================
--- relstorage/trunk/relstorage/adapters/mysql.py 2009-09-23 21:36:09 UTC (rev 104465)
+++ relstorage/trunk/relstorage/adapters/mysql.py 2009-09-23 23:14:01 UTC (rev 104466)
@@ -55,6 +55,7 @@
from relstorage.adapters.dbiter import HistoryPreservingDatabaseIterator
from relstorage.adapters.loadstore import HistoryPreservingMySQLLoadStore
from relstorage.adapters.locker import MySQLLocker
+from relstorage.adapters.oidallocator import MySQLOIDAllocator
from relstorage.adapters.packundo import HistoryPreservingPackUndo
from relstorage.adapters.poller import Poller
from relstorage.adapters.schema import HistoryPreservingMySQLSchema
@@ -68,7 +69,11 @@
# raised when the connection to the database has been broken.
disconnected_exceptions = (MySQLdb.OperationalError, MySQLdb.InterfaceError)
+# close_exceptions contains the exception types to ignore
+# when the adapter attempts to close a database connection.
+close_exceptions = disconnected_exceptions + (MySQLdb.ProgrammingError,)
+
class MySQLAdapter(object):
"""MySQL adapter for RelStorage."""
@@ -83,10 +88,10 @@
runner=self.runner,
)
self.loadstore = HistoryPreservingMySQLLoadStore(
- connmanager=self.connmanager,
- disconnected_exceptions=disconnected_exceptions,
Binary=MySQLdb.Binary,
)
+ self.oidallocator = MySQLOIDAllocator()
+ self.connmanager.set_on_store_opened(self.loadstore.on_store_opened)
self.txncontrol = MySQLTransactionControl(
Binary=MySQLdb.Binary,
)
@@ -109,6 +114,10 @@
self.open = self.connmanager.open
self.close = self.connmanager.close
+ self.open_for_load = self.connmanager.open_for_load
+ self.restart_load = self.connmanager.restart_load
+ self.open_for_store = self.connmanager.open_for_store
+ self.restart_store = self.connmanager.restart_store
self.hold_commit_lock = self.locker.hold_commit_lock
self.release_commit_lock = self.locker.release_commit_lock
@@ -120,26 +129,22 @@
self.zap_all = self.schema.zap_all
self.drop_all = self.schema.drop_all
- self.open_for_load = self.loadstore.open_for_load
- self.restart_load = self.loadstore.restart_load
self.get_current_tid = self.loadstore.get_current_tid
self.load_current = self.loadstore.load_current
self.load_revision = self.loadstore.load_revision
self.exists = self.loadstore.exists
self.load_before = self.loadstore.load_before
self.get_object_tid_after = self.loadstore.get_object_tid_after
-
- self.open_for_store = self.loadstore.open_for_store
- self.restart_store = self.loadstore.restart_store
self.store_temp = self.loadstore.store_temp
self.replace_temp = self.loadstore.replace_temp
self.restore = self.loadstore.restore
self.detect_conflict = self.loadstore.detect_conflict
self.move_from_temp = self.loadstore.move_from_temp
self.update_current = self.loadstore.update_current
- self.set_min_oid = self.loadstore.set_min_oid
- self.new_oid = self.loadstore.new_oid
+ self.set_min_oid = self.oidallocator.set_min_oid
+ self.new_oid = self.oidallocator.new_oid
+
self.get_tid_and_time = self.txncontrol.get_tid_and_time
self.add_transaction = self.txncontrol.add_transaction
self.commit_phase1 = self.txncontrol.commit_phase1
@@ -170,9 +175,8 @@
isolation_read_committed = "ISOLATION LEVEL READ COMMITTED"
isolation_repeatable_read = "ISOLATION LEVEL REPEATABLE READ"
- # close_exceptions contains the exception types to ignore
- # when the adapter attempts to close a database connection.
- close_exceptions = disconnected_exceptions + (MySQLdb.ProgrammingError,)
+ disconnected_exceptions = disconnected_exceptions
+ close_exceptions = close_exceptions
def __init__(self, params):
self._params = params.copy()
@@ -192,3 +196,10 @@
log.warning("Unable to connect: %s", e)
raise
+ def open_for_load(self):
+ """Open and initialize a connection for loading objects.
+
+ Returns (conn, cursor).
+ """
+ return self.open(self.isolation_repeatable_read)
+
Added: relstorage/trunk/relstorage/adapters/oidallocator.py
===================================================================
--- relstorage/trunk/relstorage/adapters/oidallocator.py (rev 0)
+++ relstorage/trunk/relstorage/adapters/oidallocator.py 2009-09-23 23:14:01 UTC (rev 104466)
@@ -0,0 +1,90 @@
+##############################################################################
+#
+# Copyright (c) 2009 Zope Foundation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""IOIDAllocator implementations"""
+
+from relstorage.adapters.interfaces import IOIDAllocator
+from zope.interface import implements
+
+class PostgreSQLOIDAllocator(object):
+ implements(IOIDAllocator)
+
+ def set_min_oid(self, cursor, oid):
+ """Ensure the next OID is at least the given OID."""
+ cursor.execute("""
+ SELECT CASE WHEN %s > nextval('zoid_seq')
+ THEN setval('zoid_seq', %s)
+ ELSE 0
+ END
+ """, (oid, oid))
+
+ def new_oid(self, cursor):
+ """Return a new, unused OID."""
+ stmt = "SELECT NEXTVAL('zoid_seq')"
+ cursor.execute(stmt)
+ return cursor.fetchone()[0]
+
+
+class MySQLOIDAllocator(object):
+ implements(IOIDAllocator)
+
+ def set_min_oid(self, cursor, oid):
+ """Ensure the next OID is at least the given OID."""
+ cursor.execute("REPLACE INTO new_oid VALUES(%s)", (oid,))
+
+ def new_oid(self, cursor):
+ """Return a new, unused OID."""
+ stmt = "INSERT INTO new_oid VALUES ()"
+ cursor.execute(stmt)
+ oid = cursor.connection.insert_id()
+ if oid % 100 == 0:
+ # Clean out previously generated OIDs.
+ stmt = "DELETE FROM new_oid WHERE zoid < %s"
+ cursor.execute(stmt, (oid,))
+ return oid
+
+
+class OracleOIDAllocator(object):
+ implements(IOIDAllocator)
+
+ def __init__(self, connmanager):
+ self.connmanager = connmanager
+
+ def set_min_oid(self, cursor, oid):
+ """Ensure the next OID is at least the given OID."""
+ next_oid = self.new_oid(cursor)
+ if next_oid < oid:
+ # Oracle provides no way modify the sequence value
+ # except through alter sequence or drop/create sequence,
+ # but either statement kills the current transaction.
+ # Therefore, open a temporary connection to make the
+ # alteration.
+ conn2, cursor2 = self.connmanager.open()
+ try:
+ # Change the sequence by altering the increment.
+ # (this is safer than dropping and re-creating the sequence)
+ diff = oid - next_oid
+ cursor2.execute(
+ "ALTER SEQUENCE zoid_seq INCREMENT BY %d" % diff)
+ cursor2.execute("SELECT zoid_seq.nextval FROM DUAL")
+ cursor2.execute("ALTER SEQUENCE zoid_seq INCREMENT BY 1")
+ conn2.commit()
+ finally:
+ self.connmanager.close(conn2, cursor2)
+
+ def new_oid(self, cursor):
+ """Return a new, unused OID."""
+ stmt = "SELECT zoid_seq.nextval FROM DUAL"
+ cursor.execute(stmt)
+ return cursor.fetchone()[0]
+
Modified: relstorage/trunk/relstorage/adapters/oracle.py
===================================================================
--- relstorage/trunk/relstorage/adapters/oracle.py 2009-09-23 21:36:09 UTC (rev 104465)
+++ relstorage/trunk/relstorage/adapters/oracle.py 2009-09-23 23:14:01 UTC (rev 104466)
@@ -15,11 +15,13 @@
import logging
import cx_Oracle
+from ZODB.POSException import StorageError
from relstorage.adapters.connmanager import AbstractConnectionManager
from relstorage.adapters.dbiter import HistoryPreservingDatabaseIterator
from relstorage.adapters.loadstore import HistoryPreservingOracleLoadStore
from relstorage.adapters.locker import OracleLocker
+from relstorage.adapters.oidallocator import OracleOIDAllocator
from relstorage.adapters.packundo import OracleHistoryPreservingPackUndo
from relstorage.adapters.poller import Poller
from relstorage.adapters.schema import HistoryPreservingOracleSchema
@@ -31,9 +33,15 @@
# disconnected_exceptions contains the exception types that might be
# raised when the connection to the database has been broken.
-disconnected_exceptions = (cx_Oracle.OperationalError,
- cx_Oracle.InterfaceError, cx_Oracle.DatabaseError)
+disconnected_exceptions = (
+ cx_Oracle.OperationalError,
+ cx_Oracle.InterfaceError,
+ cx_Oracle.DatabaseError,
+ )
+# close_exceptions contains the exception types to ignore
+# when the adapter attempts to close a database connection.
+close_exceptions = disconnected_exceptions
class OracleAdapter(object):
"""Oracle adapter for RelStorage."""
@@ -70,14 +78,16 @@
runner=self.runner,
)
self.loadstore = HistoryPreservingOracleLoadStore(
- connmanager=self.connmanager,
runner=self.runner,
- disconnected_exceptions=disconnected_exceptions,
Binary=cx_Oracle.Binary,
inputsize_BLOB=cx_Oracle.BLOB,
inputsize_BINARY=cx_Oracle.BINARY,
twophase=bool(twophase),
)
+ self.oidallocator = OracleOIDAllocator(
+ connmanager=self.connmanager,
+ )
+ self.connmanager.set_on_store_opened(self.loadstore.on_store_opened)
self.txncontrol = OracleTransactionControl(
Binary=cx_Oracle.Binary,
)
@@ -100,6 +110,10 @@
self.open = self.connmanager.open
self.close = self.connmanager.close
+ self.open_for_load = self.connmanager.open_for_load
+ self.restart_load = self.connmanager.restart_load
+ self.open_for_store = self.connmanager.open_for_store
+ self.restart_store = self.connmanager.restart_store
self.hold_commit_lock = self.locker.hold_commit_lock
self.release_commit_lock = self.locker.release_commit_lock
@@ -111,26 +125,22 @@
self.zap_all = self.schema.zap_all
self.drop_all = self.schema.drop_all
- self.open_for_load = self.loadstore.open_for_load
- self.restart_load = self.loadstore.restart_load
self.get_current_tid = self.loadstore.get_current_tid
self.load_current = self.loadstore.load_current
self.load_revision = self.loadstore.load_revision
self.exists = self.loadstore.exists
self.load_before = self.loadstore.load_before
self.get_object_tid_after = self.loadstore.get_object_tid_after
-
- self.open_for_store = self.loadstore.open_for_store
- self.restart_store = self.loadstore.restart_store
self.store_temp = self.loadstore.store_temp
self.replace_temp = self.loadstore.replace_temp
self.restore = self.loadstore.restore
self.detect_conflict = self.loadstore.detect_conflict
self.move_from_temp = self.loadstore.move_from_temp
self.update_current = self.loadstore.update_current
- self.set_min_oid = self.loadstore.set_min_oid
- self.new_oid = self.loadstore.new_oid
+ self.set_min_oid = self.oidallocator.set_min_oid
+ self.new_oid = self.oidallocator.new_oid
+
self.get_tid_and_time = self.txncontrol.get_tid_and_time
self.add_transaction = self.txncontrol.add_transaction
self.commit_phase1 = self.txncontrol.commit_phase1
@@ -222,11 +232,13 @@
isolation_read_committed = "ISOLATION LEVEL READ COMMITTED"
isolation_read_only = "READ ONLY"
- close_exceptions = disconnected_exceptions
+ disconnected_exceptions = disconnected_exceptions
+ close_exceptions = close_exceptions
- def __init__(self, params, arraysize):
+ def __init__(self, params, arraysize, twophase):
self._params = params
self._arraysize = arraysize
+ self._twophase = twophase
def open(self, transaction_mode="ISOLATION LEVEL READ COMMITTED",
twophase=False):
@@ -243,3 +255,58 @@
except cx_Oracle.OperationalError, e:
log.warning("Unable to connect: %s", e)
raise
+
+ def open_for_load(self):
+ """Open and initialize a connection for loading objects.
+
+ Returns (conn, cursor).
+ """
+ return self.open(self.isolation_read_only)
+
+ def restart_load(self, cursor):
+ """Reinitialize a connection for loading objects."""
+ try:
+ cursor.connection.rollback()
+ cursor.execute("SET TRANSACTION READ ONLY")
+ except self.disconnected_exceptions, e:
+ raise StorageError(e)
+
+ def _set_xid(self, conn, cursor):
+ """Set up a distributed transaction"""
+ stmt = """
+ SELECT SYS_CONTEXT('USERENV', 'SID') FROM DUAL
+ """
+ cursor.execute(stmt)
+ xid = str(cursor.fetchone()[0])
+ conn.begin(0, xid, '0')
+
+ def open_for_store(self):
+ """Open and initialize a connection for storing objects.
+
+ Returns (conn, cursor).
+ """
+ try:
+ if self._twophase:
+ conn, cursor = self.open(transaction_mode=None, twophase=True)
+ try:
+ self._set_xid(conn, cursor)
+ else:
+ conn, cursor = self.open()
+ if self.on_store_opened is not None:
+ self.on_store_opened(cursor, restart=False)
+ return conn, cursor
+ except:
+ self.close(conn, cursor)
+ raise
+
+ def restart_store(self, conn, cursor):
+ """Reuse a store connection."""
+ try:
+ conn.rollback()
+ if self._twophase:
+ self._set_xid(conn, cursor)
+ if self.on_store_opened is not None:
+ self.on_store_opened(cursor, restart=True)
+ except self.disconnected_exceptions, e:
+ raise StorageError(e)
+
Modified: relstorage/trunk/relstorage/adapters/postgresql.py
===================================================================
--- relstorage/trunk/relstorage/adapters/postgresql.py 2009-09-23 21:36:09 UTC (rev 104465)
+++ relstorage/trunk/relstorage/adapters/postgresql.py 2009-09-23 23:14:01 UTC (rev 104466)
@@ -21,6 +21,7 @@
from relstorage.adapters.dbiter import HistoryPreservingDatabaseIterator
from relstorage.adapters.loadstore import HistoryPreservingPostgreSQLLoadStore
from relstorage.adapters.locker import PostgreSQLLocker
+from relstorage.adapters.oidallocator import PostgreSQLOIDAllocator
from relstorage.adapters.packundo import HistoryPreservingPackUndo
from relstorage.adapters.poller import Poller
from relstorage.adapters.schema import HistoryPreservingPostgreSQLSchema
@@ -32,8 +33,14 @@
# disconnected_exceptions contains the exception types that might be
# raised when the connection to the database has been broken.
-disconnected_exceptions = (psycopg2.OperationalError, psycopg2.InterfaceError)
+disconnected_exceptions = (
+ psycopg2.OperationalError,
+ psycopg2.InterfaceError,
+ )
+# close_exceptions contains the exception types to ignore
+# when the adapter attempts to close a database connection.
+close_exceptions = disconnected_exceptions
class PostgreSQLAdapter(object):
"""PostgreSQL adapter for RelStorage."""
@@ -48,10 +55,9 @@
locker=self.locker,
connmanager=self.connmanager,
)
- self.loadstore = HistoryPreservingPostgreSQLLoadStore(
- connmanager=self.connmanager,
- disconnected_exceptions=disconnected_exceptions,
- )
+ self.loadstore = HistoryPreservingPostgreSQLLoadStore()
+ self.oidallocator = PostgreSQLOIDAllocator()
+ self.connmanager.set_on_store_opened(self.loadstore.on_store_opened)
self.txncontrol = PostgreSQLTransactionControl()
self.poller = Poller(
poll_query="EXECUTE get_latest_tid",
@@ -72,6 +78,10 @@
self.open = self.connmanager.open
self.close = self.connmanager.close
+ self.open_for_load = self.connmanager.open_for_load
+ self.restart_load = self.connmanager.restart_load
+ self.open_for_store = self.connmanager.open_for_store
+ self.restart_store = self.connmanager.restart_store
self.hold_commit_lock = self.locker.hold_commit_lock
self.release_commit_lock = self.locker.release_commit_lock
@@ -83,26 +93,22 @@
self.zap_all = self.schema.zap_all
self.drop_all = self.schema.drop_all
- self.open_for_load = self.loadstore.open_for_load
- self.restart_load = self.loadstore.restart_load
self.get_current_tid = self.loadstore.get_current_tid
self.load_current = self.loadstore.load_current
self.load_revision = self.loadstore.load_revision
self.exists = self.loadstore.exists
self.load_before = self.loadstore.load_before
self.get_object_tid_after = self.loadstore.get_object_tid_after
-
- self.open_for_store = self.loadstore.open_for_store
- self.restart_store = self.loadstore.restart_store
self.store_temp = self.loadstore.store_temp
self.replace_temp = self.loadstore.replace_temp
self.restore = self.loadstore.restore
self.detect_conflict = self.loadstore.detect_conflict
self.move_from_temp = self.loadstore.move_from_temp
self.update_current = self.loadstore.update_current
- self.set_min_oid = self.loadstore.set_min_oid
- self.new_oid = self.loadstore.new_oid
+ self.set_min_oid = self.oidallocator.set_min_oid
+ self.new_oid = self.oidallocator.new_oid
+
self.get_tid_and_time = self.txncontrol.get_tid_and_time
self.add_transaction = self.txncontrol.add_transaction
self.commit_phase1 = self.txncontrol.commit_phase1
@@ -135,7 +141,8 @@
isolation_serializable = (
psycopg2.extensions.ISOLATION_LEVEL_SERIALIZABLE)
- close_exceptions = disconnected_exceptions
+ disconnected_exceptions = disconnected_exceptions
+ close_exceptions = close_exceptions
def __init__(self, dsn):
self._dsn = dsn
@@ -153,3 +160,19 @@
raise
return conn, cursor
+ def open_for_load(self):
+ """Open and initialize a connection for loading objects.
+
+ Returns (conn, cursor).
+ """
+ conn, cursor = self.open(self.isolation_serializable)
+ stmt = """
+ PREPARE get_latest_tid AS
+ SELECT tid
+ FROM transaction
+ ORDER BY tid DESC
+ LIMIT 1
+ """
+ cursor.execute(stmt)
+ return conn, cursor
+
Modified: relstorage/trunk/relstorage/relstorage.py
===================================================================
--- relstorage/trunk/relstorage/relstorage.py 2009-09-23 21:36:09 UTC (rev 104465)
+++ relstorage/trunk/relstorage/relstorage.py 2009-09-23 23:14:01 UTC (rev 104466)
@@ -195,7 +195,8 @@
self._open_load_connection()
else:
try:
- self._adapter.restart_load(self._load_cursor)
+ self._adapter.restart_load(
+ self._load_conn, self._load_cursor)
except POSException.StorageError, e:
log.warning("Reconnecting load_conn: %s", e)
self._drop_load_connection()
@@ -227,7 +228,8 @@
self._open_store_connection()
else:
try:
- self._adapter.restart_store(self._store_cursor)
+ self._adapter.restart_store(
+ self._store_conn, self._store_cursor)
except POSException.StorageError, e:
log.warning("Reconnecting store_conn: %s", e)
self._drop_store_connection()
More information about the checkins
mailing list