[Checkins] SVN: gocept.zeoraid/trunk/src/gocept/zeoraid/ Implement majority voting for results in co-op mode. This included a lot of
Christian Theune
ct at gocept.com
Wed Oct 6 10:28:09 EDT 2010
Log message for revision 117305:
Implement majority voting for results in co-op mode. This included a lot of
detailed changes that are reflected in the tests. (Sorry for the bulk
check-in.)
Changed:
U gocept.zeoraid/trunk/src/gocept/zeoraid/storage.py
U gocept.zeoraid/trunk/src/gocept/zeoraid/tests/test_basics.py
U gocept.zeoraid/trunk/src/gocept/zeoraid/tests/test_recovery.py
U gocept.zeoraid/trunk/src/gocept/zeoraid/tests/test_zeo.py
-=-
Modified: gocept.zeoraid/trunk/src/gocept/zeoraid/storage.py
===================================================================
--- gocept.zeoraid/trunk/src/gocept/zeoraid/storage.py 2010-10-06 13:20:13 UTC (rev 117304)
+++ gocept.zeoraid/trunk/src/gocept/zeoraid/storage.py 2010-10-06 14:28:08 UTC (rev 117305)
@@ -20,6 +20,7 @@
import ZODB.blob
import ZODB.interfaces
import ZODB.utils
+import cPickle
import gocept.zeoraid.interfaces
import gocept.zeoraid.recovery
import logging
@@ -51,6 +52,7 @@
def ensure_writable(method):
+ @ensure_open_storage
def check_writable(self, *args, **kw):
if self.isReadOnly():
raise ZODB.POSException.ReadOnlyError()
@@ -148,8 +150,13 @@
# Remember the openers so closed storages can be re-opened as needed.
self.openers = dict((opener.name, opener) for opener in openers)
+ # Do not fail the whole storage while opening storages the first time
+ # as we have intermittent states that would never allow a RAID to
+ # bootstrap.
+ self._fail_after_degrade = False
for name in self.openers:
self._open_storage(name)
+ self._fail_after_degrade = True
# Evaluate the consistency of the opened storages. We compare the last
# known TIDs of all storages. All storages whose TID equals the newest
@@ -193,6 +200,7 @@
try:
AllStoragesOperation(self, expect_connected=False).close()
except Exception, e:
+ logging.exception('asdf')
if not zeoraid_exception(e):
raise e
finally:
@@ -225,8 +233,6 @@
def lastTransaction(self):
"""Return the id of the last committed transaction."""
- if self.raid_status() == 'failed':
- raise gocept.zeoraid.interfaces.RAIDError('RAID is failed.')
# Although this is a read operation we apply it to all storages as a
# safety belt to ensure consistency.
return AllStoragesOperation(self).lastTransaction()
@@ -235,7 +241,7 @@
"""The approximate number of objects in the storage."""
try:
return self._reader.__len__()
- except RuntimeError, e:
+ except ZEO.Exceptions.ClientStorageError, e:
if zeoraid_exception(e):
return 0
raise e
@@ -269,9 +275,6 @@
reliable, oid = self._apply_storage(storage, 'new_oid')
if reliable:
oids.append((oid, storage))
- if not oids:
- raise gocept.zeoraid.interfaces.RAIDError(
- "RAID storage is failed.")
min_oid = sorted(oids)[0][0]
for oid, storage in oids:
@@ -584,7 +587,6 @@
# IRAIDStorage
- @ensure_open_storage
def raid_status(self):
if self.storage_recovering:
return 'recovering'
@@ -613,8 +615,10 @@
@ensure_open_storage
def raid_disable(self, name):
- self._degrade_storage(
- name, reason='disabled by controller', fail=False)
+ try:
+ self._degrade_storage(name, reason='disabled by controller')
+ except ZEO.Exceptions.ClientStorageError:
+ pass
return 'disabled %r' % (name,)
@ensure_open_storage
@@ -709,8 +713,7 @@
# We were trying to open a storage. Even if we fail we can't be
# more broke than before, so don't ever fail due to this.
self._degrade_storage(
- name, reason='an error occured opening the storage',
- fail=False)
+ name, reason='an error occured opening the storage')
if storage is not None:
try:
storage.close()
@@ -729,14 +732,24 @@
t.setDaemon(True)
t.start()
- def _degrade_storage(self, name, reason, fail=True):
+ def _degrade_storage(self, name, reason):
self._close_storage(name)
self.storages_degraded.append(name)
self.degrade_reasons[name] = reason
- logger.critical('RAID %r degraded due failure of back-end %r. '
+ logger.critical('RAID %r degraded due to failure of back-end %r. '
'Reason: %s' % (self.__name__, name, reason))
- if not self.storages_optimal and fail:
- raise gocept.zeoraid.interfaces.RAIDError("No storages remain.")
+ if not self._fail_after_degrade:
+ return
+ fail = False
+ if self.cluster_mode == 'single':
+ if not self.storages_optimal:
+ fail = 'No storages remain optimal'
+ elif self.cluster_mode == 'coop':
+ if len(self.storages_optimal) <= len(self.openers) * 0.5:
+ fail = 'Less than 50% of the configured storages remain optimal.'
+ if fail:
+ self.close()
+ raise gocept.zeoraid.interfaces.RAIDClosedError(fail)
def _apply_storage(self, storage_name, method_name, args=(), kw={},
expect_connected=True):
@@ -811,9 +824,8 @@
except Exception:
logger.exception('Error opening storage %s' % name)
self.storage_recovering = None
- self.storages_degraded.append(name)
- self.degrade_reasons[name] = (
- 'an error occured opening the storage')
+ self._degrade_storage(name,
+ 'An error occurred while opening the storage.')
raise
self.storages[name] = target
recovery = gocept.zeoraid.recovery.Recovery(
@@ -826,9 +838,8 @@
except Exception:
logger.exception('Error recovering storage %s' % name)
self.storage_recovering = None
- self.storages_degraded.append(name)
- self.degrade_reasons[name] = (
- 'an error occured recovering the storage')
+ self._degrade_storage(name,
+ 'An error occured recovering the storage')
raise
def _finalize_recovery(self, storage):
@@ -928,6 +939,8 @@
self.raid = raid
def __getattr__(self, name):
+ if name.startswith('_') and name not in ['__len__']:
+ raise AttributeError(name)
return lambda *args, **kw: self(name, args, kw)
@property
@@ -995,6 +1008,10 @@
applicable_storages = [storage for storage in applicable_storages
if storage not in self.exclude]
+ if not applicable_storages:
+ raise gocept.zeoraid.interfaces.RAIDError(
+ 'No applicable storages for operation %s available.' %
+ method_name)
# Run _apply_storage on all applicable storages in parallel.
threads = []
for storage_name in applicable_storages:
@@ -1007,7 +1024,6 @@
# Wait for threads to finish and pick up results.
results = {}
- exceptions = []
for thread in threads:
# XXX The timeout should be calculated such that the total time
# spent in this loop doesn't grow with the number of storages.
@@ -1021,51 +1037,54 @@
self.raid._threads.add(thread)
continue
if thread.exception:
- exceptions.append(thread.exception)
+ results[thread.storage_name] = OperationExceptionResult(
+ thread.exception)
elif thread.reliable:
- results[thread.storage_name] = thread.result
+ results[thread.storage_name] = OperationResult(
+ thread.result,
+ self.filter_results(thread.result,
+ self.raid.storages.get(thread.storage_name)))
- # Analyse result consistency.
- consistent = True
- if exceptions and results:
- consistent = False
- elif exceptions:
- # Since we can only get one kind of exceptions at the moment, they
- # must be consistent anyway.
- pass
- elif results:
- consistent = self._check_result_consistency(results)
- if not consistent:
- self.raid.close()
- raise gocept.zeoraid.interfaces.RAIDError(
- "RAID is inconsistent and was closed.")
+ try:
+ result = self._extract_result(results)
+ except RuntimeError:
+ logger.debug(
+ 'Received inconsistent results for method %s: %r' %
+ (method_name, results))
+ raise
- # Select result.
- if exceptions:
- raise exceptions[0]
- if results:
- return results.values()[0]
+ return result()
- # We did not get any reliable result, making this call effectively a
- # no-op.
- if self.ignore_noop:
- return
- raise gocept.zeoraid.interfaces.RAIDError("RAID storage is failed.")
+ def _extract_result(self, results):
+ """Extract a consistent result from a set of results.
- def _check_result_consistency(self, results):
- filtered_results = [
- self.filter_results(result, self.raid.storages[storage])
- for storage, result in results.items()]
- ref = filtered_results[0]
- for test in filtered_results[1:]:
- if test != ref:
- logger.debug(
- 'Got inconsistent results for method %s: %r' %
- (method_name, results))
- return False
- return True
+ We select the result that was returned by the most storages and rely
+ on the implementation of _degrade_storage to disable the RAID
+ completely if too few storages were involved delivering that result.
+ We expect _degrade_storage to raise an exception in the latter case.
+ """
+ result_classes = NonHashingDict()
+ # Classify results by their value, keep track of the storages that
+ # returned the respective result.
+ for storage, result in results.items():
+ storages = result_classes.setdefault(result, [])
+ storages.append(storage)
+ # Select the result that was returned most.
+ max_same_result = 0
+ for result, storages in result_classes.items():
+ if len(storages) > max_same_result:
+ max_same_result = len(storages)
+ extracted_result = result
+ # Degrade all storages with different results.
+ for result, storages in result_classes.items():
+ if result != extracted_result:
+ for storage in storages:
+ self.raid._degrade_storage(storage, 'inconsistent result')
+ return extracted_result
+
+
def optimistic_copy(source, target):
"""Try creating a hard link to source at target. Fall back to copying the
file.
@@ -1119,3 +1138,51 @@
"""
return bool(getattr(e, 'created_by_zeoraid', False))
+
+
+class NonHashingDict(object):
+
+ def __init__(self):
+ self.data = []
+
+ def setdefault(self, key, default=None):
+ for candidate_key, value in self.data:
+ if key == candidate_key:
+ return value
+ self.data.append((key, default))
+ return default
+
+ def items(self):
+ return self.data[:]
+
+
+class OperationResult(object):
+
+ def __init__(self, original, filtered):
+ self.original = original
+ self.filtered = filtered
+
+ def __eq__(self, other):
+ if isinstance(other, OperationResult):
+ return self.filtered == other.filtered
+ return NotImplemented
+
+ def __call__(self):
+ return self.original
+
+
+class OperationExceptionResult(object):
+
+ def __init__(self, exception):
+ self.exception = exception
+
+ def __eq__(self, other):
+ if not isinstance(other, OperationExceptionResult):
+ return NotImplemented
+ try:
+ return cPickle.dumps(self.exception) == cPickle.dumps(other.exception)
+ except cPickle.PicklingError:
+ return False
+
+ def __call__(self):
+ raise self.exception
Modified: gocept.zeoraid/trunk/src/gocept/zeoraid/tests/test_basics.py
===================================================================
--- gocept.zeoraid/trunk/src/gocept/zeoraid/tests/test_basics.py 2010-10-06 13:20:13 UTC (rev 117304)
+++ gocept.zeoraid/trunk/src/gocept/zeoraid/tests/test_basics.py 2010-10-06 14:28:08 UTC (rev 117305)
@@ -67,9 +67,8 @@
class Opener(object):
- name = 'foo'
-
- def __init__(self, undo=True):
+ def __init__(self, name, undo=True):
+ self.name = name
self.undo = undo
def open(self):
@@ -154,7 +153,8 @@
class FailingStorageTestSetup(StorageTestBase.StorageTestBase):
- backend_count = 2
+ backend_count = 3
+ cluster_mode = 'coop'
def _backend(self, index):
return self._storage.storages[
@@ -184,7 +184,8 @@
blob_dir = tempfile.mkdtemp()
self.temp_paths.append(blob_dir)
self._storage = gocept.zeoraid.storage.RAIDStorage(
- 'teststorage', self._storages, blob_dir=blob_dir)
+ 'teststorage', self._storages,
+ cluster_mode=self.cluster_mode, blob_dir=blob_dir)
self.orig_choice = random.choice
random.choice = lambda seq: seq[0]
@@ -226,7 +227,7 @@
wait_timeout=60))
self._storage = gocept.zeoraid.storage.RAIDStorage(
'teststorage', self._storages, blob_dir=blob_dir,
- shared_blob_dir=True)
+ cluster_mode=self.cluster_mode, shared_blob_dir=True)
self.orig_choice = random.choice
random.choice = lambda seq: seq[0]
@@ -326,9 +327,11 @@
self._disable_storage(0)
self.assertEquals(1, len(self._backend(0).history(oid, '')))
self.assertEquals(1, len(self._storage.history(oid, '')))
+ self.assertEquals('degraded', self._storage.raid_status())
self._disable_storage(0)
- self.assertRaises(RuntimeError, self._storage.history, oid, '')
+ self.assertRaises(ZEO.Exceptions.ClientStorageError, self._storage.history, oid, '')
+ self.assertEquals('failed', self._storage.raid_status())
def test_history_degrading(self):
oid = self._storage.new_oid()
@@ -343,7 +346,7 @@
self.assertEquals('degraded', self._storage.raid_status())
self._backend(0).fail('history')
- self.assertRaises(RuntimeError, self._storage.history, oid, '')
+ self.assertRaises(ZEO.Exceptions.ClientStorageError, self._storage.history, oid, '')
self.assertEquals('failed', self._storage.raid_status())
def test_lastTransaction(self):
@@ -361,7 +364,7 @@
self.assertEquals(None, self._storage.lastTransaction())
self._disable_storage(0)
self.assertEquals('failed', self._storage.raid_status())
- self.assertRaises(RuntimeError, self._storage.lastTransaction)
+ self.assertRaises(ZEO.Exceptions.ClientStorageError, self._storage.lastTransaction)
def test_len_degrading(self):
# Brrrr. ClientStorage doesn't seem to implement __len__ correctly.
@@ -415,7 +418,7 @@
self.assertEquals((data_record, serial), self._backend(0).load(oid))
self._disable_storage(0)
- self.assertRaises(RuntimeError, self._storage.load, oid)
+ self.assertRaises(ZEO.Exceptions.ClientStorageError, self._storage.load, oid)
def test_load_can_be_failed(self):
# ClientStorage does not directly call `load` but
@@ -443,7 +446,7 @@
self.assertEquals('degraded', self._storage.raid_status())
self._backend(0).fail('load')
- self.assertRaises(RuntimeError, self._storage.load, oid)
+ self.assertRaises(ZEO.Exceptions.ClientStorageError, self._storage.load, oid)
self.assertEquals('failed', self._storage.raid_status())
def test_loadBefore_degrading1(self):
@@ -477,7 +480,7 @@
self._backend(0).loadBefore(oid, revid2))
self._disable_storage(0)
- self.assertRaises(RuntimeError, self._storage.loadBefore, oid, revid2)
+ self.assertRaises(ZEO.Exceptions.ClientStorageError, self._storage.loadBefore, oid, revid2)
def test_loadBefore_degrading2(self):
oid = self._storage.new_oid()
@@ -498,7 +501,7 @@
self.assertEquals('degraded', self._storage.raid_status())
self._backend(0).fail('loadBefore')
- self.assertRaises(RuntimeError, self._storage.loadBefore, oid, revid2)
+ self.assertRaises(ZEO.Exceptions.ClientStorageError, self._storage.loadBefore, oid, revid2)
self.assertEquals('failed', self._storage.raid_status())
def test_loadSerial_degrading1(self):
@@ -534,7 +537,7 @@
self._backend(0).loadSerial(oid, revid))
self._disable_storage(0)
- self.assertRaises(RuntimeError, self._storage.loadSerial, oid, revid)
+ self.assertRaises(ZEO.Exceptions.ClientStorageError, self._storage.loadSerial, oid, revid)
def test_loadSerial_degrading2(self):
oid = self._storage.new_oid()
@@ -557,7 +560,7 @@
self.assertEquals('degraded', self._storage.raid_status())
self._backend(0).fail('loadSerial')
- self.assertRaises(RuntimeError, self._storage.loadSerial, oid, revid)
+ self.assertRaises(ZEO.Exceptions.ClientStorageError, self._storage.loadSerial, oid, revid)
self.assertEquals('failed', self._storage.raid_status())
def test_new_oid_degrading1(self):
@@ -565,7 +568,7 @@
self._disable_storage(0)
self.assertEquals(8, len(self._storage.new_oid()))
self._disable_storage(0)
- self.assertRaises(RuntimeError, self._storage.new_oid)
+ self.assertRaises(ZEO.Exceptions.ClientStorageError, self._storage.new_oid)
def test_new_oid_degrading2(self):
self.assertEquals(8, len(self._storage.new_oid()))
@@ -578,7 +581,7 @@
self._backend(0)._oids = None
self._backend(0).fail('new_oid')
- self.assertRaises(RuntimeError, self._storage.new_oid)
+ self.assertRaises(ZEO.Exceptions.ClientStorageError, self._storage.new_oid)
self.assertEquals('failed', self._storage.raid_status())
def test_new_oid_unsynchronised_degrading(self):
@@ -620,7 +623,7 @@
self._dostore(oid=oid, revid=revid3, data=4)
self.assertEquals(256, self._storage.getSize())
self._disable_storage(0)
- self.assertRaises(RuntimeError,
+ self.assertRaises(ZEO.Exceptions.ClientStorageError,
self._storage.pack,
time.time(), ZODB.serialize.referencesf)
@@ -645,7 +648,7 @@
self.assertEquals(256, self._storage.getSize())
self._backend(0).fail('pack')
- self.assertRaises(RuntimeError,
+ self.assertRaises(ZEO.Exceptions.ClientStorageError,
self._storage.pack,
time.time(), ZODB.serialize.referencesf)
self.assertEquals('failed', self._storage.raid_status())
@@ -660,7 +663,7 @@
self.assertEquals('degraded', self._storage.raid_status())
self._backend(0).fail('store')
- self.assertRaises(RuntimeError,
+ self.assertRaises(ZEO.Exceptions.ClientStorageError,
self._dostoreNP,
oid=oid, revid=revid, data='bar')
self.assertEquals('failed', self._storage.raid_status())
@@ -675,7 +678,7 @@
oid = self._storage.new_oid()
self._backend(0).fail('tpc_begin')
- self.assertRaises(RuntimeError, self._dostoreNP, oid=oid, data='bar')
+ self.assertRaises(ZEO.Exceptions.ClientStorageError, self._dostoreNP, oid=oid, data='bar')
self.assertEquals('failed', self._storage.raid_status())
def test_tpc_vote_degrading(self):
@@ -688,7 +691,7 @@
oid = self._storage.new_oid()
self._backend(0).fail('tpc_vote')
- self.assertRaises(RuntimeError, self._dostoreNP, oid=oid, data='bar')
+ self.assertRaises(ZEO.Exceptions.ClientStorageError, self._dostoreNP, oid=oid, data='bar')
self.assertEquals('failed', self._storage.raid_status())
def test_tpc_finish_degrading(self):
@@ -701,7 +704,7 @@
oid = self._storage.new_oid()
self._backend(0).fail('tpc_finish')
- self.assertRaises(RuntimeError, self._dostoreNP, oid=oid, data='bar')
+ self.assertRaises(ZEO.Exceptions.ClientStorageError, self._dostoreNP, oid=oid, data='bar')
self.assertEquals('failed', self._storage.raid_status())
def test_tpc_abort_not_degrading(self):
@@ -773,7 +776,7 @@
self._storage.tpc_begin(t)
self._disable_storage(0)
self._disable_storage(0)
- self.assertRaises(RuntimeError,
+ self.assertRaises(ZEO.Exceptions.ClientStorageError,
self._storage.storeBlob,
oid, ZODB.utils.z64, 'foo', blob_file_name, '', t)
@@ -809,7 +812,7 @@
# when tpc_vote ist called.
self._storage.storeBlob(
oid, ZODB.utils.z64, 'foo', blob_file_name, '', t)
- self.assertRaises(RuntimeError,
+ self.assertRaises(ZEO.Exceptions.ClientStorageError,
self._storage.tpc_vote, t)
def test_storeBlob_degrading3(self):
@@ -843,7 +846,7 @@
raise Exception()
self._backend(0).storeBlob = fail
self._backend(1).storeBlob = fail
- self.assertRaises(RuntimeError,
+ self.assertRaises(ZEO.Exceptions.ClientStorageError,
self._storage.storeBlob,
oid, ZODB.utils.z64, 'foo', blob_file_name, '', t)
@@ -872,7 +875,7 @@
os.remove(stored_file_name)
self._disable_storage(0)
- self.assertRaises(RuntimeError,
+ self.assertRaises(ZEO.Exceptions.ClientStorageError,
self._storage.loadBlob, oid, last_transaction)
def test_loadBlob_degrading2(self):
@@ -905,7 +908,7 @@
os.unlink(b0_filename)
self._backend(0).fail('loadBlob')
- self.assertRaises(RuntimeError,
+ self.assertRaises(ZEO.Exceptions.ClientStorageError,
self._storage.loadBlob, oid, last_transaction)
self.assertEquals('failed', self._storage.raid_status())
@@ -964,7 +967,7 @@
def test_supportsUndo_required(self):
self.assertRaises(RuntimeError,
gocept.zeoraid.storage.RAIDStorage,
- 'name', [Opener(undo=False)])
+ 'name', [Opener('1', undo=False)])
def test_supportsUndo(self):
self.assertEquals(True, self._storage.supportsUndo())
@@ -992,7 +995,7 @@
t = transaction.Transaction()
self._storage.tpc_begin(t)
self._disable_storage(0)
- self.assertRaises(RuntimeError, self._storage.undo, info[2]['id'], t)
+ self.assertRaises(ZEO.Exceptions.ClientStorageError, self._storage.undo, info[2]['id'], t)
def test_undo_degrading2(self):
oid = self._storage.new_oid()
@@ -1018,7 +1021,7 @@
t = transaction.Transaction()
self._storage.tpc_begin(t)
self._backend(0).fail('undo')
- self.assertRaises(RuntimeError, self._storage.undo, info[2]['id'], t)
+ self.assertRaises(ZEO.Exceptions.ClientStorageError, self._storage.undo, info[2]['id'], t)
self.assertEquals('failed', self._storage.raid_status())
def test_undoLog_degrading1(self):
@@ -1034,7 +1037,7 @@
self.assertEquals(2, len(info))
self._disable_storage(0)
- self.assertRaises(RuntimeError, self._storage.undoLog)
+ self.assertRaises(ZEO.Exceptions.ClientStorageError, self._storage.undoLog)
def test_undoLog_degrading2(self):
oid = self._storage.new_oid()
@@ -1050,7 +1053,7 @@
self.assertEquals(2, len(info))
self._backend(0).fail('undoLog')
- self.assertRaises(RuntimeError, self._storage.undoLog)
+ self.assertRaises(ZEO.Exceptions.ClientStorageError, self._storage.undoLog)
self.assertEquals('failed', self._storage.raid_status())
def test_undoInfo_degrading1(self):
@@ -1066,7 +1069,7 @@
self.assertEquals(2, len(info))
self._disable_storage(0)
- self.assertRaises(RuntimeError, self._storage.undoInfo)
+ self.assertRaises(ZEO.Exceptions.ClientStorageError, self._storage.undoInfo)
def test_undoInfo_degrading2(self):
oid = self._storage.new_oid()
@@ -1082,7 +1085,7 @@
self.assertEquals(2, len(info))
self._backend(0).fail('undoInfo')
- self.assertRaises(RuntimeError, self._storage.undoInfo)
+ self.assertRaises(ZEO.Exceptions.ClientStorageError, self._storage.undoInfo)
self.assertEquals('failed', self._storage.raid_status())
def test_record_iternext(self):
@@ -1112,7 +1115,7 @@
self.assertEquals('0', data)
self._disable_storage(0)
- self.assertRaises(RuntimeError, self._storage.record_iternext, next)
+ self.assertRaises(ZEO.Exceptions.ClientStorageError, self._storage.record_iternext, next)
def test_record_iternext_degrading2(self):
for x in range(5):
@@ -1125,7 +1128,7 @@
self.assertEquals('degraded', self._storage.raid_status())
self._backend(0).fail('record_iternext')
- self.assertRaises(RuntimeError, self._storage.record_iternext, next)
+ self.assertRaises(ZEO.Exceptions.ClientStorageError, self._storage.record_iternext, next)
self.assertEquals('failed', self._storage.raid_status())
def test_gettid_degrading1(self):
@@ -1137,7 +1140,7 @@
self.assertEquals(revid, tid)
self._disable_storage(0)
- self.assertRaises(RuntimeError, self._storage.getTid, oid)
+ self.assertRaises(ZEO.Exceptions.ClientStorageError, self._storage.getTid, oid)
def test_gettid_degrading2(self):
oid = self._storage.new_oid()
@@ -1149,7 +1152,7 @@
self.assertEquals('degraded', self._storage.raid_status())
self._backend(0).fail('getTid')
- self.assertRaises(RuntimeError, self._storage.getTid, oid)
+ self.assertRaises(ZEO.Exceptions.ClientStorageError, self._storage.getTid, oid)
self.assertEquals('failed', self._storage.raid_status())
def test_tpc_transaction_finishing(self):
@@ -1227,9 +1230,11 @@
self.assertRaises(
ValueError,
self._storage._recover_impl, self._storage.storages_degraded[0])
+ values = self._storage.raid_details().values()
self.assertEquals(
- {'1': 'failed: an error occured recovering the storage',
- '0': 'optimal'}, self._storage.raid_details())
+ 1,
+ values.count('failed: An error occured recovering the storage'))
+ self.assertEquals(self.backend_count-1, values.count('optimal'))
def test_timeoutBackend(self):
self._storage.timeout = 2
@@ -1245,8 +1250,8 @@
self.assertRaises(RuntimeError, self._storage.raid_reload)
-class FailingStorageTests(FailingStorageTestBase,
- FailingStorageTestSetup):
+class FailingStorageCoopTests(FailingStorageTestBase,
+ FailingStorageTestSetup):
def test_blob_cache_cannot_link(self):
called_broken = []
@@ -1312,9 +1317,15 @@
thread.join()
-class FailingStorageSharedBlobTests(FailingStorageTestBase,
- FailingStorageSharedBlobTestSetup):
+class FailingStorageSingleTests(FailingStorageCoopTests):
+ backend_count = 2
+ cluster_mode = 'single'
+
+
+class FailingStorageCoopSharedBlobTests(FailingStorageTestBase,
+ FailingStorageSharedBlobTestSetup):
+
def test_loadBlob_file_missing(self):
oid, tid = self.store_blob()
stored_file_name = self._storage.loadBlob(oid, tid)
@@ -1395,11 +1406,17 @@
self._storage.tpc_begin(t)
fail(self._backend(0), 'storeBlob')
fail(self._backend(1), 'storeBlob')
- self.assertRaises(RuntimeError,
+ self.assertRaises(ZEO.Exceptions.ClientStorageError,
self._storage.storeBlob,
oid, ZODB.utils.z64, 'foo', blob_file_name, '', t)
+class FailingStorageSingleSharedBlobTests(FailingStorageCoopSharedBlobTests):
+
+ backend_count = 2
+ cluster_mode = 'single'
+
+
class ZEOReplicationStorageTests(ZEOStorageBackendTests,
ReplicationStorageTests,
ThreadTests.ThreadTests):
@@ -1662,30 +1679,138 @@
class ClusterModeTests(unittest.TestCase):
def setUp(self):
- self.storage = gocept.zeoraid.storage.RAIDStorage(
- 'test', [Opener('1'), Opener('2')])
- self.storage._apply_storage = mock.Mock()
+ self.raid = gocept.zeoraid.storage.RAIDStorage(
+ 'test', [Opener('%s' % s) for s in range(5)])
+ self.raid._apply_storage = mock.Mock(return_value=(True, None))
- def test_single_mode_read(self):
- self.storage.cluster_mode = 'single'
- self.assert_(
- isinstance(self.storage._reader,
- gocept.zeoraid.storage.SingleStorageOperation))
- def test_coop_mode_read(self):
- self.storage.cluster_mode = 'coop'
+class ClusterModeCoopTests(ClusterModeTests):
+
+ def setUp(self):
+ super(ClusterModeCoopTests, self).setUp()
+ self.raid.cluster_mode = 'coop'
+
+ def test_read_operation_chooses_all_storages_op(self):
self.assert_(
- isinstance(self.storage._reader,
+ isinstance(self.raid._reader,
gocept.zeoraid.storage.AllStoragesOperation))
+ def test_degrade_fails_with_too_few_active_storages(self):
+ self.raid._degrade_storage('0', 'test')
+ self.raid._degrade_storage('1', 'test')
+ self.assertRaises(ZEO.Exceptions.ClientStorageError,
+ lambda: self.raid._degrade_storage('2', 'test'))
+class ClusterModeSingleTests(ClusterModeTests):
+
+ def setUp(self):
+ super(ClusterModeSingleTests, self).setUp()
+ self.raid.cluster_mode = 'single'
+
+ def test_read_operation_chooses_single_storage_op(self):
+ self.assert_(
+ isinstance(self.raid._reader,
+ gocept.zeoraid.storage.SingleStorageOperation))
+
+ def test_degrade_fails_with_no_active_storage(self):
+ self.raid._degrade_storage('0', 'test')
+ self.raid._degrade_storage('1', 'test')
+ self.raid._degrade_storage('2', 'test')
+ self.raid._degrade_storage('3', 'test')
+ self.assertRaises(ZEO.Exceptions.ClientStorageError,
+ lambda: self.raid._degrade_storage('4', 'test'))
+
+class AllStorageConsistencyCheck(unittest.TestCase):
+
+ def setUp(self):
+ self.raid = mock.Mock()
+ self.op = gocept.zeoraid.storage.AllStoragesOperation(self.raid)
+
+ def test_equal_results_should_be_consistent(self):
+ self.assert_(self.op._extract_result(
+ {'1': mock.sentinel.result,
+ '2': mock.sentinel.result}))
+ self.assertEquals(0, self.raid._degrade_storage.call_count)
+
+ def test_two_different_out_of_two_results_should_be_inconsistent(self):
+ self.op._extract_result(
+ {'1': mock.sentinel.result1,
+ '2': mock.sentinel.result2})
+ self.assert_([(('1', 'inconsistent result'), {}),
+ (('2', 'inconsistent result'), {})],
+ self.raid._degrade_storage.call_args_list)
+
+ def test_one_different_out_of_three_results_should_be_consistent(self):
+ self.assertEquals(mock.sentinel.result2,
+ self.op._extract_result(
+ {'1': mock.sentinel.result1,
+ '2': mock.sentinel.result2,
+ '3': mock.sentinel.result2}))
+ self.raid._degrade_storage.assert_called_with('1', 'inconsistent result')
+
+ def test_three_different_out_of_three_results_should_be_inconsistent(self):
+ self.op._extract_result(
+ {'1': mock.sentinel.result1,
+ '2': mock.sentinel.result2,
+ '3': mock.sentinel.result3})
+ # We only expect two degrades to happen as we do select one of the
+ # results as "good" and rely on the actual implementation of
+ # _degrade_storage to raise an exception in that case.
+ self.assertEquals(2, self.raid._degrade_storage.call_count)
+
+ def test_extract_non_hashable_result_works(self):
+ non_hashable = [1, 2, 3]
+ self.assertEquals(non_hashable,
+ self.op._extract_result({'1': non_hashable}))
+
+
+class OperationExceptionResultTests(unittest.TestCase):
+
+ def result(self, exception):
+ return gocept.zeoraid.storage.OperationExceptionResult(exception)
+
+ def test_same_exception_type_and_dict_are_equal(self):
+ self.assertEquals(self.result(RuntimeError('foo')),
+ self.result(RuntimeError('foo')))
+
+ def test_unpickleable_exceptions_are_not_equal(self):
+ class UnpickleableException(object):
+ __slots__ = ()
+ self.assertNotEqual(self.result(UnpickleableException()),
+ self.result(UnpickleableException()))
+
+ def test_same_exception_type_and_different_dict_are_not_equal(self):
+ self.assertNotEqual(self.result(RuntimeError('foo')),
+ self.result(RuntimeError('bar')))
+
+ def test_different_exception_type_and_same_dict_are_not_equal(self):
+ self.assertNotEqual(self.result(RuntimeError('foo')),
+ self.result(TypeError('foo')))
+
+ def test_different_exception_type_and_different_dict_are_not_equal(self):
+ self.assertNotEqual(self.result(RuntimeError('foo')),
+ self.result(TypeError('bar')))
+
+ def test_call_should_raise(self):
+ exception = RuntimeError('foo')
+ try:
+ self.result(exception)()
+ except RuntimeError, e:
+ self.assertEquals(exception, e)
+ else:
+ self.fail('No exception raised')
+
def test_suite():
-
suite = unittest.TestSuite()
suite.addTest(unittest.makeSuite(ZEOReplicationStorageTests, "check"))
- suite.addTest(unittest.makeSuite(FailingStorageTests))
- suite.addTest(unittest.makeSuite(FailingStorageSharedBlobTests))
+ suite.addTest(unittest.makeSuite(FailingStorageSingleTests))
+ suite.addTest(unittest.makeSuite(FailingStorageSingleSharedBlobTests))
+ suite.addTest(unittest.makeSuite(FailingStorageCoopTests))
+ suite.addTest(unittest.makeSuite(FailingStorageCoopSharedBlobTests))
suite.addTest(unittest.makeSuite(LoggingStorageDistributedTests))
suite.addTest(unittest.makeSuite(ExtensionMethodsTests))
- suite.addTest(unittest.makeSuite(ClusterModeTests))
+ suite.addTest(unittest.makeSuite(ClusterModeSingleTests))
+ suite.addTest(unittest.makeSuite(ClusterModeCoopTests))
+ suite.addTest(unittest.makeSuite(AllStorageConsistencyCheck))
+ suite.addTest(unittest.makeSuite(OperationExceptionResultTests))
return suite
Modified: gocept.zeoraid/trunk/src/gocept/zeoraid/tests/test_recovery.py
===================================================================
--- gocept.zeoraid/trunk/src/gocept/zeoraid/tests/test_recovery.py 2010-10-06 13:20:13 UTC (rev 117304)
+++ gocept.zeoraid/trunk/src/gocept/zeoraid/tests/test_recovery.py 2010-10-06 14:28:08 UTC (rev 117305)
@@ -205,7 +205,7 @@
return gocept.zeoraid.storage.RAIDStorage(
'raid',
- [self.source_opener, self.target_opener],
+ [self.source_opener, self.target_opener], cluster_mode='single',
blob_dir=blob_dir, shared_blob_dir=self.shared)
def test_verify_both_empty(self):
Modified: gocept.zeoraid/trunk/src/gocept/zeoraid/tests/test_zeo.py
===================================================================
--- gocept.zeoraid/trunk/src/gocept/zeoraid/tests/test_zeo.py 2010-10-06 13:20:13 UTC (rev 117304)
+++ gocept.zeoraid/trunk/src/gocept/zeoraid/tests/test_zeo.py 2010-10-06 14:28:08 UTC (rev 117305)
@@ -70,6 +70,7 @@
config = """\
%%import gocept.zeoraid
<raidstorage 1>
+ cluster-mode single
<filestorage 1>
path %s
</filestorage>
More information about the checkins
mailing list