[Checkins] SVN: relstorage/trunk/ History-preserving storages now replace objects on restore instead of

Shane Hathaway shane at hathawaymix.org
Mon Apr 12 13:17:07 EDT 2010


Log message for revision 110758:
  History-preserving storages now replace objects on restore instead of
  just inserting them.
  

Changed:
  U   relstorage/trunk/CHANGES.txt
  U   relstorage/trunk/relstorage/adapters/batch.py
  U   relstorage/trunk/relstorage/adapters/interfaces.py
  U   relstorage/trunk/relstorage/adapters/mover.py
  U   relstorage/trunk/relstorage/adapters/schema.py
  U   relstorage/trunk/relstorage/adapters/tests/test_batch.py
  U   relstorage/trunk/relstorage/storage.py
  U   relstorage/trunk/relstorage/tests/RecoveryStorage.py
  U   relstorage/trunk/relstorage/tests/reltestbase.py

-=-
Modified: relstorage/trunk/CHANGES.txt
===================================================================
--- relstorage/trunk/CHANGES.txt	2010-04-12 17:06:48 UTC (rev 110757)
+++ relstorage/trunk/CHANGES.txt	2010-04-12 17:17:07 UTC (rev 110758)
@@ -1,3 +1,9 @@
+Next Release
+------------
+
+- History-preserving storages now replace objects on restore instead of
+  just inserting them.
+
 1.4.0b3 (2010-02-02)
 --------------------
 

Modified: relstorage/trunk/relstorage/adapters/batch.py
===================================================================
--- relstorage/trunk/relstorage/adapters/batch.py	2010-04-12 17:06:48 UTC (rev 110757)
+++ relstorage/trunk/relstorage/adapters/batch.py	2010-04-12 17:17:07 UTC (rev 110758)
@@ -16,6 +16,7 @@
 
 import re
 
+
 class RowBatcher(object):
     """Generic row batcher.
 
@@ -27,19 +28,27 @@
     database_name = None
     support_batch_insert = True
 
-    def __init__(self, cursor):
+    def __init__(self, cursor, row_limit=None):
         self.cursor = cursor
+        if row_limit is not None:
+            self.row_limit = row_limit
         self.rows_added = 0
         self.size_added = 0
-        self.deletes = {}  # {(table, varname): set([value])}
+        self.deletes = {}  # {(table, columns_tuple): set([(column_value,)])}
         self.inserts = {}  # {(command, header, row_schema): {rowkey: [row]}}
 
-    def delete_from(self, table, varname, value):
-        key = (table, varname)
-        values = self.deletes.get(key)
-        if values is None:
-            self.deletes[key] = values = set()
-        values.add(str(value))
+    def delete_from(self, table, **kw):
+        if not kw:
+            raise AssertionError("Need at least one column value")
+        columns = kw.keys()
+        columns.sort()
+        columns = tuple(columns)
+        key = (table, columns)
+        rows = self.deletes.get(key)
+        if rows is None:
+            self.deletes[key] = rows = set()
+        row = tuple(str(kw[column]) for column in columns)
+        rows.add(row)
         self.rows_added += 1
         if self.rows_added >= self.row_limit:
             self.flush()
@@ -68,12 +77,22 @@
         self.size_added = 0
 
     def _do_deletes(self):
-        for (table, varname), values in sorted(self.deletes.items()):
-            v = list(values)
-            v.sort()
-            value_str = ','.join(v)
-            stmt = "DELETE FROM %s WHERE %s IN (%s)" % (
-                table, varname, value_str)
+        for (table, columns), rows in sorted(self.deletes.items()):
+            rows = list(rows)
+            rows.sort()
+            if len(columns) == 1:
+                value_str = ','.join(v for (v,) in rows)
+                stmt = "DELETE FROM %s WHERE %s IN (%s)" % (
+                    table, columns[0], value_str)
+            else:
+                lines = []
+                for row in rows:
+                    line = []
+                    for i, column in enumerate(columns):
+                        line.append("%s = %s" % (column, row[i]))
+                    lines.append(" AND ".join(line))
+                stmt = "DELETE FROM %s WHERE %s" % (
+                    table, " OR ".join(lines))
             self.cursor.execute(stmt)
 
     def _do_inserts(self):
@@ -98,8 +117,8 @@
 
 class PostgreSQLRowBatcher(RowBatcher):
 
-    def __init__(self, cursor, version_detector):
-        super(PostgreSQLRowBatcher, self).__init__(cursor)
+    def __init__(self, cursor, version_detector, row_limit=None):
+        super(PostgreSQLRowBatcher, self).__init__(cursor, row_limit)
         self.support_batch_insert = (
             version_detector.get_version(cursor) >= (8, 2))
 
@@ -116,8 +135,8 @@
     Expects :name parameters and a dictionary for each row.
     """
 
-    def __init__(self, cursor, inputsizes):
-        super(OracleRowBatcher, self).__init__(cursor)
+    def __init__(self, cursor, inputsizes, row_limit=None):
+        super(OracleRowBatcher, self).__init__(cursor, row_limit)
         self.inputsizes = inputsizes
         self.array_ops = {}  # {(operation, row_schema): {rowkey: [row]}}
 

Modified: relstorage/trunk/relstorage/adapters/interfaces.py
===================================================================
--- relstorage/trunk/relstorage/adapters/interfaces.py	2010-04-12 17:06:48 UTC (rev 110757)
+++ relstorage/trunk/relstorage/adapters/interfaces.py	2010-04-12 17:17:07 UTC (rev 110758)
@@ -216,9 +216,13 @@
         initialization is required.
         """
 
-    def make_batcher(cursor):
-        """Return an object to be used for batch store operations."""
+    def make_batcher(cursor, row_limit):
+        """Return an object to be used for batch store operations.
 
+        row_limit is the maximum number of rows to queue before
+        calling the database.
+        """
+
     def store_temp(cursor, batcher, oid, prev_tid, data):
         """Store an object in the temporary table.
 

Modified: relstorage/trunk/relstorage/adapters/mover.py
===================================================================
--- relstorage/trunk/relstorage/adapters/mover.py	2010-04-12 17:06:48 UTC (rev 110757)
+++ relstorage/trunk/relstorage/adapters/mover.py	2010-04-12 17:17:07 UTC (rev 110758)
@@ -385,14 +385,14 @@
 
 
 
-    def postgresql_make_batcher(self, cursor):
-        return PostgreSQLRowBatcher(cursor, self.version_detector)
+    def postgresql_make_batcher(self, cursor, row_limit):
+        return PostgreSQLRowBatcher(cursor, self.version_detector, row_limit)
 
-    def mysql_make_batcher(self, cursor):
-        return MySQLRowBatcher(cursor)
+    def mysql_make_batcher(self, cursor, row_limit):
+        return MySQLRowBatcher(cursor, row_limit)
 
-    def oracle_make_batcher(self, cursor):
-        return OracleRowBatcher(cursor, self.inputsizes)
+    def oracle_make_batcher(self, cursor, row_limit):
+        return OracleRowBatcher(cursor, self.inputsizes, row_limit)
 
 
 
@@ -403,7 +403,7 @@
             md5sum = compute_md5sum(data)
         else:
             md5sum = None
-        batcher.delete_from('temp_store', 'zoid', oid)
+        batcher.delete_from('temp_store', zoid=oid)
         batcher.insert_into(
             "temp_store (zoid, prev_tid, md5, state)",
             "%s, %s, %s, decode(%s, 'base64')",
@@ -480,6 +480,7 @@
             encoded = None
 
         if self.keep_history:
+            batcher.delete_from("object_state", zoid=oid, tid=tid)
             row_schema = """
                 %s, %s,
                 COALESCE((SELECT tid FROM current_object WHERE zoid = %s), 0),
@@ -493,7 +494,7 @@
                 size=len(data or ''),
                 )
         else:
-            batcher.delete_from('object_state', 'zoid', oid)
+            batcher.delete_from('object_state', zoid=oid)
             if data:
                 batcher.insert_into(
                     "object_state (zoid, tid, state)",
@@ -530,6 +531,7 @@
                 (oid, tid, oid, md5sum, encoded),
                 rowkey=(oid, tid),
                 size=len(data or ''),
+                command='REPLACE',
                 )
         else:
             if data:
@@ -542,7 +544,7 @@
                     command='REPLACE',
                     )
             else:
-                batcher.delete_from('object_state', 'zoid', oid)
+                batcher.delete_from('object_state', zoid=oid)
 
     def oracle_restore(self, cursor, batcher, oid, tid, data):
         """Store an object directly, without conflict detection.
@@ -600,7 +602,7 @@
                     size=len(data or ''),
                     )
             else:
-                batcher.delete_from('object_state', 'zoid', oid)
+                batcher.delete_from('object_state', zoid=oid)
                 if data:
                     batcher.insert_into(
                         "object_state (zoid, tid, state)",

Modified: relstorage/trunk/relstorage/adapters/schema.py
===================================================================
--- relstorage/trunk/relstorage/adapters/schema.py	2010-04-12 17:06:48 UTC (rev 110757)
+++ relstorage/trunk/relstorage/adapters/schema.py	2010-04-12 17:17:07 UTC (rev 110758)
@@ -377,6 +377,10 @@
         states IN statelist) IS
     BEGIN
         FORALL indx IN zoids.first..zoids.last
+            DELETE FROM object_state
+            WHERE zoid = zoids(indx)
+                AND tid = tids(indx);
+        FORALL indx IN zoids.first..zoids.last
             INSERT INTO object_state (zoid, tid, prev_tid, md5, state) VALUES
             (zoids(indx), tids(indx),
             COALESCE((SELECT tid

Modified: relstorage/trunk/relstorage/adapters/tests/test_batch.py
===================================================================
--- relstorage/trunk/relstorage/adapters/tests/test_batch.py	2010-04-12 17:06:48 UTC (rev 110757)
+++ relstorage/trunk/relstorage/adapters/tests/test_batch.py	2010-04-12 17:17:07 UTC (rev 110758)
@@ -23,18 +23,29 @@
     def test_delete_defer(self):
         cursor = MockCursor()
         batcher = self.getClass()(cursor)
-        batcher.delete_from("mytable", "id", 2)
+        batcher.delete_from("mytable", id=2)
         self.assertEqual(cursor.executed, [])
         self.assertEqual(batcher.rows_added, 1)
         self.assertEqual(batcher.size_added, 0)
-        self.assertEqual(batcher.deletes, {('mytable', 'id'): set(["2"])})
+        self.assertEqual(batcher.deletes,
+            {('mytable', ('id',)): set([("2",)])})
 
+    def test_delete_multiple_column(self):
+        cursor = MockCursor()
+        batcher = self.getClass()(cursor)
+        batcher.delete_from("mytable", id=2, tid=10)
+        self.assertEqual(cursor.executed, [])
+        self.assertEqual(batcher.rows_added, 1)
+        self.assertEqual(batcher.size_added, 0)
+        self.assertEqual(batcher.deletes,
+            {('mytable', ('id', 'tid')): set([("2", "10")])})
+
     def test_delete_auto_flush(self):
         cursor = MockCursor()
         batcher = self.getClass()(cursor)
         batcher.row_limit = 2
-        batcher.delete_from("mytable", "id", 2)
-        batcher.delete_from("mytable", "id", 1)
+        batcher.delete_from("mytable", id=2)
+        batcher.delete_from("mytable", id=1)
         self.assertEqual(cursor.executed,
             [('DELETE FROM mytable WHERE id IN (1,2)', None)])
         self.assertEqual(batcher.rows_added, 0)
@@ -132,7 +143,7 @@
     def test_flush(self):
         cursor = MockCursor()
         batcher = self.getClass()(cursor)
-        batcher.delete_from("mytable", "id", 1)
+        batcher.delete_from("mytable", id=1)
         batcher.insert_into(
             "mytable (id, name)",
             "%s, id || %s",

Modified: relstorage/trunk/relstorage/storage.py
===================================================================
--- relstorage/trunk/relstorage/storage.py	2010-04-12 17:06:48 UTC (rev 110757)
+++ relstorage/trunk/relstorage/storage.py	2010-04-12 17:17:07 UTC (rev 110758)
@@ -137,6 +137,9 @@
     # so they can be executed in batch (to minimize latency).
     _batcher = None
 
+    # _batcher_row_limit: The number of rows to queue before
+    # calling the database.
+    _batcher_row_limit = 100
 
     def __init__(self, adapter, name=None, create=True,
             options=None, cache=None, **kwoptions):
@@ -632,7 +635,7 @@
             adapter = self._adapter
             self._cache.tpc_begin()
             self._batcher = self._adapter.mover.make_batcher(
-                self._store_cursor)
+                self._store_cursor, self._batcher_row_limit)
 
             if tid is not None:
                 # hold the commit lock and add the transaction now

Modified: relstorage/trunk/relstorage/tests/RecoveryStorage.py
===================================================================
--- relstorage/trunk/relstorage/tests/RecoveryStorage.py	2010-04-12 17:06:48 UTC (rev 110757)
+++ relstorage/trunk/relstorage/tests/RecoveryStorage.py	2010-04-12 17:17:07 UTC (rev 110758)
@@ -16,15 +16,19 @@
 # This is copied from ZODB.tests.RecoveryStorage and expanded to fit
 # history-free storages.
 
+from relstorage.util import is_blob_record
+from transaction import Transaction
+from ZODB import DB
+from ZODB.serialize import referencesf
+from ZODB.tests.StorageTestBase import handle_serials
+from ZODB.tests.StorageTestBase import MinPO
+from ZODB.tests.StorageTestBase import snooze
+from ZODB.tests.StorageTestBase import zodb_pickle
+from ZODB.tests.StorageTestBase import zodb_unpickle
 import itertools
 import time
 import transaction
-from transaction import Transaction
-from ZODB.tests.StorageTestBase import MinPO, zodb_unpickle, snooze
-from ZODB import DB
 import ZODB.POSException
-from ZODB.serialize import referencesf
-from relstorage.util import is_blob_record
 
 
 class IteratorDeepCompare:
@@ -58,12 +62,15 @@
             eq(e1, e2)
 
             # compare the objects in the transaction, but disregard
-            # the order of the objects since that is not important.
-            recs1 = [(r.oid, r) for r in txn1]
+            # the order of the objects and any duplicated records
+            # since those are not important.
+            recs1 = dict([(r.oid, r) for r in txn1])
+            recs2 = dict([(r.oid, r) for r in txn2])
+            eq(len(recs1), len(recs2))
+            recs1 = recs1.items()
             recs1.sort()
-            recs2 = [(r.oid, r) for r in txn2]
+            recs2 = recs2.items()
             recs2.sort()
-            eq(len(recs1), len(recs2))
             for (oid1, rec1), (oid2, rec2) in itertools.izip(recs1, recs2):
                 eq(rec1.oid, rec2.oid)
                 eq(rec1.tid, rec2.tid)
@@ -179,7 +186,30 @@
         raises(KeyError, self._dst.load, obj1._p_oid, '')
         raises(KeyError, self._dst.load, obj2._p_oid, '')
 
+    def checkRestoreAfterDoubleCommit(self):
+        oid = self._storage.new_oid()
+        revid = '\0'*8
+        data1 = zodb_pickle(MinPO(11))
+        data2 = zodb_pickle(MinPO(12))
+        # Begin the transaction
+        t = transaction.Transaction()
+        try:
+            self._storage.tpc_begin(t)
+            # Store an object
+            self._storage.store(oid, revid, data1, '', t)
+            # Store it again
+            r1 = self._storage.store(oid, revid, data2, '', t)
+            # Finish the transaction
+            r2 = self._storage.tpc_vote(t)
+            revid = handle_serials(oid, r1, r2)
+            self._storage.tpc_finish(t)
+        except:
+            self._storage.tpc_abort(t)
+            raise
+        self._dst.copyTransactionsFrom(self._storage)
+        self.compare(self._storage, self._dst)
 
+
 class UndoableRecoveryStorage(BasicRecoveryStorage):
     """These tests require the source storage to be undoable"""
 

Modified: relstorage/trunk/relstorage/tests/reltestbase.py
===================================================================
--- relstorage/trunk/relstorage/tests/reltestbase.py	2010-04-12 17:06:48 UTC (rev 110757)
+++ relstorage/trunk/relstorage/tests/reltestbase.py	2010-04-12 17:17:07 UTC (rev 110758)
@@ -44,6 +44,7 @@
         from relstorage.storage import RelStorage
         adapter = self.make_adapter()
         self._storage = RelStorage(adapter, **kwargs)
+        self._storage._batcher_row_limit = 1
 
     def setUp(self):
         self.open(create=1)



More information about the checkins mailing list