[Checkins] SVN: gocept.zeoraid/trunk/ - General improvements on reloading: make code more readable and avoid
Christian Theune
ct at gocept.com
Sat Nov 14 09:15:35 EST 2009
Log message for revision 105643:
- General improvements on reloading: make code more readable and avoid
crashing when configuration file doesn't parse.
- Restructured output of the controller's `details` command.
- Fix #464339: Storages were not added on reload.
- Fix #330008: Reload now refuses to apply a configuration change if it would
cause all optimal storages to disappear.
- Fix #316285: Reload failed removing degraded back-ends.
Changed:
U gocept.zeoraid/trunk/CHANGES.txt
U gocept.zeoraid/trunk/doc/CMDLINE.txt
U gocept.zeoraid/trunk/src/gocept/zeoraid/scripts/controller.py
U gocept.zeoraid/trunk/src/gocept/zeoraid/storage.py
U gocept.zeoraid/trunk/src/gocept/zeoraid/tests/test_basics.py
-=-
Modified: gocept.zeoraid/trunk/CHANGES.txt
===================================================================
--- gocept.zeoraid/trunk/CHANGES.txt 2009-11-14 12:08:16 UTC (rev 105642)
+++ gocept.zeoraid/trunk/CHANGES.txt 2009-11-14 14:15:34 UTC (rev 105643)
@@ -5,6 +5,18 @@
1.0b5 (unreleased)
------------------
+- General improvements on reloading: make code more readable and avoid crashing
+ when configuration file doesn't parse.
+
+- Restructured output of the controller's `details` command.
+
+- Fix #464339: Storages were not added on reload.
+
+- Fix #330008: Reload now refuses to apply a configuration change if it would
+ cause all optimal storages to disappear.
+
+- Fix #316285: Reload failed removing degraded back-ends.
+
- Add a note to the deployment documentation that strongly advises people to
use a deployment recipe for setting up their ZEO servers to avoid buildout
killing volatile files.
Modified: gocept.zeoraid/trunk/doc/CMDLINE.txt
===================================================================
--- gocept.zeoraid/trunk/doc/CMDLINE.txt 2009-11-14 12:08:16 UTC (rev 105642)
+++ gocept.zeoraid/trunk/doc/CMDLINE.txt 2009-11-14 14:15:34 UTC (rev 105643)
@@ -53,8 +53,8 @@
New back-ends are added to the RAID in `degraded` state and have to be
manually recovered.
-Back-ends that are missing from the new configuration are `disabled` but
-remain in the RAID.
+Back-ends that are missing from the new configuration are closed and removed
+from the RAID confiuration.
Example:
Modified: gocept.zeoraid/trunk/src/gocept/zeoraid/scripts/controller.py
===================================================================
--- gocept.zeoraid/trunk/src/gocept/zeoraid/scripts/controller.py 2009-11-14 12:08:16 UTC (rev 105642)
+++ gocept.zeoraid/trunk/src/gocept/zeoraid/scripts/controller.py 2009-11-14 14:15:34 UTC (rev 105643)
@@ -78,15 +78,29 @@
return STATUS_TO_NAGIOS[status]
def cmd_details(self):
+ col1 = 25
+
ok, recovering, failed, recovery_status = self.raid.raid_details()
- print "RAID status:"
- print "\t", self.raid.raid_status()
- print "Storage status:"
- print "\toptimal\t\t", ok
- print "\trecovering\t", recovering, recovery_status
- print "\tfailed\t\t", failed
- return STATUS_TO_NAGIOS[self.cmd_status()]
+ print " %s| Status" % ('%s:%s' % (self.host, self.port)).ljust(col1)
+ print " %s+-------------------" % ('-' * col1)
+ print " %s| %s" % (('RAID %s' % self.storage).ljust(col1),
+ self.raid.raid_status().ljust(col1))
+ print " %s+-------------------" % ('-' * col1)
+ storages = {}
+ for storage in ok:
+ storages[storage] = 'optimal'
+ for storage in failed:
+ storages[storage] = 'failed'
+ if recovering:
+ storages[recovering] = ('recovering: %s transaction %s' %
+ recovery_status)
+
+ for id in sorted(storages):
+ print ' %s| %s' % (str(id).ljust(col1), storages[id])
+
+ return STATUS_TO_NAGIOS[self.raid.raid_status()]
+
def cmd_recover(self, storage):
print self.raid.raid_recover(storage)
return NAGIOS_OK
Modified: gocept.zeoraid/trunk/src/gocept/zeoraid/storage.py
===================================================================
--- gocept.zeoraid/trunk/src/gocept/zeoraid/storage.py 2009-11-14 12:08:16 UTC (rev 105642)
+++ gocept.zeoraid/trunk/src/gocept/zeoraid/storage.py 2009-11-14 14:15:34 UTC (rev 105643)
@@ -40,6 +40,7 @@
def ensure_open_storage(method):
+
def check_open(self, *args, **kw):
if self.closed:
raise gocept.zeoraid.interfaces.RAIDClosedError(
@@ -49,6 +50,7 @@
def ensure_writable(method):
+
def check_writable(self, *args, **kw):
if self.isReadOnly():
raise ZODB.POSException.ReadOnlyError()
@@ -167,7 +169,7 @@
# Set up list of degraded storages
self.storages_degraded = []
# Degrade all remaining (non-optimal) storages
- for name in reduce(lambda x,y:x+y, tids.values(), []):
+ for name in reduce(lambda x, y: x + y, tids.values(), []):
self._degrade_storage(name)
# No storage is recovering initially
@@ -252,23 +254,27 @@
"""Allocate a new object id."""
self._write_lock.acquire()
try:
- oids = []
- for storage in self.storages_optimal[:]:
- reliable, oid = self.__apply_storage(storage, 'new_oid')
- if reliable:
- oids.append((oid, storage))
- if not oids:
- raise gocept.zeoraid.interfaces.RAIDError(
- "RAID storage is failed.")
-
- min_oid = sorted(oids)[0][0]
- for oid, storage in oids:
- if oid > min_oid:
- self._degrade_storage(storage)
- return min_oid
+ return self._new_oid()
finally:
self._write_lock.release()
+ def _new_oid(self):
+ # Not write-lock protected implementation of new_oid
+ oids = []
+ for storage in self.storages_optimal[:]:
+ reliable, oid = self.__apply_storage(storage, 'new_oid')
+ if reliable:
+ oids.append((oid, storage))
+ if not oids:
+ raise gocept.zeoraid.interfaces.RAIDError(
+ "RAID storage is failed.")
+
+ min_oid = sorted(oids)[0][0]
+ for oid, storage in oids:
+ if oid > min_oid:
+ self._degrade_storage(storage)
+ return min_oid
+
@ensure_writable
def pack(self, t, referencesf):
"""Pack the storage."""
@@ -607,7 +613,13 @@
'Cannot reload config without running inside ZEO.')
options = ZEOOptions()
- options.realize(['-C', self.zeo.options.configfile])
+ try:
+ options.realize(['-C', self.zeo.options.configfile])
+ except Exception, e:
+ raise RuntimeError(
+ 'Could not reload configuration file: '
+ 'please check your configuration.')
+
for candidate in options.storages:
if candidate.name == self.__name__:
storage = candidate
@@ -615,18 +627,33 @@
else:
raise RuntimeError(
'No storage section found for RAID %s.' % self.__name__)
- new_storages = dict((opt.name, opt)
- for opt in storage.config.storages)
- new_names = set(new_storages)
- old_names = set(self.openers)
+ configured_storages = dict(
+ (opt.name, opt) for opt in storage.config.storages)
- for name in old_names - new_names:
- self._close_storage(name)
+ configured_names = set(configured_storages.keys())
+ current_names = set(self.openers.keys())
- for name in new_names - old_names:
- self.openers[name] = new_storages[name]
+ added = configured_names - current_names
+ removed = current_names - configured_names
+
+ # Check whether we would remove all optimal storages. If that's so,
+ # then we do not perform the reconfiguration.
+ remaining_optimal = set(self.storages_optimal) - removed
+ if not remaining_optimal:
+ raise RuntimeError(
+ 'Cannot perform reconfiguration: '
+ 'all optimal storages would be removed.')
+
+ for name in added:
+ self.openers[name] = configured_storages[name]
self.storages_degraded.append(name)
+ for name in removed:
+ self._close_storage(name)
+ del self.openers[name]
+ if name in self.storages_degraded:
+ self.storages_degraded.remove(name)
+
# internal
def _open_storage(self, name):
@@ -638,11 +665,12 @@
def _close_storage(self, name):
if name in self.storages_optimal:
self.storages_optimal.remove(name)
- storage = self.storages.pop(name)
- t = threading.Thread(target=storage.close)
- self._threads.add(t)
- t.setDaemon(True)
- t.start()
+ if name in self.storages:
+ storage = self.storages.pop(name)
+ t = threading.Thread(target=storage.close)
+ self._threads.add(t)
+ t.setDaemon(True)
+ t.start()
def _degrade_storage(self, name, fail=True):
self._close_storage(name)
@@ -726,6 +754,7 @@
else:
# Provide a fallback if `args` is given as a simple tuple.
static_arguments = args
+
def dummy_generator():
while True:
yield static_arguments
@@ -814,33 +843,37 @@
def _finalize_recovery(self, storage):
self._write_lock.acquire()
try:
+ # Synchronize OIDs: check which one is further down giving out
+ # OIDs. Due to ZEO allocation massively at once this might also be
+ # the recovering storage. Give it exactly one try to catch up. If
+ # we miss the target, we simply degrade the recovering storage
+ # again.
+ max_optimal = self._new_oid()
+ max_recovering = self.storages[self.storage_recovering].new_oid()
+ if max_optimal != max_recovering:
+ if max_optimal > max_recovering:
+ target = max_optimal
+ catch_up = self.storages[self.storage_recovering].new_oid
+ else:
+ target = max_recovering
+ catch_up = self._new_oid
+
+ oid = None
+ while oid < target:
+ oid = catch_up()
+
+ if oid != target:
+ logging.warn(
+ 'Degrading recovering storage %r due to '
+ 'new OID mismatch' % self.storage_recovering)
+ self._degrade_storage(self.storage_recovering)
+ return
+
self.storages_optimal.append(self.storage_recovering)
- self._synchronise_oids()
self.storage_recovering = None
finally:
self._write_lock.release()
- def _synchronise_oids(self):
- # Try allocating the same OID from all storages. This is done by
- # determining the maximum and making all other storages increase
- # their OID until they hit the maximum. While any storage yields
- # an OID above the maximum, we try again with that value.
- max_oid = None
- lagging = self.storages_optimal[:]
- while lagging:
- storage = lagging.pop()
- while True:
- reliable, oid = self.__apply_storage(storage, 'new_oid')
- if not reliable:
- break
- if oid < max_oid:
- continue
- if oid > max_oid:
- max_oid = oid
- lagging = [s for s in self.storages_optimal
- if s != storage]
- break
-
def _new_tid(self, old_tid):
"""Generates a new TID."""
if old_tid is None:
Modified: gocept.zeoraid/trunk/src/gocept/zeoraid/tests/test_basics.py
===================================================================
--- gocept.zeoraid/trunk/src/gocept/zeoraid/tests/test_basics.py 2009-11-14 12:08:16 UTC (rev 105642)
+++ gocept.zeoraid/trunk/src/gocept/zeoraid/tests/test_basics.py 2009-11-14 14:15:34 UTC (rev 105643)
@@ -41,6 +41,7 @@
def fail(obj, name):
old_method = getattr(obj, name)
+
def failing_method(*args, **kw):
setattr(obj, name, old_method)
raise Exception()
@@ -790,6 +791,7 @@
open(blob_file_name, 'w').write('I am a happy blob.')
t = transaction.Transaction()
self._storage.tpc_begin(t)
+
def fail(*args, **kw):
raise Exception()
self._backend(0).storeBlob = fail
@@ -809,6 +811,7 @@
open(blob_file_name, 'w').write('I am a happy blob.')
t = transaction.Transaction()
self._storage.tpc_begin(t)
+
def fail(*args, **kw):
raise Exception()
self._backend(0).storeBlob = fail
@@ -932,6 +935,7 @@
storage.close()
def test_supportsUndo_required(self):
+
class Opener(object):
name = 'foo'
@@ -1182,6 +1186,7 @@
def test_timeoutBackend(self):
self._storage.timeout = 2
+
def slow_tpc_begin(*args):
time.sleep(4)
self._backend(0).tpc_begin = slow_tpc_begin
@@ -1198,6 +1203,7 @@
def test_blob_cache_cannot_link(self):
called_broken = []
+
def broken_link(foo, bar):
called_broken.append(True)
raise OSError
@@ -1215,6 +1221,7 @@
def test_blob_cache_locking(self):
return_value = []
+
def try_loadBlob():
return_value.append(self._storage.loadBlob(oid, tid))
@@ -1490,25 +1497,114 @@
s5.close()
- def test_reload_remove(self):
- storage = self._storages.pop(3).open()
+ def test_recover_same_storage(self):
+ # This is a somewhat convoluted example for accidentally connecting a
+ # new storage to the same ZEO server that is already used and trying
+ # to recover. This will almost recover but will cause the newly
+ # connected storage to be disconnected again because OIDs can't be
+ # synchronized.
+ removed_storage = self._storages.pop(3)
# configure the RAID to no longer use the removed backend
self.update_config()
self._storage.raid_reload()
- self.assertEquals('optimal', self._storage.raid_status())
+ self.assertEquals([['1', '0', '2', '4'], None, [], ''],
+ self._storage.raid_details())
+ # Re-insert the storage
+ self._storages.append(removed_storage)
+ self.update_config()
+ self._storage.raid_reload()
+ self.assertEquals([['1', '0', '2', '4'], None, ['3'], ''],
+ self._storage.raid_details())
+
+ self._storage.raid_recover('3')
+
+ while self._storage.raid_status() != 'degraded':
+ time.sleep(0.1)
+ # This is unobvious: Storage 4 fails because the _apply_all call for
+ # new_oid causes storage 4 to give an inconsistent result thus being
+ # dropped from the pool of good storages. Storage 3 however meets the
+ # OID target then thus being picked up.
+ self.assertEquals([['1', '0', '2', '3'], None, ['4'], ('recovered',)],
+ self._storage.raid_details())
+
+ def test_reload_remove_readd(self):
+ removed_storage = self._storages.pop()
+
+ # configure the RAID to no longer use the removed backend
+ self.update_config()
+ self._storage.raid_reload()
+ self.assertEquals([['1', '0', '3', '2'], None, [], ''],
+ self._storage.raid_details())
+
# ensure that we can still write to the RAID
oid = self._storage.new_oid()
self._dostore(oid=oid)
# ensure that the transaction did not arrive at the removed backend
- self.assertRaises(ZODB.POSException.POSKeyError, storage.load, oid)
+ s = removed_storage.open()
+ self.assertRaises(ZODB.POSException.POSKeyError, s.load, oid)
+ s.close()
- storage.close()
+ # Re-insert the storage
+ self._storages.append(removed_storage)
+ self.update_config()
+ self._storage.raid_reload()
+ self.assertEquals([['1', '0', '3', '2'], None, ['4'], ''],
+ self._storage.raid_details())
+ self._storage.raid_recover('4')
+ while self._storage.raid_status() != 'optimal':
+ time.sleep(0.1)
+
+ self.assertEquals(
+ [['1', '0', '3', '2', '4'], None, [], ('recovered',)],
+ self._storage.raid_details())
+
+ # Now, after recovery, the OID should be visible
+ s = removed_storage.open()
+ s.load(oid)
+ s.close()
+
+ def test_reload_remove_disabled(self):
+ removed_storage = self._storages.pop()
+ self._storage.raid_disable(removed_storage.name)
+
+ self.assertEquals([['1', '0', '3', '2'], None, ['4'], ''],
+ self._storage.raid_details())
+
+ # configure the RAID to no longer use the removed backend
+ self.update_config()
+
+ self._storage.raid_reload()
+ self.assertEquals([['1', '0', '3', '2'], None, [], ''],
+ self._storage.raid_details())
+
+ def test_reload_broken_config(self):
+ self.update_config()
+ f = open(self.zeo_configfile, 'w')
+ f.write('fdashkjfhdaskkf')
+ f.close()
+
+ self.assertRaises(RuntimeError, self._storage.raid_reload)
+ self.assertEquals([['1', '0', '3', '2', '4'], None, [], ''],
+ self._storage.raid_details())
+
+ def test_reload_no_remaining_storages(self):
+ remove = self._storages[:-1]
+ self._storages = self._storages[-1:]
+ self._storage.raid_disable(self._storages[0].name)
+
+ self.update_config()
+ self.assertRaises(RuntimeError, self._storage.raid_reload)
+ self.assertEquals([['1', '0', '3', '2'], None, ['4'], ''],
+ self._storage.raid_details())
+
+
def test_suite():
+
suite = unittest.TestSuite()
suite.addTest(unittest.makeSuite(ZEOReplicationStorageTests, "check"))
suite.addTest(unittest.makeSuite(FailingStorageTests))
More information about the checkins
mailing list