[Checkins] SVN: relstorage/trunk/ Oracle: Used PL/SQL bulk insert operations to improve write performance.
Shane Hathaway
shane at hathawaymix.org
Wed Oct 28 06:23:59 EDT 2009
Log message for revision 105322:
Oracle: Used PL/SQL bulk insert operations to improve write performance.
Changed:
U relstorage/trunk/CHANGES.txt
U relstorage/trunk/relstorage/adapters/batch.py
U relstorage/trunk/relstorage/adapters/mover.py
U relstorage/trunk/relstorage/adapters/oracle.py
U relstorage/trunk/relstorage/adapters/schema.py
-=-
Modified: relstorage/trunk/CHANGES.txt
===================================================================
--- relstorage/trunk/CHANGES.txt 2009-10-28 10:22:46 UTC (rev 105321)
+++ relstorage/trunk/CHANGES.txt 2009-10-28 10:23:58 UTC (rev 105322)
@@ -32,11 +32,14 @@
objects, making the adapter code more modular. Added interfaces
that describe the duties of each part.
-- Oracle: sped up restore operations by sending short blobs inline.
+- Oracle: Sped up restore operations by sending short blobs inline.
-- Oracle: use a timeout on commit locks. This requires installation
+- Oracle: Use a timeout on commit locks. This requires installation
of a small PL/SQL package that can access DBMS_LOCK. See README.txt.
+- Oracle: Used PL/SQL bulk insert operations to improve write
+ performance.
+
- PostgreSQL: use the documented ALTER SEQUENCE RESTART WITH
statement instead of ALTER SEQUENCE START WITH.
Modified: relstorage/trunk/relstorage/adapters/batch.py
===================================================================
--- relstorage/trunk/relstorage/adapters/batch.py 2009-10-28 10:22:46 UTC (rev 105321)
+++ relstorage/trunk/relstorage/adapters/batch.py 2009-10-28 10:23:58 UTC (rev 105322)
@@ -119,6 +119,7 @@
def __init__(self, cursor, inputsizes):
super(OracleRowBatcher, self).__init__(cursor)
self.inputsizes = inputsizes
+ self.array_ops = {} # {(operation, row_schema): {rowkey: [row]}}
def _do_inserts(self):
@@ -159,3 +160,37 @@
self.cursor.setinputsizes(**stmt_inputsizes)
self.cursor.execute(stmt, params)
+ def add_array_op(self, operation, row_schema, row, rowkey, size):
+ key = (operation, row_schema)
+ rows = self.array_ops.get(key)
+ if rows is None:
+ self.array_ops[key] = rows = {}
+ rows[rowkey] = row # note that this may replace a row
+ self.rows_added += 1
+ self.size_added += size
+ if (self.rows_added >= self.row_limit
+ or self.size_added >= self.size_limit):
+ self.flush()
+
+ def flush(self):
+ if self.deletes:
+ self._do_deletes()
+ self.deletes.clear()
+ if self.inserts:
+ self._do_inserts()
+ self.inserts.clear()
+ if self.array_ops:
+ self._do_array_ops()
+ self.array_ops.clear()
+ self.rows_added = 0
+ self.size_added = 0
+
+ def _do_array_ops(self):
+ items = sorted(self.array_ops.items())
+ for (operation, row_schema), rows in items:
+ r = rows.values()
+ params = []
+ datatypes = [self.inputsizes[name] for name in row_schema.split()]
+ for i, column in enumerate(zip(*r)):
+ params.append(self.cursor.arrayvar(datatypes[i], list(column)))
+ self.cursor.execute(operation, tuple(params))
Modified: relstorage/trunk/relstorage/adapters/mover.py
===================================================================
--- relstorage/trunk/relstorage/adapters/mover.py 2009-10-28 10:22:46 UTC (rev 105321)
+++ relstorage/trunk/relstorage/adapters/mover.py 2009-10-28 10:23:58 UTC (rev 105322)
@@ -56,18 +56,13 @@
)
def __init__(self, database_name, keep_history, runner=None,
- Binary=None, inputsize_BLOB=None, inputsize_BINARY=None,
- version_detector=None):
- # The inputsize parameters are for Oracle only.
+ Binary=None, inputsizes=None, version_detector=None):
+ # The inputsizes parameter is for Oracle only.
self.database_name = database_name
self.keep_history = keep_history
self.runner = runner
self.Binary = Binary
- self.inputsize_BLOB = inputsize_BLOB
- self.inputsizes = {
- 'blobdata': inputsize_BLOB,
- 'rawdata': inputsize_BINARY,
- }
+ self.inputsizes = inputsizes
self.version_detector = version_detector
for method_name in self._method_names:
@@ -439,25 +434,32 @@
else:
md5sum = None
- batcher.delete_from('temp_store', 'zoid', oid)
-
- row = {'oid': oid, 'prev_tid': prev_tid, 'md5sum': md5sum}
if len(data) <= 2000:
# Send data inline for speed. Oracle docs say maximum size
# of a RAW is 2000 bytes.
- row_schema = ":oid, :prev_tid, :md5sum, :rawdata"
- row['rawdata'] = data
+ stmt = "BEGIN relstorage_op.store_temp(:1, :2, :3, :4); END;"
+ batcher.add_array_op(
+ stmt,
+ 'oid prev_tid md5sum rawdata',
+ (oid, prev_tid, md5sum, data),
+ rowkey=oid,
+ size=len(data),
+ )
else:
# Send data as a BLOB
- row_schema = ":oid, :prev_tid, :md5sum, :blobdata"
- row['blobdata'] = data
- batcher.insert_into(
- "temp_store (zoid, prev_tid, md5, state)",
- row_schema,
- row,
- rowkey=oid,
- size=len(data),
- )
+ row = {
+ 'oid': oid,
+ 'prev_tid': prev_tid,
+ 'md5sum': md5sum,
+ 'blobdata': data,
+ }
+ batcher.insert_into(
+ "temp_store (zoid, prev_tid, md5, state)",
+ ":oid, :prev_tid, :md5sum, :blobdata",
+ row,
+ rowkey=oid,
+ size=len(data),
+ )
@@ -552,42 +554,61 @@
else:
md5sum = None
- if self.keep_history:
- row = {'oid': oid, 'tid': tid, 'md5sum': md5sum}
- row_schema = """
- :oid, :tid,
- COALESCE((SELECT tid FROM current_object WHERE zoid = :oid), 0),
- :md5sum, :rawdata
- """
- if not data or len(data) <= 2000:
- row['rawdata'] = data
+ if not data or len(data) <= 2000:
+ # Send data inline for speed. Oracle docs say maximum size
+ # of a RAW is 2000 bytes.
+ if self.keep_history:
+ stmt = "BEGIN relstorage_op.restore(:1, :2, :3, :4); END;"
+ batcher.add_array_op(
+ stmt,
+ 'oid tid md5sum rawdata',
+ (oid, tid, md5sum, data),
+ rowkey=(oid, tid),
+ size=len(data or ''),
+ )
else:
- row_schema = row_schema.replace(":rawdata", ":blobdata")
- row['blobdata'] = data
- batcher.insert_into(
- "object_state (zoid, tid, prev_tid, md5, state)",
- row_schema,
- row,
- rowkey=(oid, tid),
- size=len(data or ''),
- )
+ stmt = "BEGIN relstorage_op.restore(:1, :2, :3); END;"
+ batcher.add_array_op(
+ stmt,
+ 'oid tid rawdata',
+ (oid, tid, data),
+ rowkey=(oid, tid),
+ size=len(data or ''),
+ )
+
else:
- batcher.delete_from('object_state', 'zoid', oid)
- if data:
- row = {'oid': oid, 'tid': tid}
- if len(data) <= 2000:
- row_schema = ":oid, :tid, :rawdata"
- row['rawdata'] = data
- else:
- row_schema = ":oid, :tid, :blobdata"
- row['blobdata'] = data
+ # Send as a BLOB
+ if self.keep_history:
+ row = {
+ 'oid': oid,
+ 'tid': tid,
+ 'md5sum': md5sum,
+ 'blobdata': data,
+ }
+ row_schema = """
+ :oid, :tid,
+ COALESCE((SELECT tid
+ FROM current_object
+ WHERE zoid = :oid), 0),
+ :md5sum, :blobdata
+ """
batcher.insert_into(
- "object_state (zoid, tid, state)",
+ "object_state (zoid, tid, prev_tid, md5, state)",
row_schema,
row,
- rowkey=oid,
- size=len(data),
+ rowkey=(oid, tid),
+ size=len(data or ''),
)
+ else:
+ batcher.delete_from('object_state', 'zoid', oid)
+ if data:
+ batcher.insert_into(
+ "object_state (zoid, tid, state)",
+ ":oid, :tid, :blobdata",
+ {'oid': oid, 'tid': tid, 'blobdata': data},
+ rowkey=oid,
+ size=len(data),
+ )
@@ -733,7 +754,7 @@
state = :blobdata
WHERE zoid = :oid
"""
- cursor.setinputsizes(blobdata=self.inputsize_BLOB)
+ cursor.setinputsizes(blobdata=self.inputsizes['blobdata'])
cursor.execute(stmt, oid=oid, prev_tid=prev_tid,
md5sum=md5sum, blobdata=self.Binary(data))
Modified: relstorage/trunk/relstorage/adapters/oracle.py
===================================================================
--- relstorage/trunk/relstorage/adapters/oracle.py 2009-10-28 10:22:46 UTC (rev 105321)
+++ relstorage/trunk/relstorage/adapters/oracle.py 2009-10-28 10:23:58 UTC (rev 105322)
@@ -98,8 +98,14 @@
keep_history=self.keep_history,
runner=self.runner,
Binary=cx_Oracle.Binary,
- inputsize_BLOB=cx_Oracle.BLOB,
- inputsize_BINARY=cx_Oracle.BINARY,
+ inputsizes={
+ 'blobdata': cx_Oracle.BLOB,
+ 'rawdata': cx_Oracle.BINARY,
+ 'oid': cx_Oracle.NUMBER,
+ 'tid': cx_Oracle.NUMBER,
+ 'prev_tid': cx_Oracle.NUMBER,
+ 'md5sum': cx_Oracle.STRING,
+ },
)
self.connmanager.set_on_store_opened(self.mover.on_store_opened)
self.oidallocator = OracleOIDAllocator(
Modified: relstorage/trunk/relstorage/adapters/schema.py
===================================================================
--- relstorage/trunk/relstorage/adapters/schema.py 2009-10-28 10:22:46 UTC (rev 105321)
+++ relstorage/trunk/relstorage/adapters/schema.py 2009-10-28 10:23:58 UTC (rev 105322)
@@ -338,6 +338,57 @@
CREATE SEQUENCE zoid_seq;
"""
+oracle_history_preserving_plsql = """
+CREATE OR REPLACE PACKAGE relstorage_op AS
+ TYPE numlist IS TABLE OF NUMBER(20) INDEX BY BINARY_INTEGER;
+ TYPE md5list IS TABLE OF VARCHAR2(32) INDEX BY BINARY_INTEGER;
+ TYPE statelist IS TABLE OF RAW(2000) INDEX BY BINARY_INTEGER;
+ PROCEDURE store_temp(
+ zoids IN numlist,
+ prev_tids IN numlist,
+ md5s IN md5list,
+ states IN statelist);
+ PROCEDURE restore(
+ zoids IN numlist,
+ tids IN numlist,
+ md5s IN md5list,
+ states IN statelist);
+END relstorage_op;
+/
+
+CREATE OR REPLACE PACKAGE BODY relstorage_op AS
+ PROCEDURE store_temp(
+ zoids IN numlist,
+ prev_tids IN numlist,
+ md5s IN md5list,
+ states IN statelist) IS
+ BEGIN
+ FORALL indx IN zoids.first..zoids.last
+ DELETE FROM temp_store WHERE zoid = zoids(indx);
+ FORALL indx IN zoids.first..zoids.last
+ INSERT INTO temp_store (zoid, prev_tid, md5, state) VALUES
+ (zoids(indx), prev_tids(indx), md5s(indx), states(indx));
+ END store_temp;
+
+ PROCEDURE restore(
+ zoids IN numlist,
+ tids IN numlist,
+ md5s IN md5list,
+ states IN statelist) IS
+ BEGIN
+ FORALL indx IN zoids.first..zoids.last
+ INSERT INTO object_state (zoid, tid, prev_tid, md5, state) VALUES
+ (zoids(indx), tids(indx),
+ COALESCE((SELECT tid
+ FROM current_object
+ WHERE zoid = zoids(indx)), 0),
+ md5s(indx), states(indx));
+ END restore;
+END relstorage_op;
+/
+"""
+
+
history_free_schema = """
# commit_lock: Held during commit. Another kind of lock is used for MySQL.
@@ -511,7 +562,53 @@
CREATE SEQUENCE zoid_seq;
"""
+oracle_history_free_plsql = """
+CREATE OR REPLACE PACKAGE relstorage_op AS
+ TYPE numlist IS TABLE OF NUMBER(20) INDEX BY BINARY_INTEGER;
+ TYPE md5list IS TABLE OF VARCHAR2(32) INDEX BY BINARY_INTEGER;
+ TYPE statelist IS TABLE OF RAW(2000) INDEX BY BINARY_INTEGER;
+ PROCEDURE store_temp(
+ zoids IN numlist,
+ prev_tids IN numlist,
+ md5s IN md5list,
+ states IN statelist);
+ PROCEDURE restore(
+ zoids IN numlist,
+ tids IN numlist,
+ states IN statelist);
+END relstorage_op;
+/
+CREATE OR REPLACE PACKAGE BODY relstorage_op AS
+ PROCEDURE store_temp(
+ zoids IN numlist,
+ prev_tids IN numlist,
+ md5s IN md5list,
+ states IN statelist) IS
+ BEGIN
+ FORALL indx IN zoids.first..zoids.last
+ DELETE FROM temp_store WHERE zoid = zoids(indx);
+ FORALL indx IN zoids.first..zoids.last
+ INSERT INTO temp_store (zoid, prev_tid, md5, state) VALUES
+ (zoids(indx), prev_tids(indx), md5s(indx), states(indx));
+ END store_temp;
+
+ PROCEDURE restore(
+ zoids IN numlist,
+ tids IN numlist,
+ states IN statelist) IS
+ BEGIN
+ FORALL indx IN zoids.first..zoids.last
+ DELETE FROM object_state WHERE zoid = zoids(indx);
+ FORALL indx IN zoids.first..zoids.last
+ INSERT INTO object_state (zoid, tid, state) VALUES
+ (zoids(indx), tids(indx), states(indx));
+ END restore;
+END relstorage_op;
+/
+"""
+
+
def filter_script(script, database_name):
res = []
match = False
@@ -672,6 +769,23 @@
database_name = 'oracle'
+ def create(self, cursor):
+ """Create the database tables and the relstorage_op PL/SQL package."""
+ super(OracleSchemaInstaller, self).create(cursor)
+ if self.keep_history:
+ plsql = oracle_history_preserving_plsql
+ else:
+ plsql = oracle_history_free_plsql
+
+ lines = []
+ for line in plsql.splitlines():
+ if line.strip() == '/':
+ # end of a statement
+ cursor.execute('\n'.join(lines))
+ lines = []
+ elif line.strip():
+ lines.append(line)
+
def list_tables(self, cursor):
cursor.execute("SELECT table_name FROM user_tables")
return [name.lower() for (name,) in cursor]
More information about the checkins
mailing list