[Checkins] SVN: relstorage/trunk/ Store operations now use multi-insert and multi-delete SQL
Shane Hathaway
shane at hathawaymix.org
Thu Oct 8 05:50:05 EDT 2009
Log message for revision 104916:
Store operations now use multi-insert and multi-delete SQL
statements to reduce the effect of network latency.
Changed:
U relstorage/trunk/CHANGES.txt
A relstorage/trunk/relstorage/adapters/batch.py
U relstorage/trunk/relstorage/adapters/interfaces.py
U relstorage/trunk/relstorage/adapters/mover.py
U relstorage/trunk/relstorage/pylibmc_wrapper.py
U relstorage/trunk/relstorage/storage.py
-=-
Modified: relstorage/trunk/CHANGES.txt
===================================================================
--- relstorage/trunk/CHANGES.txt 2009-10-08 09:39:52 UTC (rev 104915)
+++ relstorage/trunk/CHANGES.txt 2009-10-08 09:50:05 UTC (rev 104916)
@@ -16,6 +16,9 @@
- Added a wrapper module for pylibmc.
+- Store operations now use multi-insert and multi-delete SQL
+ statements to reduce the effect of network latency.
+
- Renamed relstorage.py to storage.py to overcome import issues.
Also moved the Options class to options.py.
Added: relstorage/trunk/relstorage/adapters/batch.py
===================================================================
--- relstorage/trunk/relstorage/adapters/batch.py (rev 0)
+++ relstorage/trunk/relstorage/adapters/batch.py 2009-10-08 09:50:05 UTC (rev 104916)
@@ -0,0 +1,139 @@
+##############################################################################
+#
+# Copyright (c) 2009 Zope Foundation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""Batch table row insert/delete support.
+"""
+
+import re
+
+class RowBatcher(object):
+ """Generic row batcher.
+
+ Expects '%s' parameters and a tuple for each row.
+ """
+
+ row_limit = 100
+ size_limit = 1<<20
+ database_name = None
+
+ def __init__(self, cursor):
+ self.cursor = cursor
+ self.rows_added = 0
+ self.size_added = 0
+ self.deletes = {} # {(table, varname): set([value])}
+ self.inserts = {} # {(command, header, row_schema): {rowkey: [row]}}
+
+ def delete_from(self, table, varname, value):
+ key = (table, varname)
+ values = self.deletes.get(key)
+ if values is None:
+ self.deletes[key] = values = set()
+ values.add(str(value))
+ self.rows_added += 1
+ if self.rows_added >= self.row_limit:
+ self.flush()
+
+ def insert_into(self, header, row_schema, row, rowkey, size,
+ command='INSERT'):
+ key = (command, header, row_schema)
+ rows = self.inserts.get(key)
+ if rows is None:
+ self.inserts[key] = rows = {}
+ rows[rowkey] = row # note that this may replace a row
+ self.rows_added += 1
+ self.size_added += size
+ if (self.rows_added >= self.row_limit
+ or self.size_added >= self.size_limit):
+ self.flush()
+
+ def flush(self):
+ if self.deletes:
+ self.do_deletes()
+ self.deletes.clear()
+ if self.inserts:
+ self.do_inserts()
+ self.inserts.clear()
+ self.rows_added = 0
+ self.size_added = 0
+
+ def do_deletes(self):
+ for (table, varname), values in sorted(self.deletes.items()):
+ value_str = ','.join(values)
+ stmt = "DELETE FROM %s WHERE %s IN (%s)" % (
+ table, varname, value_str)
+ self.cursor.execute(stmt)
+
+ def do_inserts(self):
+ items = sorted(self.inserts.items())
+ for (command, header, row_schema), rows in items:
+ parts = []
+ params = []
+ s = "(%s)" % row_schema
+ for row in rows.values():
+ parts.append(s)
+ params.extend(row)
+ parts = ',\n'.join(parts)
+ stmt = "%s INTO %s VALUES %s" % (command, header, parts)
+ self.cursor.execute(stmt, tuple(params))
+
+
+oracle_rowvar_re = re.compile(":([a-zA-Z0-9_]+)")
+
+class OracleRowBatcher(RowBatcher):
+ """Oracle-specific row batcher.
+
+ Expects :name parameters and a dictionary for each row.
+ """
+
+ def __init__(self, cursor, inputsizes):
+ super(OracleRowBatcher, self).__init__(cursor)
+ self.inputsizes = inputsizes
+
+ def do_inserts(self):
+
+ def replace_var(match):
+ name = match.group(1)
+ new_name = '%s_%d' % (name, rownum)
+ if name in self.inputsizes:
+ stmt_inputsizes[new_name] = self.inputsizes[name]
+ params[new_name] = row[name]
+ return ':%s' % new_name
+
+ items = sorted(self.inserts.items())
+ for (command, header, row_schema), rows in items:
+ stmt_inputsizes = {}
+
+ if len(rows) == 1:
+ # use the single insert syntax
+ row = rows.values()[0]
+ stmt = "INSERT INTO %s VALUES (%s)" % (header, row_schema)
+ for name in self.inputsizes:
+ if name in row:
+ stmt_inputsizes[name] = self.inputsizes[name]
+ if stmt_inputsizes:
+ self.cursor.setinputsizes(**stmt_inputsizes)
+ self.cursor.execute(stmt, row)
+
+ else:
+ # use the multi-insert syntax
+ parts = []
+ params = {}
+ for rownum, row in enumerate(rows.values()):
+ mod_row = oracle_rowvar_re.sub(replace_var, row_schema)
+ parts.append("INTO %s VALUES (%s)" % (header, mod_row))
+
+ parts = '\n'.join(parts)
+ stmt = "INSERT ALL\n%s\nSELECT * FROM DUAL" % parts
+ if stmt_inputsizes:
+ self.cursor.setinputsizes(**stmt_inputsizes)
+ self.cursor.execute(stmt, params)
Modified: relstorage/trunk/relstorage/adapters/interfaces.py
===================================================================
--- relstorage/trunk/relstorage/adapters/interfaces.py 2009-10-08 09:39:52 UTC (rev 104915)
+++ relstorage/trunk/relstorage/adapters/interfaces.py 2009-10-08 09:50:05 UTC (rev 104916)
@@ -216,19 +216,21 @@
initialization is required.
"""
- def store_temp(cursor, oid, prev_tid, data):
- """Store an object in the temporary table."""
+ def make_batcher(cursor):
+ """Return an object to be used for batch store operations."""
- def replace_temp(cursor, oid, prev_tid, data):
- """Replace an object in the temporary table.
+ def store_temp(cursor, batcher, oid, prev_tid, data):
+ """Store an object in the temporary table.
- This happens after conflict resolution.
+ batcher is an object returned by self.make_batcher().
"""
- def restore(cursor, oid, tid, data):
+ def restore(cursor, batcher, oid, tid, data, stmt_buf):
"""Store an object directly, without conflict detection.
Used for copying transactions into this database.
+
+ batcher is an object returned by self.make_batcher().
"""
def detect_conflict(cursor):
@@ -238,6 +240,12 @@
attempted_data). If there is no conflict, returns None.
"""
+ def replace_temp(cursor, oid, prev_tid, data):
+ """Replace an object in the temporary table.
+
+ This happens after conflict resolution.
+ """
+
def move_from_temp(cursor, tid):
"""Moved the temporarily stored objects to permanent storage.
Modified: relstorage/trunk/relstorage/adapters/mover.py
===================================================================
--- relstorage/trunk/relstorage/adapters/mover.py 2009-10-08 09:39:52 UTC (rev 104915)
+++ relstorage/trunk/relstorage/adapters/mover.py 2009-10-08 09:50:05 UTC (rev 104916)
@@ -17,6 +17,8 @@
from base64 import decodestring
from base64 import encodestring
from relstorage.adapters.interfaces import IObjectMover
+from relstorage.adapters.batch import OracleRowBatcher
+from relstorage.adapters.batch import RowBatcher
from zope.interface import implements
try:
@@ -44,9 +46,9 @@
'get_object_tid_after',
'on_store_opened',
'store_temp',
- 'replace_temp',
'restore',
'detect_conflict',
+ 'replace_temp',
'move_from_temp',
'update_current',
)
@@ -59,15 +61,24 @@
self.runner = runner
self.Binary = Binary
self.inputsize_BLOB = inputsize_BLOB
- self.inputsize_BINARY = inputsize_BINARY
+ self.inputsizes = {
+ 'blobdata': inputsize_BLOB,
+ 'rawdata': inputsize_BINARY,
+ }
for method_name in self._method_names:
method = getattr(self, '%s_%s' % (database_name, method_name))
setattr(self, method_name, method)
+ def make_batcher(self, cursor):
+ if self.database_name == 'oracle':
+ return OracleRowBatcher(cursor, self.inputsizes)
+ else:
+ return RowBatcher(cursor)
+
def postgresql_load_current(self, cursor, oid):
"""Returns the current pickle and integer tid for an object.
@@ -381,121 +392,67 @@
- def postgresql_store_temp(self, cursor, oid, prev_tid, data):
+ def postgresql_store_temp(self, cursor, batcher, oid, prev_tid, data):
"""Store an object in the temporary table."""
if self.keep_history:
md5sum = compute_md5sum(data)
else:
md5sum = None
- stmt = """
- DELETE FROM temp_store WHERE zoid = %s;
- INSERT INTO temp_store (zoid, prev_tid, md5, state)
- VALUES (%s, %s, %s, decode(%s, 'base64'))
- """
- cursor.execute(stmt, (oid, oid, prev_tid, md5sum, encodestring(data)))
+ batcher.delete_from('temp_store', 'zoid', oid)
+ batcher.insert_into(
+ "temp_store (zoid, prev_tid, md5, state)",
+ "%s, %s, %s, decode(%s, 'base64')",
+ (oid, prev_tid, md5sum, encodestring(data)),
+ rowkey=oid,
+ size=len(data),
+ )
- def mysql_store_temp(self, cursor, oid, prev_tid, data):
+ def mysql_store_temp(self, cursor, batcher, oid, prev_tid, data):
"""Store an object in the temporary table."""
if self.keep_history:
md5sum = compute_md5sum(data)
else:
md5sum = None
- stmt = """
- REPLACE INTO temp_store (zoid, prev_tid, md5, state)
- VALUES (%s, %s, %s, %s)
- """
- cursor.execute(stmt, (oid, prev_tid, md5sum, self.Binary(data)))
+ batcher.insert_into(
+ "temp_store (zoid, prev_tid, md5, state)",
+ "%s, %s, %s, %s",
+ (oid, prev_tid, md5sum, self.Binary(data)),
+ rowkey=oid,
+ size=len(data),
+ command='REPLACE',
+ )
- def oracle_store_temp(self, cursor, oid, prev_tid, data):
+ def oracle_store_temp(self, cursor, batcher, oid, prev_tid, data):
"""Store an object in the temporary table."""
if self.keep_history:
md5sum = compute_md5sum(data)
else:
md5sum = None
- cursor.execute("DELETE FROM temp_store WHERE zoid = :oid", oid=oid)
+
+ batcher.delete_from('temp_store', 'zoid', oid)
+
+ row = {'oid': oid, 'prev_tid': prev_tid, 'md5sum': md5sum}
if len(data) <= 2000:
# Send data inline for speed. Oracle docs say maximum size
- # of a RAW is 2000 bytes. inputsize_BINARY corresponds with RAW.
- stmt = """
- INSERT INTO temp_store (zoid, prev_tid, md5, state)
- VALUES (:oid, :prev_tid, :md5sum, :rawdata)
- """
- cursor.setinputsizes(rawdata=self.inputsize_BINARY)
- cursor.execute(stmt, oid=oid, prev_tid=prev_tid,
- md5sum=md5sum, rawdata=data)
+ # of a RAW is 2000 bytes.
+ row_schema = ":oid, :prev_tid, :md5sum, :rawdata"
+ row['rawdata'] = data
else:
# Send data as a BLOB
- stmt = """
- INSERT INTO temp_store (zoid, prev_tid, md5, state)
- VALUES (:oid, :prev_tid, :md5sum, :blobdata)
- """
- cursor.setinputsizes(blobdata=self.inputsize_BLOB)
- cursor.execute(stmt, oid=oid, prev_tid=prev_tid,
- md5sum=md5sum, blobdata=data)
+ row_schema = ":oid, :prev_tid, :md5sum, :blobdata"
+ row['blobdata'] = data
+ batcher.insert_into(
+ "temp_store (zoid, prev_tid, md5, state)",
+ row_schema,
+ row,
+ rowkey=oid,
+ size=len(data),
+ )
- def postgresql_replace_temp(self, cursor, oid, prev_tid, data):
- """Replace an object in the temporary table.
-
- This happens after conflict resolution.
- """
- if self.keep_history:
- md5sum = compute_md5sum(data)
- else:
- md5sum = None
- stmt = """
- UPDATE temp_store SET
- prev_tid = %s,
- md5 = %s,
- state = decode(%s, 'base64')
- WHERE zoid = %s
- """
- cursor.execute(stmt, (prev_tid, md5sum, encodestring(data), oid))
-
- def mysql_replace_temp(self, cursor, oid, prev_tid, data):
- """Replace an object in the temporary table.
-
- This happens after conflict resolution.
- """
- if self.keep_history:
- md5sum = compute_md5sum(data)
- else:
- md5sum = None
- stmt = """
- UPDATE temp_store SET
- prev_tid = %s,
- md5 = %s,
- state = %s
- WHERE zoid = %s
- """
- cursor.execute(stmt, (prev_tid, md5sum, self.Binary(data), oid))
-
- def oracle_replace_temp(self, cursor, oid, prev_tid, data):
- """Replace an object in the temporary table.
-
- This happens after conflict resolution.
- """
- if self.keep_history:
- md5sum = compute_md5sum(data)
- else:
- md5sum = None
- stmt = """
- UPDATE temp_store SET
- prev_tid = :prev_tid,
- md5 = :md5sum,
- state = :blobdata
- WHERE zoid = :oid
- """
- cursor.setinputsizes(blobdata=self.inputsize_BLOB)
- cursor.execute(stmt, oid=oid, prev_tid=prev_tid,
- md5sum=md5sum, blobdata=self.Binary(data))
-
-
-
-
- def postgresql_restore(self, cursor, oid, tid, data):
+ def postgresql_restore(self, cursor, batcher, oid, tid, data):
"""Store an object directly, without conflict detection.
Used for copying transactions into this database.
@@ -511,27 +468,30 @@
encoded = None
if self.keep_history:
- stmt = """
- INSERT INTO object_state (zoid, tid, prev_tid, md5, state)
- VALUES (%s, %s,
+ row_schema = """
+ %s, %s,
COALESCE((SELECT tid FROM current_object WHERE zoid = %s), 0),
- %s, decode(%s, 'base64'))
+ %s, decode(%s, 'base64')
"""
- cursor.execute(stmt, (oid, tid, oid, md5sum, encoded))
+ batcher.insert_into(
+ "object_state (zoid, tid, prev_tid, md5, state)",
+ row_schema,
+ (oid, tid, oid, md5sum, encoded),
+ rowkey=(oid, tid),
+ size=len(data or ''),
+ )
else:
- stmt = """
- DELETE FROM object_state
- WHERE zoid = %s
- """
- cursor.execute(stmt, (oid,))
+ batcher.delete_from('object_state', 'zoid', oid)
if data:
- stmt = """
- INSERT INTO object_state (zoid, tid, state)
- VALUES (%s, %s, decode(%s, 'base64'))
- """
- cursor.execute(stmt, (oid, tid, encoded))
+ batcher.insert_into(
+ "object_state (zoid, tid, state)",
+ "%s, %s, decode(%s, 'base64')",
+ (oid, tid, encoded),
+ rowkey=oid,
+ size=len(data),
+ )
- def mysql_restore(self, cursor, oid, tid, data):
+ def mysql_restore(self, cursor, batcher, oid, tid, data):
"""Store an object directly, without conflict detection.
Used for copying transactions into this database.
@@ -547,28 +507,32 @@
encoded = None
if self.keep_history:
- stmt = """
- INSERT INTO object_state (zoid, tid, prev_tid, md5, state)
- VALUES (%s, %s,
+ row_schema = """
+ %s, %s,
COALESCE((SELECT tid FROM current_object WHERE zoid = %s), 0),
- %s, %s)
+ %s, %s
"""
- cursor.execute(stmt, (oid, tid, oid, md5sum, encoded))
+ batcher.insert_into(
+ "object_state (zoid, tid, prev_tid, md5, state)",
+ row_schema,
+ (oid, tid, oid, md5sum, encoded),
+ rowkey=(oid, tid),
+ size=len(data or ''),
+ )
else:
if data:
- stmt = """
- REPLACE INTO object_state (zoid, tid, state)
- VALUES (%s, %s, %s)
- """
- cursor.execute(stmt, (oid, tid, encoded))
+ batcher.insert_into(
+ "object_state (zoid, tid, state)",
+ "%s, %s, %s",
+ (oid, tid, encoded),
+ rowkey=oid,
+ size=len(data),
+ command='REPLACE',
+ )
else:
- stmt = """
- DELETE FROM object_state
- WHERE zoid = %s
- """
- cursor.execute(stmt, (oid,))
+ batcher.delete_from('object_state', 'zoid', oid)
- def oracle_restore(self, cursor, oid, tid, data):
+ def oracle_restore(self, cursor, batcher, oid, tid, data):
"""Store an object directly, without conflict detection.
Used for copying transactions into this database.
@@ -577,52 +541,43 @@
md5sum = compute_md5sum(data)
else:
md5sum = None
- stmt = "DELETE FROM object_state WHERE zoid = :1"
- cursor.execute(stmt, (oid,))
- if not data or len(data) <= 2000:
- # Send data inline for speed. Oracle docs say maximum size
- # of a RAW is 2000 bytes. inputsize_BINARY corresponds with RAW.
- if self.keep_history:
- stmt = """
- INSERT INTO object_state (zoid, tid, prev_tid, md5, state)
- VALUES (:oid, :tid,
- COALESCE(
- (SELECT tid FROM current_object WHERE zoid = :oid), 0),
- :md5sum, :rawdata)
- """
- cursor.setinputsizes(rawdata=self.inputsize_BINARY)
- cursor.execute(stmt, oid=oid, tid=tid,
- md5sum=md5sum, rawdata=data)
+ if self.keep_history:
+ row = {'oid': oid, 'tid': tid, 'md5sum': md5sum}
+ row_schema = """
+ :oid, :tid,
+ COALESCE((SELECT tid FROM current_object WHERE zoid = :oid), 0),
+ :md5sum, :rawdata
+ """
+ if not data or len(data) <= 2000:
+ row['rawdata'] = data
else:
- if data:
- stmt = """
- INSERT INTO object_state (zoid, tid, state)
- VALUES (:oid, :tid, :rawdata)
- """
- cursor.setinputsizes(rawdata=self.inputsize_BINARY)
- cursor.execute(stmt, oid=oid, tid=tid, rawdata=data)
+ row_schema = row_schema.replace(":rawdata", ":blobdata")
+ row['blobdata'] = data
+ batcher.insert_into(
+ "object_state (zoid, tid, prev_tid, md5, state)",
+ row_schema,
+ row,
+ rowkey=(oid, tid),
+ size=len(data or ''),
+ )
else:
- # Send data as a BLOB
- if self.keep_history:
- stmt = """
- INSERT INTO object_state (zoid, tid, prev_tid, md5, state)
- VALUES (:oid, :tid,
- COALESCE(
- (SELECT tid FROM current_object WHERE zoid = :oid), 0),
- :md5sum, :blobdata)
- """
- cursor.setinputsizes(blobdata=self.inputsize_BLOB)
- cursor.execute(stmt, oid=oid, tid=tid,
- md5sum=md5sum, blobdata=data)
- else:
- cursor.execute(stmt, (oid,))
- stmt = """
- INSERT INTO object_state (zoid, tid, state)
- VALUES (:oid, :tid, :blobdata)
- """
- cursor.setinputsizes(blobdata=self.inputsize_BLOB)
- cursor.execute(stmt, oid=oid, tid=tid, blobdata=data)
+ batcher.delete_from('object_state', 'zoid', oid)
+ if data:
+ row = {'oid': oid, 'tid': tid}
+ if len(data) <= 2000:
+ row_schema = ":oid, :tid, :rawdata"
+ row['rawdata'] = data
+ else:
+ row_schema = ":oid, :tid, :blobdata"
+ row['blobdata'] = data
+ batcher.insert_into(
+ "object_state (zoid, tid, state)",
+ row_schema,
+ row,
+ rowkey=oid,
+ size=len(data),
+ )
@@ -716,6 +671,65 @@
+ def postgresql_replace_temp(self, cursor, oid, prev_tid, data):
+ """Replace an object in the temporary table.
+
+ This happens after conflict resolution.
+ """
+ if self.keep_history:
+ md5sum = compute_md5sum(data)
+ else:
+ md5sum = None
+ stmt = """
+ UPDATE temp_store SET
+ prev_tid = %s,
+ md5 = %s,
+ state = decode(%s, 'base64')
+ WHERE zoid = %s
+ """
+ cursor.execute(stmt, (prev_tid, md5sum, encodestring(data), oid))
+
+ def mysql_replace_temp(self, cursor, oid, prev_tid, data):
+ """Replace an object in the temporary table.
+
+ This happens after conflict resolution.
+ """
+ if self.keep_history:
+ md5sum = compute_md5sum(data)
+ else:
+ md5sum = None
+ stmt = """
+ UPDATE temp_store SET
+ prev_tid = %s,
+ md5 = %s,
+ state = %s
+ WHERE zoid = %s
+ """
+ cursor.execute(stmt, (prev_tid, md5sum, self.Binary(data), oid))
+
+ def oracle_replace_temp(self, cursor, oid, prev_tid, data):
+ """Replace an object in the temporary table.
+
+ This happens after conflict resolution.
+ """
+ if self.keep_history:
+ md5sum = compute_md5sum(data)
+ else:
+ md5sum = None
+ stmt = """
+ UPDATE temp_store SET
+ prev_tid = :prev_tid,
+ md5 = :md5sum,
+ state = :blobdata
+ WHERE zoid = :oid
+ """
+ cursor.setinputsizes(blobdata=self.inputsize_BLOB)
+ cursor.execute(stmt, oid=oid, prev_tid=prev_tid,
+ md5sum=md5sum, blobdata=self.Binary(data))
+
+
+
+
def generic_move_from_temp(self, cursor, tid):
"""Moved the temporarily stored objects to permanent storage.
Modified: relstorage/trunk/relstorage/pylibmc_wrapper.py
===================================================================
--- relstorage/trunk/relstorage/pylibmc_wrapper.py 2009-10-08 09:39:52 UTC (rev 104915)
+++ relstorage/trunk/relstorage/pylibmc_wrapper.py 2009-10-08 09:50:05 UTC (rev 104916)
@@ -35,6 +35,11 @@
def __init__(self, servers):
self._client = pylibmc.Client(servers, binary=True)
+ self._client.set_behaviors({
+ "tcp_nodelay": True,
+ #"no block": True,
+ #"buffer requests": True,
+ })
def get(self, key):
try:
Modified: relstorage/trunk/relstorage/storage.py
===================================================================
--- relstorage/trunk/relstorage/storage.py 2009-10-08 09:39:52 UTC (rev 104915)
+++ relstorage/trunk/relstorage/storage.py 2009-10-08 09:50:05 UTC (rev 104916)
@@ -158,6 +158,10 @@
# currently uncommitted transaction.
self._txn_blobs = None
+ # _batcher: An object that accumulates store operations
+ # so they can be executed in batch (to minimize latency).
+ self._batcher = None
+
if options.blob_dir:
from ZODB.blob import FilesystemHelper
self.fshelper = FilesystemHelper(options.blob_dir)
@@ -508,7 +512,8 @@
try:
self._max_stored_oid = max(self._max_stored_oid, oid_int)
# save the data in a temporary table
- adapter.mover.store_temp(cursor, oid_int, prev_tid_int, data)
+ adapter.mover.store_temp(
+ cursor, self._batcher, oid_int, prev_tid_int, data)
return None
finally:
self._lock_release()
@@ -538,7 +543,8 @@
try:
self._max_stored_oid = max(self._max_stored_oid, oid_int)
# save the data. Note that data can be None.
- adapter.mover.restore(cursor, oid_int, tid_int, data)
+ adapter.mover.restore(
+ cursor, self._batcher, oid_int, tid_int, data)
finally:
self._lock_release()
@@ -566,8 +572,10 @@
self._ude = user, desc, ext
self._tstatus = status
+ self._restart_store()
adapter = self._adapter
- self._restart_store()
+ self._batcher = self._adapter.mover.make_batcher(
+ self._store_cursor)
if tid is not None:
# get the commit lock and add the transaction now
@@ -622,6 +630,7 @@
# method was called.
self._prepared_txn = None
self._max_stored_oid = 0
+ self._batcher = None
def _finish_store(self):
@@ -690,6 +699,10 @@
assert cursor is not None
conn = self._store_conn
+ # execute all remaining batch store operations
+ self._batcher.flush()
+
+ # Reserve all OIDs used by this transaction
if self._max_stored_oid > self._max_new_oid:
self._adapter.oidallocator.set_min_oid(
cursor, self._max_stored_oid + 1)
@@ -775,6 +788,7 @@
self._prepared_txn = None
self._tid = None
self._transaction = None
+ self._batcher = None
def _abort(self):
# the lock is held here
@@ -796,6 +810,7 @@
self._prepared_txn = None
self._tid = None
self._transaction = None
+ self._batcher = None
def lastTransaction(self):
return self._ltid
More information about the checkins
mailing list