[Checkins] SVN: relstorage/trunk/ Oracle: Used PL/SQL bulk insert operations to improve write performance.

Shane Hathaway shane at hathawaymix.org
Wed Oct 28 06:23:59 EDT 2009


Log message for revision 105322:
  Oracle: Used PL/SQL bulk insert operations to improve write performance.
  

Changed:
  U   relstorage/trunk/CHANGES.txt
  U   relstorage/trunk/relstorage/adapters/batch.py
  U   relstorage/trunk/relstorage/adapters/mover.py
  U   relstorage/trunk/relstorage/adapters/oracle.py
  U   relstorage/trunk/relstorage/adapters/schema.py

-=-
Modified: relstorage/trunk/CHANGES.txt
===================================================================
--- relstorage/trunk/CHANGES.txt	2009-10-28 10:22:46 UTC (rev 105321)
+++ relstorage/trunk/CHANGES.txt	2009-10-28 10:23:58 UTC (rev 105322)
@@ -32,11 +32,14 @@
   objects, making the adapter code more modular.  Added interfaces
   that describe the duties of each part.
 
-- Oracle: sped up restore operations by sending short blobs inline.
+- Oracle: Sped up restore operations by sending short blobs inline.
 
-- Oracle: use a timeout on commit locks.  This requires installation
+- Oracle: Use a timeout on commit locks.  This requires installation
   of a small PL/SQL package that can access DBMS_LOCK.  See README.txt.
 
+- Oracle: Used PL/SQL bulk insert operations to improve write
+  performance.
+
 - PostgreSQL: use the documented ALTER SEQUENCE RESTART WITH
   statement instead of ALTER SEQUENCE START WITH.
 

Modified: relstorage/trunk/relstorage/adapters/batch.py
===================================================================
--- relstorage/trunk/relstorage/adapters/batch.py	2009-10-28 10:22:46 UTC (rev 105321)
+++ relstorage/trunk/relstorage/adapters/batch.py	2009-10-28 10:23:58 UTC (rev 105322)
@@ -119,6 +119,7 @@
     def __init__(self, cursor, inputsizes):
         super(OracleRowBatcher, self).__init__(cursor)
         self.inputsizes = inputsizes
+        self.array_ops = {}  # {(operation, row_schema): {rowkey: [row]}}
 
     def _do_inserts(self):
 
@@ -159,3 +160,37 @@
                     self.cursor.setinputsizes(**stmt_inputsizes)
                 self.cursor.execute(stmt, params)
 
+    def add_array_op(self, operation, row_schema, row, rowkey, size):
+        key = (operation, row_schema)
+        rows = self.array_ops.get(key)
+        if rows is None:
+            self.array_ops[key] = rows = {}
+        rows[rowkey] = row  # note that this may replace a row
+        self.rows_added += 1
+        self.size_added += size
+        if (self.rows_added >= self.row_limit
+            or self.size_added >= self.size_limit):
+            self.flush()
+
+    def flush(self):
+        if self.deletes:
+            self._do_deletes()
+            self.deletes.clear()
+        if self.inserts:
+            self._do_inserts()
+            self.inserts.clear()
+        if self.array_ops:
+            self._do_array_ops()
+            self.array_ops.clear()
+        self.rows_added = 0
+        self.size_added = 0
+
+    def _do_array_ops(self):
+        items = sorted(self.array_ops.items())
+        for (operation, row_schema), rows in items:
+            r = rows.values()
+            params = []
+            datatypes = [self.inputsizes[name] for name in row_schema.split()]
+            for i, column in enumerate(zip(*r)):
+                params.append(self.cursor.arrayvar(datatypes[i], list(column)))
+            self.cursor.execute(operation, tuple(params))

Modified: relstorage/trunk/relstorage/adapters/mover.py
===================================================================
--- relstorage/trunk/relstorage/adapters/mover.py	2009-10-28 10:22:46 UTC (rev 105321)
+++ relstorage/trunk/relstorage/adapters/mover.py	2009-10-28 10:23:58 UTC (rev 105322)
@@ -56,18 +56,13 @@
         )
 
     def __init__(self, database_name, keep_history, runner=None,
-            Binary=None, inputsize_BLOB=None, inputsize_BINARY=None,
-            version_detector=None):
-        # The inputsize parameters are for Oracle only.
+            Binary=None, inputsizes=None, version_detector=None):
+        # The inputsizes parameter is for Oracle only.
         self.database_name = database_name
         self.keep_history = keep_history
         self.runner = runner
         self.Binary = Binary
-        self.inputsize_BLOB = inputsize_BLOB
-        self.inputsizes = {
-            'blobdata': inputsize_BLOB,
-            'rawdata': inputsize_BINARY,
-            }
+        self.inputsizes = inputsizes
         self.version_detector = version_detector
 
         for method_name in self._method_names:
@@ -439,25 +434,32 @@
         else:
             md5sum = None
 
-        batcher.delete_from('temp_store', 'zoid', oid)
-
-        row = {'oid': oid, 'prev_tid': prev_tid, 'md5sum': md5sum}
         if len(data) <= 2000:
             # Send data inline for speed.  Oracle docs say maximum size
             # of a RAW is 2000 bytes.
-            row_schema = ":oid, :prev_tid, :md5sum, :rawdata"
-            row['rawdata'] = data
+            stmt = "BEGIN relstorage_op.store_temp(:1, :2, :3, :4); END;"
+            batcher.add_array_op(
+                stmt,
+                'oid prev_tid md5sum rawdata',
+                (oid, prev_tid, md5sum, data),
+                rowkey=oid,
+                size=len(data),
+                )
         else:
             # Send data as a BLOB
-            row_schema = ":oid, :prev_tid, :md5sum, :blobdata"
-            row['blobdata'] = data
-        batcher.insert_into(
-            "temp_store (zoid, prev_tid, md5, state)",
-            row_schema,
-            row,
-            rowkey=oid,
-            size=len(data),
-            )
+            row = {
+                'oid': oid,
+                'prev_tid': prev_tid,
+                'md5sum': md5sum,
+                'blobdata': data,
+                }
+            batcher.insert_into(
+                "temp_store (zoid, prev_tid, md5, state)",
+                ":oid, :prev_tid, :md5sum, :blobdata",
+                row,
+                rowkey=oid,
+                size=len(data),
+                )
 
 
 
@@ -552,42 +554,61 @@
         else:
             md5sum = None
 
-        if self.keep_history:
-            row = {'oid': oid, 'tid': tid, 'md5sum': md5sum}
-            row_schema = """
-                :oid, :tid,
-                COALESCE((SELECT tid FROM current_object WHERE zoid = :oid), 0),
-                :md5sum, :rawdata
-            """
-            if not data or len(data) <= 2000:
-                row['rawdata'] = data
+        if not data or len(data) <= 2000:
+            # Send data inline for speed.  Oracle docs say maximum size
+            # of a RAW is 2000 bytes.
+            if self.keep_history:
+                stmt = "BEGIN relstorage_op.restore(:1, :2, :3, :4); END;"
+                batcher.add_array_op(
+                    stmt,
+                    'oid tid md5sum rawdata',
+                    (oid, tid, md5sum, data),
+                    rowkey=(oid, tid),
+                    size=len(data or ''),
+                    )
             else:
-                row_schema = row_schema.replace(":rawdata", ":blobdata")
-                row['blobdata'] = data
-            batcher.insert_into(
-                "object_state (zoid, tid, prev_tid, md5, state)",
-                row_schema,
-                row,
-                rowkey=(oid, tid),
-                size=len(data or ''),
-                )
+                stmt = "BEGIN relstorage_op.restore(:1, :2, :3); END;"
+                batcher.add_array_op(
+                    stmt,
+                    'oid tid rawdata',
+                    (oid, tid, data),
+                    rowkey=(oid, tid),
+                    size=len(data or ''),
+                    )
+
         else:
-            batcher.delete_from('object_state', 'zoid', oid)
-            if data:
-                row = {'oid': oid, 'tid': tid}
-                if len(data) <= 2000:
-                    row_schema = ":oid, :tid, :rawdata"
-                    row['rawdata'] = data
-                else:
-                    row_schema = ":oid, :tid, :blobdata"
-                    row['blobdata'] = data
+            # Send as a BLOB
+            if self.keep_history:
+                row = {
+                    'oid': oid,
+                    'tid': tid,
+                    'md5sum': md5sum,
+                    'blobdata': data,
+                    }
+                row_schema = """
+                    :oid, :tid,
+                    COALESCE((SELECT tid
+                              FROM current_object
+                              WHERE zoid = :oid), 0),
+                    :md5sum, :blobdata
+                """
                 batcher.insert_into(
-                    "object_state (zoid, tid, state)",
+                    "object_state (zoid, tid, prev_tid, md5, state)",
                     row_schema,
                     row,
-                    rowkey=oid,
-                    size=len(data),
+                    rowkey=(oid, tid),
+                    size=len(data or ''),
                     )
+            else:
+                batcher.delete_from('object_state', 'zoid', oid)
+                if data:
+                    batcher.insert_into(
+                        "object_state (zoid, tid, state)",
+                        ":oid, :tid, :blobdata",
+                        {'oid': oid, 'tid': tid, 'blobdata': data},
+                        rowkey=oid,
+                        size=len(data),
+                        )
 
 
 
@@ -733,7 +754,7 @@
             state = :blobdata
         WHERE zoid = :oid
         """
-        cursor.setinputsizes(blobdata=self.inputsize_BLOB)
+        cursor.setinputsizes(blobdata=self.inputsizes['blobdata'])
         cursor.execute(stmt, oid=oid, prev_tid=prev_tid,
             md5sum=md5sum, blobdata=self.Binary(data))
 

Modified: relstorage/trunk/relstorage/adapters/oracle.py
===================================================================
--- relstorage/trunk/relstorage/adapters/oracle.py	2009-10-28 10:22:46 UTC (rev 105321)
+++ relstorage/trunk/relstorage/adapters/oracle.py	2009-10-28 10:23:58 UTC (rev 105322)
@@ -98,8 +98,14 @@
             keep_history=self.keep_history,
             runner=self.runner,
             Binary=cx_Oracle.Binary,
-            inputsize_BLOB=cx_Oracle.BLOB,
-            inputsize_BINARY=cx_Oracle.BINARY,
+            inputsizes={
+                'blobdata': cx_Oracle.BLOB,
+                'rawdata': cx_Oracle.BINARY,
+                'oid': cx_Oracle.NUMBER,
+                'tid': cx_Oracle.NUMBER,
+                'prev_tid': cx_Oracle.NUMBER,
+                'md5sum': cx_Oracle.STRING,
+                },
             )
         self.connmanager.set_on_store_opened(self.mover.on_store_opened)
         self.oidallocator = OracleOIDAllocator(

Modified: relstorage/trunk/relstorage/adapters/schema.py
===================================================================
--- relstorage/trunk/relstorage/adapters/schema.py	2009-10-28 10:22:46 UTC (rev 105321)
+++ relstorage/trunk/relstorage/adapters/schema.py	2009-10-28 10:23:58 UTC (rev 105322)
@@ -338,6 +338,57 @@
         CREATE SEQUENCE zoid_seq;
 """
 
+oracle_history_preserving_plsql = """
+CREATE OR REPLACE PACKAGE relstorage_op AS
+    TYPE numlist IS TABLE OF NUMBER(20) INDEX BY BINARY_INTEGER;
+    TYPE md5list IS TABLE OF VARCHAR2(32) INDEX BY BINARY_INTEGER;
+    TYPE statelist IS TABLE OF RAW(2000) INDEX BY BINARY_INTEGER;
+    PROCEDURE store_temp(
+        zoids IN numlist,
+        prev_tids IN numlist,
+        md5s IN md5list,
+        states IN statelist);
+    PROCEDURE restore(
+        zoids IN numlist,
+        tids IN numlist,
+        md5s IN md5list,
+        states IN statelist);
+END relstorage_op;
+/
+
+CREATE OR REPLACE PACKAGE BODY relstorage_op AS
+    PROCEDURE store_temp(
+        zoids IN numlist,
+        prev_tids IN numlist,
+        md5s IN md5list,
+        states IN statelist) IS
+    BEGIN
+        FORALL indx IN zoids.first..zoids.last
+            DELETE FROM temp_store WHERE zoid = zoids(indx);
+        FORALL indx IN zoids.first..zoids.last
+            INSERT INTO temp_store (zoid, prev_tid, md5, state) VALUES
+            (zoids(indx), prev_tids(indx), md5s(indx), states(indx));
+    END store_temp;
+
+    PROCEDURE restore(
+        zoids IN numlist,
+        tids IN numlist,
+        md5s IN md5list,
+        states IN statelist) IS
+    BEGIN
+        FORALL indx IN zoids.first..zoids.last
+            INSERT INTO object_state (zoid, tid, prev_tid, md5, state) VALUES
+            (zoids(indx), tids(indx),
+            COALESCE((SELECT tid
+                      FROM current_object
+                      WHERE zoid = zoids(indx)), 0),
+            md5s(indx), states(indx));
+    END restore;
+END relstorage_op;
+/
+"""
+
+
 history_free_schema = """
 
 # commit_lock: Held during commit.  Another kind of lock is used for MySQL.
@@ -511,7 +562,53 @@
         CREATE SEQUENCE zoid_seq;
 """
 
+oracle_history_free_plsql = """
+CREATE OR REPLACE PACKAGE relstorage_op AS
+    TYPE numlist IS TABLE OF NUMBER(20) INDEX BY BINARY_INTEGER;
+    TYPE md5list IS TABLE OF VARCHAR2(32) INDEX BY BINARY_INTEGER;
+    TYPE statelist IS TABLE OF RAW(2000) INDEX BY BINARY_INTEGER;
+    PROCEDURE store_temp(
+        zoids IN numlist,
+        prev_tids IN numlist,
+        md5s IN md5list,
+        states IN statelist);
+    PROCEDURE restore(
+        zoids IN numlist,
+        tids IN numlist,
+        states IN statelist);
+END relstorage_op;
+/
 
+CREATE OR REPLACE PACKAGE BODY relstorage_op AS
+    PROCEDURE store_temp(
+        zoids IN numlist,
+        prev_tids IN numlist,
+        md5s IN md5list,
+        states IN statelist) IS
+    BEGIN
+        FORALL indx IN zoids.first..zoids.last
+            DELETE FROM temp_store WHERE zoid = zoids(indx);
+        FORALL indx IN zoids.first..zoids.last
+            INSERT INTO temp_store (zoid, prev_tid, md5, state) VALUES
+            (zoids(indx), prev_tids(indx), md5s(indx), states(indx));
+    END store_temp;
+
+    PROCEDURE restore(
+        zoids IN numlist,
+        tids IN numlist,
+        states IN statelist) IS
+    BEGIN
+        FORALL indx IN zoids.first..zoids.last
+            DELETE FROM object_state WHERE zoid = zoids(indx);
+        FORALL indx IN zoids.first..zoids.last
+            INSERT INTO object_state (zoid, tid, state) VALUES
+            (zoids(indx), tids(indx), states(indx));
+    END restore;
+END relstorage_op;
+/
+"""
+
+
 def filter_script(script, database_name):
     res = []
     match = False
@@ -672,6 +769,23 @@
 
     database_name = 'oracle'
 
+    def create(self, cursor):
+        """Create the database tables and the relstorage_op PL/SQL package."""
+        super(OracleSchemaInstaller, self).create(cursor)
+        if self.keep_history:
+            plsql = oracle_history_preserving_plsql
+        else:
+            plsql = oracle_history_free_plsql
+
+        lines = []
+        for line in plsql.splitlines():
+            if line.strip() == '/':
+                # end of a statement
+                cursor.execute('\n'.join(lines))
+                lines = []
+            elif line.strip():
+                lines.append(line)
+
     def list_tables(self, cursor):
         cursor.execute("SELECT table_name FROM user_tables")
         return [name.lower() for (name,) in cursor]



More information about the checkins mailing list