[Checkins] SVN: gocept.zeoraid/trunk/src/gocept/zeoraid/ - added
tests for loading objects under RAID degradation
Thomas Lotze
tl at gocept.com
Tue Jan 15 04:39:46 EST 2008
Log message for revision 82889:
- added tests for loading objects under RAID degradation
- refactored _apply_single_storage and _apply_all_storages to accept saner arguments
- added logging of failures during recovery
Changed:
U gocept.zeoraid/trunk/src/gocept/zeoraid/storage.py
U gocept.zeoraid/trunk/src/gocept/zeoraid/tests/test_basics.py
-=-
Modified: gocept.zeoraid/trunk/src/gocept/zeoraid/storage.py
===================================================================
--- gocept.zeoraid/trunk/src/gocept/zeoraid/storage.py 2008-01-15 04:22:22 UTC (rev 82888)
+++ gocept.zeoraid/trunk/src/gocept/zeoraid/storage.py 2008-01-15 09:39:46 UTC (rev 82889)
@@ -6,6 +6,7 @@
import threading
import time
+import logging
import zope.interface
@@ -20,6 +21,7 @@
import gocept.zeoraid.interfaces
import gocept.zeoraid.compatibility
+logger = logging.getLogger('gocept.zeoraid')
def ensure_open_storage(method):
def check_open(self, *args, **kw):
@@ -151,7 +153,7 @@
# of tests.
return
try:
- self._apply_all_storages('close', _raid_expect_connected=False)
+ self._apply_all_storages('close', expect_connected=False)
finally:
self.closed = True
del self.storages_optimal[:]
@@ -167,7 +169,7 @@
def history(self, oid, version='', size=1):
"""Return a sequence of history information dictionaries."""
assert version is ''
- return self._apply_single_storage('history', oid, size)
+ return self._apply_single_storage('history', (oid, size))
def isReadOnly(self):
"""Test whether a storage allows committing new transactions."""
@@ -186,15 +188,16 @@
def load(self, oid, version=''):
"""Load data for an object id and version."""
assert version is ''
- return self._apply_single_storage('load', oid)
+ return self._apply_single_storage('load', (oid,),
+ allowed_exceptions=KeyError)
def loadBefore(self, oid, tid):
"""Load the object data written before a transaction id."""
- return self._apply_single_storage('loadBefore', oid, tid)
+ return self._apply_single_storage('loadBefore', (oid, tid))
def loadSerial(self, oid, serial):
"""Load the object record for the give transaction id."""
- return self._apply_single_storage('loadSerial', oid, serial)
+ return self._apply_single_storage('loadSerial', (oid, serial))
# XXX
def new_oid(self):
@@ -210,13 +213,13 @@
def pack(self, t, referencesf):
if self.isReadOnly():
raise ZODB.POSException.ReadOnlyError()
- self._apply_all_storages('pack', t, referencesf)
+ self._apply_all_storages('pack', (t, referencesf))
# XXX
def registerDB(self, db, limit=None):
# XXX Is it safe to register a DB with multiple storages or do we need some kind
# of wrapper here?
- self._apply_all_storages('registerDB', db)
+ self._apply_all_storages('registerDB', (db,))
# XXX
def sortKey(self):
@@ -233,7 +236,8 @@
self._lock_acquire()
try:
# XXX ClientStorage doesn't adhere to the interface correctly (yet).
- self._apply_all_storages('store', oid, oldserial, data, '', transaction)
+ self._apply_all_storages('store',
+ (oid, oldserial, data, '', transaction))
if self._log_stores:
oids = self._unrecovered_transactions.setdefault(self._tid, [])
oids.append(oid)
@@ -254,7 +258,7 @@
# which we have to remove again because we aborted it.
if self._tid in self._unrecovered_transactions:
del self._unrecovered_transactions[self._tid]
- self._apply_all_storages('tpc_abort', transaction)
+ self._apply_all_storages('tpc_abort', (transaction,))
self._transaction = None
finally:
self._commit_lock_release()
@@ -296,7 +300,8 @@
tid = self._new_tid(self._last_tid)
self._tid = tid
- self._apply_all_storages('tpc_begin', transaction, self._tid, status)
+ self._apply_all_storages('tpc_begin',
+ (transaction, self._tid, status))
finally:
self._lock_release()
@@ -309,7 +314,7 @@
try:
if callback is not None:
callback(self._tid)
- self._apply_all_storages('tpc_finish', transaction)
+ self._apply_all_storages('tpc_finish', (transaction,))
self._last_tid = self._tid
return self._tid
finally:
@@ -324,7 +329,7 @@
try:
if transaction is not self._transaction:
return
- self._apply_all_storages('tpc_vote', transaction)
+ self._apply_all_storages('tpc_vote', (transaction,))
finally:
self._lock_release()
@@ -368,18 +373,19 @@
raise ZODB.POSException.ReadOnlyError()
self._lock_acquire()
try:
- return self._apply_all_storages('undo', transaction_id, transaction)
+ return self._apply_all_storages('undo',
+ (transaction_id, transaction))
finally:
self._lock_release()
# XXX
def undoLog(self, first=0, last=-20, filter=None):
- return self._apply_single_storage('undoLog', first, last, filter)
+ return self._apply_single_storage('undoLog', (first, last, filter))
# XXX
def undoInfo(self, first=0, last=-20, specification=None):
- return self._apply_single_storage('undoInfo', first, last,
- specification)
+ return self._apply_single_storage('undoInfo',
+ (first, last, specification))
# IStorageCurrentRecordIteration
@@ -400,7 +406,7 @@
# XXX
def getTid(self, oid):
- return self._apply_single_storage('getTid', oid)
+ return self._apply_single_storage('getTid', (oid,))
# XXX
def getExtensionMethods(self):
@@ -448,7 +454,7 @@
self.storages_recovering.append(name)
t = threading.Thread(target=self._recover_impl, args=(name,))
t.start()
- return 'recovering %r' % name
+ return 'recovering %r' % (name,)
# internal
@@ -470,8 +476,10 @@
raise gocept.zeoraid.interfaces.RAIDError("No storages remain.")
@ensure_open_storage
- def _apply_single_storage(self, method_name, *args, **kw):
- # Try to find a storage that we can talk to. Stop after we found a reliable result.
+ def _apply_single_storage(self, method_name, args=(), kw={},
+ allowed_exceptions=()):
+ # Try to find a storage that we can talk to. Stop after we found a
+ # reliable result.
failed = 0
for name in self.storages_optimal[:]:
# XXX storage might be degraded by now, need to check.
@@ -479,6 +487,10 @@
method = getattr(storage, method_name)
try:
result = method(*args, **kw)
+ except allowed_exceptions:
+ # These exceptions are valid answers from the storage, such as
+ # POSKeyError. They don't indicate storage failure.
+ raise
except Exception:
# XXX Logging
if failed:
@@ -496,14 +508,13 @@
raise gocept.zeoraid.interfaces.RAIDError("RAID storage is failed.")
@ensure_open_storage
- def _apply_all_storages(self, method_name, *args, **kw):
- # kw might contain special parameters. We need to do this
- # to avoid interfering with the actual arguments that we proxy.
- expect_connected = kw.pop('_raid_expect_connected', True)
+ def _apply_all_storages(self, method_name, args=(), kw={},
+ expect_connected=True):
results = []
storages = self.storages_optimal[:]
if not storages:
- raise gocept.zeoraid.interfaces.RAIDError("RAID storage is failed.")
+ raise gocept.zeoraid.interfaces.RAIDError(
+ "RAID storage is failed.")
for name in self.storages_optimal:
storage = self.storages[name]
@@ -535,9 +546,15 @@
begin = time.time()
self._recover_second(name)
end = time.time()
- except:
+ except Exception:
# *something* went wrong. Put the storage back to degraded.
- self._degrade_storage(name)
+ logger.exception('Failure recovering %r: ' % (name,))
+ try:
+ self._degrade_storage(name)
+ except Exception:
+ logger.exception(
+ 'Failure degrading %r after failed recovery: ' % (name,))
+ raise
raise
def _recover_second(self, name):
@@ -632,7 +649,8 @@
break
init = False
- oid, tid, data, next_oid = self._apply_single_storage('record_iternext', next_oid)
+ oid, tid, data, next_oid = self._apply_single_storage(
+ 'record_iternext', (next_oid,))
if tid > max_transaction:
continue
Modified: gocept.zeoraid/trunk/src/gocept/zeoraid/tests/test_basics.py
===================================================================
--- gocept.zeoraid/trunk/src/gocept/zeoraid/tests/test_basics.py 2008-01-15 04:22:22 UTC (rev 82888)
+++ gocept.zeoraid/trunk/src/gocept/zeoraid/tests/test_basics.py 2008-01-15 09:39:46 UTC (rev 82889)
@@ -54,7 +54,6 @@
self._servers.append(adminaddr)
self._storages.append(ZEOOpener(zport, storage='1',
- cache_size=2000000,
min_disconnect_poll=0.5, wait=1,
wait_timeout=60))
self.open()
@@ -129,7 +128,7 @@
zconf, port)
self._servers.append(adminaddr)
self._storages.append(ZEOOpener(zport, storage='1',
- cache_size=2000000,
+ cache_size=50,
min_disconnect_poll=0.5, wait=1,
wait_timeout=60))
self._storage = gocept.zeoraid.storage.RAIDStorage('teststorage',
@@ -257,7 +256,55 @@
self.assertRaises(gocept.zeoraid.interfaces.RAIDError,
self._storage.__len__)
+ def test_load_degrading1(self):
+ oid = self._storage.new_oid()
+ # These KeyErrors should be POSKeyErrors by IStorage.
+ self.assertRaises(KeyError,
+ self._storage.load, oid)
+ self.assertRaises(KeyError,
+ self._backend(0).load, oid)
+ self.assertRaises(KeyError,
+ self._backend(1).load, oid)
+ self._dostore(oid=oid, revid='\x00\x00\x00\x00\x00\x00\x00\x01')
+ data_record, serial = self._storage.load(oid)
+ self.assertEquals('((U\x10ZODB.tests.MinPOq\x01U\x05MinPOq\x02tq\x03Nt.}q\x04U\x05valueq\x05K\x07s.',
+ data_record)
+ self.assertEquals(self._storage.lastTransaction(), serial)
+ self.assertEquals((data_record, serial), self._backend(0).load(oid))
+ self.assertEquals((data_record, serial), self._backend(1).load(oid))
+
+ self._storage.raid_disable(self._storage.storages_optimal[0])
+ self.assertEquals((data_record, serial), self._storage.load(oid))
+ self.assertEquals((data_record, serial), self._backend(0).load(oid))
+
+ oid = self._storage.new_oid()
+ self._dostore(oid=oid, revid='\x00\x00\x00\x00\x00\x00\x00\x02')
+ data_record, serial = self._storage.load(oid)
+ self.assertEquals(self._storage.lastTransaction(), serial)
+ self.assertEquals((data_record, serial), self._backend(0).load(oid))
+
+ self._storage.raid_disable(self._storage.storages_optimal[0])
+ self.assertRaises(gocept.zeoraid.interfaces.RAIDError,
+ self._storage.load, oid)
+
+ def test_load_degrading2(self):
+ oid = self._storage.new_oid()
+ self._dostore(oid=oid, revid='\x00\x00\x00\x00\x00\x00\x00\x01')
+ self._backend(0).fail('load')
+ data_record, serial = self._storage.load(oid)
+ self.assertEquals('((U\x10ZODB.tests.MinPOq\x01U\x05MinPOq\x02tq\x03Nt.}q\x04U\x05valueq\x05K\x07s.',
+ data_record)
+ self.assertEquals(self._storage.lastTransaction(), serial)
+ self.assertEquals((data_record, serial), self._backend(0).load(oid))
+ self.assertEquals('degraded', self._storage.raid_status())
+
+ self._backend(0).fail('load')
+ self.assertRaises(gocept.zeoraid.interfaces.RAIDError,
+ self._storage.load, oid)
+ self.assertEquals('failed', self._storage.raid_status())
+
+
class ZEOReplicationStorageTests(ZEOStorageBackendTests,
ReplicationStorageTests,
ThreadTests.ThreadTests):
More information about the Checkins
mailing list