[Checkins] SVN: relstorage/trunk/ History-preserving storages now replace objects on restore instead of
Shane Hathaway
shane at hathawaymix.org
Mon Apr 12 13:17:07 EDT 2010
Log message for revision 110758:
History-preserving storages now replace objects on restore instead of
just inserting them.
Changed:
U relstorage/trunk/CHANGES.txt
U relstorage/trunk/relstorage/adapters/batch.py
U relstorage/trunk/relstorage/adapters/interfaces.py
U relstorage/trunk/relstorage/adapters/mover.py
U relstorage/trunk/relstorage/adapters/schema.py
U relstorage/trunk/relstorage/adapters/tests/test_batch.py
U relstorage/trunk/relstorage/storage.py
U relstorage/trunk/relstorage/tests/RecoveryStorage.py
U relstorage/trunk/relstorage/tests/reltestbase.py
-=-
Modified: relstorage/trunk/CHANGES.txt
===================================================================
--- relstorage/trunk/CHANGES.txt 2010-04-12 17:06:48 UTC (rev 110757)
+++ relstorage/trunk/CHANGES.txt 2010-04-12 17:17:07 UTC (rev 110758)
@@ -1,3 +1,9 @@
+Next Release
+------------
+
+- History-preserving storages now replace objects on restore instead of
+ just inserting them.
+
1.4.0b3 (2010-02-02)
--------------------
Modified: relstorage/trunk/relstorage/adapters/batch.py
===================================================================
--- relstorage/trunk/relstorage/adapters/batch.py 2010-04-12 17:06:48 UTC (rev 110757)
+++ relstorage/trunk/relstorage/adapters/batch.py 2010-04-12 17:17:07 UTC (rev 110758)
@@ -16,6 +16,7 @@
import re
+
class RowBatcher(object):
"""Generic row batcher.
@@ -27,19 +28,27 @@
database_name = None
support_batch_insert = True
- def __init__(self, cursor):
+ def __init__(self, cursor, row_limit=None):
self.cursor = cursor
+ if row_limit is not None:
+ self.row_limit = row_limit
self.rows_added = 0
self.size_added = 0
- self.deletes = {} # {(table, varname): set([value])}
+ self.deletes = {} # {(table, columns_tuple): set([(column_value,)])}
self.inserts = {} # {(command, header, row_schema): {rowkey: [row]}}
- def delete_from(self, table, varname, value):
- key = (table, varname)
- values = self.deletes.get(key)
- if values is None:
- self.deletes[key] = values = set()
- values.add(str(value))
+ def delete_from(self, table, **kw):
+ if not kw:
+ raise AssertionError("Need at least one column value")
+ columns = kw.keys()
+ columns.sort()
+ columns = tuple(columns)
+ key = (table, columns)
+ rows = self.deletes.get(key)
+ if rows is None:
+ self.deletes[key] = rows = set()
+ row = tuple(str(kw[column]) for column in columns)
+ rows.add(row)
self.rows_added += 1
if self.rows_added >= self.row_limit:
self.flush()
@@ -68,12 +77,22 @@
self.size_added = 0
def _do_deletes(self):
- for (table, varname), values in sorted(self.deletes.items()):
- v = list(values)
- v.sort()
- value_str = ','.join(v)
- stmt = "DELETE FROM %s WHERE %s IN (%s)" % (
- table, varname, value_str)
+ for (table, columns), rows in sorted(self.deletes.items()):
+ rows = list(rows)
+ rows.sort()
+ if len(columns) == 1:
+ value_str = ','.join(v for (v,) in rows)
+ stmt = "DELETE FROM %s WHERE %s IN (%s)" % (
+ table, columns[0], value_str)
+ else:
+ lines = []
+ for row in rows:
+ line = []
+ for i, column in enumerate(columns):
+ line.append("%s = %s" % (column, row[i]))
+ lines.append(" AND ".join(line))
+ stmt = "DELETE FROM %s WHERE %s" % (
+ table, " OR ".join(lines))
self.cursor.execute(stmt)
def _do_inserts(self):
@@ -98,8 +117,8 @@
class PostgreSQLRowBatcher(RowBatcher):
- def __init__(self, cursor, version_detector):
- super(PostgreSQLRowBatcher, self).__init__(cursor)
+ def __init__(self, cursor, version_detector, row_limit=None):
+ super(PostgreSQLRowBatcher, self).__init__(cursor, row_limit)
self.support_batch_insert = (
version_detector.get_version(cursor) >= (8, 2))
@@ -116,8 +135,8 @@
Expects :name parameters and a dictionary for each row.
"""
- def __init__(self, cursor, inputsizes):
- super(OracleRowBatcher, self).__init__(cursor)
+ def __init__(self, cursor, inputsizes, row_limit=None):
+ super(OracleRowBatcher, self).__init__(cursor, row_limit)
self.inputsizes = inputsizes
self.array_ops = {} # {(operation, row_schema): {rowkey: [row]}}
Modified: relstorage/trunk/relstorage/adapters/interfaces.py
===================================================================
--- relstorage/trunk/relstorage/adapters/interfaces.py 2010-04-12 17:06:48 UTC (rev 110757)
+++ relstorage/trunk/relstorage/adapters/interfaces.py 2010-04-12 17:17:07 UTC (rev 110758)
@@ -216,9 +216,13 @@
initialization is required.
"""
- def make_batcher(cursor):
- """Return an object to be used for batch store operations."""
+ def make_batcher(cursor, row_limit):
+ """Return an object to be used for batch store operations.
+ row_limit is the maximum number of rows to queue before
+ calling the database.
+ """
+
def store_temp(cursor, batcher, oid, prev_tid, data):
"""Store an object in the temporary table.
Modified: relstorage/trunk/relstorage/adapters/mover.py
===================================================================
--- relstorage/trunk/relstorage/adapters/mover.py 2010-04-12 17:06:48 UTC (rev 110757)
+++ relstorage/trunk/relstorage/adapters/mover.py 2010-04-12 17:17:07 UTC (rev 110758)
@@ -385,14 +385,14 @@
- def postgresql_make_batcher(self, cursor):
- return PostgreSQLRowBatcher(cursor, self.version_detector)
+ def postgresql_make_batcher(self, cursor, row_limit):
+ return PostgreSQLRowBatcher(cursor, self.version_detector, row_limit)
- def mysql_make_batcher(self, cursor):
- return MySQLRowBatcher(cursor)
+ def mysql_make_batcher(self, cursor, row_limit):
+ return MySQLRowBatcher(cursor, row_limit)
- def oracle_make_batcher(self, cursor):
- return OracleRowBatcher(cursor, self.inputsizes)
+ def oracle_make_batcher(self, cursor, row_limit):
+ return OracleRowBatcher(cursor, self.inputsizes, row_limit)
@@ -403,7 +403,7 @@
md5sum = compute_md5sum(data)
else:
md5sum = None
- batcher.delete_from('temp_store', 'zoid', oid)
+ batcher.delete_from('temp_store', zoid=oid)
batcher.insert_into(
"temp_store (zoid, prev_tid, md5, state)",
"%s, %s, %s, decode(%s, 'base64')",
@@ -480,6 +480,7 @@
encoded = None
if self.keep_history:
+ batcher.delete_from("object_state", zoid=oid, tid=tid)
row_schema = """
%s, %s,
COALESCE((SELECT tid FROM current_object WHERE zoid = %s), 0),
@@ -493,7 +494,7 @@
size=len(data or ''),
)
else:
- batcher.delete_from('object_state', 'zoid', oid)
+ batcher.delete_from('object_state', zoid=oid)
if data:
batcher.insert_into(
"object_state (zoid, tid, state)",
@@ -530,6 +531,7 @@
(oid, tid, oid, md5sum, encoded),
rowkey=(oid, tid),
size=len(data or ''),
+ command='REPLACE',
)
else:
if data:
@@ -542,7 +544,7 @@
command='REPLACE',
)
else:
- batcher.delete_from('object_state', 'zoid', oid)
+ batcher.delete_from('object_state', zoid=oid)
def oracle_restore(self, cursor, batcher, oid, tid, data):
"""Store an object directly, without conflict detection.
@@ -600,7 +602,7 @@
size=len(data or ''),
)
else:
- batcher.delete_from('object_state', 'zoid', oid)
+ batcher.delete_from('object_state', zoid=oid)
if data:
batcher.insert_into(
"object_state (zoid, tid, state)",
Modified: relstorage/trunk/relstorage/adapters/schema.py
===================================================================
--- relstorage/trunk/relstorage/adapters/schema.py 2010-04-12 17:06:48 UTC (rev 110757)
+++ relstorage/trunk/relstorage/adapters/schema.py 2010-04-12 17:17:07 UTC (rev 110758)
@@ -377,6 +377,10 @@
states IN statelist) IS
BEGIN
FORALL indx IN zoids.first..zoids.last
+ DELETE FROM object_state
+ WHERE zoid = zoids(indx)
+ AND tid = tids(indx);
+ FORALL indx IN zoids.first..zoids.last
INSERT INTO object_state (zoid, tid, prev_tid, md5, state) VALUES
(zoids(indx), tids(indx),
COALESCE((SELECT tid
Modified: relstorage/trunk/relstorage/adapters/tests/test_batch.py
===================================================================
--- relstorage/trunk/relstorage/adapters/tests/test_batch.py 2010-04-12 17:06:48 UTC (rev 110757)
+++ relstorage/trunk/relstorage/adapters/tests/test_batch.py 2010-04-12 17:17:07 UTC (rev 110758)
@@ -23,18 +23,29 @@
def test_delete_defer(self):
cursor = MockCursor()
batcher = self.getClass()(cursor)
- batcher.delete_from("mytable", "id", 2)
+ batcher.delete_from("mytable", id=2)
self.assertEqual(cursor.executed, [])
self.assertEqual(batcher.rows_added, 1)
self.assertEqual(batcher.size_added, 0)
- self.assertEqual(batcher.deletes, {('mytable', 'id'): set(["2"])})
+ self.assertEqual(batcher.deletes,
+ {('mytable', ('id',)): set([("2",)])})
+ def test_delete_multiple_column(self):
+ cursor = MockCursor()
+ batcher = self.getClass()(cursor)
+ batcher.delete_from("mytable", id=2, tid=10)
+ self.assertEqual(cursor.executed, [])
+ self.assertEqual(batcher.rows_added, 1)
+ self.assertEqual(batcher.size_added, 0)
+ self.assertEqual(batcher.deletes,
+ {('mytable', ('id', 'tid')): set([("2", "10")])})
+
def test_delete_auto_flush(self):
cursor = MockCursor()
batcher = self.getClass()(cursor)
batcher.row_limit = 2
- batcher.delete_from("mytable", "id", 2)
- batcher.delete_from("mytable", "id", 1)
+ batcher.delete_from("mytable", id=2)
+ batcher.delete_from("mytable", id=1)
self.assertEqual(cursor.executed,
[('DELETE FROM mytable WHERE id IN (1,2)', None)])
self.assertEqual(batcher.rows_added, 0)
@@ -132,7 +143,7 @@
def test_flush(self):
cursor = MockCursor()
batcher = self.getClass()(cursor)
- batcher.delete_from("mytable", "id", 1)
+ batcher.delete_from("mytable", id=1)
batcher.insert_into(
"mytable (id, name)",
"%s, id || %s",
Modified: relstorage/trunk/relstorage/storage.py
===================================================================
--- relstorage/trunk/relstorage/storage.py 2010-04-12 17:06:48 UTC (rev 110757)
+++ relstorage/trunk/relstorage/storage.py 2010-04-12 17:17:07 UTC (rev 110758)
@@ -137,6 +137,9 @@
# so they can be executed in batch (to minimize latency).
_batcher = None
+ # _batcher_row_limit: The number of rows to queue before
+ # calling the database.
+ _batcher_row_limit = 100
def __init__(self, adapter, name=None, create=True,
options=None, cache=None, **kwoptions):
@@ -632,7 +635,7 @@
adapter = self._adapter
self._cache.tpc_begin()
self._batcher = self._adapter.mover.make_batcher(
- self._store_cursor)
+ self._store_cursor, self._batcher_row_limit)
if tid is not None:
# hold the commit lock and add the transaction now
Modified: relstorage/trunk/relstorage/tests/RecoveryStorage.py
===================================================================
--- relstorage/trunk/relstorage/tests/RecoveryStorage.py 2010-04-12 17:06:48 UTC (rev 110757)
+++ relstorage/trunk/relstorage/tests/RecoveryStorage.py 2010-04-12 17:17:07 UTC (rev 110758)
@@ -16,15 +16,19 @@
# This is copied from ZODB.tests.RecoveryStorage and expanded to fit
# history-free storages.
+from relstorage.util import is_blob_record
+from transaction import Transaction
+from ZODB import DB
+from ZODB.serialize import referencesf
+from ZODB.tests.StorageTestBase import handle_serials
+from ZODB.tests.StorageTestBase import MinPO
+from ZODB.tests.StorageTestBase import snooze
+from ZODB.tests.StorageTestBase import zodb_pickle
+from ZODB.tests.StorageTestBase import zodb_unpickle
import itertools
import time
import transaction
-from transaction import Transaction
-from ZODB.tests.StorageTestBase import MinPO, zodb_unpickle, snooze
-from ZODB import DB
import ZODB.POSException
-from ZODB.serialize import referencesf
-from relstorage.util import is_blob_record
class IteratorDeepCompare:
@@ -58,12 +62,15 @@
eq(e1, e2)
# compare the objects in the transaction, but disregard
- # the order of the objects since that is not important.
- recs1 = [(r.oid, r) for r in txn1]
+ # the order of the objects and any duplicated records
+ # since those are not important.
+ recs1 = dict([(r.oid, r) for r in txn1])
+ recs2 = dict([(r.oid, r) for r in txn2])
+ eq(len(recs1), len(recs2))
+ recs1 = recs1.items()
recs1.sort()
- recs2 = [(r.oid, r) for r in txn2]
+ recs2 = recs2.items()
recs2.sort()
- eq(len(recs1), len(recs2))
for (oid1, rec1), (oid2, rec2) in itertools.izip(recs1, recs2):
eq(rec1.oid, rec2.oid)
eq(rec1.tid, rec2.tid)
@@ -179,7 +186,30 @@
raises(KeyError, self._dst.load, obj1._p_oid, '')
raises(KeyError, self._dst.load, obj2._p_oid, '')
+ def checkRestoreAfterDoubleCommit(self):
+ oid = self._storage.new_oid()
+ revid = '\0'*8
+ data1 = zodb_pickle(MinPO(11))
+ data2 = zodb_pickle(MinPO(12))
+ # Begin the transaction
+ t = transaction.Transaction()
+ try:
+ self._storage.tpc_begin(t)
+ # Store an object
+ self._storage.store(oid, revid, data1, '', t)
+ # Store it again
+ r1 = self._storage.store(oid, revid, data2, '', t)
+ # Finish the transaction
+ r2 = self._storage.tpc_vote(t)
+ revid = handle_serials(oid, r1, r2)
+ self._storage.tpc_finish(t)
+ except:
+ self._storage.tpc_abort(t)
+ raise
+ self._dst.copyTransactionsFrom(self._storage)
+ self.compare(self._storage, self._dst)
+
class UndoableRecoveryStorage(BasicRecoveryStorage):
"""These tests require the source storage to be undoable"""
Modified: relstorage/trunk/relstorage/tests/reltestbase.py
===================================================================
--- relstorage/trunk/relstorage/tests/reltestbase.py 2010-04-12 17:06:48 UTC (rev 110757)
+++ relstorage/trunk/relstorage/tests/reltestbase.py 2010-04-12 17:17:07 UTC (rev 110758)
@@ -44,6 +44,7 @@
from relstorage.storage import RelStorage
adapter = self.make_adapter()
self._storage = RelStorage(adapter, **kwargs)
+ self._storage._batcher_row_limit = 1
def setUp(self):
self.open(create=1)
More information about the checkins
mailing list