[Checkins] SVN: gocept.zeoraid/trunk/src/gocept/zeoraid/ implemented and tested recovery in the Raid storage

Thomas Lotze tl at gocept.com
Thu Feb 21 08:31:10 EST 2008


Log message for revision 84111:
  implemented and tested recovery in the Raid storage

Changed:
  U   gocept.zeoraid/trunk/src/gocept/zeoraid/recovery.py
  U   gocept.zeoraid/trunk/src/gocept/zeoraid/storage.py
  U   gocept.zeoraid/trunk/src/gocept/zeoraid/tests/test_basics.py
  U   gocept.zeoraid/trunk/src/gocept/zeoraid/tests/test_recovery.py

-=-
Modified: gocept.zeoraid/trunk/src/gocept/zeoraid/recovery.py
===================================================================
--- gocept.zeoraid/trunk/src/gocept/zeoraid/recovery.py	2008-02-21 13:30:41 UTC (rev 84110)
+++ gocept.zeoraid/trunk/src/gocept/zeoraid/recovery.py	2008-02-21 13:31:10 UTC (rev 84111)
@@ -84,6 +84,8 @@
 
         yield ('verified',)
 
+        restorable = hasattr(self.target, 'restore')
+
         # Recover from that point on until the target storage has all
         # transactions that exist in the source storage at the time of
         # finalization. Therefore we need to check continuously for new
@@ -104,8 +106,12 @@
             self.target.tpc_begin(txn_info, txn_info.tid, txn_info.status)
 
             for r in txn_info:
-                self.target.restore(r.oid, r.tid, r.data, r.version,
-                                    r.data_txn, txn_info)
+                if restorable:
+                    self.target.restore(r.oid, r.tid, r.data, r.version,
+                                        r.data_txn, txn_info)
+                else:
+                    self.target.store(r.oid, r.tid, r.data, r.version,
+                                      txn_info)
 
             self.target.tpc_vote(txn_info)
             self.target.tpc_finish(txn_info)

Modified: gocept.zeoraid/trunk/src/gocept/zeoraid/storage.py
===================================================================
--- gocept.zeoraid/trunk/src/gocept/zeoraid/storage.py	2008-02-21 13:30:41 UTC (rev 84110)
+++ gocept.zeoraid/trunk/src/gocept/zeoraid/storage.py	2008-02-21 13:31:10 UTC (rev 84111)
@@ -33,6 +33,7 @@
 import ZODB.blob
 
 import gocept.zeoraid.interfaces
+import gocept.zeoraid.recovery
 
 logger = logging.getLogger('gocept.zeoraid')
 
@@ -67,6 +68,7 @@
                               ZODB.interfaces.IBlobStorage,
                               ZODB.interfaces.IStorageUndoable,
                               ZODB.interfaces.IStorageCurrentRecordIteration,
+                              ZODB.interfaces.IStorageIteration,
                               ZEO.interfaces.IServeable,
                               )
 
@@ -135,11 +137,9 @@
         for degraded_storages in tids.values():
             self.storages_degraded.extend(degraded_storages)
 
-        # XXX Degrade storages that don't have the right max OID.
+        # No storage is recovering initially
+        self.storage_recovering = None
 
-        # No storages are recovering initially
-        self.storages_recovering = []
-
     # IStorage
 
     def close(self):
@@ -423,6 +423,14 @@
         """Iterate over the records in a storage."""
         return self._apply_single_storage('record_iternext', (next,))
 
+    # IStorageIteration
+
+    def iterator(self, start=None, stop=None):
+        """Return an IStorageTransactionInformation iterator."""
+        # XXX This should really include fail-over for iterators over storages
+        # that degrade or recover while this iterator is running.
+        return self._apply_single_storage('iterator', (start, stop))
+
     # IServeable
 
     # Note: We opt to not implement lastInvalidations until ClientStorage does.
@@ -449,7 +457,7 @@
     # XXX
     @ensure_open_storage
     def raid_status(self):
-        if self.storages_recovering:
+        if self.storage_recovering:
             return 'recovering'
         if not self.storages_degraded:
             return 'optimal'
@@ -460,7 +468,7 @@
     # XXX
     @ensure_open_storage
     def raid_details(self):
-        return [self.storages_optimal, self.storages_recovering, self.storages_degraded]
+        return [self.storages_optimal, self.storage_recovering, self.storages_degraded]
 
     # XXX
     @ensure_open_storage
@@ -471,11 +479,10 @@
     # XXX
     @ensure_open_storage
     def raid_recover(self, name):
-        # XXX: Need to sync `max oid` after recovery
         if name not in self.storages_degraded:
             return
         self.storages_degraded.remove(name)
-        self.storages_recovering.append(name)
+        self.storage_recovering = name
         t = threading.Thread(target=self._recover_impl, args=(name,))
         self._threads.add(t)
         t.setDaemon(True)
@@ -613,156 +620,43 @@
         raise gocept.zeoraid.interfaces.RAIDError("RAID storage is failed.")
 
     def _recover_impl(self, name):
-        try:
-            # First pass: Transfer all oids without hindering running transactions
-            begin = time.time()
-            self._recover_first(name)
-            end = time.time()
-
-            # Second pass: Start the TPC on a reference storage to block other
-            # transactions so we can catch up. The second pass should be
-            # significantly faster than the first.
-            begin = time.time()
-            self._recover_second(name)
-            end = time.time()
-        except Exception:
-            # *something* went wrong. Put the storage back to degraded.
-            logger.exception('Failure recovering %r: ' % (name,))
-            try:
-                self._degrade_storage(name)
-            except Exception:
-                logger.exception(
-                    'Failure degrading %r after failed recovery: ' % (name,))
-                raise
-            raise
-
-    def _recover_second(self, name):
-        storage = self.storages[name]
-        reference_storage = self.storages[self.storages_optimal[0]]
-        # Start a transation on the reference storage to acquire the
-        # commit log # and prevent other people from committing in the second phase.
-        # XXX This needs to be optimized in a way that the second phase
-        # gets re-run as long as possible, only holding the commit lock if 
-        # no transactions remain that need to be replayed and putting the 
-        # recovered storage back into the array of optimal storages.
-        while 1:
-            tm = transaction.TransactionManager()
-            t = tm.get()
-            last_transaction = storage.lastTransaction()
-            reference_storage.tpc_begin(t)
-            unrecovered_transactions = self._unrecovered_transactions
-            if unrecovered_transactions:
-                # We acquired the commit lock and there are transactions that
-                # have been committed and were not yet transferred to the 
-                # recovering storage. We have to try to replay those and then
-                # check again. We can remove the commit lock for now.
-                self._unrecovered_transactions = {}
-                reference_storage.tpc_abort(t)
-
-                # RRR: Refactor into its own method?
-                tm2 = transaction.TransactionManager()
-                t2 = tm2.get()
-
-                # Get the unrecovered transactions in the order they were
-                # recorded.
-                tids = sorted(unrecovered_transactions.keys())
-                for tid in tids:
-                    oids = unrecovered_transactions[tid]
-                    # We create one transaction for all oids that belong to one
-                    # transaction.
-                    storage.tpc_begin(t2, tid=tid)
-                    for oid in oids:
-                        data, tid_ = reference_storage.load(oid, '')
-                        if tid_ > tid:
-                            # If the current tid of the object is newer
-                            # than the one we logged, we can ignore it, because
-                            # there will be another entry for this oid in a 
-                            # later transaction.
-                            continue
-                        try:
-                            oldserial = storage.getTid(oid)
-                        except ZODB.POSException.POSKeyError:
-                            # This means that the object is new and didn't have an
-                            # old transaction yet. 
-                            # XXX Might this also happen with non-undoable storages?
-                            oldserial = ZODB.utils.z64
-                        storage.store(oid, oldserial, data, '', t2)
-                    storage.tpc_vote(t2)
-                    storage.tpc_finish(t2)
-                # /RRR
-            else:
-                # We acquired the commit lock and no committed transactions
-                # are waiting in the log. This means the recovering storage
-                # has caught up by now and we can put it into optimal state
-                # again.
-                self.storages_recovering.remove(name)
-                if self._db:
-                    # We are registered with a database already. We need to
-                    # re-register the recovered storage to make invalidations
-                    # pass through.
-                    self.storages[name].registerDB(self._db)
-                self.storages_optimal.append(name)
-                # We can also stop logging stores now.
-                self._log_stores = False
-                reference_storage.tpc_abort(t)
-                break
-
-    def _recover_first(self, name):
-        """The inner loop of the recovery code. Does the actual work."""
-        # Re-open storage
         storage = self.openers[name].open()
         self.storages[name] = storage
-        # XXX Bring the storage to the current stage. This only copies the
-        # current data, so RAID currently does support neither undo nor versions.
-        next_oid = None
-        tm = transaction.TransactionManager()
-        t = tm.get()
-        # XXX we assume that the last written transaction actually is consistent. We need
-        # a consistency check.
-        last_transaction = storage.lastTransaction()
-        # This flag starts logging all succcessfull stores and updates those oids
-        # in the second pass again.
-        max_transaction = self.storages[self.storages_optimal[0]].lastTransaction()
-        self._unrecovered_transactions = {}
-        self._log_stores = True
-        # The init flag allows us to phrase the break condition of the 
-        # following loop a little bit more elegantly.
-        init = True
-        while 1:
-            if next_oid is None and not init:
-                break
+        recovery = gocept.zeoraid.recovery.Recovery(
+            self, storage, self._finalize_recovery)
+        for msg in recovery():
+            logger.info(str(msg))
 
-            init = False
-            oid, tid, data, next_oid = self._apply_single_storage(
-                'record_iternext', (next_oid,))
+    def _finalize_recovery(self, storage):
+        self._write_lock.acquire()
+        try:
+            self.storages_optimal.append(self.storage_recovering)
+            self._synchronise_oids()
+            self.storage_recovering = None
+        finally:
+            self._write_lock.release()
 
-            if tid > max_transaction:
-                continue
+    def _synchronise_oids(self):
+        # Try allocating the same OID from all storages. This is done by
+        # determining the maximum and making all other storages increase
+        # their OID until they hit the maximum. While any storage yields
+        # an OID above the maximum, we try again with that value.
+        max_oid = None
+        lagging = self.storages_optimal[:]
+        while lagging:
+            storage = lagging.pop()
+            while True:
+                reliable, oid = self.__apply_storage(storage, 'new_oid')
+                if not reliable:
+                    break
+                if oid < max_oid:
+                    continue
+                if oid > max_oid:
+                    max_oid = oid
+                    lagging = [s for s in self.storages_optimal
+                               if s != storage]
+                break
 
-            if tid <= last_transaction:
-                try:
-                    old_data = storage.loadSerial(oid, tid)
-                except ZODB.POSException.POSKeyError:
-                    pass
-                else:
-                    if old_data == data:
-                        continue
-
-            # There is a newer version of the object available or the existing
-            # version was incorrect. Overwrite it with the right data.
-            try:
-                oldserial = storage.getTid(oid)
-            except ZODB.POSException.POSKeyError:
-                oldserial = ZODB.utils.z64
-
-
-            assert oldserial <= tid, "last_transaction and oldserial are not in-sync"
-
-            storage.tpc_begin(t, tid=tid)
-            storage.store(oid, oldserial, data, '', t)
-            storage.tpc_vote(t)
-            storage.tpc_finish(t)
-
     def _new_tid(self, old_tid):
         """Generates a new TID."""
         if old_tid is None:

Modified: gocept.zeoraid/trunk/src/gocept/zeoraid/tests/test_basics.py
===================================================================
--- gocept.zeoraid/trunk/src/gocept/zeoraid/tests/test_basics.py	2008-02-21 13:30:41 UTC (rev 84110)
+++ gocept.zeoraid/trunk/src/gocept/zeoraid/tests/test_basics.py	2008-02-21 13:31:10 UTC (rev 84111)
@@ -31,6 +31,7 @@
              MTStorage, ReadOnlyStorage, RecoveryStorage
 
 import gocept.zeoraid.storage
+import gocept.zeoraid.tests.test_recovery
 
 from ZEO.ClientStorage import ClientStorage
 from ZEO.tests import forker, CommitLockTests, ThreadTests
@@ -1079,6 +1080,23 @@
         self._disable_storage(0)
         self._storage.getExtensionMethods()
 
+    def test_recover(self):
+        self._dostore()
+        self._dostore()
+        self._dostore()
+        self._disable_storage(0)
+        self._dostore()
+        self._dostore()
+        self._storage.raid_recover(self._storage.storages_degraded[0])
+        while self._storage.storage_recovering:
+            time.sleep(0.02)
+        self.assertEquals('optimal', self._storage.raid_status())
+        gocept.zeoraid.tests.test_recovery.compare(
+            self, self._backend(0), self._backend(1))
+        self._storage.new_oid()
+        self.assertEquals('optimal', self._storage.raid_status())
+
+
 class ZEOReplicationStorageTests(ZEOStorageBackendTests,
                                  ReplicationStorageTests,
                                  ThreadTests.ThreadTests):

Modified: gocept.zeoraid/trunk/src/gocept/zeoraid/tests/test_recovery.py
===================================================================
--- gocept.zeoraid/trunk/src/gocept/zeoraid/tests/test_recovery.py	2008-02-21 13:30:41 UTC (rev 84110)
+++ gocept.zeoraid/trunk/src/gocept/zeoraid/tests/test_recovery.py	2008-02-21 13:31:10 UTC (rev 84111)
@@ -27,6 +27,25 @@
 import gocept.zeoraid.recovery
 
 
+def compare(test, source, target):
+    recovery = gocept.zeoraid.recovery.Recovery(
+        source, target, lambda target: None)
+    protocol = list(recovery())
+    test.assertEquals([('verified',), ('recovered',)], protocol[-2:])
+    for source_txn, target_txn in zip(source.iterator(),
+                                      target.iterator()):
+        # We need not compare the transaction metadata because that has
+        # already been done by the recovery's verification run.
+        source_records = list(source_txn)
+        target_records = list(target_txn)
+        test.assertEquals(len(source_records), len(target_records))
+        for source_record, target_record in zip(source_records,
+                                                target_records):
+            for name in 'oid', 'tid', 'data', 'version', 'data_txn':
+                test.assertEquals(getattr(source_record, name),
+                                  getattr(target_record, name))
+
+
 class ContinuousStorageIterator(ZODB.tests.StorageTestBase.StorageTestBase):
 
     def setUp(self):
@@ -100,22 +119,7 @@
         return tid
 
     def compare(self, source, target):
-        recovery = gocept.zeoraid.recovery.Recovery(
-            source, target, lambda target: None)
-        protocol = list(recovery())
-        self.assertEquals([('verified',), ('recovered',)], protocol[-2:])
-        for source_txn, target_txn in zip(source.iterator(),
-                                          target.iterator()):
-            # We need not compare the transaction metadata because that has
-            # already been done by the recovery's verification run.
-            source_records = list(source_txn)
-            target_records = list(target_txn)
-            self.assertEquals(len(source_records), len(target_records))
-            for source_record, target_record in zip(source_records,
-                                                    target_records):
-                for name in 'oid', 'tid', 'data', 'version', 'data_txn':
-                    self.assertEquals(getattr(source_record, name),
-                                      getattr(target_record, name))
+        compare(self, source, target)
 
     def setUp(self):
         self.source = ZODB.FileStorage.FileStorage(tempfile.mktemp())



More information about the Checkins mailing list