[Checkins] SVN: gocept.zeoraid/trunk/src/gocept/zeoraid/storage.py
Cleanup of __init__, refactored TID generation.
Christian Theune
ct at gocept.com
Thu Jan 10 04:22:54 EST 2008
Log message for revision 82776:
Cleanup of __init__, refactored TID generation.
Changed:
U gocept.zeoraid/trunk/src/gocept/zeoraid/storage.py
-=-
Modified: gocept.zeoraid/trunk/src/gocept/zeoraid/storage.py
===================================================================
--- gocept.zeoraid/trunk/src/gocept/zeoraid/storage.py 2008-01-10 06:55:04 UTC (rev 82775)
+++ gocept.zeoraid/trunk/src/gocept/zeoraid/storage.py 2008-01-10 09:22:54 UTC (rev 82776)
@@ -41,13 +41,25 @@
closed = False
_transaction = None
+ # This flag signals whether any `store` operation should be logged. This
+ # is necessary to support the two-phase recovery process. It is set to
+ # `true` when a recovery starts and set back to `false` when it is
+ # finished.
+ _log_stores = False
+
+ # The last transaction that we know of. This is used to keep a global
+ # knowledge of the current assumed state and verify storages that might
+ # have fallen out of sync. It is also used as a point of reference
+ # for generating new TIDs.
+ _last_tid = None
+
def __init__(self, name, openers, read_only=False):
self.__name__ = name
self.read_only = read_only
self.storages = {}
- self._log_stores = False
# Allocate locks
+ # XXX document locks
l = threading.RLock()
self._lock_acquire = l.acquire
self._lock_release = l.release
@@ -55,50 +67,40 @@
self._commit_lock_acquire = l.acquire
self._commit_lock_release = l.release
- # Remember the openers to for recovering a storage later
- self.openers = {}
- # Open the storages
- for opener in openers:
- self.openers[opener.name] = opener
- self._open_storage(opener.name)
+ # Remember the openers so closed storages can be re-opened as needed.
+ self.openers = dict((opener.name, opener) for opener in openers)
- self.storages_optimal = []
- self.storages_degraded = []
- self.storages_recovering = []
+ for name in self.openers:
+ self._open_storage(name)
+ # Evaluate the consistency of the opened storages. We compare the last
+ # known TIDs of all storages. All storages whose TID equals the newest
+ # of these TIDs are considered optimal.
tids = {}
for name, storage in self.storages.items():
try:
tid = storage.lastTransaction()
- except ZEO.ClientStorage.ClientDisconnected:
- self._degrade_storage(name, fail=False)
+ except StorageDegraded:
continue
- if tid is None:
- # Not connected yet.
- # XXX or empty ...
- self._degrade_storage(name, fail=False)
- continue
- s = tids.setdefault(tid, [])
- s.append(name)
-
- self._unrecovered_transactions = {}
- self._last_tid = None
+ tids.setdefault(tid, [])
+ tids[tid].append(name)
- # Activate all optimal storages
- if tids:
- self._last_tid = max(tids.keys())
- self.storages_optimal.extend(tids[self._last_tid])
- del tids[self._last_tid]
+ if not tids:
+ # No storage is working.
+ raise gocept.zeoraid.interfaces.RAIDError(
+ "Can't start without at least one working storage.")
- # Deactive all degraded storages
- for degraded_storages in tids.values():
- self.storages_degraded.extend(degraded_storages)
+ # Set up list of optimal storages
+ self._last_tid = max(tids)
+ self.storages_optimal = tids.pop(self._last_tid)
- t = time.time()
- self.ts = persistent.TimeStamp.TimeStamp(*(time.gmtime(t)[:5] + (t%60,)))
+ # Set up list of degraded storages
+ self.storages_degraded = []
+ for degraded_storages in tids.values():
+ self.storages_degraded.extend(degraded_storages)
- if not self.storages_optimal:
- raise gocept.zeoraid.interfaces.RAIDError("Can't start without at least one optimal storage.")
+ # No storages are recovering initially
+ self.storages_recovering = []
# IStorage
@@ -246,15 +248,10 @@
if last_tid != self._last_tid:
self._degrade_storage(name)
- # Create a common tid for all storages if we don't have one yet.
if tid is None:
- now = time.time()
- t = persistent.TimeStamp.TimeStamp(*(time.gmtime(now)[:5] + (now % 60,)))
- self.ts = t.laterThan(self.ts)
- self._tid = repr(self.ts)
- else:
- self._ts = persistent.TimeStamp.TimeStamp(tid)
- self._tid = tid
+ # No TID was given, so we create a new one.
+ tid = self._new_tid(self._last_tid)
+ self._tid = tid
self._apply_all_storages('tpc_begin', transaction, self._tid, status)
finally:
@@ -614,3 +611,12 @@
storage.store(oid, oldserial, data, '', t)
storage.tpc_vote(t)
storage.tpc_finish(t)
+
+ def _new_tid(self, old_tid):
+ """Generates a new TID."""
+ old_ts = persistent.TimeStamp.TimeStamp(old_tid)
+ now = time.time()
+ new_ts = persistent.TimeStamp.TimeStamp(
+ *(time.gmtime(now)[:5] + (now % 60,)))
+ new_ts = new_ts.laterThan(old_ts)
+ return repr(new_ts)
More information about the Checkins
mailing list