[Checkins] SVN: gocept.zeoraid/trunk/src/gocept/zeoraid/ added
new_oid tests, fixed _apply methods
Thomas Lotze
tl at gocept.com
Wed Jan 16 05:22:35 EST 2008
Log message for revision 82917:
added new_oid tests, fixed _apply methods
Changed:
U gocept.zeoraid/trunk/src/gocept/zeoraid/storage.py
U gocept.zeoraid/trunk/src/gocept/zeoraid/tests/test_basics.py
-=-
Modified: gocept.zeoraid/trunk/src/gocept/zeoraid/storage.py
===================================================================
--- gocept.zeoraid/trunk/src/gocept/zeoraid/storage.py 2008-01-16 08:54:10 UTC (rev 82916)
+++ gocept.zeoraid/trunk/src/gocept/zeoraid/storage.py 2008-01-16 10:22:35 UTC (rev 82917)
@@ -17,6 +17,7 @@
import ZODB.utils
import persistent.TimeStamp
import transaction
+import transaction.interfaces
import gocept.zeoraid.interfaces
import gocept.zeoraid.compatibility
@@ -150,7 +151,7 @@
for degraded_storages in tids.values():
self.storages_degraded.extend(degraded_storages)
- # Degrade storages that don't have the right max OID.
+ # XXX Degrade storages that don't have the right max OID.
# No storages are recovering initially
self.storages_recovering = []
@@ -180,9 +181,7 @@
def history(self, oid, version='', size=1):
"""Return a sequence of history information dictionaries."""
assert version is ''
- return self._apply_single_storage(
- 'history', (oid, size),
- allowed_exceptions=ZODB.POSException.POSKeyError)
+ return self._apply_single_storage('history', (oid, size))
def isReadOnly(self):
"""Test whether a storage allows committing new transactions."""
@@ -201,20 +200,15 @@
def load(self, oid, version=''):
"""Load data for an object id and version."""
assert version is ''
- return self._apply_single_storage(
- 'load', (oid,), allowed_exceptions=ZODB.POSException.POSKeyError)
+ return self._apply_single_storage('load', (oid,))
def loadBefore(self, oid, tid):
"""Load the object data written before a transaction id."""
- return self._apply_single_storage(
- 'loadBefore', (oid, tid),
- allowed_exceptions=ZODB.POSException.POSKeyError)
+ return self._apply_single_storage('loadBefore', (oid, tid))
def loadSerial(self, oid, serial):
"""Load the object record for the give transaction id."""
- return self._apply_single_storage(
- 'loadSerial', (oid, serial),
- allowed_exceptions=ZODB.POSException.POSKeyError)
+ return self._apply_single_storage('loadSerial', (oid, serial))
# XXX
@ensure_writable
@@ -489,33 +483,35 @@
raise gocept.zeoraid.interfaces.RAIDError("No storages remain.")
@ensure_open_storage
- def _apply_single_storage(self, method_name, args=(), kw={},
- allowed_exceptions=()):
+ def _apply_single_storage(self, method_name, args=(), kw={}):
# Try to find a storage that we can talk to. Stop after we found a
# reliable result.
- failed = 0
for name in self.storages_optimal[:]:
# XXX storage might be degraded by now, need to check.
storage = self.storages[name]
method = getattr(storage, method_name)
try:
result = method(*args, **kw)
- except allowed_exceptions:
- # These exceptions are valid answers from the storage, such as
- # POSKeyError. They don't indicate storage failure.
+ except ZODB.POSException.StorageError:
+ # Handle StorageErrors first, otherwise they would be
+ # swallowed when POSErrors are.
+ self._degrade_storage(name)
+ continue
+ except (ZODB.POSException.POSError,
+ transaction.interfaces.TransactionError), e:
+ # These exceptions are valid answers from the storage. They
+ # don't indicate storage failure.
raise
except Exception:
- # XXX Logging
- if failed:
- raise
- failed += 1
- else:
- if storage.is_connected():
- # We have a result that is reliable.
- return result
- # There was no result or it is not reliable, the storage needs to
- # be degraded and we try another storage.
- self._degrade_storage(name)
+ # We have no result.
+ self._degrade_storage(name)
+ continue
+ if not storage.is_connected():
+ # We cannot rely on the result.
+ self._degrade_storage(name)
+ continue
+ # Everything went fine.
+ return result
# We could not determine a result from any storage.
raise gocept.zeoraid.interfaces.RAIDError("RAID storage is failed.")
@@ -524,28 +520,63 @@
def _apply_all_storages(self, method_name, args=(), kw={},
expect_connected=True):
results = []
- storages = self.storages_optimal[:]
- if not storages:
- raise gocept.zeoraid.interfaces.RAIDError(
- "RAID storage is failed.")
-
- for name in self.storages_optimal:
+ exceptions = []
+ for name in self.storages_optimal[:]:
storage = self.storages[name]
+ method = getattr(storage, method_name)
try:
- method = getattr(storage, method_name)
- results.append(method(*args, **kw))
- except ZEO.ClientStorage.ClientDisconnected:
+ result = method(*args, **kw)
+ except ZODB.POSException.StorageError:
+ # Handle StorageErrors first, otherwise they would be
+ # swallowed when POSErrors are.
self._degrade_storage(name)
- else:
- if expect_connected and not storage.is_connected():
- self._degrade_storage(name)
+ continue
+ except (ZODB.POSException.POSError,
+ transaction.interfaces.TransactionError), e:
+ # These exceptions are valid answers from the storage. They
+ # don't indicate storage failure.
+ exceptions.append(e)
+ continue
+ except Exception:
+ # We have no result.
+ self._degrade_storage(name)
+ continue
+ if expect_connected and not storage.is_connected():
+ # We cannot rely on the result.
+ self._degrade_storage(name)
+ continue
+ # Everything went fine.
+ results.append(result)
- res = results[:]
- for test1 in res:
- for test2 in res:
- assert test1 == test2, "Results not consistent. Asynchronous storage?"
- return results[0]
+ # Analyse result consistency.
+ consistent = True
+ if exceptions and results:
+ consistent = False
+ elif exceptions:
+ # Since we can only get one kind of exceptions at the moment, they
+ # must be consistent anyway.
+ pass
+ elif results:
+ ref = results[0]
+ for test in results:
+ if test != ref:
+ consistent = False
+ break
+ if not consistent:
+ self.close()
+ raise gocept.zeoraid.interfaces.RAIDError(
+ "RAID is inconsistent and was closed.")
+ # Select result.
+ if exceptions:
+ raise exceptions[0]
+ if results:
+ return results[0]
+
+ # We could not determine a result from any storage because all of them
+ # failed.
+ raise gocept.zeoraid.interfaces.RAIDError("RAID storage is failed.")
+
def _recover_impl(self, name):
try:
# First pass: Transfer all oids without hindering running transactions
Modified: gocept.zeoraid/trunk/src/gocept/zeoraid/tests/test_basics.py
===================================================================
--- gocept.zeoraid/trunk/src/gocept/zeoraid/tests/test_basics.py 2008-01-16 08:54:10 UTC (rev 82916)
+++ gocept.zeoraid/trunk/src/gocept/zeoraid/tests/test_basics.py 2008-01-16 10:22:35 UTC (rev 82917)
@@ -438,7 +438,6 @@
self.assertRaises(gocept.zeoraid.interfaces.RAIDError,
self._storage.loadSerial, oid, revid)
-
def test_loadSerial_degrading2(self):
oid = self._storage.new_oid()
revid = self._dostoreNP(oid=oid, revid=None, data='foo')
@@ -464,7 +463,30 @@
self._storage.loadSerial, oid, revid)
self.assertEquals('failed', self._storage.raid_status())
+ def test_new_oid_degrading1(self):
+ self.assertEquals(8, len(self._storage.new_oid()))
+ self._disable_storage(0)
+ self.assertEquals(8, len(self._storage.new_oid()))
+ self._disable_storage(0)
+ self.assertRaises(gocept.zeoraid.interfaces.RAIDError,
+ self._storage.new_oid)
+ def test_new_oid_degrading2(self):
+ self.assertEquals(8, len(self._storage.new_oid()))
+ self.assertEquals('optimal', self._storage.raid_status())
+
+ self._backend(0)._oids = None
+ self._backend(0).fail('new_oid')
+ self.assertEquals(8, len(self._storage.new_oid()))
+ self.assertEquals('degraded', self._storage.raid_status())
+
+ self._backend(0)._oids = None
+ self._backend(0).fail('new_oid')
+ self.assertRaises(gocept.zeoraid.interfaces.RAIDError,
+ self._storage.new_oid)
+ self.assertEquals('failed', self._storage.raid_status())
+
+
class ZEOReplicationStorageTests(ZEOStorageBackendTests,
ReplicationStorageTests,
ThreadTests.ThreadTests):
@@ -476,6 +498,3 @@
suite.addTest(unittest.makeSuite(ZEOReplicationStorageTests, "check"))
suite.addTest(unittest.makeSuite(FailingStorageTests2Backends))
return suite
-
-if __name__=='__main__':
- unittest.main()
More information about the Checkins
mailing list