[Checkins] SVN: gocept.zeoraid/trunk/src/gocept/zeoraid/ added new_oid tests, fixed _apply methods

Thomas Lotze tl at gocept.com
Wed Jan 16 05:22:35 EST 2008


Log message for revision 82917:
  added new_oid tests, fixed _apply methods

Changed:
  U   gocept.zeoraid/trunk/src/gocept/zeoraid/storage.py
  U   gocept.zeoraid/trunk/src/gocept/zeoraid/tests/test_basics.py

-=-
Modified: gocept.zeoraid/trunk/src/gocept/zeoraid/storage.py
===================================================================
--- gocept.zeoraid/trunk/src/gocept/zeoraid/storage.py	2008-01-16 08:54:10 UTC (rev 82916)
+++ gocept.zeoraid/trunk/src/gocept/zeoraid/storage.py	2008-01-16 10:22:35 UTC (rev 82917)
@@ -17,6 +17,7 @@
 import ZODB.utils
 import persistent.TimeStamp
 import transaction
+import transaction.interfaces
 
 import gocept.zeoraid.interfaces
 import gocept.zeoraid.compatibility
@@ -150,7 +151,7 @@
         for degraded_storages in tids.values():
             self.storages_degraded.extend(degraded_storages)
 
-        # Degrade storages that don't have the right max OID.
+        # XXX Degrade storages that don't have the right max OID.
 
         # No storages are recovering initially
         self.storages_recovering = []
@@ -180,9 +181,7 @@
     def history(self, oid, version='', size=1):
         """Return a sequence of history information dictionaries."""
         assert version is ''
-        return self._apply_single_storage(
-            'history', (oid, size),
-            allowed_exceptions=ZODB.POSException.POSKeyError)
+        return self._apply_single_storage('history', (oid, size))
 
     def isReadOnly(self):
         """Test whether a storage allows committing new transactions."""
@@ -201,20 +200,15 @@
     def load(self, oid, version=''):
         """Load data for an object id and version."""
         assert version is ''
-        return self._apply_single_storage(
-            'load', (oid,), allowed_exceptions=ZODB.POSException.POSKeyError)
+        return self._apply_single_storage('load', (oid,))
 
     def loadBefore(self, oid, tid):
         """Load the object data written before a transaction id."""
-        return self._apply_single_storage(
-            'loadBefore', (oid, tid),
-            allowed_exceptions=ZODB.POSException.POSKeyError)
+        return self._apply_single_storage('loadBefore', (oid, tid))
 
     def loadSerial(self, oid, serial):
         """Load the object record for the give transaction id."""
-        return self._apply_single_storage(
-            'loadSerial', (oid, serial),
-            allowed_exceptions=ZODB.POSException.POSKeyError)
+        return self._apply_single_storage('loadSerial', (oid, serial))
 
     # XXX
     @ensure_writable
@@ -489,33 +483,35 @@
             raise gocept.zeoraid.interfaces.RAIDError("No storages remain.")
 
     @ensure_open_storage
-    def _apply_single_storage(self, method_name, args=(), kw={},
-                              allowed_exceptions=()):
+    def _apply_single_storage(self, method_name, args=(), kw={}):
         # Try to find a storage that we can talk to. Stop after we found a
         # reliable result.
-        failed = 0
         for name in self.storages_optimal[:]:
             # XXX storage might be degraded by now, need to check.
             storage = self.storages[name]
             method = getattr(storage, method_name)
             try:
                 result = method(*args, **kw)
-            except allowed_exceptions:
-                # These exceptions are valid answers from the storage, such as
-                # POSKeyError. They don't indicate storage failure.
+            except ZODB.POSException.StorageError:
+                # Handle StorageErrors first, otherwise they would be
+                # swallowed when POSErrors are.
+                self._degrade_storage(name)
+                continue
+            except (ZODB.POSException.POSError,
+                    transaction.interfaces.TransactionError), e:
+                # These exceptions are valid answers from the storage. They
+                # don't indicate storage failure.
                 raise
             except Exception:
-                # XXX Logging
-                if failed:
-                    raise
-                failed += 1
-            else:
-                if storage.is_connected():
-                    # We have a result that is reliable.
-                    return result
-            # There was no result or it is not reliable, the storage needs to
-            # be degraded and we try another storage.
-            self._degrade_storage(name)
+                # We have no result.
+                self._degrade_storage(name)
+                continue
+            if not storage.is_connected():
+                # We cannot rely on the result.
+                self._degrade_storage(name)
+                continue
+            # Everything went fine.
+            return result
 
         # We could not determine a result from any storage.
         raise gocept.zeoraid.interfaces.RAIDError("RAID storage is failed.")
@@ -524,28 +520,63 @@
     def _apply_all_storages(self, method_name, args=(), kw={},
                             expect_connected=True):
         results = []
-        storages = self.storages_optimal[:]
-        if not storages:
-            raise gocept.zeoraid.interfaces.RAIDError(
-                "RAID storage is failed.")
-
-        for name in self.storages_optimal:
+        exceptions = []
+        for name in self.storages_optimal[:]:
             storage = self.storages[name]
+            method = getattr(storage, method_name)
             try:
-                method = getattr(storage, method_name)
-                results.append(method(*args, **kw))
-            except ZEO.ClientStorage.ClientDisconnected:
+                result = method(*args, **kw)
+            except ZODB.POSException.StorageError:
+                # Handle StorageErrors first, otherwise they would be
+                # swallowed when POSErrors are.
                 self._degrade_storage(name)
-            else:
-                if expect_connected and not storage.is_connected():
-                    self._degrade_storage(name)
+                continue
+            except (ZODB.POSException.POSError,
+                    transaction.interfaces.TransactionError), e:
+                # These exceptions are valid answers from the storage. They
+                # don't indicate storage failure.
+                exceptions.append(e)
+                continue
+            except Exception:
+                # We have no result.
+                self._degrade_storage(name)
+                continue
+            if expect_connected and not storage.is_connected():
+                # We cannot rely on the result.
+                self._degrade_storage(name)
+                continue
+            # Everything went fine.
+            results.append(result)
 
-        res = results[:]
-        for test1 in res:
-            for test2 in res:
-                assert test1 == test2, "Results not consistent. Asynchronous storage?"
-        return results[0]
+        # Analyse result consistency.
+        consistent = True
+        if exceptions and results:
+            consistent = False
+        elif exceptions:
+            # Since we can only get one kind of exceptions at the moment, they
+            # must be consistent anyway.
+            pass
+        elif results:
+            ref = results[0]
+            for test in results:
+                if test != ref:
+                    consistent = False
+                    break
+        if not consistent:
+            self.close()
+            raise gocept.zeoraid.interfaces.RAIDError(
+                "RAID is inconsistent and was closed.")
 
+        # Select result.
+        if exceptions:
+            raise exceptions[0]
+        if results:
+            return results[0]
+
+        # We could not determine a result from any storage because all of them
+        # failed.
+        raise gocept.zeoraid.interfaces.RAIDError("RAID storage is failed.")
+
     def _recover_impl(self, name):
         try:
             # First pass: Transfer all oids without hindering running transactions

Modified: gocept.zeoraid/trunk/src/gocept/zeoraid/tests/test_basics.py
===================================================================
--- gocept.zeoraid/trunk/src/gocept/zeoraid/tests/test_basics.py	2008-01-16 08:54:10 UTC (rev 82916)
+++ gocept.zeoraid/trunk/src/gocept/zeoraid/tests/test_basics.py	2008-01-16 10:22:35 UTC (rev 82917)
@@ -438,7 +438,6 @@
         self.assertRaises(gocept.zeoraid.interfaces.RAIDError,
                           self._storage.loadSerial, oid, revid)
 
-
     def test_loadSerial_degrading2(self):
         oid = self._storage.new_oid()
         revid = self._dostoreNP(oid=oid, revid=None, data='foo')
@@ -464,7 +463,30 @@
                           self._storage.loadSerial, oid, revid)
         self.assertEquals('failed', self._storage.raid_status())
 
+    def test_new_oid_degrading1(self):
+        self.assertEquals(8, len(self._storage.new_oid()))
+        self._disable_storage(0)
+        self.assertEquals(8, len(self._storage.new_oid()))
+        self._disable_storage(0)
+        self.assertRaises(gocept.zeoraid.interfaces.RAIDError,
+                          self._storage.new_oid)
 
+    def test_new_oid_degrading2(self):
+        self.assertEquals(8, len(self._storage.new_oid()))
+        self.assertEquals('optimal', self._storage.raid_status())
+
+        self._backend(0)._oids = None
+        self._backend(0).fail('new_oid')
+        self.assertEquals(8, len(self._storage.new_oid()))
+        self.assertEquals('degraded', self._storage.raid_status())
+
+        self._backend(0)._oids = None
+        self._backend(0).fail('new_oid')
+        self.assertRaises(gocept.zeoraid.interfaces.RAIDError,
+                          self._storage.new_oid)
+        self.assertEquals('failed', self._storage.raid_status())
+
+
 class ZEOReplicationStorageTests(ZEOStorageBackendTests,
                                  ReplicationStorageTests,
                                  ThreadTests.ThreadTests):
@@ -476,6 +498,3 @@
     suite.addTest(unittest.makeSuite(ZEOReplicationStorageTests, "check"))
     suite.addTest(unittest.makeSuite(FailingStorageTests2Backends))
     return suite
-
-if __name__=='__main__':
-    unittest.main()



More information about the Checkins mailing list