[Checkins] SVN: gocept.zeoraid/trunk/src/gocept/zeoraid/ implemented and started testing the online recovery

Thomas Lotze tl at gocept.com
Tue Feb 19 07:47:57 EST 2008


Log message for revision 84053:
  implemented and started testing the online recovery

Changed:
  U   gocept.zeoraid/trunk/src/gocept/zeoraid/recovery.py
  A   gocept.zeoraid/trunk/src/gocept/zeoraid/tests/test_recovery.py

-=-
Modified: gocept.zeoraid/trunk/src/gocept/zeoraid/recovery.py
===================================================================
--- gocept.zeoraid/trunk/src/gocept/zeoraid/recovery.py	2008-02-19 11:37:14 UTC (rev 84052)
+++ gocept.zeoraid/trunk/src/gocept/zeoraid/recovery.py	2008-02-19 12:47:56 UTC (rev 84053)
@@ -11,59 +11,94 @@
 # FOR A PARTICULAR PURPOSE.
 #
 ##############################################################################
-"""ZEORaid online recovery implementation."""
+"""Online storage recovery."""
 
+import transaction
+import ZODB.utils
 
-class Recovery(object):
-    """ZEORaid online recovery implementation.
-    """
 
-    _current = 0
-    raid_transaction_count = 0
-    target_transaction_count = 0
+def continuous_storage_iterator(storage):
+    seen = ZODB.utils.z64
+    while seen < storage.lastTransaction():
+        iterator = storage.iterator(seen)
+        if seen > ZODB.utils.z64:
+            # We can only get an iterator starting with a given transaction,
+            # which we have already seen, so we skip it now.
+            iterator.next()
+        for txn_info in iterator:
+            yield txn_info
+        seen = txn_info.tid
 
-    def __init__(self, raid_storage, target_name):
-        self.raid_storage = raid_storage
-        self.target_name = target_name
-        self.target = raid_storage.storages[target_name]
-        self.target_transaction_count = ...
-        # initialize counting up self.raid_transaction_count
 
+class Recovery(object):
+    """Online storage recovery."""
 
-    def get_raid_transaction_info(self, n):
-        """Retrieves the n-th transaction info from the RAID, counting from
-        the beginning of the storage's history.
-        """
+    def __init__(self, source, target, finalize):
+        self.source = source
+        self.target = target
+        self.finalize = finalize
 
-    def get_target_transaction_info(self, n):
-        """Retrieves the n-th transaction info from the target storage,
-        counting from the beginning of the storage's history.
-        """
-
     def __call__(self):
         """Performs recovery."""
         # Verify old transactions that may already be stored in the target
-        # storage.
-        for self._current in xrange(self.target_transaction_count):
-            if (self.get_raid_transaction_info(self._current) !=
-                self.get_target_transaction_info(self._current)):
-                raise XXX
+        # storage. When comparing transaction records, ignore storage records
+        # in order to avoid transferring too much data.
+        source_iter = continuous_storage_iterator(self.source)
+        target_iter = self.target.iterator()
 
-        # Recover all transaction from that point on until self._current
-        # equals self.raid_transaction_count.
-        # self._current now points to the first transaction to be copied.
-        # We need to do a "while True" loop in order to be able to check on
-        # our progress and finalize recovery atomically.
         while True:
-            commit lock
             try:
-                if self._current == self.raid_transaction_count:
-                    no longer degraded
+                target_txn = target_iter.next()
+            except StopIteration:
+                break
+            try:
+                source_txn = source_iter.next()
+            except StopIteration:
+                # An exhausted source storage would be OK if the target
+                # storage is exhausted at the same time. In that case, we will
+                # already have left the loop though.
+                raise ValueError('The target storage contains already more '
+                                 'transactions than the source storage.')
+
+            for name in 'tid', 'status', 'user', 'description', 'extension':
+                source_value = getattr(source_txn, name)
+                target_value = getattr(target_txn, name)
+                if source_value != target_value:
+                    raise ValueError(
+                        '%r mismatch: %r (source) != %r (target) '
+                        'in source transaction %r.' % (
+                        name, source_value, target_value, source_txn.tid))
+
+            yield ('verify', source_txn.tid)
+
+        yield ('verified',)
+
+        # Recover from that point on until the target storage has all
+        # transactions that exist in the source storage at the time of
+        # finalization. Therefore we need to check continuously for new
+        # remaining transactions under the commit lock and finalize recovery
+        # atomically.
+        while True:
+            t = transaction.Transaction()
+            self.source.tpc_begin(t)
+            try:
+                try:
+                    txn_info = source_iter.next()
+                except StopIteration:
+                    self.finalize(self.target)
                     break
             finally:
-                commit unlock
+                self.source.tpc_abort(t)
 
-            # Recover transaction self._current.
-            foo
+            self.target.tpc_begin(txn_info, txn_info.tid, txn_info.status)
 
-            self._current += 1
+            for r in txn_info:
+                self.target.restore(r.oid, r.tid, r.data, r.version,
+                                    r.data_txn, txn_info)
+
+            self.target.tpc_vote(txn_info)
+            self.target.tpc_finish(txn_info)
+
+            yield ('restore', txn_info.tid)
+
+        yield ('recovered',)

Added: gocept.zeoraid/trunk/src/gocept/zeoraid/tests/test_recovery.py
===================================================================
--- gocept.zeoraid/trunk/src/gocept/zeoraid/tests/test_recovery.py	                        (rev 0)
+++ gocept.zeoraid/trunk/src/gocept/zeoraid/tests/test_recovery.py	2008-02-19 12:47:56 UTC (rev 84053)
@@ -0,0 +1,134 @@
+##############################################################################
+#
+# Copyright (c) 2007-2008 Zope Foundation and contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""Test harness for online recovery."""
+
+import unittest
+import tempfile
+
+import transaction
+import ZODB.FileStorage
+import ZODB.utils
+import ZODB.tests.MinPO
+import ZODB.tests.StorageTestBase
+
+import gocept.zeoraid.recovery
+
+
+class OnlineRecovery(unittest.TestCase):
+
+    def store(self, storages, tid=None, status=' ', user=None,
+              description=None, extension={}):
+        oid = storages[0].new_oid()
+        data = ZODB.tests.MinPO.MinPO(7)
+        data = ZODB.tests.StorageTestBase.zodb_pickle(data)
+        # Begin the transaction
+        t = transaction.Transaction()
+        if user is not None:
+            t.user = user
+        if description is not None:
+            t.description = description
+        for name, value in extension.items():
+            t.setExtendedInfo(name, value)
+        try:
+            for storage in storages:
+                storage.tpc_begin(t, tid, status)
+                # Store an object
+                r1 = storage.store(oid, ZODB.utils.z64, data, '', t)
+                # Finish the transaction
+                r2 = storage.tpc_vote(t)
+                tid = ZODB.tests.StorageTestBase.handle_serials(oid, r1, r2)
+            for storage in storages:
+                storage.tpc_finish(t)
+        except:
+            for storage in storages:
+                storage.tpc_abort(t)
+            raise
+        return tid
+
+    def setUp(self):
+        self.source = ZODB.FileStorage.FileStorage(tempfile.mktemp())
+        self.target = ZODB.FileStorage.FileStorage(tempfile.mktemp())
+        self.recovery = gocept.zeoraid.recovery.Recovery(
+            self.source, self.target, lambda target: None)
+
+    def tearDown(self):
+        self.source.close()
+        self.source.cleanup()
+        self.target.close()
+        self.target.cleanup()
+
+    def test_verify_both_empty(self):
+        self.assertEquals([('verified',), ('recovered',)],
+                          list(self.recovery()))
+
+    def test_verify_empty_target(self):
+        self.store([self.source])
+        recovery = self.recovery()
+        self.assertEquals('verified', recovery.next()[0])
+
+    def test_verify_shorter_target(self):
+        self.store([self.source, self.target])
+        self.store([self.source])
+        recovery = self.recovery()
+        self.assertEquals('verify', recovery.next()[0])
+        self.assertEquals('verified', recovery.next()[0])
+
+    def test_verify_equal_length(self):
+        self.store([self.source, self.target])
+        recovery = self.recovery()
+        self.assertEquals('verify', recovery.next()[0])
+        self.assertEquals('verified', recovery.next()[0])
+
+    def test_verify_too_long_target(self):
+        self.store([self.source, self.target])
+        self.store([self.target])
+        recovery = self.recovery()
+        self.assertEquals('verify', recovery.next()[0])
+        self.assertRaises(ValueError, recovery.next)
+
+    def test_verify_tid_mismatch(self):
+        self.store([self.source])
+        self.store([self.target])
+        recovery = self.recovery()
+        self.assertRaises(ValueError, recovery.next)
+
+    def test_verify_status_mismatch(self):
+        tid = self.store([self.source])
+        self.store([self.target], tid=tid, status='p')
+        recovery = self.recovery()
+        self.assertRaises(ValueError, recovery.next)
+
+    def test_verify_user_mismatch(self):
+        tid = self.store([self.source])
+        self.store([self.target], tid=tid, user='Hans')
+        recovery = self.recovery()
+        self.assertRaises(ValueError, recovery.next)
+
+    def test_verify_description_mismatch(self):
+        tid = self.store([self.source])
+        self.store([self.target], tid=tid, description='foo bar')
+        recovery = self.recovery()
+        self.assertRaises(ValueError, recovery.next)
+
+    def test_verify_extension_mismatch(self):
+        tid = self.store([self.source])
+        self.store([self.target], tid=tid, extension=dict(foo=3))
+        recovery = self.recovery()
+        self.assertRaises(ValueError, recovery.next)
+
+
+def test_suite():
+    suite = unittest.TestSuite()
+    suite.addTest(unittest.makeSuite(OnlineRecovery))
+    return suite


Property changes on: gocept.zeoraid/trunk/src/gocept/zeoraid/tests/test_recovery.py
___________________________________________________________________
Name: svn:keywords
   + Id Rev Date
Name: svn:eol-style
   + native



More information about the Checkins mailing list