[Checkins] SVN: relstorage/trunk/ Store operations now use multi-insert and multi-delete SQL

Shane Hathaway shane at hathawaymix.org
Thu Oct 8 05:50:05 EDT 2009


Log message for revision 104916:
  Store operations now use multi-insert and multi-delete SQL
  statements to reduce the effect of network latency.
  

Changed:
  U   relstorage/trunk/CHANGES.txt
  A   relstorage/trunk/relstorage/adapters/batch.py
  U   relstorage/trunk/relstorage/adapters/interfaces.py
  U   relstorage/trunk/relstorage/adapters/mover.py
  U   relstorage/trunk/relstorage/pylibmc_wrapper.py
  U   relstorage/trunk/relstorage/storage.py

-=-
Modified: relstorage/trunk/CHANGES.txt
===================================================================
--- relstorage/trunk/CHANGES.txt	2009-10-08 09:39:52 UTC (rev 104915)
+++ relstorage/trunk/CHANGES.txt	2009-10-08 09:50:05 UTC (rev 104916)
@@ -16,6 +16,9 @@
 
 - Added a wrapper module for pylibmc.
 
+- Store operations now use multi-insert and multi-delete SQL
+  statements to reduce the effect of network latency.
+
 - Renamed relstorage.py to storage.py to overcome import issues.
   Also moved the Options class to options.py.
 

Added: relstorage/trunk/relstorage/adapters/batch.py
===================================================================
--- relstorage/trunk/relstorage/adapters/batch.py	                        (rev 0)
+++ relstorage/trunk/relstorage/adapters/batch.py	2009-10-08 09:50:05 UTC (rev 104916)
@@ -0,0 +1,139 @@
+##############################################################################
+#
+# Copyright (c) 2009 Zope Foundation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""Batch table row insert/delete support.
+"""
+
+import re
+
+class RowBatcher(object):
+    """Generic row batcher.
+
+    Expects '%s' parameters and a tuple for each row.
+    """
+
+    row_limit = 100
+    size_limit = 1<<20
+    database_name = None
+
+    def __init__(self, cursor):
+        self.cursor = cursor
+        self.rows_added = 0
+        self.size_added = 0
+        self.deletes = {}  # {(table, varname): set([value])}
+        self.inserts = {}  # {(command, header, row_schema): {rowkey: [row]}}
+
+    def delete_from(self, table, varname, value):
+        key = (table, varname)
+        values = self.deletes.get(key)
+        if values is None:
+            self.deletes[key] = values = set()
+        values.add(str(value))
+        self.rows_added += 1
+        if self.rows_added >= self.row_limit:
+            self.flush()
+
+    def insert_into(self, header, row_schema, row, rowkey, size,
+            command='INSERT'):
+        key = (command, header, row_schema)
+        rows = self.inserts.get(key)
+        if rows is None:
+            self.inserts[key] = rows = {}
+        rows[rowkey] = row  # note that this may replace a row
+        self.rows_added += 1
+        self.size_added += size
+        if (self.rows_added >= self.row_limit
+            or self.size_added >= self.size_limit):
+            self.flush()
+
+    def flush(self):
+        if self.deletes:
+            self.do_deletes()
+            self.deletes.clear()
+        if self.inserts:
+            self.do_inserts()
+            self.inserts.clear()
+        self.rows_added = 0
+        self.size_added = 0
+
+    def do_deletes(self):
+        for (table, varname), values in sorted(self.deletes.items()):
+            value_str = ','.join(values)
+            stmt = "DELETE FROM %s WHERE %s IN (%s)" % (
+                table, varname, value_str)
+            self.cursor.execute(stmt)
+
+    def do_inserts(self):
+        items = sorted(self.inserts.items())
+        for (command, header, row_schema), rows in items:
+            parts = []
+            params = []
+            s = "(%s)" % row_schema
+            for row in rows.values():
+                parts.append(s)
+                params.extend(row)
+            parts = ',\n'.join(parts)
+            stmt = "%s INTO %s VALUES %s" % (command, header, parts)
+            self.cursor.execute(stmt, tuple(params))
+
+
+oracle_rowvar_re = re.compile(":([a-zA-Z0-9_]+)")
+
+class OracleRowBatcher(RowBatcher):
+    """Oracle-specific row batcher.
+
+    Expects :name parameters and a dictionary for each row.
+    """
+
+    def __init__(self, cursor, inputsizes):
+        super(OracleRowBatcher, self).__init__(cursor)
+        self.inputsizes = inputsizes
+
+    def do_inserts(self):
+
+        def replace_var(match):
+            name = match.group(1)
+            new_name = '%s_%d' % (name, rownum)
+            if name in self.inputsizes:
+                stmt_inputsizes[new_name] = self.inputsizes[name]
+            params[new_name] = row[name]
+            return ':%s' % new_name
+
+        items = sorted(self.inserts.items())
+        for (command, header, row_schema), rows in items:
+            stmt_inputsizes = {}
+
+            if len(rows) == 1:
+                # use the single insert syntax
+                row = rows.values()[0]
+                stmt = "INSERT INTO %s VALUES (%s)" % (header, row_schema)
+                for name in self.inputsizes:
+                    if name in row:
+                        stmt_inputsizes[name] = self.inputsizes[name]
+                if stmt_inputsizes:
+                    self.cursor.setinputsizes(**stmt_inputsizes)
+                self.cursor.execute(stmt, row)
+
+            else:
+                # use the multi-insert syntax
+                parts = []
+                params = {}
+                for rownum, row in enumerate(rows.values()):
+                    mod_row = oracle_rowvar_re.sub(replace_var, row_schema)
+                    parts.append("INTO %s VALUES (%s)" % (header, mod_row))
+
+                parts = '\n'.join(parts)
+                stmt = "INSERT ALL\n%s\nSELECT * FROM DUAL" % parts
+                if stmt_inputsizes:
+                    self.cursor.setinputsizes(**stmt_inputsizes)
+                self.cursor.execute(stmt, params)

Modified: relstorage/trunk/relstorage/adapters/interfaces.py
===================================================================
--- relstorage/trunk/relstorage/adapters/interfaces.py	2009-10-08 09:39:52 UTC (rev 104915)
+++ relstorage/trunk/relstorage/adapters/interfaces.py	2009-10-08 09:50:05 UTC (rev 104916)
@@ -216,19 +216,21 @@
         initialization is required.
         """
 
-    def store_temp(cursor, oid, prev_tid, data):
-        """Store an object in the temporary table."""
+    def make_batcher(cursor):
+        """Return an object to be used for batch store operations."""
 
-    def replace_temp(cursor, oid, prev_tid, data):
-        """Replace an object in the temporary table.
+    def store_temp(cursor, batcher, oid, prev_tid, data):
+        """Store an object in the temporary table.
 
-        This happens after conflict resolution.
+        batcher is an object returned by self.make_batcher().
         """
 
-    def restore(cursor, oid, tid, data):
+    def restore(cursor, batcher, oid, tid, data, stmt_buf):
         """Store an object directly, without conflict detection.
 
         Used for copying transactions into this database.
+
+        batcher is an object returned by self.make_batcher().
         """
 
     def detect_conflict(cursor):
@@ -238,6 +240,12 @@
         attempted_data).  If there is no conflict, returns None.
         """
 
+    def replace_temp(cursor, oid, prev_tid, data):
+        """Replace an object in the temporary table.
+
+        This happens after conflict resolution.
+        """
+
     def move_from_temp(cursor, tid):
         """Moved the temporarily stored objects to permanent storage.
 

Modified: relstorage/trunk/relstorage/adapters/mover.py
===================================================================
--- relstorage/trunk/relstorage/adapters/mover.py	2009-10-08 09:39:52 UTC (rev 104915)
+++ relstorage/trunk/relstorage/adapters/mover.py	2009-10-08 09:50:05 UTC (rev 104916)
@@ -17,6 +17,8 @@
 from base64 import decodestring
 from base64 import encodestring
 from relstorage.adapters.interfaces import IObjectMover
+from relstorage.adapters.batch import OracleRowBatcher
+from relstorage.adapters.batch import RowBatcher
 from zope.interface import implements
 
 try:
@@ -44,9 +46,9 @@
         'get_object_tid_after',
         'on_store_opened',
         'store_temp',
-        'replace_temp',
         'restore',
         'detect_conflict',
+        'replace_temp',
         'move_from_temp',
         'update_current',
         )
@@ -59,15 +61,24 @@
         self.runner = runner
         self.Binary = Binary
         self.inputsize_BLOB = inputsize_BLOB
-        self.inputsize_BINARY = inputsize_BINARY
+        self.inputsizes = {
+            'blobdata': inputsize_BLOB,
+            'rawdata': inputsize_BINARY,
+            }
 
         for method_name in self._method_names:
             method = getattr(self, '%s_%s' % (database_name, method_name))
             setattr(self, method_name, method)
 
+    def make_batcher(self, cursor):
+        if self.database_name == 'oracle':
+            return OracleRowBatcher(cursor, self.inputsizes)
+        else:
+            return RowBatcher(cursor)
 
 
 
+
     def postgresql_load_current(self, cursor, oid):
         """Returns the current pickle and integer tid for an object.
 
@@ -381,121 +392,67 @@
 
 
 
-    def postgresql_store_temp(self, cursor, oid, prev_tid, data):
+    def postgresql_store_temp(self, cursor, batcher, oid, prev_tid, data):
         """Store an object in the temporary table."""
         if self.keep_history:
             md5sum = compute_md5sum(data)
         else:
             md5sum = None
-        stmt = """
-        DELETE FROM temp_store WHERE zoid = %s;
-        INSERT INTO temp_store (zoid, prev_tid, md5, state)
-        VALUES (%s, %s, %s, decode(%s, 'base64'))
-        """
-        cursor.execute(stmt, (oid, oid, prev_tid, md5sum, encodestring(data)))
+        batcher.delete_from('temp_store', 'zoid', oid)
+        batcher.insert_into(
+            "temp_store (zoid, prev_tid, md5, state)",
+            "%s, %s, %s, decode(%s, 'base64')",
+            (oid, prev_tid, md5sum, encodestring(data)),
+            rowkey=oid,
+            size=len(data),
+            )
 
-    def mysql_store_temp(self, cursor, oid, prev_tid, data):
+    def mysql_store_temp(self, cursor, batcher, oid, prev_tid, data):
         """Store an object in the temporary table."""
         if self.keep_history:
             md5sum = compute_md5sum(data)
         else:
             md5sum = None
-        stmt = """
-        REPLACE INTO temp_store (zoid, prev_tid, md5, state)
-        VALUES (%s, %s, %s, %s)
-        """
-        cursor.execute(stmt, (oid, prev_tid, md5sum, self.Binary(data)))
+        batcher.insert_into(
+            "temp_store (zoid, prev_tid, md5, state)",
+            "%s, %s, %s, %s",
+            (oid, prev_tid, md5sum, self.Binary(data)),
+            rowkey=oid,
+            size=len(data),
+            command='REPLACE',
+            )
 
-    def oracle_store_temp(self, cursor, oid, prev_tid, data):
+    def oracle_store_temp(self, cursor, batcher, oid, prev_tid, data):
         """Store an object in the temporary table."""
         if self.keep_history:
             md5sum = compute_md5sum(data)
         else:
             md5sum = None
-        cursor.execute("DELETE FROM temp_store WHERE zoid = :oid", oid=oid)
+
+        batcher.delete_from('temp_store', 'zoid', oid)
+
+        row = {'oid': oid, 'prev_tid': prev_tid, 'md5sum': md5sum}
         if len(data) <= 2000:
             # Send data inline for speed.  Oracle docs say maximum size
-            # of a RAW is 2000 bytes.  inputsize_BINARY corresponds with RAW.
-            stmt = """
-            INSERT INTO temp_store (zoid, prev_tid, md5, state)
-            VALUES (:oid, :prev_tid, :md5sum, :rawdata)
-            """
-            cursor.setinputsizes(rawdata=self.inputsize_BINARY)
-            cursor.execute(stmt, oid=oid, prev_tid=prev_tid,
-                md5sum=md5sum, rawdata=data)
+            # of a RAW is 2000 bytes.
+            row_schema = ":oid, :prev_tid, :md5sum, :rawdata"
+            row['rawdata'] = data
         else:
             # Send data as a BLOB
-            stmt = """
-            INSERT INTO temp_store (zoid, prev_tid, md5, state)
-            VALUES (:oid, :prev_tid, :md5sum, :blobdata)
-            """
-            cursor.setinputsizes(blobdata=self.inputsize_BLOB)
-            cursor.execute(stmt, oid=oid, prev_tid=prev_tid,
-                md5sum=md5sum, blobdata=data)
+            row_schema = ":oid, :prev_tid, :md5sum, :blobdata"
+            row['blobdata'] = data
+        batcher.insert_into(
+            "temp_store (zoid, prev_tid, md5, state)",
+            row_schema,
+            row,
+            rowkey=oid,
+            size=len(data),
+            )
 
 
 
 
-    def postgresql_replace_temp(self, cursor, oid, prev_tid, data):
-        """Replace an object in the temporary table.
-
-        This happens after conflict resolution.
-        """
-        if self.keep_history:
-            md5sum = compute_md5sum(data)
-        else:
-            md5sum = None
-        stmt = """
-        UPDATE temp_store SET
-            prev_tid = %s,
-            md5 = %s,
-            state = decode(%s, 'base64')
-        WHERE zoid = %s
-        """
-        cursor.execute(stmt, (prev_tid, md5sum, encodestring(data), oid))
-
-    def mysql_replace_temp(self, cursor, oid, prev_tid, data):
-        """Replace an object in the temporary table.
-
-        This happens after conflict resolution.
-        """
-        if self.keep_history:
-            md5sum = compute_md5sum(data)
-        else:
-            md5sum = None
-        stmt = """
-        UPDATE temp_store SET
-            prev_tid = %s,
-            md5 = %s,
-            state = %s
-        WHERE zoid = %s
-        """
-        cursor.execute(stmt, (prev_tid, md5sum, self.Binary(data), oid))
-
-    def oracle_replace_temp(self, cursor, oid, prev_tid, data):
-        """Replace an object in the temporary table.
-
-        This happens after conflict resolution.
-        """
-        if self.keep_history:
-            md5sum = compute_md5sum(data)
-        else:
-            md5sum = None
-        stmt = """
-        UPDATE temp_store SET
-            prev_tid = :prev_tid,
-            md5 = :md5sum,
-            state = :blobdata
-        WHERE zoid = :oid
-        """
-        cursor.setinputsizes(blobdata=self.inputsize_BLOB)
-        cursor.execute(stmt, oid=oid, prev_tid=prev_tid,
-            md5sum=md5sum, blobdata=self.Binary(data))
-
-
-
-
-    def postgresql_restore(self, cursor, oid, tid, data):
+    def postgresql_restore(self, cursor, batcher, oid, tid, data):
         """Store an object directly, without conflict detection.
 
         Used for copying transactions into this database.
@@ -511,27 +468,30 @@
             encoded = None
 
         if self.keep_history:
-            stmt = """
-            INSERT INTO object_state (zoid, tid, prev_tid, md5, state)
-            VALUES (%s, %s,
+            row_schema = """
+                %s, %s,
                 COALESCE((SELECT tid FROM current_object WHERE zoid = %s), 0),
-                %s, decode(%s, 'base64'))
+                %s, decode(%s, 'base64')
             """
-            cursor.execute(stmt, (oid, tid, oid, md5sum, encoded))
+            batcher.insert_into(
+                "object_state (zoid, tid, prev_tid, md5, state)",
+                row_schema,
+                (oid, tid, oid, md5sum, encoded),
+                rowkey=(oid, tid),
+                size=len(data or ''),
+                )
         else:
-            stmt = """
-            DELETE FROM object_state
-            WHERE zoid = %s
-            """
-            cursor.execute(stmt, (oid,))
+            batcher.delete_from('object_state', 'zoid', oid)
             if data:
-                stmt = """
-                INSERT INTO object_state (zoid, tid, state)
-                VALUES (%s, %s, decode(%s, 'base64'))
-                """
-                cursor.execute(stmt, (oid, tid, encoded))
+                batcher.insert_into(
+                    "object_state (zoid, tid, state)",
+                    "%s, %s, decode(%s, 'base64')",
+                    (oid, tid, encoded),
+                    rowkey=oid,
+                    size=len(data),
+                    )
 
-    def mysql_restore(self, cursor, oid, tid, data):
+    def mysql_restore(self, cursor, batcher, oid, tid, data):
         """Store an object directly, without conflict detection.
 
         Used for copying transactions into this database.
@@ -547,28 +507,32 @@
             encoded = None
 
         if self.keep_history:
-            stmt = """
-            INSERT INTO object_state (zoid, tid, prev_tid, md5, state)
-            VALUES (%s, %s,
+            row_schema = """
+                %s, %s,
                 COALESCE((SELECT tid FROM current_object WHERE zoid = %s), 0),
-                %s, %s)
+                %s, %s
             """
-            cursor.execute(stmt, (oid, tid, oid, md5sum, encoded))
+            batcher.insert_into(
+                "object_state (zoid, tid, prev_tid, md5, state)",
+                row_schema,
+                (oid, tid, oid, md5sum, encoded),
+                rowkey=(oid, tid),
+                size=len(data or ''),
+                )
         else:
             if data:
-                stmt = """
-                REPLACE INTO object_state (zoid, tid, state)
-                VALUES (%s, %s, %s)
-                """
-                cursor.execute(stmt, (oid, tid, encoded))
+                batcher.insert_into(
+                    "object_state (zoid, tid, state)",
+                    "%s, %s, %s",
+                    (oid, tid, encoded),
+                    rowkey=oid,
+                    size=len(data),
+                    command='REPLACE',
+                    )
             else:
-                stmt = """
-                DELETE FROM object_state
-                WHERE zoid = %s
-                """
-                cursor.execute(stmt, (oid,))
+                batcher.delete_from('object_state', 'zoid', oid)
 
-    def oracle_restore(self, cursor, oid, tid, data):
+    def oracle_restore(self, cursor, batcher, oid, tid, data):
         """Store an object directly, without conflict detection.
 
         Used for copying transactions into this database.
@@ -577,52 +541,43 @@
             md5sum = compute_md5sum(data)
         else:
             md5sum = None
-            stmt = "DELETE FROM object_state WHERE zoid = :1"
-            cursor.execute(stmt, (oid,))
 
-        if not data or len(data) <= 2000:
-            # Send data inline for speed.  Oracle docs say maximum size
-            # of a RAW is 2000 bytes.  inputsize_BINARY corresponds with RAW.
-            if self.keep_history:
-                stmt = """
-                INSERT INTO object_state (zoid, tid, prev_tid, md5, state)
-                VALUES (:oid, :tid,
-                    COALESCE(
-                        (SELECT tid FROM current_object WHERE zoid = :oid), 0),
-                    :md5sum, :rawdata)
-                """
-                cursor.setinputsizes(rawdata=self.inputsize_BINARY)
-                cursor.execute(stmt, oid=oid, tid=tid,
-                    md5sum=md5sum, rawdata=data)
+        if self.keep_history:
+            row = {'oid': oid, 'tid': tid, 'md5sum': md5sum}
+            row_schema = """
+                :oid, :tid,
+                COALESCE((SELECT tid FROM current_object WHERE zoid = :oid), 0),
+                :md5sum, :rawdata
+            """
+            if not data or len(data) <= 2000:
+                row['rawdata'] = data
             else:
-                if data:
-                    stmt = """
-                    INSERT INTO object_state (zoid, tid, state)
-                    VALUES (:oid, :tid, :rawdata)
-                    """
-                    cursor.setinputsizes(rawdata=self.inputsize_BINARY)
-                    cursor.execute(stmt, oid=oid, tid=tid, rawdata=data)
+                row_schema = row_schema.replace(":rawdata", ":blobdata")
+                row['blobdata'] = data
+            batcher.insert_into(
+                "object_state (zoid, tid, prev_tid, md5, state)",
+                row_schema,
+                row,
+                rowkey=(oid, tid),
+                size=len(data or ''),
+                )
         else:
-            # Send data as a BLOB
-            if self.keep_history:
-                stmt = """
-                INSERT INTO object_state (zoid, tid, prev_tid, md5, state)
-                VALUES (:oid, :tid,
-                    COALESCE(
-                        (SELECT tid FROM current_object WHERE zoid = :oid), 0),
-                    :md5sum, :blobdata)
-                """
-                cursor.setinputsizes(blobdata=self.inputsize_BLOB)
-                cursor.execute(stmt, oid=oid, tid=tid,
-                    md5sum=md5sum, blobdata=data)
-            else:
-                cursor.execute(stmt, (oid,))
-                stmt = """
-                INSERT INTO object_state (zoid, tid, state)
-                VALUES (:oid, :tid, :blobdata)
-                """
-                cursor.setinputsizes(blobdata=self.inputsize_BLOB)
-                cursor.execute(stmt, oid=oid, tid=tid, blobdata=data)
+            batcher.delete_from('object_state', 'zoid', oid)
+            if data:
+                row = {'oid': oid, 'tid': tid}
+                if len(data) <= 2000:
+                    row_schema = ":oid, :tid, :rawdata"
+                    row['rawdata'] = data
+                else:
+                    row_schema = ":oid, :tid, :blobdata"
+                    row['blobdata'] = data
+                batcher.insert_into(
+                    "object_state (zoid, tid, state)",
+                    row_schema,
+                    row,
+                    rowkey=oid,
+                    size=len(data),
+                    )
 
 
 
@@ -716,6 +671,65 @@
 
 
 
+    def postgresql_replace_temp(self, cursor, oid, prev_tid, data):
+        """Replace an object in the temporary table.
+
+        This happens after conflict resolution.
+        """
+        if self.keep_history:
+            md5sum = compute_md5sum(data)
+        else:
+            md5sum = None
+        stmt = """
+        UPDATE temp_store SET
+            prev_tid = %s,
+            md5 = %s,
+            state = decode(%s, 'base64')
+        WHERE zoid = %s
+        """
+        cursor.execute(stmt, (prev_tid, md5sum, encodestring(data), oid))
+
+    def mysql_replace_temp(self, cursor, oid, prev_tid, data):
+        """Replace an object in the temporary table.
+
+        This happens after conflict resolution.
+        """
+        if self.keep_history:
+            md5sum = compute_md5sum(data)
+        else:
+            md5sum = None
+        stmt = """
+        UPDATE temp_store SET
+            prev_tid = %s,
+            md5 = %s,
+            state = %s
+        WHERE zoid = %s
+        """
+        cursor.execute(stmt, (prev_tid, md5sum, self.Binary(data), oid))
+
+    def oracle_replace_temp(self, cursor, oid, prev_tid, data):
+        """Replace an object in the temporary table.
+
+        This happens after conflict resolution.
+        """
+        if self.keep_history:
+            md5sum = compute_md5sum(data)
+        else:
+            md5sum = None
+        stmt = """
+        UPDATE temp_store SET
+            prev_tid = :prev_tid,
+            md5 = :md5sum,
+            state = :blobdata
+        WHERE zoid = :oid
+        """
+        cursor.setinputsizes(blobdata=self.inputsize_BLOB)
+        cursor.execute(stmt, oid=oid, prev_tid=prev_tid,
+            md5sum=md5sum, blobdata=self.Binary(data))
+
+
+
+
     def generic_move_from_temp(self, cursor, tid):
         """Moved the temporarily stored objects to permanent storage.
 

Modified: relstorage/trunk/relstorage/pylibmc_wrapper.py
===================================================================
--- relstorage/trunk/relstorage/pylibmc_wrapper.py	2009-10-08 09:39:52 UTC (rev 104915)
+++ relstorage/trunk/relstorage/pylibmc_wrapper.py	2009-10-08 09:50:05 UTC (rev 104916)
@@ -35,6 +35,11 @@
 
     def __init__(self, servers):
         self._client = pylibmc.Client(servers, binary=True)
+        self._client.set_behaviors({
+            "tcp_nodelay": True,
+            #"no block": True,
+            #"buffer requests": True,
+            })
 
     def get(self, key):
         try:

Modified: relstorage/trunk/relstorage/storage.py
===================================================================
--- relstorage/trunk/relstorage/storage.py	2009-10-08 09:39:52 UTC (rev 104915)
+++ relstorage/trunk/relstorage/storage.py	2009-10-08 09:50:05 UTC (rev 104916)
@@ -158,6 +158,10 @@
         # currently uncommitted transaction.
         self._txn_blobs = None
 
+        # _batcher: An object that accumulates store operations
+        # so they can be executed in batch (to minimize latency).
+        self._batcher = None
+
         if options.blob_dir:
             from ZODB.blob import FilesystemHelper
             self.fshelper = FilesystemHelper(options.blob_dir)
@@ -508,7 +512,8 @@
         try:
             self._max_stored_oid = max(self._max_stored_oid, oid_int)
             # save the data in a temporary table
-            adapter.mover.store_temp(cursor, oid_int, prev_tid_int, data)
+            adapter.mover.store_temp(
+                cursor, self._batcher, oid_int, prev_tid_int, data)
             return None
         finally:
             self._lock_release()
@@ -538,7 +543,8 @@
         try:
             self._max_stored_oid = max(self._max_stored_oid, oid_int)
             # save the data.  Note that data can be None.
-            adapter.mover.restore(cursor, oid_int, tid_int, data)
+            adapter.mover.restore(
+                cursor, self._batcher, oid_int, tid_int, data)
         finally:
             self._lock_release()
 
@@ -566,8 +572,10 @@
             self._ude = user, desc, ext
             self._tstatus = status
 
+            self._restart_store()
             adapter = self._adapter
-            self._restart_store()
+            self._batcher = self._adapter.mover.make_batcher(
+                self._store_cursor)
 
             if tid is not None:
                 # get the commit lock and add the transaction now
@@ -622,6 +630,7 @@
         # method was called.
         self._prepared_txn = None
         self._max_stored_oid = 0
+        self._batcher = None
 
 
     def _finish_store(self):
@@ -690,6 +699,10 @@
         assert cursor is not None
         conn = self._store_conn
 
+        # execute all remaining batch store operations
+        self._batcher.flush()
+
+        # Reserve all OIDs used by this transaction
         if self._max_stored_oid > self._max_new_oid:
             self._adapter.oidallocator.set_min_oid(
                 cursor, self._max_stored_oid + 1)
@@ -775,6 +788,7 @@
             self._prepared_txn = None
             self._tid = None
             self._transaction = None
+            self._batcher = None
 
     def _abort(self):
         # the lock is held here
@@ -796,6 +810,7 @@
             self._prepared_txn = None
             self._tid = None
             self._transaction = None
+            self._batcher = None
 
     def lastTransaction(self):
         return self._ltid



More information about the checkins mailing list