[Checkins] SVN: gocept.zeoraid/trunk/ - General improvements on reloading: make code more readable and avoid

Christian Theune ct at gocept.com
Sat Nov 14 09:15:35 EST 2009


Log message for revision 105643:
  - General improvements on reloading: make code more readable and avoid
    crashing when configuration file doesn't parse.
  
  - Restructured output of the controller's `details` command.
  
  - Fix #464339: Storages were not added on reload.
  
  - Fix #330008: Reload now refuses to apply a configuration change if it would
    cause all optimal storages to disappear.
  
  - Fix #316285: Reload failed removing degraded back-ends.
  

Changed:
  U   gocept.zeoraid/trunk/CHANGES.txt
  U   gocept.zeoraid/trunk/doc/CMDLINE.txt
  U   gocept.zeoraid/trunk/src/gocept/zeoraid/scripts/controller.py
  U   gocept.zeoraid/trunk/src/gocept/zeoraid/storage.py
  U   gocept.zeoraid/trunk/src/gocept/zeoraid/tests/test_basics.py

-=-
Modified: gocept.zeoraid/trunk/CHANGES.txt
===================================================================
--- gocept.zeoraid/trunk/CHANGES.txt	2009-11-14 12:08:16 UTC (rev 105642)
+++ gocept.zeoraid/trunk/CHANGES.txt	2009-11-14 14:15:34 UTC (rev 105643)
@@ -5,6 +5,18 @@
 1.0b5 (unreleased)
 ------------------
 
+- General improvements on reloading: make code more readable and avoid crashing
+  when configuration file doesn't parse.
+
+- Restructured output of the controller's `details` command.
+
+- Fix #464339: Storages were not added on reload.
+
+- Fix #330008: Reload now refuses to apply a configuration change if it would
+  cause all optimal storages to disappear.
+
+- Fix #316285: Reload failed removing degraded back-ends.
+
 - Add a note to the deployment documentation that strongly advises people to
   use a deployment recipe for setting up their ZEO servers to avoid buildout
   killing volatile files.

Modified: gocept.zeoraid/trunk/doc/CMDLINE.txt
===================================================================
--- gocept.zeoraid/trunk/doc/CMDLINE.txt	2009-11-14 12:08:16 UTC (rev 105642)
+++ gocept.zeoraid/trunk/doc/CMDLINE.txt	2009-11-14 14:15:34 UTC (rev 105643)
@@ -53,8 +53,8 @@
 New back-ends are added to the RAID in `degraded` state and have to be
 manually recovered.
 
-Back-ends that are missing from the new configuration are `disabled` but
-remain in the RAID.
+Back-ends that are missing from the new configuration are closed and removed
+from the RAID confiuration.
 
 Example:
 

Modified: gocept.zeoraid/trunk/src/gocept/zeoraid/scripts/controller.py
===================================================================
--- gocept.zeoraid/trunk/src/gocept/zeoraid/scripts/controller.py	2009-11-14 12:08:16 UTC (rev 105642)
+++ gocept.zeoraid/trunk/src/gocept/zeoraid/scripts/controller.py	2009-11-14 14:15:34 UTC (rev 105643)
@@ -78,15 +78,29 @@
         return STATUS_TO_NAGIOS[status]
 
     def cmd_details(self):
+        col1 = 25
+
         ok, recovering, failed, recovery_status = self.raid.raid_details()
-        print "RAID status:"
-        print "\t", self.raid.raid_status()
-        print "Storage status:"
-        print "\toptimal\t\t", ok
-        print "\trecovering\t", recovering, recovery_status
-        print "\tfailed\t\t", failed
-        return STATUS_TO_NAGIOS[self.cmd_status()]
+        print " %s| Status" % ('%s:%s' % (self.host, self.port)).ljust(col1)
+        print " %s+-------------------" % ('-' * col1)
+        print " %s| %s" % (('RAID %s' % self.storage).ljust(col1),
+                           self.raid.raid_status().ljust(col1))
+        print " %s+-------------------" % ('-' * col1)
 
+        storages = {}
+        for storage in ok:
+            storages[storage] = 'optimal'
+        for storage in failed:
+            storages[storage] = 'failed'
+        if recovering:
+            storages[recovering] = ('recovering: %s transaction %s' %
+                                    recovery_status)
+
+        for id in sorted(storages):
+            print ' %s| %s' % (str(id).ljust(col1), storages[id])
+
+        return STATUS_TO_NAGIOS[self.raid.raid_status()]
+
     def cmd_recover(self, storage):
         print self.raid.raid_recover(storage)
         return NAGIOS_OK

Modified: gocept.zeoraid/trunk/src/gocept/zeoraid/storage.py
===================================================================
--- gocept.zeoraid/trunk/src/gocept/zeoraid/storage.py	2009-11-14 12:08:16 UTC (rev 105642)
+++ gocept.zeoraid/trunk/src/gocept/zeoraid/storage.py	2009-11-14 14:15:34 UTC (rev 105643)
@@ -40,6 +40,7 @@
 
 
 def ensure_open_storage(method):
+
     def check_open(self, *args, **kw):
         if self.closed:
             raise gocept.zeoraid.interfaces.RAIDClosedError(
@@ -49,6 +50,7 @@
 
 
 def ensure_writable(method):
+
     def check_writable(self, *args, **kw):
         if self.isReadOnly():
             raise ZODB.POSException.ReadOnlyError()
@@ -167,7 +169,7 @@
         # Set up list of degraded storages
         self.storages_degraded = []
         # Degrade all remaining (non-optimal) storages
-        for name in reduce(lambda x,y:x+y, tids.values(), []):
+        for name in reduce(lambda x, y: x + y, tids.values(), []):
             self._degrade_storage(name)
 
         # No storage is recovering initially
@@ -252,23 +254,27 @@
         """Allocate a new object id."""
         self._write_lock.acquire()
         try:
-            oids = []
-            for storage in self.storages_optimal[:]:
-                reliable, oid = self.__apply_storage(storage, 'new_oid')
-                if reliable:
-                    oids.append((oid, storage))
-            if not oids:
-                raise gocept.zeoraid.interfaces.RAIDError(
-                    "RAID storage is failed.")
-
-            min_oid = sorted(oids)[0][0]
-            for oid, storage in oids:
-                if oid > min_oid:
-                    self._degrade_storage(storage)
-            return min_oid
+            return self._new_oid()
         finally:
             self._write_lock.release()
 
+    def _new_oid(self):
+        # Not write-lock protected implementation of new_oid
+        oids = []
+        for storage in self.storages_optimal[:]:
+            reliable, oid = self.__apply_storage(storage, 'new_oid')
+            if reliable:
+                oids.append((oid, storage))
+        if not oids:
+            raise gocept.zeoraid.interfaces.RAIDError(
+                "RAID storage is failed.")
+
+        min_oid = sorted(oids)[0][0]
+        for oid, storage in oids:
+            if oid > min_oid:
+                self._degrade_storage(storage)
+        return min_oid
+
     @ensure_writable
     def pack(self, t, referencesf):
         """Pack the storage."""
@@ -607,7 +613,13 @@
                 'Cannot reload config without running inside ZEO.')
 
         options = ZEOOptions()
-        options.realize(['-C', self.zeo.options.configfile])
+        try:
+            options.realize(['-C', self.zeo.options.configfile])
+        except Exception, e:
+            raise RuntimeError(
+                'Could not reload configuration file: '
+                'please check your configuration.')
+
         for candidate in options.storages:
             if candidate.name == self.__name__:
                 storage = candidate
@@ -615,18 +627,33 @@
         else:
             raise RuntimeError(
                 'No storage section found for RAID %s.' % self.__name__)
-        new_storages = dict((opt.name, opt)
-                            for opt in storage.config.storages)
-        new_names = set(new_storages)
-        old_names = set(self.openers)
+        configured_storages = dict(
+            (opt.name, opt) for opt in storage.config.storages)
 
-        for name in old_names - new_names:
-            self._close_storage(name)
+        configured_names = set(configured_storages.keys())
+        current_names = set(self.openers.keys())
 
-        for name in new_names - old_names:
-            self.openers[name] = new_storages[name]
+        added = configured_names - current_names
+        removed = current_names - configured_names
+
+        # Check whether we would remove all optimal storages. If that's so,
+        # then we do not perform the reconfiguration.
+        remaining_optimal = set(self.storages_optimal) - removed
+        if not remaining_optimal:
+            raise RuntimeError(
+                'Cannot perform reconfiguration: '
+                'all optimal storages would be removed.')
+
+        for name in added:
+            self.openers[name] = configured_storages[name]
             self.storages_degraded.append(name)
 
+        for name in removed:
+            self._close_storage(name)
+            del self.openers[name]
+            if name in self.storages_degraded:
+                self.storages_degraded.remove(name)
+
     # internal
 
     def _open_storage(self, name):
@@ -638,11 +665,12 @@
     def _close_storage(self, name):
         if name in self.storages_optimal:
             self.storages_optimal.remove(name)
-        storage = self.storages.pop(name)
-        t = threading.Thread(target=storage.close)
-        self._threads.add(t)
-        t.setDaemon(True)
-        t.start()
+        if name in self.storages:
+            storage = self.storages.pop(name)
+            t = threading.Thread(target=storage.close)
+            self._threads.add(t)
+            t.setDaemon(True)
+            t.start()
 
     def _degrade_storage(self, name, fail=True):
         self._close_storage(name)
@@ -726,6 +754,7 @@
         else:
             # Provide a fallback if `args` is given as a simple tuple.
             static_arguments = args
+
             def dummy_generator():
                 while True:
                     yield static_arguments
@@ -814,33 +843,37 @@
     def _finalize_recovery(self, storage):
         self._write_lock.acquire()
         try:
+            # Synchronize OIDs: check which one is further down giving out
+            # OIDs. Due to ZEO allocation massively at once this might also be
+            # the recovering storage. Give it exactly one try to catch up. If
+            # we miss the target, we simply degrade the recovering storage
+            # again.
+            max_optimal = self._new_oid()
+            max_recovering = self.storages[self.storage_recovering].new_oid()
+            if max_optimal != max_recovering:
+                if max_optimal > max_recovering:
+                    target = max_optimal
+                    catch_up = self.storages[self.storage_recovering].new_oid
+                else:
+                    target = max_recovering
+                    catch_up = self._new_oid
+
+                oid = None
+                while oid < target:
+                    oid = catch_up()
+
+                if oid != target:
+                    logging.warn(
+                        'Degrading recovering storage %r due to '
+                        'new OID mismatch' % self.storage_recovering)
+                    self._degrade_storage(self.storage_recovering)
+                    return
+
             self.storages_optimal.append(self.storage_recovering)
-            self._synchronise_oids()
             self.storage_recovering = None
         finally:
             self._write_lock.release()
 
-    def _synchronise_oids(self):
-        # Try allocating the same OID from all storages. This is done by
-        # determining the maximum and making all other storages increase
-        # their OID until they hit the maximum. While any storage yields
-        # an OID above the maximum, we try again with that value.
-        max_oid = None
-        lagging = self.storages_optimal[:]
-        while lagging:
-            storage = lagging.pop()
-            while True:
-                reliable, oid = self.__apply_storage(storage, 'new_oid')
-                if not reliable:
-                    break
-                if oid < max_oid:
-                    continue
-                if oid > max_oid:
-                    max_oid = oid
-                    lagging = [s for s in self.storages_optimal
-                               if s != storage]
-                break
-
     def _new_tid(self, old_tid):
         """Generates a new TID."""
         if old_tid is None:

Modified: gocept.zeoraid/trunk/src/gocept/zeoraid/tests/test_basics.py
===================================================================
--- gocept.zeoraid/trunk/src/gocept/zeoraid/tests/test_basics.py	2009-11-14 12:08:16 UTC (rev 105642)
+++ gocept.zeoraid/trunk/src/gocept/zeoraid/tests/test_basics.py	2009-11-14 14:15:34 UTC (rev 105643)
@@ -41,6 +41,7 @@
 
 def fail(obj, name):
     old_method = getattr(obj, name)
+
     def failing_method(*args, **kw):
         setattr(obj, name, old_method)
         raise Exception()
@@ -790,6 +791,7 @@
         open(blob_file_name, 'w').write('I am a happy blob.')
         t = transaction.Transaction()
         self._storage.tpc_begin(t)
+
         def fail(*args, **kw):
             raise Exception()
         self._backend(0).storeBlob = fail
@@ -809,6 +811,7 @@
         open(blob_file_name, 'w').write('I am a happy blob.')
         t = transaction.Transaction()
         self._storage.tpc_begin(t)
+
         def fail(*args, **kw):
             raise Exception()
         self._backend(0).storeBlob = fail
@@ -932,6 +935,7 @@
         storage.close()
 
     def test_supportsUndo_required(self):
+
         class Opener(object):
             name = 'foo'
 
@@ -1182,6 +1186,7 @@
 
     def test_timeoutBackend(self):
         self._storage.timeout = 2
+
         def slow_tpc_begin(*args):
             time.sleep(4)
         self._backend(0).tpc_begin = slow_tpc_begin
@@ -1198,6 +1203,7 @@
 
     def test_blob_cache_cannot_link(self):
         called_broken = []
+
         def broken_link(foo, bar):
             called_broken.append(True)
             raise OSError
@@ -1215,6 +1221,7 @@
 
     def test_blob_cache_locking(self):
         return_value = []
+
         def try_loadBlob():
             return_value.append(self._storage.loadBlob(oid, tid))
 
@@ -1490,25 +1497,114 @@
 
         s5.close()
 
-    def test_reload_remove(self):
-        storage = self._storages.pop(3).open()
+    def test_recover_same_storage(self):
+        # This is a somewhat convoluted example for accidentally connecting a
+        # new storage to the same ZEO server that is already used and trying
+        # to recover. This will almost recover but will cause the newly
+        # connected storage to be disconnected again because OIDs can't be
+        # synchronized.
+        removed_storage = self._storages.pop(3)
 
         # configure the RAID to no longer use the removed backend
         self.update_config()
         self._storage.raid_reload()
-        self.assertEquals('optimal', self._storage.raid_status())
+        self.assertEquals([['1', '0', '2', '4'], None, [], ''],
+                          self._storage.raid_details())
 
+        # Re-insert the storage
+        self._storages.append(removed_storage)
+        self.update_config()
+        self._storage.raid_reload()
+        self.assertEquals([['1', '0', '2', '4'], None, ['3'], ''],
+                          self._storage.raid_details())
+
+        self._storage.raid_recover('3')
+
+        while self._storage.raid_status() != 'degraded':
+            time.sleep(0.1)
+        # This is unobvious: Storage 4 fails because the _apply_all call for
+        # new_oid causes storage 4 to give an inconsistent result thus being
+        # dropped from the pool of good storages. Storage 3 however meets the
+        # OID target then thus being picked up.
+        self.assertEquals([['1', '0', '2', '3'], None, ['4'], ('recovered',)],
+                          self._storage.raid_details())
+
+    def test_reload_remove_readd(self):
+        removed_storage = self._storages.pop()
+
+        # configure the RAID to no longer use the removed backend
+        self.update_config()
+        self._storage.raid_reload()
+        self.assertEquals([['1', '0', '3', '2'], None, [], ''],
+                          self._storage.raid_details())
+
         # ensure that we can still write to the RAID
         oid = self._storage.new_oid()
         self._dostore(oid=oid)
 
         # ensure that the transaction did not arrive at the removed backend
-        self.assertRaises(ZODB.POSException.POSKeyError, storage.load, oid)
+        s = removed_storage.open()
+        self.assertRaises(ZODB.POSException.POSKeyError, s.load, oid)
+        s.close()
 
-        storage.close()
+        # Re-insert the storage
+        self._storages.append(removed_storage)
+        self.update_config()
+        self._storage.raid_reload()
+        self.assertEquals([['1', '0', '3', '2'], None, ['4'], ''],
+                          self._storage.raid_details())
 
+        self._storage.raid_recover('4')
 
+        while self._storage.raid_status() != 'optimal':
+            time.sleep(0.1)
+
+        self.assertEquals(
+            [['1', '0', '3', '2', '4'], None, [], ('recovered',)],
+            self._storage.raid_details())
+
+        # Now, after recovery, the OID should be visible
+        s = removed_storage.open()
+        s.load(oid)
+        s.close()
+
+    def test_reload_remove_disabled(self):
+        removed_storage = self._storages.pop()
+        self._storage.raid_disable(removed_storage.name)
+
+        self.assertEquals([['1', '0', '3', '2'], None, ['4'], ''],
+                          self._storage.raid_details())
+
+        # configure the RAID to no longer use the removed backend
+        self.update_config()
+
+        self._storage.raid_reload()
+        self.assertEquals([['1', '0', '3', '2'], None, [], ''],
+                          self._storage.raid_details())
+
+    def test_reload_broken_config(self):
+        self.update_config()
+        f = open(self.zeo_configfile, 'w')
+        f.write('fdashkjfhdaskkf')
+        f.close()
+
+        self.assertRaises(RuntimeError, self._storage.raid_reload)
+        self.assertEquals([['1', '0', '3', '2', '4'], None, [], ''],
+                          self._storage.raid_details())
+
+    def test_reload_no_remaining_storages(self):
+        remove = self._storages[:-1]
+        self._storages = self._storages[-1:]
+        self._storage.raid_disable(self._storages[0].name)
+
+        self.update_config()
+        self.assertRaises(RuntimeError, self._storage.raid_reload)
+        self.assertEquals([['1', '0', '3', '2'], None, ['4'], ''],
+                          self._storage.raid_details())
+
+
 def test_suite():
+
     suite = unittest.TestSuite()
     suite.addTest(unittest.makeSuite(ZEOReplicationStorageTests, "check"))
     suite.addTest(unittest.makeSuite(FailingStorageTests))



More information about the checkins mailing list