[Checkins] SVN: relstorage/ Integrated inline BLOB retrieval for the Oracle adapter.

Shane Hathaway shane at hathawaymix.org
Mon Dec 15 17:53:54 EST 2008


Log message for revision 94090:
  Integrated inline BLOB retrieval for the Oracle adapter.
  
  Decided to use normal methods instead of conditional method
  definition, enabling the creator of OracleAdapter instances
  to decide whether or not to use this optimization.
  

Changed:
  U   relstorage/branches/1.1/CHANGES.txt
  U   relstorage/branches/1.1/relstorage/adapters/oracle.py
  U   relstorage/branches/1.1-jarn/relstorage/adapters/oracle.py
  U   relstorage/branches/1.1-jarn/setup.py
  U   relstorage/trunk/CHANGES.txt
  U   relstorage/trunk/relstorage/adapters/oracle.py

-=-
Modified: relstorage/branches/1.1/CHANGES.txt
===================================================================
--- relstorage/branches/1.1/CHANGES.txt	2008-12-15 22:49:12 UTC (rev 94089)
+++ relstorage/branches/1.1/CHANGES.txt	2008-12-15 22:53:54 UTC (rev 94090)
@@ -17,7 +17,10 @@
 - Implemented the database size query in MySQL, based on a patch from
   Kazuhiko Shiozaki.  Thanks!
 
+- Optimized Oracle object retrieval by causing BLOBs to be sent inline
+  when possible, based on a patch by Helge Tesdal.
 
+
 RelStorage 1.1c1
 
 - Added optional memcache integration.  This is useful when the connection

Modified: relstorage/branches/1.1/relstorage/adapters/oracle.py
===================================================================
--- relstorage/branches/1.1/relstorage/adapters/oracle.py	2008-12-15 22:49:12 UTC (rev 94089)
+++ relstorage/branches/1.1/relstorage/adapters/oracle.py	2008-12-15 22:53:54 UTC (rev 94090)
@@ -15,8 +15,6 @@
 
 import logging
 import re
-import thread
-import time
 import cx_Oracle
 from ZODB.POSException import StorageError
 
@@ -24,7 +22,28 @@
 
 log = logging.getLogger("relstorage.adapters.oracle")
 
+def lob_handler(cursor, name, defaultType, size, precision, scale):
+    """cx_Oracle outputtypehandler that causes Oracle to send BLOBs inline.
 
+    Note that if a BLOB in the result is too large, Oracle generates an
+    error indicating truncation.  The execute_lob_stmt() method works
+    around this.
+    """
+    if defaultType == cx_Oracle.BLOB:
+        # Default size for BLOB is 4, we want the whole blob inline.
+        # Typical chunk size is 8132, we choose a multiple - 32528
+        return cursor.var(cx_Oracle.LONG_BINARY, 32528, cursor.arraysize)
+
+def read_lob(value):
+    """Handle an Oracle LOB by returning its byte stream.
+
+    Returns other objects unchanged.
+    """
+    if isinstance(value, cx_Oracle.LOB):
+        return value.read()
+    return value
+
+
 class OracleAdapter(Adapter):
     """Oracle adapter for RelStorage."""
 
@@ -73,12 +92,31 @@
             Adapter._scripts['prepack_follow_child_refs'],
     }
 
-    def __init__(self, user, password, dsn, twophase=False, arraysize=64):
+    def __init__(self, user, password, dsn, twophase=False, arraysize=64,
+            use_inline_lobs=None):
+        """Create an Oracle adapter.
+
+        The user, password, and dsn parameters are provided to cx_Oracle
+        at connection time.
+
+        If twophase is true, all commits go through an Oracle-level two-phase
+        commit process.  This is disabled by default.  Even when this option
+        is disabled, the ZODB two-phase commit is still in effect.
+
+        arraysize sets the number of rows to buffer in cx_Oracle.  The default
+        is 64.
+
+        use_inline_lobs enables Oracle to send BLOBs inline in response to
+        queries.  It depends on features in cx_Oracle 5.  The default is None,
+        telling the adapter to auto-detect the presence of cx_Oracle 5.
+        """
         self._params = (user, password, dsn)
-        self._twophase = twophase
+        self._twophase = bool(twophase)
         self._arraysize = arraysize
+        if use_inline_lobs is None:
+            use_inline_lobs = (cx_Oracle.version >= '5.0')
+        self._use_inline_lobs = bool(use_inline_lobs)
 
-
     def _run_script_stmt(self, cursor, generic_stmt, generic_params=()):
         """Execute a statement from a script with the given parameters.
 
@@ -365,39 +403,67 @@
             return tid
         return None
 
+    def execute_lob_stmt(self, cursor, stmt, args=(), default=None):
+        """Execute a statement and return one row with all LOBs inline.
+
+        Returns the value of the default parameter if the result was empty.
+        """
+        if self._use_inline_lobs:
+            try:
+                cursor.outputtypehandler = lob_handler
+                try:
+                    cursor.execute(stmt, args)
+                    for row in cursor:
+                        return row
+                finally:
+                    del cursor.outputtypehandler
+            except cx_Oracle.DatabaseError, e:
+                # ORA-01406: fetched column value was truncated
+                error, = e
+                if ((isinstance(error, str) and not error.endswith(' 1406'))
+                        or error.code != 1406):
+                    raise
+                # Execute the query, but alter it slightly without
+                # changing its meaning, so that the query cache
+                # will see it as a statement that has to be compiled
+                # with different output type parameters.
+                cursor.execute(stmt + ' ', args)
+                for row in cursor:
+                    return tuple(map(read_lob, row))
+        else:
+            cursor.execute(stmt, args)
+            for row in cursor:
+                return tuple(map(read_lob, row))
+        return default
+
     def load_current(self, cursor, oid):
         """Returns the current pickle and integer tid for an object.
 
         oid is an integer.  Returns (None, None) if object does not exist.
         """
-        cursor.execute("""
+        stmt = """
         SELECT state, tid
         FROM current_object
             JOIN object_state USING(zoid, tid)
         WHERE zoid = :1
-        """, (oid,))
-        for state, tid in cursor:
-            if state is not None:
-                state = state.read()
-            # else this object's creation has been undone
-            return state, tid
-        return None, None
+        """
+        return self.execute_lob_stmt(
+            cursor, stmt, (oid,), default=(None, None))
 
     def load_revision(self, cursor, oid, tid):
         """Returns the pickle for an object on a particular transaction.
 
         Returns None if no such state exists.
         """
-        cursor.execute("""
+        stmt = """
         SELECT state
         FROM object_state
         WHERE zoid = :1
             AND tid = :2
-        """, (oid, tid))
-        for (state,) in cursor:
-            if state is not None:
-                return state.read()
-        return None
+        """
+        (state,) = self.execute_lob_stmt(
+            cursor, stmt, (oid, tid), default=(None,))
+        return state
 
     def exists(self, cursor, oid):
         """Returns a true value if the given object exists."""
@@ -420,13 +486,8 @@
                     AND tid < :tid
             )
         """
-        cursor.execute(stmt, {'oid': oid, 'tid': tid})
-        for state, tid in cursor:
-            if state is not None:
-                state = state.read()
-            # else this object's creation has been undone
-            return state, tid
-        return None, None
+        return self.execute_lob_stmt(cursor, stmt, {'oid': oid, 'tid': tid},
+             default=(None, None))
 
     def get_object_tid_after(self, cursor, oid, tid):
         """Returns the tid of the next change after an object revision.
@@ -577,13 +638,10 @@
             JOIN current_object ON (temp_store.zoid = current_object.zoid)
         WHERE temp_store.prev_tid != current_object.tid
         """
-        cursor.execute(stmt)
-        for oid, prev_tid, attempted_prev_tid, data in cursor:
-            return oid, prev_tid, attempted_prev_tid, data.read()
-        return None
+        return self.execute_lob_stmt(cursor, stmt)
 
     def move_from_temp(self, cursor, tid):
-        """Moved the temporarily stored objects to permanent storage.
+        """Move the temporarily stored objects to permanent storage.
 
         Returns the list of oids stored.
         """

Modified: relstorage/branches/1.1-jarn/relstorage/adapters/oracle.py
===================================================================
--- relstorage/branches/1.1-jarn/relstorage/adapters/oracle.py	2008-12-15 22:49:12 UTC (rev 94089)
+++ relstorage/branches/1.1-jarn/relstorage/adapters/oracle.py	2008-12-15 22:53:54 UTC (rev 94090)
@@ -15,18 +15,26 @@
 
 import logging
 import re
-import thread
-import time
 import cx_Oracle
 from ZODB.POSException import StorageError
 
 from common import Adapter
 
 log = logging.getLogger("relstorage.adapters.oracle")
-use_inline_lobs = (cx_Oracle.version >= '5.0')
 
+def lob_handler(cursor, name, defaultType, size, precision, scale):
+    """cx_Oracle outputtypehandler that causes Oracle to send BLOBs inline.
 
-def inline(value):
+    Note that if a BLOB in the result is too large, Oracle generates an
+    error indicating truncation.  The execute_lob_stmt() method works
+    around this.
+    """
+    if defaultType == cx_Oracle.BLOB:
+        # Default size for BLOB is 4, we want the whole blob inline.
+        # Typical chunk size is 8132, we choose a multiple - 32528
+        return cursor.var(cx_Oracle.LONG_BINARY, 32528, cursor.arraysize)
+
+def read_lob(value):
     """Handle an Oracle LOB by returning its byte stream.
 
     Returns other objects unchanged.
@@ -36,55 +44,6 @@
     return value
 
 
-if use_inline_lobs:
-    def execute_lob_stmt(cursor, stmt, args=(), default=None):
-        """Execute a statement and return one row with all LOBs inline.
-
-        Returns the value of the default parameter if the result was empty.
-        """
-        try:
-            cursor.execute(stmt, args)
-            for row in cursor:
-                return row
-        except cx_Oracle.DatabaseError, e:
-            # ORA-01406: fetched column value was truncated
-            if not e.args[0].endswith(' 1406'):
-                raise
-            del cursor.outputtypehandler
-            try:
-                # Execute the query, but alter it slightly without
-                # changing its meaning, so that the query cache
-                # will see it as a statement that has to be compiled
-                # with different output type parameters.
-                cursor.execute(stmt + ' ', args)
-            finally:
-                cursor.outputtypehandler = lob_handler
-            for row in cursor:
-                return tuple(map(inline, row))
-        return default
-else:
-    def execute_lob_stmt(cursor, stmt, args=(), default=None):
-        """Execute a statement and return one row with all LOBs inline.
-
-        Returns the value of the default parameter if the result was empty.
-        """
-        cursor.execute(stmt, args)
-        for row in cursor:
-            return tuple(map(inline, row))
-        return default
-
-
-def lob_handler(cursor, name, defaultType, size, precision, scale):
-    """cx_Oracle outputtypehandler that causes Oracle to send BLOBs inline.
-
-    Note that if a BLOB in the result is too large, Oracle generates an
-    error indicating truncation.  The execute_lob_stmt() function works
-    around this.
-    """
-    if defaultType == cx_Oracle.BLOB:
-        return cursor.var(cx_Oracle.LONG_BINARY)
-
-
 class OracleAdapter(Adapter):
     """Oracle adapter for RelStorage."""
 
@@ -133,12 +92,31 @@
             Adapter._scripts['prepack_follow_child_refs'],
     }
 
-    def __init__(self, user, password, dsn, twophase=False, arraysize=64):
+    def __init__(self, user, password, dsn, twophase=False, arraysize=64,
+            use_inline_lobs=None):
+        """Create an Oracle adapter.
+
+        The user, password, and dsn parameters are provided to cx_Oracle
+        at connection time.
+
+        If twophase is true, all commits go through an Oracle-level two-phase
+        commit process.  This is disabled by default.  Even when this option
+        is disabled, the ZODB two-phase commit is still in effect.
+
+        arraysize sets the number of rows to buffer in cx_Oracle.  The default
+        is 64.
+
+        use_inline_lobs enables Oracle to send BLOBs inline in response to
+        queries.  It depends on features in cx_Oracle 5.  The default is None,
+        telling the adapter to auto-detect the presence of cx_Oracle 5.
+        """
         self._params = (user, password, dsn)
-        self._twophase = twophase
+        self._twophase = bool(twophase)
         self._arraysize = arraysize
+        if use_inline_lobs is None:
+            use_inline_lobs = (cx_Oracle.version >= '5.0')
+        self._use_inline_lobs = bool(use_inline_lobs)
 
-
     def _run_script_stmt(self, cursor, generic_stmt, generic_params=()):
         """Execute a statement from a script with the given parameters.
 
@@ -159,13 +137,7 @@
             params = ()
 
         try:
-            if use_inline_lobs:
-                del cursor.outputtypehandler
-            try:
-                cursor.execute(stmt, params)
-            finally:
-                if use_inline_lobs:
-                    cursor.outputtypehandler = lob_handler
+            cursor.execute(stmt, params)
         except:
             log.warning("script statement failed: %r; parameters: %r",
                 stmt, params)
@@ -356,8 +328,6 @@
             conn = cx_Oracle.connect(*self._params, **kw)
             cursor = conn.cursor()
             cursor.arraysize = self._arraysize
-            if use_inline_lobs:
-                cursor.outputtypehandler = lob_handler
             if transaction_mode:
                 cursor.execute("SET TRANSACTION %s" % transaction_mode)
             return conn, cursor
@@ -433,6 +403,39 @@
             return tid
         return None
 
+    def execute_lob_stmt(self, cursor, stmt, args=(), default=None):
+        """Execute a statement and return one row with all LOBs inline.
+
+        Returns the value of the default parameter if the result was empty.
+        """
+        if self._use_inline_lobs:
+            try:
+                cursor.outputtypehandler = lob_handler
+                try:
+                    cursor.execute(stmt, args)
+                    for row in cursor:
+                        return row
+                finally:
+                    del cursor.outputtypehandler
+            except cx_Oracle.DatabaseError, e:
+                # ORA-01406: fetched column value was truncated
+                error, = e
+                if ((isinstance(error, str) and not error.endswith(' 1406'))
+                        or error.code != 1406):
+                    raise
+                # Execute the query, but alter it slightly without
+                # changing its meaning, so that the query cache
+                # will see it as a statement that has to be compiled
+                # with different output type parameters.
+                cursor.execute(stmt + ' ', args)
+                for row in cursor:
+                    return tuple(map(read_lob, row))
+        else:
+            cursor.execute(stmt, args)
+            for row in cursor:
+                return tuple(map(read_lob, row))
+        return default
+
     def load_current(self, cursor, oid):
         """Returns the current pickle and integer tid for an object.
 
@@ -444,7 +447,8 @@
             JOIN object_state USING(zoid, tid)
         WHERE zoid = :1
         """
-        return execute_lob_stmt(cursor, stmt, (oid,), default=(None, None))
+        return self.execute_lob_stmt(
+            cursor, stmt, (oid,), default=(None, None))
 
     def load_revision(self, cursor, oid, tid):
         """Returns the pickle for an object on a particular transaction.
@@ -457,7 +461,8 @@
         WHERE zoid = :1
             AND tid = :2
         """
-        (state,) = execute_lob_stmt(cursor, stmt, (oid, tid), default=(None,))
+        (state,) = self.execute_lob_stmt(
+            cursor, stmt, (oid, tid), default=(None,))
         return state
 
     def exists(self, cursor, oid):
@@ -481,7 +486,7 @@
                     AND tid < :tid
             )
         """
-        return execute_lob_stmt(cursor, stmt, {'oid': oid, 'tid': tid},
+        return self.execute_lob_stmt(cursor, stmt, {'oid': oid, 'tid': tid},
              default=(None, None))
 
     def get_object_tid_after(self, cursor, oid, tid):
@@ -633,7 +638,7 @@
             JOIN current_object ON (temp_store.zoid = current_object.zoid)
         WHERE temp_store.prev_tid != current_object.tid
         """
-        return execute_lob_stmt(cursor, stmt)
+        return self.execute_lob_stmt(cursor, stmt)
 
     def move_from_temp(self, cursor, tid):
         """Move the temporarily stored objects to permanent storage.

Modified: relstorage/branches/1.1-jarn/setup.py
===================================================================
--- relstorage/branches/1.1-jarn/setup.py	2008-12-15 22:49:12 UTC (rev 94089)
+++ relstorage/branches/1.1-jarn/setup.py	2008-12-15 22:53:54 UTC (rev 94090)
@@ -27,7 +27,7 @@
 with RelStorage.
 """
 
-VERSION = "1.1jarn2"
+VERSION = "1.1jarn4"
 
 classifiers = """\
 Development Status :: 4 - Beta

Modified: relstorage/trunk/CHANGES.txt
===================================================================
--- relstorage/trunk/CHANGES.txt	2008-12-15 22:49:12 UTC (rev 94089)
+++ relstorage/trunk/CHANGES.txt	2008-12-15 22:53:54 UTC (rev 94090)
@@ -17,7 +17,10 @@
 - Implemented the database size query in MySQL, based on a patch from
   Kazuhiko Shiozaki.  Thanks!
 
+- Optimized Oracle object retrieval by causing BLOBs to be sent inline
+  when possible, based on a patch by Helge Tesdal.
 
+
 RelStorage 1.1c1
 
 - Added optional memcache integration.  This is useful when the connection

Modified: relstorage/trunk/relstorage/adapters/oracle.py
===================================================================
--- relstorage/trunk/relstorage/adapters/oracle.py	2008-12-15 22:49:12 UTC (rev 94089)
+++ relstorage/trunk/relstorage/adapters/oracle.py	2008-12-15 22:53:54 UTC (rev 94090)
@@ -15,8 +15,6 @@
 
 import logging
 import re
-import thread
-import time
 import cx_Oracle
 from ZODB.POSException import StorageError
 
@@ -24,7 +22,28 @@
 
 log = logging.getLogger("relstorage.adapters.oracle")
 
+def lob_handler(cursor, name, defaultType, size, precision, scale):
+    """cx_Oracle outputtypehandler that causes Oracle to send BLOBs inline.
 
+    Note that if a BLOB in the result is too large, Oracle generates an
+    error indicating truncation.  The execute_lob_stmt() method works
+    around this.
+    """
+    if defaultType == cx_Oracle.BLOB:
+        # Default size for BLOB is 4, we want the whole blob inline.
+        # Typical chunk size is 8132, we choose a multiple - 32528
+        return cursor.var(cx_Oracle.LONG_BINARY, 32528, cursor.arraysize)
+
+def read_lob(value):
+    """Handle an Oracle LOB by returning its byte stream.
+
+    Returns other objects unchanged.
+    """
+    if isinstance(value, cx_Oracle.LOB):
+        return value.read()
+    return value
+
+
 class OracleAdapter(Adapter):
     """Oracle adapter for RelStorage."""
 
@@ -73,12 +92,31 @@
             Adapter._scripts['prepack_follow_child_refs'],
     }
 
-    def __init__(self, user, password, dsn, twophase=False, arraysize=64):
+    def __init__(self, user, password, dsn, twophase=False, arraysize=64,
+            use_inline_lobs=None):
+        """Create an Oracle adapter.
+
+        The user, password, and dsn parameters are provided to cx_Oracle
+        at connection time.
+
+        If twophase is true, all commits go through an Oracle-level two-phase
+        commit process.  This is disabled by default.  Even when this option
+        is disabled, the ZODB two-phase commit is still in effect.
+
+        arraysize sets the number of rows to buffer in cx_Oracle.  The default
+        is 64.
+
+        use_inline_lobs enables Oracle to send BLOBs inline in response to
+        queries.  It depends on features in cx_Oracle 5.  The default is None,
+        telling the adapter to auto-detect the presence of cx_Oracle 5.
+        """
         self._params = (user, password, dsn)
-        self._twophase = twophase
+        self._twophase = bool(twophase)
         self._arraysize = arraysize
+        if use_inline_lobs is None:
+            use_inline_lobs = (cx_Oracle.version >= '5.0')
+        self._use_inline_lobs = bool(use_inline_lobs)
 
-
     def _run_script_stmt(self, cursor, generic_stmt, generic_params=()):
         """Execute a statement from a script with the given parameters.
 
@@ -365,39 +403,67 @@
             return tid
         return None
 
+    def execute_lob_stmt(self, cursor, stmt, args=(), default=None):
+        """Execute a statement and return one row with all LOBs inline.
+
+        Returns the value of the default parameter if the result was empty.
+        """
+        if self._use_inline_lobs:
+            try:
+                cursor.outputtypehandler = lob_handler
+                try:
+                    cursor.execute(stmt, args)
+                    for row in cursor:
+                        return row
+                finally:
+                    del cursor.outputtypehandler
+            except cx_Oracle.DatabaseError, e:
+                # ORA-01406: fetched column value was truncated
+                error, = e
+                if ((isinstance(error, str) and not error.endswith(' 1406'))
+                        or error.code != 1406):
+                    raise
+                # Execute the query, but alter it slightly without
+                # changing its meaning, so that the query cache
+                # will see it as a statement that has to be compiled
+                # with different output type parameters.
+                cursor.execute(stmt + ' ', args)
+                for row in cursor:
+                    return tuple(map(read_lob, row))
+        else:
+            cursor.execute(stmt, args)
+            for row in cursor:
+                return tuple(map(read_lob, row))
+        return default
+
     def load_current(self, cursor, oid):
         """Returns the current pickle and integer tid for an object.
 
         oid is an integer.  Returns (None, None) if object does not exist.
         """
-        cursor.execute("""
+        stmt = """
         SELECT state, tid
         FROM current_object
             JOIN object_state USING(zoid, tid)
         WHERE zoid = :1
-        """, (oid,))
-        for state, tid in cursor:
-            if state is not None:
-                state = state.read()
-            # else this object's creation has been undone
-            return state, tid
-        return None, None
+        """
+        return self.execute_lob_stmt(
+            cursor, stmt, (oid,), default=(None, None))
 
     def load_revision(self, cursor, oid, tid):
         """Returns the pickle for an object on a particular transaction.
 
         Returns None if no such state exists.
         """
-        cursor.execute("""
+        stmt = """
         SELECT state
         FROM object_state
         WHERE zoid = :1
             AND tid = :2
-        """, (oid, tid))
-        for (state,) in cursor:
-            if state is not None:
-                return state.read()
-        return None
+        """
+        (state,) = self.execute_lob_stmt(
+            cursor, stmt, (oid, tid), default=(None,))
+        return state
 
     def exists(self, cursor, oid):
         """Returns a true value if the given object exists."""
@@ -420,13 +486,8 @@
                     AND tid < :tid
             )
         """
-        cursor.execute(stmt, {'oid': oid, 'tid': tid})
-        for state, tid in cursor:
-            if state is not None:
-                state = state.read()
-            # else this object's creation has been undone
-            return state, tid
-        return None, None
+        return self.execute_lob_stmt(cursor, stmt, {'oid': oid, 'tid': tid},
+             default=(None, None))
 
     def get_object_tid_after(self, cursor, oid, tid):
         """Returns the tid of the next change after an object revision.
@@ -577,13 +638,10 @@
             JOIN current_object ON (temp_store.zoid = current_object.zoid)
         WHERE temp_store.prev_tid != current_object.tid
         """
-        cursor.execute(stmt)
-        for oid, prev_tid, attempted_prev_tid, data in cursor:
-            return oid, prev_tid, attempted_prev_tid, data.read()
-        return None
+        return self.execute_lob_stmt(cursor, stmt)
 
     def move_from_temp(self, cursor, tid):
-        """Moved the temporarily stored objects to permanent storage.
+        """Move the temporarily stored objects to permanent storage.
 
         Returns the list of oids stored.
         """



More information about the Checkins mailing list