[Checkins] SVN: gocept.zeoraid/trunk/src/gocept/zeoraid/ - added tests for loading objects under RAID degradation

Thomas Lotze tl at gocept.com
Tue Jan 15 04:39:46 EST 2008


Log message for revision 82889:
  - added tests for loading objects under RAID degradation
  - refactored _apply_single_storage and _apply_all_storages to accept saner arguments
  - added logging of failures during recovery
  

Changed:
  U   gocept.zeoraid/trunk/src/gocept/zeoraid/storage.py
  U   gocept.zeoraid/trunk/src/gocept/zeoraid/tests/test_basics.py

-=-
Modified: gocept.zeoraid/trunk/src/gocept/zeoraid/storage.py
===================================================================
--- gocept.zeoraid/trunk/src/gocept/zeoraid/storage.py	2008-01-15 04:22:22 UTC (rev 82888)
+++ gocept.zeoraid/trunk/src/gocept/zeoraid/storage.py	2008-01-15 09:39:46 UTC (rev 82889)
@@ -6,6 +6,7 @@
 
 import threading
 import time
+import logging
 
 import zope.interface
 
@@ -20,6 +21,7 @@
 import gocept.zeoraid.interfaces
 import gocept.zeoraid.compatibility
 
+logger = logging.getLogger('gocept.zeoraid')
 
 def ensure_open_storage(method):
     def check_open(self, *args, **kw):
@@ -151,7 +153,7 @@
             # of tests.
             return
         try:
-            self._apply_all_storages('close', _raid_expect_connected=False)
+            self._apply_all_storages('close', expect_connected=False)
         finally:
             self.closed = True
             del self.storages_optimal[:]
@@ -167,7 +169,7 @@
     def history(self, oid, version='', size=1):
         """Return a sequence of history information dictionaries."""
         assert version is ''
-        return self._apply_single_storage('history', oid, size)
+        return self._apply_single_storage('history', (oid, size))
 
     def isReadOnly(self):
         """Test whether a storage allows committing new transactions."""
@@ -186,15 +188,16 @@
     def load(self, oid, version=''):
         """Load data for an object id and version."""
         assert version is ''
-        return self._apply_single_storage('load', oid)
+        return self._apply_single_storage('load', (oid,),
+                                          allowed_exceptions=KeyError)
 
     def loadBefore(self, oid, tid):
         """Load the object data written before a transaction id."""
-        return self._apply_single_storage('loadBefore', oid, tid)
+        return self._apply_single_storage('loadBefore', (oid, tid))
 
     def loadSerial(self, oid, serial):
         """Load the object record for the give transaction id."""
-        return self._apply_single_storage('loadSerial', oid, serial)
+        return self._apply_single_storage('loadSerial', (oid, serial))
 
     # XXX
     def new_oid(self):
@@ -210,13 +213,13 @@
     def pack(self, t, referencesf):
         if self.isReadOnly():
             raise ZODB.POSException.ReadOnlyError()
-        self._apply_all_storages('pack', t, referencesf)
+        self._apply_all_storages('pack', (t, referencesf))
 
     # XXX
     def registerDB(self, db, limit=None):
         # XXX Is it safe to register a DB with multiple storages or do we need some kind
         # of wrapper here?
-        self._apply_all_storages('registerDB', db)
+        self._apply_all_storages('registerDB', (db,))
 
     # XXX
     def sortKey(self):
@@ -233,7 +236,8 @@
         self._lock_acquire()
         try:
             # XXX ClientStorage doesn't adhere to the interface correctly (yet).
-            self._apply_all_storages('store', oid, oldserial, data, '', transaction)
+            self._apply_all_storages('store',
+                                     (oid, oldserial, data, '', transaction))
             if self._log_stores:
                 oids = self._unrecovered_transactions.setdefault(self._tid, [])
                 oids.append(oid)
@@ -254,7 +258,7 @@
                     # which we have to remove again because we aborted it.
                     if self._tid in self._unrecovered_transactions:
                         del self._unrecovered_transactions[self._tid]
-                self._apply_all_storages('tpc_abort', transaction)
+                self._apply_all_storages('tpc_abort', (transaction,))
                 self._transaction = None
             finally:
                 self._commit_lock_release()
@@ -296,7 +300,8 @@
                 tid = self._new_tid(self._last_tid)
             self._tid = tid
 
-            self._apply_all_storages('tpc_begin', transaction, self._tid, status)
+            self._apply_all_storages('tpc_begin',
+                                     (transaction, self._tid, status))
         finally:
             self._lock_release()
 
@@ -309,7 +314,7 @@
             try:
                 if callback is not None:
                     callback(self._tid)
-                self._apply_all_storages('tpc_finish', transaction)
+                self._apply_all_storages('tpc_finish', (transaction,))
                 self._last_tid = self._tid
                 return self._tid
             finally:
@@ -324,7 +329,7 @@
         try:
             if transaction is not self._transaction:
                 return
-            self._apply_all_storages('tpc_vote', transaction)
+            self._apply_all_storages('tpc_vote', (transaction,))
         finally:
             self._lock_release()
 
@@ -368,18 +373,19 @@
             raise ZODB.POSException.ReadOnlyError()
         self._lock_acquire()
         try:
-            return self._apply_all_storages('undo', transaction_id, transaction)
+            return self._apply_all_storages('undo',
+                                            (transaction_id, transaction))
         finally:
             self._lock_release()
 
     # XXX
     def undoLog(self, first=0, last=-20, filter=None):
-        return self._apply_single_storage('undoLog', first, last, filter)
+        return self._apply_single_storage('undoLog', (first, last, filter))
 
     # XXX
     def undoInfo(self, first=0, last=-20, specification=None):
-        return self._apply_single_storage('undoInfo', first, last,
-                                          specification)
+        return self._apply_single_storage('undoInfo',
+                                          (first, last, specification))
 
     # IStorageCurrentRecordIteration
 
@@ -400,7 +406,7 @@
 
     # XXX
     def getTid(self, oid):
-        return self._apply_single_storage('getTid', oid)
+        return self._apply_single_storage('getTid', (oid,))
 
     # XXX
     def getExtensionMethods(self):
@@ -448,7 +454,7 @@
         self.storages_recovering.append(name)
         t = threading.Thread(target=self._recover_impl, args=(name,))
         t.start()
-        return 'recovering %r' % name
+        return 'recovering %r' % (name,)
 
     # internal
 
@@ -470,8 +476,10 @@
             raise gocept.zeoraid.interfaces.RAIDError("No storages remain.")
 
     @ensure_open_storage
-    def _apply_single_storage(self, method_name, *args, **kw):
-        # Try to find a storage that we can talk to. Stop after we found a reliable result.
+    def _apply_single_storage(self, method_name, args=(), kw={},
+                              allowed_exceptions=()):
+        # Try to find a storage that we can talk to. Stop after we found a
+        # reliable result.
         failed = 0
         for name in self.storages_optimal[:]:
             # XXX storage might be degraded by now, need to check.
@@ -479,6 +487,10 @@
             method = getattr(storage, method_name)
             try:
                 result = method(*args, **kw)
+            except allowed_exceptions:
+                # These exceptions are valid answers from the storage, such as
+                # POSKeyError. They don't indicate storage failure.
+                raise
             except Exception:
                 # XXX Logging
                 if failed:
@@ -496,14 +508,13 @@
         raise gocept.zeoraid.interfaces.RAIDError("RAID storage is failed.")
 
     @ensure_open_storage
-    def _apply_all_storages(self, method_name, *args, **kw):
-        # kw might contain special parameters. We need to do this
-        # to avoid interfering with the actual arguments that we proxy.
-        expect_connected = kw.pop('_raid_expect_connected', True)
+    def _apply_all_storages(self, method_name, args=(), kw={},
+                            expect_connected=True):
         results = []
         storages = self.storages_optimal[:]
         if not storages:
-            raise gocept.zeoraid.interfaces.RAIDError("RAID storage is failed.")
+            raise gocept.zeoraid.interfaces.RAIDError(
+                "RAID storage is failed.")
 
         for name in self.storages_optimal:
             storage = self.storages[name]
@@ -535,9 +546,15 @@
             begin = time.time()
             self._recover_second(name)
             end = time.time()
-        except:
+        except Exception:
             # *something* went wrong. Put the storage back to degraded.
-            self._degrade_storage(name)
+            logger.exception('Failure recovering %r: ' % (name,))
+            try:
+                self._degrade_storage(name)
+            except Exception:
+                logger.exception(
+                    'Failure degrading %r after failed recovery: ' % (name,))
+                raise
             raise
 
     def _recover_second(self, name):
@@ -632,7 +649,8 @@
                 break
 
             init = False
-            oid, tid, data, next_oid = self._apply_single_storage('record_iternext', next_oid)
+            oid, tid, data, next_oid = self._apply_single_storage(
+                'record_iternext', (next_oid,))
 
             if tid > max_transaction:
                 continue

Modified: gocept.zeoraid/trunk/src/gocept/zeoraid/tests/test_basics.py
===================================================================
--- gocept.zeoraid/trunk/src/gocept/zeoraid/tests/test_basics.py	2008-01-15 04:22:22 UTC (rev 82888)
+++ gocept.zeoraid/trunk/src/gocept/zeoraid/tests/test_basics.py	2008-01-15 09:39:46 UTC (rev 82889)
@@ -54,7 +54,6 @@
 
             self._servers.append(adminaddr)
             self._storages.append(ZEOOpener(zport, storage='1',
-                                            cache_size=2000000,
                                             min_disconnect_poll=0.5, wait=1,
                                             wait_timeout=60))
         self.open()
@@ -129,7 +128,7 @@
                 zconf, port)
             self._servers.append(adminaddr)
             self._storages.append(ZEOOpener(zport, storage='1',
-                                            cache_size=2000000,
+                                            cache_size=50,
                                             min_disconnect_poll=0.5, wait=1,
                                             wait_timeout=60))
         self._storage = gocept.zeoraid.storage.RAIDStorage('teststorage',
@@ -257,7 +256,55 @@
         self.assertRaises(gocept.zeoraid.interfaces.RAIDError,
                           self._storage.__len__)
 
+    def test_load_degrading1(self):
+        oid = self._storage.new_oid()
+        # These KeyErrors should be POSKeyErrors by IStorage.
+        self.assertRaises(KeyError,
+                          self._storage.load, oid)
+        self.assertRaises(KeyError,
+                          self._backend(0).load, oid)
+        self.assertRaises(KeyError,
+                          self._backend(1).load, oid)
 
+        self._dostore(oid=oid, revid='\x00\x00\x00\x00\x00\x00\x00\x01')
+        data_record, serial = self._storage.load(oid)
+        self.assertEquals('((U\x10ZODB.tests.MinPOq\x01U\x05MinPOq\x02tq\x03Nt.}q\x04U\x05valueq\x05K\x07s.',
+                          data_record)
+        self.assertEquals(self._storage.lastTransaction(), serial)
+        self.assertEquals((data_record, serial), self._backend(0).load(oid))
+        self.assertEquals((data_record, serial), self._backend(1).load(oid))
+
+        self._storage.raid_disable(self._storage.storages_optimal[0])
+        self.assertEquals((data_record, serial), self._storage.load(oid))
+        self.assertEquals((data_record, serial), self._backend(0).load(oid))
+
+        oid = self._storage.new_oid()
+        self._dostore(oid=oid, revid='\x00\x00\x00\x00\x00\x00\x00\x02')
+        data_record, serial = self._storage.load(oid)
+        self.assertEquals(self._storage.lastTransaction(), serial)
+        self.assertEquals((data_record, serial), self._backend(0).load(oid))
+
+        self._storage.raid_disable(self._storage.storages_optimal[0])
+        self.assertRaises(gocept.zeoraid.interfaces.RAIDError,
+                          self._storage.load, oid)
+
+    def test_load_degrading2(self):
+        oid = self._storage.new_oid()
+        self._dostore(oid=oid, revid='\x00\x00\x00\x00\x00\x00\x00\x01')
+        self._backend(0).fail('load')
+        data_record, serial = self._storage.load(oid)
+        self.assertEquals('((U\x10ZODB.tests.MinPOq\x01U\x05MinPOq\x02tq\x03Nt.}q\x04U\x05valueq\x05K\x07s.',
+                          data_record)
+        self.assertEquals(self._storage.lastTransaction(), serial)
+        self.assertEquals((data_record, serial), self._backend(0).load(oid))
+        self.assertEquals('degraded', self._storage.raid_status())
+
+        self._backend(0).fail('load')
+        self.assertRaises(gocept.zeoraid.interfaces.RAIDError,
+                          self._storage.load, oid)
+        self.assertEquals('failed', self._storage.raid_status())
+
+
 class ZEOReplicationStorageTests(ZEOStorageBackendTests,
                                  ReplicationStorageTests,
                                  ThreadTests.ThreadTests):



More information about the Checkins mailing list