[Checkins] SVN: relstorage/ Integrated inline BLOB retrieval for the Oracle adapter.
Shane Hathaway
shane at hathawaymix.org
Mon Dec 15 17:53:54 EST 2008
Log message for revision 94090:
Integrated inline BLOB retrieval for the Oracle adapter.
Decided to use normal methods instead of conditional method
definition, enabling the creator of OracleAdapter instances
to decide whether or not to use this optimization.
Changed:
U relstorage/branches/1.1/CHANGES.txt
U relstorage/branches/1.1/relstorage/adapters/oracle.py
U relstorage/branches/1.1-jarn/relstorage/adapters/oracle.py
U relstorage/branches/1.1-jarn/setup.py
U relstorage/trunk/CHANGES.txt
U relstorage/trunk/relstorage/adapters/oracle.py
-=-
Modified: relstorage/branches/1.1/CHANGES.txt
===================================================================
--- relstorage/branches/1.1/CHANGES.txt 2008-12-15 22:49:12 UTC (rev 94089)
+++ relstorage/branches/1.1/CHANGES.txt 2008-12-15 22:53:54 UTC (rev 94090)
@@ -17,7 +17,10 @@
- Implemented the database size query in MySQL, based on a patch from
Kazuhiko Shiozaki. Thanks!
+- Optimized Oracle object retrieval by causing BLOBs to be sent inline
+ when possible, based on a patch by Helge Tesdal.
+
RelStorage 1.1c1
- Added optional memcache integration. This is useful when the connection
Modified: relstorage/branches/1.1/relstorage/adapters/oracle.py
===================================================================
--- relstorage/branches/1.1/relstorage/adapters/oracle.py 2008-12-15 22:49:12 UTC (rev 94089)
+++ relstorage/branches/1.1/relstorage/adapters/oracle.py 2008-12-15 22:53:54 UTC (rev 94090)
@@ -15,8 +15,6 @@
import logging
import re
-import thread
-import time
import cx_Oracle
from ZODB.POSException import StorageError
@@ -24,7 +22,28 @@
log = logging.getLogger("relstorage.adapters.oracle")
+def lob_handler(cursor, name, defaultType, size, precision, scale):
+ """cx_Oracle outputtypehandler that causes Oracle to send BLOBs inline.
+ Note that if a BLOB in the result is too large, Oracle generates an
+ error indicating truncation. The execute_lob_stmt() method works
+ around this.
+ """
+ if defaultType == cx_Oracle.BLOB:
+ # Default size for BLOB is 4, we want the whole blob inline.
+ # Typical chunk size is 8132, we choose a multiple - 32528
+ return cursor.var(cx_Oracle.LONG_BINARY, 32528, cursor.arraysize)
+
+def read_lob(value):
+ """Handle an Oracle LOB by returning its byte stream.
+
+ Returns other objects unchanged.
+ """
+ if isinstance(value, cx_Oracle.LOB):
+ return value.read()
+ return value
+
+
class OracleAdapter(Adapter):
"""Oracle adapter for RelStorage."""
@@ -73,12 +92,31 @@
Adapter._scripts['prepack_follow_child_refs'],
}
- def __init__(self, user, password, dsn, twophase=False, arraysize=64):
+ def __init__(self, user, password, dsn, twophase=False, arraysize=64,
+ use_inline_lobs=None):
+ """Create an Oracle adapter.
+
+ The user, password, and dsn parameters are provided to cx_Oracle
+ at connection time.
+
+ If twophase is true, all commits go through an Oracle-level two-phase
+ commit process. This is disabled by default. Even when this option
+ is disabled, the ZODB two-phase commit is still in effect.
+
+ arraysize sets the number of rows to buffer in cx_Oracle. The default
+ is 64.
+
+ use_inline_lobs enables Oracle to send BLOBs inline in response to
+ queries. It depends on features in cx_Oracle 5. The default is None,
+ telling the adapter to auto-detect the presence of cx_Oracle 5.
+ """
self._params = (user, password, dsn)
- self._twophase = twophase
+ self._twophase = bool(twophase)
self._arraysize = arraysize
+ if use_inline_lobs is None:
+ use_inline_lobs = (cx_Oracle.version >= '5.0')
+ self._use_inline_lobs = bool(use_inline_lobs)
-
def _run_script_stmt(self, cursor, generic_stmt, generic_params=()):
"""Execute a statement from a script with the given parameters.
@@ -365,39 +403,67 @@
return tid
return None
+ def execute_lob_stmt(self, cursor, stmt, args=(), default=None):
+ """Execute a statement and return one row with all LOBs inline.
+
+ Returns the value of the default parameter if the result was empty.
+ """
+ if self._use_inline_lobs:
+ try:
+ cursor.outputtypehandler = lob_handler
+ try:
+ cursor.execute(stmt, args)
+ for row in cursor:
+ return row
+ finally:
+ del cursor.outputtypehandler
+ except cx_Oracle.DatabaseError, e:
+ # ORA-01406: fetched column value was truncated
+ error, = e
+ if ((isinstance(error, str) and not error.endswith(' 1406'))
+ or error.code != 1406):
+ raise
+ # Execute the query, but alter it slightly without
+ # changing its meaning, so that the query cache
+ # will see it as a statement that has to be compiled
+ # with different output type parameters.
+ cursor.execute(stmt + ' ', args)
+ for row in cursor:
+ return tuple(map(read_lob, row))
+ else:
+ cursor.execute(stmt, args)
+ for row in cursor:
+ return tuple(map(read_lob, row))
+ return default
+
def load_current(self, cursor, oid):
"""Returns the current pickle and integer tid for an object.
oid is an integer. Returns (None, None) if object does not exist.
"""
- cursor.execute("""
+ stmt = """
SELECT state, tid
FROM current_object
JOIN object_state USING(zoid, tid)
WHERE zoid = :1
- """, (oid,))
- for state, tid in cursor:
- if state is not None:
- state = state.read()
- # else this object's creation has been undone
- return state, tid
- return None, None
+ """
+ return self.execute_lob_stmt(
+ cursor, stmt, (oid,), default=(None, None))
def load_revision(self, cursor, oid, tid):
"""Returns the pickle for an object on a particular transaction.
Returns None if no such state exists.
"""
- cursor.execute("""
+ stmt = """
SELECT state
FROM object_state
WHERE zoid = :1
AND tid = :2
- """, (oid, tid))
- for (state,) in cursor:
- if state is not None:
- return state.read()
- return None
+ """
+ (state,) = self.execute_lob_stmt(
+ cursor, stmt, (oid, tid), default=(None,))
+ return state
def exists(self, cursor, oid):
"""Returns a true value if the given object exists."""
@@ -420,13 +486,8 @@
AND tid < :tid
)
"""
- cursor.execute(stmt, {'oid': oid, 'tid': tid})
- for state, tid in cursor:
- if state is not None:
- state = state.read()
- # else this object's creation has been undone
- return state, tid
- return None, None
+ return self.execute_lob_stmt(cursor, stmt, {'oid': oid, 'tid': tid},
+ default=(None, None))
def get_object_tid_after(self, cursor, oid, tid):
"""Returns the tid of the next change after an object revision.
@@ -577,13 +638,10 @@
JOIN current_object ON (temp_store.zoid = current_object.zoid)
WHERE temp_store.prev_tid != current_object.tid
"""
- cursor.execute(stmt)
- for oid, prev_tid, attempted_prev_tid, data in cursor:
- return oid, prev_tid, attempted_prev_tid, data.read()
- return None
+ return self.execute_lob_stmt(cursor, stmt)
def move_from_temp(self, cursor, tid):
- """Moved the temporarily stored objects to permanent storage.
+ """Move the temporarily stored objects to permanent storage.
Returns the list of oids stored.
"""
Modified: relstorage/branches/1.1-jarn/relstorage/adapters/oracle.py
===================================================================
--- relstorage/branches/1.1-jarn/relstorage/adapters/oracle.py 2008-12-15 22:49:12 UTC (rev 94089)
+++ relstorage/branches/1.1-jarn/relstorage/adapters/oracle.py 2008-12-15 22:53:54 UTC (rev 94090)
@@ -15,18 +15,26 @@
import logging
import re
-import thread
-import time
import cx_Oracle
from ZODB.POSException import StorageError
from common import Adapter
log = logging.getLogger("relstorage.adapters.oracle")
-use_inline_lobs = (cx_Oracle.version >= '5.0')
+def lob_handler(cursor, name, defaultType, size, precision, scale):
+ """cx_Oracle outputtypehandler that causes Oracle to send BLOBs inline.
-def inline(value):
+ Note that if a BLOB in the result is too large, Oracle generates an
+ error indicating truncation. The execute_lob_stmt() method works
+ around this.
+ """
+ if defaultType == cx_Oracle.BLOB:
+ # Default size for BLOB is 4, we want the whole blob inline.
+ # Typical chunk size is 8132, we choose a multiple - 32528
+ return cursor.var(cx_Oracle.LONG_BINARY, 32528, cursor.arraysize)
+
+def read_lob(value):
"""Handle an Oracle LOB by returning its byte stream.
Returns other objects unchanged.
@@ -36,55 +44,6 @@
return value
-if use_inline_lobs:
- def execute_lob_stmt(cursor, stmt, args=(), default=None):
- """Execute a statement and return one row with all LOBs inline.
-
- Returns the value of the default parameter if the result was empty.
- """
- try:
- cursor.execute(stmt, args)
- for row in cursor:
- return row
- except cx_Oracle.DatabaseError, e:
- # ORA-01406: fetched column value was truncated
- if not e.args[0].endswith(' 1406'):
- raise
- del cursor.outputtypehandler
- try:
- # Execute the query, but alter it slightly without
- # changing its meaning, so that the query cache
- # will see it as a statement that has to be compiled
- # with different output type parameters.
- cursor.execute(stmt + ' ', args)
- finally:
- cursor.outputtypehandler = lob_handler
- for row in cursor:
- return tuple(map(inline, row))
- return default
-else:
- def execute_lob_stmt(cursor, stmt, args=(), default=None):
- """Execute a statement and return one row with all LOBs inline.
-
- Returns the value of the default parameter if the result was empty.
- """
- cursor.execute(stmt, args)
- for row in cursor:
- return tuple(map(inline, row))
- return default
-
-
-def lob_handler(cursor, name, defaultType, size, precision, scale):
- """cx_Oracle outputtypehandler that causes Oracle to send BLOBs inline.
-
- Note that if a BLOB in the result is too large, Oracle generates an
- error indicating truncation. The execute_lob_stmt() function works
- around this.
- """
- if defaultType == cx_Oracle.BLOB:
- return cursor.var(cx_Oracle.LONG_BINARY)
-
-
class OracleAdapter(Adapter):
"""Oracle adapter for RelStorage."""
@@ -133,12 +92,31 @@
Adapter._scripts['prepack_follow_child_refs'],
}
- def __init__(self, user, password, dsn, twophase=False, arraysize=64):
+ def __init__(self, user, password, dsn, twophase=False, arraysize=64,
+ use_inline_lobs=None):
+ """Create an Oracle adapter.
+
+ The user, password, and dsn parameters are provided to cx_Oracle
+ at connection time.
+
+ If twophase is true, all commits go through an Oracle-level two-phase
+ commit process. This is disabled by default. Even when this option
+ is disabled, the ZODB two-phase commit is still in effect.
+
+ arraysize sets the number of rows to buffer in cx_Oracle. The default
+ is 64.
+
+ use_inline_lobs enables Oracle to send BLOBs inline in response to
+ queries. It depends on features in cx_Oracle 5. The default is None,
+ telling the adapter to auto-detect the presence of cx_Oracle 5.
+ """
self._params = (user, password, dsn)
- self._twophase = twophase
+ self._twophase = bool(twophase)
self._arraysize = arraysize
+ if use_inline_lobs is None:
+ use_inline_lobs = (cx_Oracle.version >= '5.0')
+ self._use_inline_lobs = bool(use_inline_lobs)
-
def _run_script_stmt(self, cursor, generic_stmt, generic_params=()):
"""Execute a statement from a script with the given parameters.
@@ -159,13 +137,7 @@
params = ()
try:
- if use_inline_lobs:
- del cursor.outputtypehandler
- try:
- cursor.execute(stmt, params)
- finally:
- if use_inline_lobs:
- cursor.outputtypehandler = lob_handler
+ cursor.execute(stmt, params)
except:
log.warning("script statement failed: %r; parameters: %r",
stmt, params)
@@ -356,8 +328,6 @@
conn = cx_Oracle.connect(*self._params, **kw)
cursor = conn.cursor()
cursor.arraysize = self._arraysize
- if use_inline_lobs:
- cursor.outputtypehandler = lob_handler
if transaction_mode:
cursor.execute("SET TRANSACTION %s" % transaction_mode)
return conn, cursor
@@ -433,6 +403,39 @@
return tid
return None
+ def execute_lob_stmt(self, cursor, stmt, args=(), default=None):
+ """Execute a statement and return one row with all LOBs inline.
+
+ Returns the value of the default parameter if the result was empty.
+ """
+ if self._use_inline_lobs:
+ try:
+ cursor.outputtypehandler = lob_handler
+ try:
+ cursor.execute(stmt, args)
+ for row in cursor:
+ return row
+ finally:
+ del cursor.outputtypehandler
+ except cx_Oracle.DatabaseError, e:
+ # ORA-01406: fetched column value was truncated
+ error, = e
+ if ((isinstance(error, str) and not error.endswith(' 1406'))
+ or error.code != 1406):
+ raise
+ # Execute the query, but alter it slightly without
+ # changing its meaning, so that the query cache
+ # will see it as a statement that has to be compiled
+ # with different output type parameters.
+ cursor.execute(stmt + ' ', args)
+ for row in cursor:
+ return tuple(map(read_lob, row))
+ else:
+ cursor.execute(stmt, args)
+ for row in cursor:
+ return tuple(map(read_lob, row))
+ return default
+
def load_current(self, cursor, oid):
"""Returns the current pickle and integer tid for an object.
@@ -444,7 +447,8 @@
JOIN object_state USING(zoid, tid)
WHERE zoid = :1
"""
- return execute_lob_stmt(cursor, stmt, (oid,), default=(None, None))
+ return self.execute_lob_stmt(
+ cursor, stmt, (oid,), default=(None, None))
def load_revision(self, cursor, oid, tid):
"""Returns the pickle for an object on a particular transaction.
@@ -457,7 +461,8 @@
WHERE zoid = :1
AND tid = :2
"""
- (state,) = execute_lob_stmt(cursor, stmt, (oid, tid), default=(None,))
+ (state,) = self.execute_lob_stmt(
+ cursor, stmt, (oid, tid), default=(None,))
return state
def exists(self, cursor, oid):
@@ -481,7 +486,7 @@
AND tid < :tid
)
"""
- return execute_lob_stmt(cursor, stmt, {'oid': oid, 'tid': tid},
+ return self.execute_lob_stmt(cursor, stmt, {'oid': oid, 'tid': tid},
default=(None, None))
def get_object_tid_after(self, cursor, oid, tid):
@@ -633,7 +638,7 @@
JOIN current_object ON (temp_store.zoid = current_object.zoid)
WHERE temp_store.prev_tid != current_object.tid
"""
- return execute_lob_stmt(cursor, stmt)
+ return self.execute_lob_stmt(cursor, stmt)
def move_from_temp(self, cursor, tid):
"""Move the temporarily stored objects to permanent storage.
Modified: relstorage/branches/1.1-jarn/setup.py
===================================================================
--- relstorage/branches/1.1-jarn/setup.py 2008-12-15 22:49:12 UTC (rev 94089)
+++ relstorage/branches/1.1-jarn/setup.py 2008-12-15 22:53:54 UTC (rev 94090)
@@ -27,7 +27,7 @@
with RelStorage.
"""
-VERSION = "1.1jarn2"
+VERSION = "1.1jarn4"
classifiers = """\
Development Status :: 4 - Beta
Modified: relstorage/trunk/CHANGES.txt
===================================================================
--- relstorage/trunk/CHANGES.txt 2008-12-15 22:49:12 UTC (rev 94089)
+++ relstorage/trunk/CHANGES.txt 2008-12-15 22:53:54 UTC (rev 94090)
@@ -17,7 +17,10 @@
- Implemented the database size query in MySQL, based on a patch from
Kazuhiko Shiozaki. Thanks!
+- Optimized Oracle object retrieval by causing BLOBs to be sent inline
+ when possible, based on a patch by Helge Tesdal.
+
RelStorage 1.1c1
- Added optional memcache integration. This is useful when the connection
Modified: relstorage/trunk/relstorage/adapters/oracle.py
===================================================================
--- relstorage/trunk/relstorage/adapters/oracle.py 2008-12-15 22:49:12 UTC (rev 94089)
+++ relstorage/trunk/relstorage/adapters/oracle.py 2008-12-15 22:53:54 UTC (rev 94090)
@@ -15,8 +15,6 @@
import logging
import re
-import thread
-import time
import cx_Oracle
from ZODB.POSException import StorageError
@@ -24,7 +22,28 @@
log = logging.getLogger("relstorage.adapters.oracle")
+def lob_handler(cursor, name, defaultType, size, precision, scale):
+ """cx_Oracle outputtypehandler that causes Oracle to send BLOBs inline.
+ Note that if a BLOB in the result is too large, Oracle generates an
+ error indicating truncation. The execute_lob_stmt() method works
+ around this.
+ """
+ if defaultType == cx_Oracle.BLOB:
+ # Default size for BLOB is 4, we want the whole blob inline.
+ # Typical chunk size is 8132, we choose a multiple - 32528
+ return cursor.var(cx_Oracle.LONG_BINARY, 32528, cursor.arraysize)
+
+def read_lob(value):
+ """Handle an Oracle LOB by returning its byte stream.
+
+ Returns other objects unchanged.
+ """
+ if isinstance(value, cx_Oracle.LOB):
+ return value.read()
+ return value
+
+
class OracleAdapter(Adapter):
"""Oracle adapter for RelStorage."""
@@ -73,12 +92,31 @@
Adapter._scripts['prepack_follow_child_refs'],
}
- def __init__(self, user, password, dsn, twophase=False, arraysize=64):
+ def __init__(self, user, password, dsn, twophase=False, arraysize=64,
+ use_inline_lobs=None):
+ """Create an Oracle adapter.
+
+ The user, password, and dsn parameters are provided to cx_Oracle
+ at connection time.
+
+ If twophase is true, all commits go through an Oracle-level two-phase
+ commit process. This is disabled by default. Even when this option
+ is disabled, the ZODB two-phase commit is still in effect.
+
+ arraysize sets the number of rows to buffer in cx_Oracle. The default
+ is 64.
+
+ use_inline_lobs enables Oracle to send BLOBs inline in response to
+ queries. It depends on features in cx_Oracle 5. The default is None,
+ telling the adapter to auto-detect the presence of cx_Oracle 5.
+ """
self._params = (user, password, dsn)
- self._twophase = twophase
+ self._twophase = bool(twophase)
self._arraysize = arraysize
+ if use_inline_lobs is None:
+ use_inline_lobs = (cx_Oracle.version >= '5.0')
+ self._use_inline_lobs = bool(use_inline_lobs)
-
def _run_script_stmt(self, cursor, generic_stmt, generic_params=()):
"""Execute a statement from a script with the given parameters.
@@ -365,39 +403,67 @@
return tid
return None
+ def execute_lob_stmt(self, cursor, stmt, args=(), default=None):
+ """Execute a statement and return one row with all LOBs inline.
+
+ Returns the value of the default parameter if the result was empty.
+ """
+ if self._use_inline_lobs:
+ try:
+ cursor.outputtypehandler = lob_handler
+ try:
+ cursor.execute(stmt, args)
+ for row in cursor:
+ return row
+ finally:
+ del cursor.outputtypehandler
+ except cx_Oracle.DatabaseError, e:
+ # ORA-01406: fetched column value was truncated
+ error, = e
+ if ((isinstance(error, str) and not error.endswith(' 1406'))
+ or error.code != 1406):
+ raise
+ # Execute the query, but alter it slightly without
+ # changing its meaning, so that the query cache
+ # will see it as a statement that has to be compiled
+ # with different output type parameters.
+ cursor.execute(stmt + ' ', args)
+ for row in cursor:
+ return tuple(map(read_lob, row))
+ else:
+ cursor.execute(stmt, args)
+ for row in cursor:
+ return tuple(map(read_lob, row))
+ return default
+
def load_current(self, cursor, oid):
"""Returns the current pickle and integer tid for an object.
oid is an integer. Returns (None, None) if object does not exist.
"""
- cursor.execute("""
+ stmt = """
SELECT state, tid
FROM current_object
JOIN object_state USING(zoid, tid)
WHERE zoid = :1
- """, (oid,))
- for state, tid in cursor:
- if state is not None:
- state = state.read()
- # else this object's creation has been undone
- return state, tid
- return None, None
+ """
+ return self.execute_lob_stmt(
+ cursor, stmt, (oid,), default=(None, None))
def load_revision(self, cursor, oid, tid):
"""Returns the pickle for an object on a particular transaction.
Returns None if no such state exists.
"""
- cursor.execute("""
+ stmt = """
SELECT state
FROM object_state
WHERE zoid = :1
AND tid = :2
- """, (oid, tid))
- for (state,) in cursor:
- if state is not None:
- return state.read()
- return None
+ """
+ (state,) = self.execute_lob_stmt(
+ cursor, stmt, (oid, tid), default=(None,))
+ return state
def exists(self, cursor, oid):
"""Returns a true value if the given object exists."""
@@ -420,13 +486,8 @@
AND tid < :tid
)
"""
- cursor.execute(stmt, {'oid': oid, 'tid': tid})
- for state, tid in cursor:
- if state is not None:
- state = state.read()
- # else this object's creation has been undone
- return state, tid
- return None, None
+ return self.execute_lob_stmt(cursor, stmt, {'oid': oid, 'tid': tid},
+ default=(None, None))
def get_object_tid_after(self, cursor, oid, tid):
"""Returns the tid of the next change after an object revision.
@@ -577,13 +638,10 @@
JOIN current_object ON (temp_store.zoid = current_object.zoid)
WHERE temp_store.prev_tid != current_object.tid
"""
- cursor.execute(stmt)
- for oid, prev_tid, attempted_prev_tid, data in cursor:
- return oid, prev_tid, attempted_prev_tid, data.read()
- return None
+ return self.execute_lob_stmt(cursor, stmt)
def move_from_temp(self, cursor, tid):
- """Moved the temporarily stored objects to permanent storage.
+ """Move the temporarily stored objects to permanent storage.
Returns the list of oids stored.
"""
More information about the Checkins
mailing list