[Checkins] SVN: gocept.zeoraid/trunk/src/gocept/zeoraid/ merged and cleaned up dirceu-distributed-write-calls branch, added test

Thomas Lotze tl at gocept.com
Thu Jan 8 04:08:17 EST 2009


Log message for revision 94611:
  merged and cleaned up dirceu-distributed-write-calls branch, added test

Changed:
  U   gocept.zeoraid/trunk/src/gocept/zeoraid/storage.py
  U   gocept.zeoraid/trunk/src/gocept/zeoraid/tests/test_basics.py

-=-
Modified: gocept.zeoraid/trunk/src/gocept/zeoraid/storage.py
===================================================================
--- gocept.zeoraid/trunk/src/gocept/zeoraid/storage.py	2009-01-08 07:26:31 UTC (rev 94610)
+++ gocept.zeoraid/trunk/src/gocept/zeoraid/storage.py	2009-01-08 09:08:16 UTC (rev 94611)
@@ -57,6 +57,31 @@
     return check_writable
 
 
+class ThreadedApplyStorage(threading.Thread):
+
+    reliable = None
+    result = None
+    exception = None
+
+    def __init__(self, storage_name, method_name, args, kw,
+	         expect_connected, apply_storage):
+        super(ThreadedApplyStorage, self).__init__()
+        self.storage_name = storage_name
+        self.method_name = method_name
+        self.args = args
+        self.kw = kw
+        self.expect_connected = expect_connected
+        self.__apply_storage = apply_storage
+
+    def run(self):
+        try:
+            self.reliable, self.result = self.__apply_storage(
+                self.storage_name, self.method_name, self.args,
+                self.kw, self.expect_connected)
+        except Exception, e:
+            self.exception = e
+
+
 class RAIDStorage(object):
     """The RAID storage is a drop-in replacement for the client storages that
     are configured.
@@ -90,6 +115,9 @@
     # for generating new TIDs.
     _last_tid = None
 
+    # Timeout for threaded/parallel operations on backend storages.
+    timeout = 60
+
     def __init__(self, name, openers, read_only=False, blob_dir=None,
                  shared_blob_dir=False):
         self.__name__ = name
@@ -169,9 +197,9 @@
             del self.storages_optimal[:]
 
         for thread in self._threads:
-            # We give all the threads a chance to get done within one second.
+            # We give all the threads a chance to get done quickly.
             # This is mostly a convenience for the tests to not annoy.
-            thread.join(1)
+            thread.join(5)
 
     def getName(self):
         """The name of the storage."""
@@ -587,10 +615,18 @@
         if not self.storages_optimal and fail:
             raise gocept.zeoraid.interfaces.RAIDError("No storages remain.")
 
-    def __apply_storage(self, name, method_name, args=(), kw={},
+    def __apply_storage(self, storage_name, method_name, args=(), kw={},
                         expect_connected=True):
+        """Calls a method on a given backend storage.
+
+        Returns a tuple (reliable, result).
+
+        All exceptions raised by this method indicate a user error. Storage
+        failure is signalled by declaring the result unreliable.
+
+        """
         # XXX storage might be degraded by now, need to check.
-        storage = self.storages[name]
+        storage = self.storages[storage_name]
         method = getattr(storage, method_name)
         reliable = True
         result = None
@@ -613,7 +649,7 @@
             reliable = False
 
         if not reliable:
-            self._degrade_storage(name)
+            self._degrade_storage(storage_name)
         return (reliable, result)
 
     @ensure_open_storage
@@ -642,7 +678,7 @@
                             ignore_noop=False):
         """Calls the given method on all optimal backend storages in order.
 
-        `args` can be given as an n-tupel with the positional arguments that
+        `args` can be given as an n-tuple with the positional arguments that
         should be passed to each storage.
 
         Alternatively `args` can be a callable that returns an iterable. The
@@ -650,9 +686,6 @@
         N-th storage.
 
         """
-        results = []
-        exceptions = []
-
         if callable(args):
             argument_iterable = args()
         else:
@@ -667,18 +700,32 @@
         applicable_storages = [storage for storage in applicable_storages
                                if storage not in exclude]
 
-        for name in applicable_storages:
-            try:
-                args = argument_iterable.next()
-                reliable, result = self.__apply_storage(
-                    name, method_name, args, kw, expect_connected)
-            except Exception, e:
-                exceptions.append(e)
-                raise
-            else:
-                if reliable:
-                    results.append(result)
+        # Run __apply_storage on all applicable storages in parallel.
+        threads = []
+        for storage_name in applicable_storages:
+            args = argument_iterable.next()
+            t = ThreadedApplyStorage(storage_name, method_name, args, kw,
+                                     expect_connected, self.__apply_storage)
+            threads.append(t)
+            t.start()
 
+        # Wait for threads to finish and pick up results.
+        results = []
+        exceptions = []
+        for thread in threads:
+            # XXX The timeout should be calculated such that the total time
+            # spent in this loop doesn't grow with the number of storages.
+            thread.join(self.timeout)
+            if thread.isAlive():
+                # Storage timed out.
+                self._degrade_storage(thread.storage_name)
+                self._threads.add(thread)
+                continue
+            if thread.exception:
+                exceptions.append(thread.exception)
+            elif thread.reliable:
+                results.append(thread.result)
+
         # Analyse result consistency.
         consistent = True
         if exceptions and results:

Modified: gocept.zeoraid/trunk/src/gocept/zeoraid/tests/test_basics.py
===================================================================
--- gocept.zeoraid/trunk/src/gocept/zeoraid/tests/test_basics.py	2009-01-08 07:26:31 UTC (rev 94610)
+++ gocept.zeoraid/trunk/src/gocept/zeoraid/tests/test_basics.py	2009-01-08 09:08:16 UTC (rev 94611)
@@ -1230,7 +1230,16 @@
         self._storage.new_oid()
         self.assertEquals('optimal', self._storage.raid_status())
 
+    def test_timeoutBackend(self):
+        self._storage.timeout = 2
+        def slow_tpc_begin(*args):
+            time.sleep(4)
+        self._backend(0).tpc_begin = slow_tpc_begin
+        t = transaction.Transaction()
+        self._storage.tpc_begin(t)
+        self.assertEquals('degraded', self._storage.raid_status())
 
+
 class FailingStorageTests(FailingStorageTestBase,
                           FailingStorageTestSetup):
 



More information about the Checkins mailing list