[Checkins] SVN: gocept.zeoraid/trunk/src/gocept/zeoraid/
implemented and started testing the online recovery
Thomas Lotze
tl at gocept.com
Tue Feb 19 07:47:57 EST 2008
Log message for revision 84053:
implemented and started testing the online recovery
Changed:
U gocept.zeoraid/trunk/src/gocept/zeoraid/recovery.py
A gocept.zeoraid/trunk/src/gocept/zeoraid/tests/test_recovery.py
-=-
Modified: gocept.zeoraid/trunk/src/gocept/zeoraid/recovery.py
===================================================================
--- gocept.zeoraid/trunk/src/gocept/zeoraid/recovery.py 2008-02-19 11:37:14 UTC (rev 84052)
+++ gocept.zeoraid/trunk/src/gocept/zeoraid/recovery.py 2008-02-19 12:47:56 UTC (rev 84053)
@@ -11,59 +11,94 @@
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
-"""ZEORaid online recovery implementation."""
+"""Online storage recovery."""
+import transaction
+import ZODB.utils
-class Recovery(object):
- """ZEORaid online recovery implementation.
- """
- _current = 0
- raid_transaction_count = 0
- target_transaction_count = 0
+def continuous_storage_iterator(storage):
+ seen = ZODB.utils.z64
+ while seen < storage.lastTransaction():
+ iterator = storage.iterator(seen)
+ if seen > ZODB.utils.z64:
+ # We can only get an iterator starting with a given transaction,
+ # which we have already seen, so we skip it now.
+ iterator.next()
+ for txn_info in iterator:
+ yield txn_info
+ seen = txn_info.tid
- def __init__(self, raid_storage, target_name):
- self.raid_storage = raid_storage
- self.target_name = target_name
- self.target = raid_storage.storages[target_name]
- self.target_transaction_count = ...
- # initialize counting up self.raid_transaction_count
+class Recovery(object):
+ """Online storage recovery."""
- def get_raid_transaction_info(self, n):
- """Retrieves the n-th transaction info from the RAID, counting from
- the beginning of the storage's history.
- """
+ def __init__(self, source, target, finalize):
+ self.source = source
+ self.target = target
+ self.finalize = finalize
- def get_target_transaction_info(self, n):
- """Retrieves the n-th transaction info from the target storage,
- counting from the beginning of the storage's history.
- """
-
def __call__(self):
"""Performs recovery."""
# Verify old transactions that may already be stored in the target
- # storage.
- for self._current in xrange(self.target_transaction_count):
- if (self.get_raid_transaction_info(self._current) !=
- self.get_target_transaction_info(self._current)):
- raise XXX
+ # storage. When comparing transaction records, ignore storage records
+ # in order to avoid transferring too much data.
+ source_iter = continuous_storage_iterator(self.source)
+ target_iter = self.target.iterator()
- # Recover all transaction from that point on until self._current
- # equals self.raid_transaction_count.
- # self._current now points to the first transaction to be copied.
- # We need to do a "while True" loop in order to be able to check on
- # our progress and finalize recovery atomically.
while True:
- commit lock
try:
- if self._current == self.raid_transaction_count:
- no longer degraded
+ target_txn = target_iter.next()
+ except StopIteration:
+ break
+ try:
+ source_txn = source_iter.next()
+ except StopIteration:
+ # An exhausted source storage would be OK if the target
+ # storage is exhausted at the same time. In that case, we will
+ # already have left the loop though.
+ raise ValueError('The target storage contains already more '
+ 'transactions than the source storage.')
+
+ for name in 'tid', 'status', 'user', 'description', 'extension':
+ source_value = getattr(source_txn, name)
+ target_value = getattr(target_txn, name)
+ if source_value != target_value:
+ raise ValueError(
+ '%r mismatch: %r (source) != %r (target) '
+ 'in source transaction %r.' % (
+ name, source_value, target_value, source_txn.tid))
+
+ yield ('verify', source_txn.tid)
+
+ yield ('verified',)
+
+ # Recover from that point on until the target storage has all
+ # transactions that exist in the source storage at the time of
+ # finalization. Therefore we need to check continuously for new
+ # remaining transactions under the commit lock and finalize recovery
+ # atomically.
+ while True:
+ t = transaction.Transaction()
+ self.source.tpc_begin(t)
+ try:
+ try:
+ txn_info = source_iter.next()
+ except StopIteration:
+ self.finalize(self.target)
break
finally:
- commit unlock
+ self.source.tpc_abort(t)
- # Recover transaction self._current.
- foo
+ self.target.tpc_begin(txn_info, txn_info.tid, txn_info.status)
- self._current += 1
+ for r in txn_info:
+ self.target.restore(r.oid, r.tid, r.data, r.version,
+ r.data_txn, txn_info)
+
+ self.target.tpc_vote(txn_info)
+ self.target.tpc_finish(txn_info)
+
+ yield ('restore', txn_info.tid)
+
+ yield ('recovered',)
Added: gocept.zeoraid/trunk/src/gocept/zeoraid/tests/test_recovery.py
===================================================================
--- gocept.zeoraid/trunk/src/gocept/zeoraid/tests/test_recovery.py (rev 0)
+++ gocept.zeoraid/trunk/src/gocept/zeoraid/tests/test_recovery.py 2008-02-19 12:47:56 UTC (rev 84053)
@@ -0,0 +1,134 @@
+##############################################################################
+#
+# Copyright (c) 2007-2008 Zope Foundation and contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""Test harness for online recovery."""
+
+import unittest
+import tempfile
+
+import transaction
+import ZODB.FileStorage
+import ZODB.utils
+import ZODB.tests.MinPO
+import ZODB.tests.StorageTestBase
+
+import gocept.zeoraid.recovery
+
+
+class OnlineRecovery(unittest.TestCase):
+
+ def store(self, storages, tid=None, status=' ', user=None,
+ description=None, extension={}):
+ oid = storages[0].new_oid()
+ data = ZODB.tests.MinPO.MinPO(7)
+ data = ZODB.tests.StorageTestBase.zodb_pickle(data)
+ # Begin the transaction
+ t = transaction.Transaction()
+ if user is not None:
+ t.user = user
+ if description is not None:
+ t.description = description
+ for name, value in extension.items():
+ t.setExtendedInfo(name, value)
+ try:
+ for storage in storages:
+ storage.tpc_begin(t, tid, status)
+ # Store an object
+ r1 = storage.store(oid, ZODB.utils.z64, data, '', t)
+ # Finish the transaction
+ r2 = storage.tpc_vote(t)
+ tid = ZODB.tests.StorageTestBase.handle_serials(oid, r1, r2)
+ for storage in storages:
+ storage.tpc_finish(t)
+ except:
+ for storage in storages:
+ storage.tpc_abort(t)
+ raise
+ return tid
+
+ def setUp(self):
+ self.source = ZODB.FileStorage.FileStorage(tempfile.mktemp())
+ self.target = ZODB.FileStorage.FileStorage(tempfile.mktemp())
+ self.recovery = gocept.zeoraid.recovery.Recovery(
+ self.source, self.target, lambda target: None)
+
+ def tearDown(self):
+ self.source.close()
+ self.source.cleanup()
+ self.target.close()
+ self.target.cleanup()
+
+ def test_verify_both_empty(self):
+ self.assertEquals([('verified',), ('recovered',)],
+ list(self.recovery()))
+
+ def test_verify_empty_target(self):
+ self.store([self.source])
+ recovery = self.recovery()
+ self.assertEquals('verified', recovery.next()[0])
+
+ def test_verify_shorter_target(self):
+ self.store([self.source, self.target])
+ self.store([self.source])
+ recovery = self.recovery()
+ self.assertEquals('verify', recovery.next()[0])
+ self.assertEquals('verified', recovery.next()[0])
+
+ def test_verify_equal_length(self):
+ self.store([self.source, self.target])
+ recovery = self.recovery()
+ self.assertEquals('verify', recovery.next()[0])
+ self.assertEquals('verified', recovery.next()[0])
+
+ def test_verify_too_long_target(self):
+ self.store([self.source, self.target])
+ self.store([self.target])
+ recovery = self.recovery()
+ self.assertEquals('verify', recovery.next()[0])
+ self.assertRaises(ValueError, recovery.next)
+
+ def test_verify_tid_mismatch(self):
+ self.store([self.source])
+ self.store([self.target])
+ recovery = self.recovery()
+ self.assertRaises(ValueError, recovery.next)
+
+ def test_verify_status_mismatch(self):
+ tid = self.store([self.source])
+ self.store([self.target], tid=tid, status='p')
+ recovery = self.recovery()
+ self.assertRaises(ValueError, recovery.next)
+
+ def test_verify_user_mismatch(self):
+ tid = self.store([self.source])
+ self.store([self.target], tid=tid, user='Hans')
+ recovery = self.recovery()
+ self.assertRaises(ValueError, recovery.next)
+
+ def test_verify_description_mismatch(self):
+ tid = self.store([self.source])
+ self.store([self.target], tid=tid, description='foo bar')
+ recovery = self.recovery()
+ self.assertRaises(ValueError, recovery.next)
+
+ def test_verify_extension_mismatch(self):
+ tid = self.store([self.source])
+ self.store([self.target], tid=tid, extension=dict(foo=3))
+ recovery = self.recovery()
+ self.assertRaises(ValueError, recovery.next)
+
+
+def test_suite():
+ suite = unittest.TestSuite()
+ suite.addTest(unittest.makeSuite(OnlineRecovery))
+ return suite
Property changes on: gocept.zeoraid/trunk/src/gocept/zeoraid/tests/test_recovery.py
___________________________________________________________________
Name: svn:keywords
+ Id Rev Date
Name: svn:eol-style
+ native
More information about the Checkins
mailing list