[Checkins] SVN: gocept.zeoraid/trunk/src/gocept/zeoraid/ Implement majority voting for results in co-op mode. This included a lot of

Christian Theune ct at gocept.com
Wed Oct 6 10:28:09 EDT 2010


Log message for revision 117305:
  Implement majority voting for results in co-op mode. This included a lot of
  detailed changes that are reflected in the tests. (Sorry for the bulk
  check-in.)
  

Changed:
  U   gocept.zeoraid/trunk/src/gocept/zeoraid/storage.py
  U   gocept.zeoraid/trunk/src/gocept/zeoraid/tests/test_basics.py
  U   gocept.zeoraid/trunk/src/gocept/zeoraid/tests/test_recovery.py
  U   gocept.zeoraid/trunk/src/gocept/zeoraid/tests/test_zeo.py

-=-
Modified: gocept.zeoraid/trunk/src/gocept/zeoraid/storage.py
===================================================================
--- gocept.zeoraid/trunk/src/gocept/zeoraid/storage.py	2010-10-06 13:20:13 UTC (rev 117304)
+++ gocept.zeoraid/trunk/src/gocept/zeoraid/storage.py	2010-10-06 14:28:08 UTC (rev 117305)
@@ -20,6 +20,7 @@
 import ZODB.blob
 import ZODB.interfaces
 import ZODB.utils
+import cPickle
 import gocept.zeoraid.interfaces
 import gocept.zeoraid.recovery
 import logging
@@ -51,6 +52,7 @@
 
 def ensure_writable(method):
 
+    @ensure_open_storage
     def check_writable(self, *args, **kw):
         if self.isReadOnly():
             raise ZODB.POSException.ReadOnlyError()
@@ -148,8 +150,13 @@
         # Remember the openers so closed storages can be re-opened as needed.
         self.openers = dict((opener.name, opener) for opener in openers)
 
+        # Do not fail the whole storage while opening storages the first time
+        # as we have intermittent states that would never allow a RAID to
+        # bootstrap.
+        self._fail_after_degrade = False
         for name in self.openers:
             self._open_storage(name)
+        self._fail_after_degrade = True
 
         # Evaluate the consistency of the opened storages. We compare the last
         # known TIDs of all storages. All storages whose TID equals the newest
@@ -193,6 +200,7 @@
             try:
                 AllStoragesOperation(self, expect_connected=False).close()
             except Exception, e:
+                logging.exception('asdf')
                 if not zeoraid_exception(e):
                     raise e
         finally:
@@ -225,8 +233,6 @@
 
     def lastTransaction(self):
         """Return the id of the last committed transaction."""
-        if self.raid_status() == 'failed':
-            raise gocept.zeoraid.interfaces.RAIDError('RAID is failed.')
         # Although this is a read operation we apply it to all storages as a
         # safety belt to ensure consistency.
         return AllStoragesOperation(self).lastTransaction()
@@ -235,7 +241,7 @@
         """The approximate number of objects in the storage."""
         try:
             return self._reader.__len__()
-        except RuntimeError, e:
+        except ZEO.Exceptions.ClientStorageError, e:
             if zeoraid_exception(e):
                 return 0
             raise e
@@ -269,9 +275,6 @@
             reliable, oid = self._apply_storage(storage, 'new_oid')
             if reliable:
                 oids.append((oid, storage))
-        if not oids:
-            raise gocept.zeoraid.interfaces.RAIDError(
-                "RAID storage is failed.")
 
         min_oid = sorted(oids)[0][0]
         for oid, storage in oids:
@@ -584,7 +587,6 @@
 
     # IRAIDStorage
 
-    @ensure_open_storage
     def raid_status(self):
         if self.storage_recovering:
             return 'recovering'
@@ -613,8 +615,10 @@
 
     @ensure_open_storage
     def raid_disable(self, name):
-        self._degrade_storage(
-            name, reason='disabled by controller', fail=False)
+        try:
+            self._degrade_storage(name, reason='disabled by controller')
+        except ZEO.Exceptions.ClientStorageError:
+            pass
         return 'disabled %r' % (name,)
 
     @ensure_open_storage
@@ -709,8 +713,7 @@
             # We were trying to open a storage. Even if we fail we can't be
             # more broke than before, so don't ever fail due to this.
             self._degrade_storage(
-                name, reason='an error occured opening the storage',
-                fail=False)
+                name, reason='an error occured opening the storage')
             if storage is not None:
                 try:
                     storage.close()
@@ -729,14 +732,24 @@
             t.setDaemon(True)
             t.start()
 
-    def _degrade_storage(self, name, reason, fail=True):
+    def _degrade_storage(self, name, reason):
         self._close_storage(name)
         self.storages_degraded.append(name)
         self.degrade_reasons[name] = reason
-        logger.critical('RAID %r degraded due failure of back-end %r. '
+        logger.critical('RAID %r degraded due to failure of back-end %r. '
                         'Reason: %s' % (self.__name__, name, reason))
-        if not self.storages_optimal and fail:
-            raise gocept.zeoraid.interfaces.RAIDError("No storages remain.")
+        if not self._fail_after_degrade:
+            return
+        fail = False
+        if self.cluster_mode == 'single':
+            if not self.storages_optimal:
+                fail = 'No storages remain optimal'
+        elif self.cluster_mode == 'coop':
+            if len(self.storages_optimal) <= len(self.openers) * 0.5:
+                fail = 'Less than 50% of the configured storages remain optimal.'
+        if fail:
+            self.close()
+            raise gocept.zeoraid.interfaces.RAIDClosedError(fail)
 
     def _apply_storage(self, storage_name, method_name, args=(), kw={},
                         expect_connected=True):
@@ -811,9 +824,8 @@
         except Exception:
             logger.exception('Error opening storage %s' % name)
             self.storage_recovering = None
-            self.storages_degraded.append(name)
-            self.degrade_reasons[name] = (
-                'an error occured opening the storage')
+            self._degrade_storage(name,
+                'An error occurred while opening the storage.')
             raise
         self.storages[name] = target
         recovery = gocept.zeoraid.recovery.Recovery(
@@ -826,9 +838,8 @@
         except Exception:
             logger.exception('Error recovering storage %s' % name)
             self.storage_recovering = None
-            self.storages_degraded.append(name)
-            self.degrade_reasons[name] = (
-                'an error occured recovering the storage')
+            self._degrade_storage(name,
+                'An error occured recovering the storage')
             raise
 
     def _finalize_recovery(self, storage):
@@ -928,6 +939,8 @@
         self.raid = raid
 
     def __getattr__(self, name):
+        if name.startswith('_') and name not in ['__len__']:
+            raise AttributeError(name)
         return lambda *args, **kw: self(name, args, kw)
 
     @property
@@ -995,6 +1008,10 @@
         applicable_storages = [storage for storage in applicable_storages
                                if storage not in self.exclude]
 
+        if not applicable_storages:
+            raise gocept.zeoraid.interfaces.RAIDError(
+                'No applicable storages for operation %s available.' %
+                method_name)
         # Run _apply_storage on all applicable storages in parallel.
         threads = []
         for storage_name in applicable_storages:
@@ -1007,7 +1024,6 @@
 
         # Wait for threads to finish and pick up results.
         results = {}
-        exceptions = []
         for thread in threads:
             # XXX The timeout should be calculated such that the total time
             # spent in this loop doesn't grow with the number of storages.
@@ -1021,51 +1037,54 @@
                 self.raid._threads.add(thread)
                 continue
             if thread.exception:
-                exceptions.append(thread.exception)
+                results[thread.storage_name] = OperationExceptionResult(
+                    thread.exception)
             elif thread.reliable:
-                results[thread.storage_name] = thread.result
+                results[thread.storage_name] = OperationResult(
+                    thread.result,
+                    self.filter_results(thread.result,
+                                        self.raid.storages.get(thread.storage_name)))
 
-        # Analyse result consistency.
-        consistent = True
-        if exceptions and results:
-            consistent = False
-        elif exceptions:
-            # Since we can only get one kind of exceptions at the moment, they
-            # must be consistent anyway.
-            pass
-        elif results:
-            consistent = self._check_result_consistency(results)
-        if not consistent:
-            self.raid.close()
-            raise gocept.zeoraid.interfaces.RAIDError(
-                "RAID is inconsistent and was closed.")
+        try:
+            result = self._extract_result(results)
+        except RuntimeError:
+            logger.debug(
+                'Received inconsistent results for method %s: %r' %
+                (method_name, results))
+            raise
 
-        # Select result.
-        if exceptions:
-            raise exceptions[0]
-        if results:
-            return results.values()[0]
+        return result()
 
-        # We did not get any reliable result, making this call effectively a
-        # no-op.
-        if self.ignore_noop:
-            return
-        raise gocept.zeoraid.interfaces.RAIDError("RAID storage is failed.")
+    def _extract_result(self, results):
+        """Extract a consistent result from a set of results.
 
-    def _check_result_consistency(self, results):
-        filtered_results = [
-            self.filter_results(result, self.raid.storages[storage])
-            for storage, result in results.items()]
-        ref = filtered_results[0]
-        for test in filtered_results[1:]:
-            if test != ref:
-                logger.debug(
-                    'Got inconsistent results for method %s: %r' %
-                    (method_name, results))
-                return False
-        return True
+        We select the result that was returned by the most storages and rely
+        on the implementation of _degrade_storage to disable the RAID
+        completely if too few storages were involved delivering that result.
 
+        We expect _degrade_storage to raise an exception in the latter case.
 
+        """
+        result_classes = NonHashingDict()
+        # Classify results by their value, keep track of the storages that
+        # returned the respective result.
+        for storage, result in results.items():
+            storages = result_classes.setdefault(result, [])
+            storages.append(storage)
+        # Select the result that was returned most.
+        max_same_result = 0
+        for result, storages in result_classes.items():
+            if len(storages) > max_same_result:
+                max_same_result = len(storages)
+                extracted_result = result
+        # Degrade all storages with different results.
+        for result, storages in result_classes.items():
+            if result != extracted_result:
+                for storage in storages:
+                    self.raid._degrade_storage(storage, 'inconsistent result')
+        return extracted_result
+
+
 def optimistic_copy(source, target):
     """Try creating a hard link to source at target. Fall back to copying the
     file.
@@ -1119,3 +1138,51 @@
 
     """
     return bool(getattr(e, 'created_by_zeoraid', False))
+
+
+class NonHashingDict(object):
+
+    def __init__(self):
+        self.data = []
+
+    def setdefault(self, key, default=None):
+        for candidate_key, value in self.data:
+            if key == candidate_key:
+                return value
+        self.data.append((key, default))
+        return default
+
+    def items(self):
+        return self.data[:]
+
+
+class OperationResult(object):
+
+    def __init__(self, original, filtered):
+        self.original = original
+        self.filtered = filtered
+
+    def __eq__(self, other):
+        if isinstance(other, OperationResult):
+            return self.filtered == other.filtered
+        return NotImplemented
+
+    def __call__(self):
+        return self.original
+
+
+class OperationExceptionResult(object):
+
+    def __init__(self, exception):
+        self.exception = exception
+
+    def __eq__(self, other):
+        if not isinstance(other, OperationExceptionResult):
+            return NotImplemented
+        try:
+            return cPickle.dumps(self.exception) == cPickle.dumps(other.exception)
+        except cPickle.PicklingError:
+            return False
+
+    def __call__(self):
+        raise self.exception

Modified: gocept.zeoraid/trunk/src/gocept/zeoraid/tests/test_basics.py
===================================================================
--- gocept.zeoraid/trunk/src/gocept/zeoraid/tests/test_basics.py	2010-10-06 13:20:13 UTC (rev 117304)
+++ gocept.zeoraid/trunk/src/gocept/zeoraid/tests/test_basics.py	2010-10-06 14:28:08 UTC (rev 117305)
@@ -67,9 +67,8 @@
 
 class Opener(object):
 
-    name = 'foo'
-
-    def __init__(self, undo=True):
+    def __init__(self, name, undo=True):
+        self.name = name
         self.undo = undo
 
     def open(self):
@@ -154,7 +153,8 @@
 
 class FailingStorageTestSetup(StorageTestBase.StorageTestBase):
 
-    backend_count = 2
+    backend_count = 3
+    cluster_mode = 'coop'
 
     def _backend(self, index):
         return self._storage.storages[
@@ -184,7 +184,8 @@
         blob_dir = tempfile.mkdtemp()
         self.temp_paths.append(blob_dir)
         self._storage = gocept.zeoraid.storage.RAIDStorage(
-            'teststorage', self._storages, blob_dir=blob_dir)
+            'teststorage', self._storages,
+            cluster_mode=self.cluster_mode, blob_dir=blob_dir)
 
         self.orig_choice = random.choice
         random.choice = lambda seq: seq[0]
@@ -226,7 +227,7 @@
                 wait_timeout=60))
         self._storage = gocept.zeoraid.storage.RAIDStorage(
             'teststorage', self._storages, blob_dir=blob_dir,
-            shared_blob_dir=True)
+            cluster_mode=self.cluster_mode, shared_blob_dir=True)
 
         self.orig_choice = random.choice
         random.choice = lambda seq: seq[0]
@@ -326,9 +327,11 @@
         self._disable_storage(0)
         self.assertEquals(1, len(self._backend(0).history(oid, '')))
         self.assertEquals(1, len(self._storage.history(oid, '')))
+        self.assertEquals('degraded', self._storage.raid_status())
 
         self._disable_storage(0)
-        self.assertRaises(RuntimeError, self._storage.history, oid, '')
+        self.assertRaises(ZEO.Exceptions.ClientStorageError, self._storage.history, oid, '')
+        self.assertEquals('failed', self._storage.raid_status())
 
     def test_history_degrading(self):
         oid = self._storage.new_oid()
@@ -343,7 +346,7 @@
         self.assertEquals('degraded', self._storage.raid_status())
 
         self._backend(0).fail('history')
-        self.assertRaises(RuntimeError, self._storage.history, oid, '')
+        self.assertRaises(ZEO.Exceptions.ClientStorageError, self._storage.history, oid, '')
         self.assertEquals('failed', self._storage.raid_status())
 
     def test_lastTransaction(self):
@@ -361,7 +364,7 @@
         self.assertEquals(None, self._storage.lastTransaction())
         self._disable_storage(0)
         self.assertEquals('failed', self._storage.raid_status())
-        self.assertRaises(RuntimeError, self._storage.lastTransaction)
+        self.assertRaises(ZEO.Exceptions.ClientStorageError, self._storage.lastTransaction)
 
     def test_len_degrading(self):
         # Brrrr. ClientStorage doesn't seem to implement __len__ correctly.
@@ -415,7 +418,7 @@
         self.assertEquals((data_record, serial), self._backend(0).load(oid))
 
         self._disable_storage(0)
-        self.assertRaises(RuntimeError, self._storage.load, oid)
+        self.assertRaises(ZEO.Exceptions.ClientStorageError, self._storage.load, oid)
 
     def test_load_can_be_failed(self):
         # ClientStorage does not directly call `load` but
@@ -443,7 +446,7 @@
         self.assertEquals('degraded', self._storage.raid_status())
 
         self._backend(0).fail('load')
-        self.assertRaises(RuntimeError, self._storage.load, oid)
+        self.assertRaises(ZEO.Exceptions.ClientStorageError, self._storage.load, oid)
         self.assertEquals('failed', self._storage.raid_status())
 
     def test_loadBefore_degrading1(self):
@@ -477,7 +480,7 @@
                           self._backend(0).loadBefore(oid, revid2))
 
         self._disable_storage(0)
-        self.assertRaises(RuntimeError, self._storage.loadBefore, oid, revid2)
+        self.assertRaises(ZEO.Exceptions.ClientStorageError, self._storage.loadBefore, oid, revid2)
 
     def test_loadBefore_degrading2(self):
         oid = self._storage.new_oid()
@@ -498,7 +501,7 @@
         self.assertEquals('degraded', self._storage.raid_status())
 
         self._backend(0).fail('loadBefore')
-        self.assertRaises(RuntimeError, self._storage.loadBefore, oid, revid2)
+        self.assertRaises(ZEO.Exceptions.ClientStorageError, self._storage.loadBefore, oid, revid2)
         self.assertEquals('failed', self._storage.raid_status())
 
     def test_loadSerial_degrading1(self):
@@ -534,7 +537,7 @@
                           self._backend(0).loadSerial(oid, revid))
 
         self._disable_storage(0)
-        self.assertRaises(RuntimeError, self._storage.loadSerial, oid, revid)
+        self.assertRaises(ZEO.Exceptions.ClientStorageError, self._storage.loadSerial, oid, revid)
 
     def test_loadSerial_degrading2(self):
         oid = self._storage.new_oid()
@@ -557,7 +560,7 @@
         self.assertEquals('degraded', self._storage.raid_status())
 
         self._backend(0).fail('loadSerial')
-        self.assertRaises(RuntimeError, self._storage.loadSerial, oid, revid)
+        self.assertRaises(ZEO.Exceptions.ClientStorageError, self._storage.loadSerial, oid, revid)
         self.assertEquals('failed', self._storage.raid_status())
 
     def test_new_oid_degrading1(self):
@@ -565,7 +568,7 @@
         self._disable_storage(0)
         self.assertEquals(8, len(self._storage.new_oid()))
         self._disable_storage(0)
-        self.assertRaises(RuntimeError, self._storage.new_oid)
+        self.assertRaises(ZEO.Exceptions.ClientStorageError, self._storage.new_oid)
 
     def test_new_oid_degrading2(self):
         self.assertEquals(8, len(self._storage.new_oid()))
@@ -578,7 +581,7 @@
 
         self._backend(0)._oids = None
         self._backend(0).fail('new_oid')
-        self.assertRaises(RuntimeError, self._storage.new_oid)
+        self.assertRaises(ZEO.Exceptions.ClientStorageError, self._storage.new_oid)
         self.assertEquals('failed', self._storage.raid_status())
 
     def test_new_oid_unsynchronised_degrading(self):
@@ -620,7 +623,7 @@
         self._dostore(oid=oid, revid=revid3, data=4)
         self.assertEquals(256, self._storage.getSize())
         self._disable_storage(0)
-        self.assertRaises(RuntimeError,
+        self.assertRaises(ZEO.Exceptions.ClientStorageError,
                           self._storage.pack,
                           time.time(), ZODB.serialize.referencesf)
 
@@ -645,7 +648,7 @@
         self.assertEquals(256, self._storage.getSize())
 
         self._backend(0).fail('pack')
-        self.assertRaises(RuntimeError,
+        self.assertRaises(ZEO.Exceptions.ClientStorageError,
                           self._storage.pack,
                           time.time(), ZODB.serialize.referencesf)
         self.assertEquals('failed', self._storage.raid_status())
@@ -660,7 +663,7 @@
         self.assertEquals('degraded', self._storage.raid_status())
 
         self._backend(0).fail('store')
-        self.assertRaises(RuntimeError,
+        self.assertRaises(ZEO.Exceptions.ClientStorageError,
                           self._dostoreNP,
                           oid=oid, revid=revid, data='bar')
         self.assertEquals('failed', self._storage.raid_status())
@@ -675,7 +678,7 @@
 
         oid = self._storage.new_oid()
         self._backend(0).fail('tpc_begin')
-        self.assertRaises(RuntimeError, self._dostoreNP, oid=oid, data='bar')
+        self.assertRaises(ZEO.Exceptions.ClientStorageError, self._dostoreNP, oid=oid, data='bar')
         self.assertEquals('failed', self._storage.raid_status())
 
     def test_tpc_vote_degrading(self):
@@ -688,7 +691,7 @@
 
         oid = self._storage.new_oid()
         self._backend(0).fail('tpc_vote')
-        self.assertRaises(RuntimeError, self._dostoreNP, oid=oid, data='bar')
+        self.assertRaises(ZEO.Exceptions.ClientStorageError, self._dostoreNP, oid=oid, data='bar')
         self.assertEquals('failed', self._storage.raid_status())
 
     def test_tpc_finish_degrading(self):
@@ -701,7 +704,7 @@
 
         oid = self._storage.new_oid()
         self._backend(0).fail('tpc_finish')
-        self.assertRaises(RuntimeError, self._dostoreNP, oid=oid, data='bar')
+        self.assertRaises(ZEO.Exceptions.ClientStorageError, self._dostoreNP, oid=oid, data='bar')
         self.assertEquals('failed', self._storage.raid_status())
 
     def test_tpc_abort_not_degrading(self):
@@ -773,7 +776,7 @@
         self._storage.tpc_begin(t)
         self._disable_storage(0)
         self._disable_storage(0)
-        self.assertRaises(RuntimeError,
+        self.assertRaises(ZEO.Exceptions.ClientStorageError,
                           self._storage.storeBlob,
                           oid, ZODB.utils.z64, 'foo', blob_file_name, '', t)
 
@@ -809,7 +812,7 @@
         # when tpc_vote ist called.
         self._storage.storeBlob(
             oid, ZODB.utils.z64, 'foo', blob_file_name, '', t)
-        self.assertRaises(RuntimeError,
+        self.assertRaises(ZEO.Exceptions.ClientStorageError,
                           self._storage.tpc_vote, t)
 
     def test_storeBlob_degrading3(self):
@@ -843,7 +846,7 @@
             raise Exception()
         self._backend(0).storeBlob = fail
         self._backend(1).storeBlob = fail
-        self.assertRaises(RuntimeError,
+        self.assertRaises(ZEO.Exceptions.ClientStorageError,
                           self._storage.storeBlob,
                           oid, ZODB.utils.z64, 'foo', blob_file_name, '', t)
 
@@ -872,7 +875,7 @@
         os.remove(stored_file_name)
 
         self._disable_storage(0)
-        self.assertRaises(RuntimeError,
+        self.assertRaises(ZEO.Exceptions.ClientStorageError,
                           self._storage.loadBlob, oid, last_transaction)
 
     def test_loadBlob_degrading2(self):
@@ -905,7 +908,7 @@
         os.unlink(b0_filename)
 
         self._backend(0).fail('loadBlob')
-        self.assertRaises(RuntimeError,
+        self.assertRaises(ZEO.Exceptions.ClientStorageError,
                           self._storage.loadBlob, oid, last_transaction)
         self.assertEquals('failed', self._storage.raid_status())
 
@@ -964,7 +967,7 @@
     def test_supportsUndo_required(self):
         self.assertRaises(RuntimeError,
                           gocept.zeoraid.storage.RAIDStorage,
-                          'name', [Opener(undo=False)])
+                          'name', [Opener('1', undo=False)])
 
     def test_supportsUndo(self):
         self.assertEquals(True, self._storage.supportsUndo())
@@ -992,7 +995,7 @@
         t = transaction.Transaction()
         self._storage.tpc_begin(t)
         self._disable_storage(0)
-        self.assertRaises(RuntimeError, self._storage.undo, info[2]['id'], t)
+        self.assertRaises(ZEO.Exceptions.ClientStorageError, self._storage.undo, info[2]['id'], t)
 
     def test_undo_degrading2(self):
         oid = self._storage.new_oid()
@@ -1018,7 +1021,7 @@
         t = transaction.Transaction()
         self._storage.tpc_begin(t)
         self._backend(0).fail('undo')
-        self.assertRaises(RuntimeError, self._storage.undo, info[2]['id'], t)
+        self.assertRaises(ZEO.Exceptions.ClientStorageError, self._storage.undo, info[2]['id'], t)
         self.assertEquals('failed', self._storage.raid_status())
 
     def test_undoLog_degrading1(self):
@@ -1034,7 +1037,7 @@
         self.assertEquals(2, len(info))
 
         self._disable_storage(0)
-        self.assertRaises(RuntimeError, self._storage.undoLog)
+        self.assertRaises(ZEO.Exceptions.ClientStorageError, self._storage.undoLog)
 
     def test_undoLog_degrading2(self):
         oid = self._storage.new_oid()
@@ -1050,7 +1053,7 @@
         self.assertEquals(2, len(info))
 
         self._backend(0).fail('undoLog')
-        self.assertRaises(RuntimeError, self._storage.undoLog)
+        self.assertRaises(ZEO.Exceptions.ClientStorageError, self._storage.undoLog)
         self.assertEquals('failed', self._storage.raid_status())
 
     def test_undoInfo_degrading1(self):
@@ -1066,7 +1069,7 @@
         self.assertEquals(2, len(info))
 
         self._disable_storage(0)
-        self.assertRaises(RuntimeError, self._storage.undoInfo)
+        self.assertRaises(ZEO.Exceptions.ClientStorageError, self._storage.undoInfo)
 
     def test_undoInfo_degrading2(self):
         oid = self._storage.new_oid()
@@ -1082,7 +1085,7 @@
         self.assertEquals(2, len(info))
 
         self._backend(0).fail('undoInfo')
-        self.assertRaises(RuntimeError, self._storage.undoInfo)
+        self.assertRaises(ZEO.Exceptions.ClientStorageError, self._storage.undoInfo)
         self.assertEquals('failed', self._storage.raid_status())
 
     def test_record_iternext(self):
@@ -1112,7 +1115,7 @@
         self.assertEquals('0', data)
 
         self._disable_storage(0)
-        self.assertRaises(RuntimeError, self._storage.record_iternext, next)
+        self.assertRaises(ZEO.Exceptions.ClientStorageError, self._storage.record_iternext, next)
 
     def test_record_iternext_degrading2(self):
         for x in range(5):
@@ -1125,7 +1128,7 @@
         self.assertEquals('degraded', self._storage.raid_status())
 
         self._backend(0).fail('record_iternext')
-        self.assertRaises(RuntimeError, self._storage.record_iternext, next)
+        self.assertRaises(ZEO.Exceptions.ClientStorageError, self._storage.record_iternext, next)
         self.assertEquals('failed', self._storage.raid_status())
 
     def test_gettid_degrading1(self):
@@ -1137,7 +1140,7 @@
         self.assertEquals(revid, tid)
 
         self._disable_storage(0)
-        self.assertRaises(RuntimeError, self._storage.getTid, oid)
+        self.assertRaises(ZEO.Exceptions.ClientStorageError, self._storage.getTid, oid)
 
     def test_gettid_degrading2(self):
         oid = self._storage.new_oid()
@@ -1149,7 +1152,7 @@
         self.assertEquals('degraded', self._storage.raid_status())
 
         self._backend(0).fail('getTid')
-        self.assertRaises(RuntimeError, self._storage.getTid, oid)
+        self.assertRaises(ZEO.Exceptions.ClientStorageError, self._storage.getTid, oid)
         self.assertEquals('failed', self._storage.raid_status())
 
     def test_tpc_transaction_finishing(self):
@@ -1227,9 +1230,11 @@
         self.assertRaises(
             ValueError,
             self._storage._recover_impl, self._storage.storages_degraded[0])
+        values = self._storage.raid_details().values()
         self.assertEquals(
-            {'1': 'failed: an error occured recovering the storage',
-             '0': 'optimal'}, self._storage.raid_details())
+            1,
+            values.count('failed: An error occured recovering the storage'))
+        self.assertEquals(self.backend_count-1, values.count('optimal'))
 
     def test_timeoutBackend(self):
         self._storage.timeout = 2
@@ -1245,8 +1250,8 @@
         self.assertRaises(RuntimeError, self._storage.raid_reload)
 
 
-class FailingStorageTests(FailingStorageTestBase,
-                          FailingStorageTestSetup):
+class FailingStorageCoopTests(FailingStorageTestBase,
+                              FailingStorageTestSetup):
 
     def test_blob_cache_cannot_link(self):
         called_broken = []
@@ -1312,9 +1317,15 @@
         thread.join()
 
 
-class FailingStorageSharedBlobTests(FailingStorageTestBase,
-                                    FailingStorageSharedBlobTestSetup):
+class FailingStorageSingleTests(FailingStorageCoopTests):
 
+    backend_count = 2
+    cluster_mode = 'single'
+
+
+class FailingStorageCoopSharedBlobTests(FailingStorageTestBase,
+                                        FailingStorageSharedBlobTestSetup):
+
     def test_loadBlob_file_missing(self):
         oid, tid = self.store_blob()
         stored_file_name = self._storage.loadBlob(oid, tid)
@@ -1395,11 +1406,17 @@
         self._storage.tpc_begin(t)
         fail(self._backend(0), 'storeBlob')
         fail(self._backend(1), 'storeBlob')
-        self.assertRaises(RuntimeError,
+        self.assertRaises(ZEO.Exceptions.ClientStorageError,
                           self._storage.storeBlob,
                           oid, ZODB.utils.z64, 'foo', blob_file_name, '', t)
 
 
+class FailingStorageSingleSharedBlobTests(FailingStorageCoopSharedBlobTests):
+
+    backend_count = 2
+    cluster_mode = 'single'
+
+
 class ZEOReplicationStorageTests(ZEOStorageBackendTests,
                                  ReplicationStorageTests,
                                  ThreadTests.ThreadTests):
@@ -1662,30 +1679,138 @@
 class ClusterModeTests(unittest.TestCase):
 
     def setUp(self):
-        self.storage = gocept.zeoraid.storage.RAIDStorage(
-            'test', [Opener('1'), Opener('2')])
-        self.storage._apply_storage = mock.Mock()
+        self.raid = gocept.zeoraid.storage.RAIDStorage(
+            'test', [Opener('%s' % s) for s in range(5)])
+        self.raid._apply_storage = mock.Mock(return_value=(True, None))
 
-    def test_single_mode_read(self):
-        self.storage.cluster_mode = 'single'
-        self.assert_(
-            isinstance(self.storage._reader,
-                       gocept.zeoraid.storage.SingleStorageOperation))
 
-    def test_coop_mode_read(self):
-        self.storage.cluster_mode = 'coop'
+class ClusterModeCoopTests(ClusterModeTests):
+
+    def setUp(self):
+        super(ClusterModeCoopTests, self).setUp()
+        self.raid.cluster_mode = 'coop'
+
+    def test_read_operation_chooses_all_storages_op(self):
         self.assert_(
-            isinstance(self.storage._reader,
+            isinstance(self.raid._reader,
                        gocept.zeoraid.storage.AllStoragesOperation))
 
+    def test_degrade_fails_with_too_few_active_storages(self):
+        self.raid._degrade_storage('0', 'test')
+        self.raid._degrade_storage('1', 'test')
+        self.assertRaises(ZEO.Exceptions.ClientStorageError,
+                          lambda: self.raid._degrade_storage('2', 'test'))
 
+class ClusterModeSingleTests(ClusterModeTests):
+
+    def setUp(self):
+        super(ClusterModeSingleTests, self).setUp()
+        self.raid.cluster_mode = 'single'
+
+    def test_read_operation_chooses_single_storage_op(self):
+        self.assert_(
+            isinstance(self.raid._reader,
+                       gocept.zeoraid.storage.SingleStorageOperation))
+
+    def test_degrade_fails_with_no_active_storage(self):
+        self.raid._degrade_storage('0', 'test')
+        self.raid._degrade_storage('1', 'test')
+        self.raid._degrade_storage('2', 'test')
+        self.raid._degrade_storage('3', 'test')
+        self.assertRaises(ZEO.Exceptions.ClientStorageError,
+                          lambda: self.raid._degrade_storage('4', 'test'))
+
+class AllStorageConsistencyCheck(unittest.TestCase):
+
+    def setUp(self):
+        self.raid = mock.Mock()
+        self.op = gocept.zeoraid.storage.AllStoragesOperation(self.raid)
+
+    def test_equal_results_should_be_consistent(self):
+        self.assert_(self.op._extract_result(
+            {'1': mock.sentinel.result,
+             '2': mock.sentinel.result}))
+        self.assertEquals(0, self.raid._degrade_storage.call_count)
+
+    def test_two_different_out_of_two_results_should_be_inconsistent(self):
+        self.op._extract_result(
+            {'1': mock.sentinel.result1,
+             '2': mock.sentinel.result2})
+        self.assert_([(('1', 'inconsistent result'), {}),
+                      (('2', 'inconsistent result'), {})],
+                     self.raid._degrade_storage.call_args_list)
+
+    def test_one_different_out_of_three_results_should_be_consistent(self):
+        self.assertEquals(mock.sentinel.result2,
+                          self.op._extract_result(
+                              {'1': mock.sentinel.result1,
+                               '2': mock.sentinel.result2,
+                               '3': mock.sentinel.result2}))
+        self.raid._degrade_storage.assert_called_with('1', 'inconsistent result')
+
+    def test_three_different_out_of_three_results_should_be_inconsistent(self):
+        self.op._extract_result(
+            {'1': mock.sentinel.result1,
+             '2': mock.sentinel.result2,
+             '3': mock.sentinel.result3})
+        # We only expect two degrades to happen as we do select one of the
+        # results as "good" and rely on the actual implementation of
+        # _degrade_storage to raise an exception in that case.
+        self.assertEquals(2, self.raid._degrade_storage.call_count)
+
+    def test_extract_non_hashable_result_works(self):
+        non_hashable = [1, 2, 3]
+        self.assertEquals(non_hashable,
+                  self.op._extract_result({'1': non_hashable}))
+
+
+class OperationExceptionResultTests(unittest.TestCase):
+
+    def result(self, exception):
+        return gocept.zeoraid.storage.OperationExceptionResult(exception)
+
+    def test_same_exception_type_and_dict_are_equal(self):
+        self.assertEquals(self.result(RuntimeError('foo')),
+                          self.result(RuntimeError('foo')))
+
+    def test_unpickleable_exceptions_are_not_equal(self):
+        class UnpickleableException(object):
+            __slots__ = ()
+        self.assertNotEqual(self.result(UnpickleableException()),
+                            self.result(UnpickleableException()))
+
+    def test_same_exception_type_and_different_dict_are_not_equal(self):
+        self.assertNotEqual(self.result(RuntimeError('foo')),
+                            self.result(RuntimeError('bar')))
+
+    def test_different_exception_type_and_same_dict_are_not_equal(self):
+        self.assertNotEqual(self.result(RuntimeError('foo')),
+                            self.result(TypeError('foo')))
+
+    def test_different_exception_type_and_different_dict_are_not_equal(self):
+        self.assertNotEqual(self.result(RuntimeError('foo')),
+                            self.result(TypeError('bar')))
+
+    def test_call_should_raise(self):
+        exception = RuntimeError('foo')
+        try:
+            self.result(exception)()
+        except RuntimeError, e:
+            self.assertEquals(exception, e)
+        else:
+            self.fail('No exception raised')
+
 def test_suite():
-
     suite = unittest.TestSuite()
     suite.addTest(unittest.makeSuite(ZEOReplicationStorageTests, "check"))
-    suite.addTest(unittest.makeSuite(FailingStorageTests))
-    suite.addTest(unittest.makeSuite(FailingStorageSharedBlobTests))
+    suite.addTest(unittest.makeSuite(FailingStorageSingleTests))
+    suite.addTest(unittest.makeSuite(FailingStorageSingleSharedBlobTests))
+    suite.addTest(unittest.makeSuite(FailingStorageCoopTests))
+    suite.addTest(unittest.makeSuite(FailingStorageCoopSharedBlobTests))
     suite.addTest(unittest.makeSuite(LoggingStorageDistributedTests))
     suite.addTest(unittest.makeSuite(ExtensionMethodsTests))
-    suite.addTest(unittest.makeSuite(ClusterModeTests))
+    suite.addTest(unittest.makeSuite(ClusterModeSingleTests))
+    suite.addTest(unittest.makeSuite(ClusterModeCoopTests))
+    suite.addTest(unittest.makeSuite(AllStorageConsistencyCheck))
+    suite.addTest(unittest.makeSuite(OperationExceptionResultTests))
     return suite

Modified: gocept.zeoraid/trunk/src/gocept/zeoraid/tests/test_recovery.py
===================================================================
--- gocept.zeoraid/trunk/src/gocept/zeoraid/tests/test_recovery.py	2010-10-06 13:20:13 UTC (rev 117304)
+++ gocept.zeoraid/trunk/src/gocept/zeoraid/tests/test_recovery.py	2010-10-06 14:28:08 UTC (rev 117305)
@@ -205,7 +205,7 @@
 
         return gocept.zeoraid.storage.RAIDStorage(
             'raid',
-            [self.source_opener, self.target_opener],
+            [self.source_opener, self.target_opener], cluster_mode='single',
             blob_dir=blob_dir, shared_blob_dir=self.shared)
 
     def test_verify_both_empty(self):

Modified: gocept.zeoraid/trunk/src/gocept/zeoraid/tests/test_zeo.py
===================================================================
--- gocept.zeoraid/trunk/src/gocept/zeoraid/tests/test_zeo.py	2010-10-06 13:20:13 UTC (rev 117304)
+++ gocept.zeoraid/trunk/src/gocept/zeoraid/tests/test_zeo.py	2010-10-06 14:28:08 UTC (rev 117305)
@@ -70,6 +70,7 @@
         config = """\
         %%import gocept.zeoraid
         <raidstorage 1>
+            cluster-mode single
             <filestorage 1>
             path %s
             </filestorage>



More information about the checkins mailing list