[Checkins] SVN: gocept.zeoraid/trunk/src/gocept/zeoraid/storage.py Cleanup of __init__, refactored TID generation.

Christian Theune ct at gocept.com
Thu Jan 10 04:22:54 EST 2008


Log message for revision 82776:
  Cleanup of __init__, refactored TID generation.
  

Changed:
  U   gocept.zeoraid/trunk/src/gocept/zeoraid/storage.py

-=-
Modified: gocept.zeoraid/trunk/src/gocept/zeoraid/storage.py
===================================================================
--- gocept.zeoraid/trunk/src/gocept/zeoraid/storage.py	2008-01-10 06:55:04 UTC (rev 82775)
+++ gocept.zeoraid/trunk/src/gocept/zeoraid/storage.py	2008-01-10 09:22:54 UTC (rev 82776)
@@ -41,13 +41,25 @@
     closed = False
     _transaction = None
 
+    # This flag signals whether any `store` operation should be logged. This
+    # is necessary to support the two-phase recovery process. It is set to
+    # `true` when a recovery starts and set back to `false` when it is
+    # finished.
+    _log_stores = False
+
+    # The last transaction that we know of. This is used to keep a global
+    # knowledge of the current assumed state and verify storages that might
+    # have fallen out of sync. It is also used as a point of reference
+    # for generating new TIDs.
+    _last_tid = None
+
     def __init__(self, name, openers, read_only=False):
         self.__name__ = name
         self.read_only = read_only
         self.storages = {}
-        self._log_stores = False
 
         # Allocate locks
+        # XXX document locks
         l = threading.RLock()
         self._lock_acquire = l.acquire
         self._lock_release = l.release
@@ -55,50 +67,40 @@
         self._commit_lock_acquire = l.acquire
         self._commit_lock_release = l.release
 
-        # Remember the openers to for recovering a storage later
-        self.openers = {}
-        # Open the storages
-        for opener in openers:
-            self.openers[opener.name] = opener
-            self._open_storage(opener.name)
+        # Remember the openers so closed storages can be re-opened as needed.
+        self.openers = dict((opener.name, opener) for opener in openers)
 
-        self.storages_optimal = []
-        self.storages_degraded = []
-        self.storages_recovering = []
+        for name in self.openers:
+            self._open_storage(name)
 
+        # Evaluate the consistency of the opened storages. We compare the last
+        # known TIDs of all storages. All storages whose TID equals the newest
+        # of these TIDs are considered optimal.
         tids = {}
         for name, storage in self.storages.items():
             try:
                 tid = storage.lastTransaction()
-            except ZEO.ClientStorage.ClientDisconnected:
-                self._degrade_storage(name, fail=False)
+            except StorageDegraded:
                 continue
-            if tid is None:
-                # Not connected yet.
-                # XXX or empty ...
-                self._degrade_storage(name, fail=False)
-                continue
-            s = tids.setdefault(tid, [])
-            s.append(name)
- 
-        self._unrecovered_transactions = {}
-        self._last_tid = None
+            tids.setdefault(tid, [])
+            tids[tid].append(name)
 
-        # Activate all optimal storages
-        if tids:
-            self._last_tid = max(tids.keys())
-            self.storages_optimal.extend(tids[self._last_tid])
-            del tids[self._last_tid]
+        if not tids:
+            # No storage is working.
+            raise gocept.zeoraid.interfaces.RAIDError(
+                "Can't start without at least one working storage.")
 
-            # Deactive all degraded storages
-            for degraded_storages in tids.values():
-                self.storages_degraded.extend(degraded_storages)
+        # Set up list of optimal storages
+        self._last_tid = max(tids)
+        self.storages_optimal = tids.pop(self._last_tid)
 
-        t = time.time()
-        self.ts = persistent.TimeStamp.TimeStamp(*(time.gmtime(t)[:5] + (t%60,)))
+        # Set up list of degraded storages
+        self.storages_degraded = []
+        for degraded_storages in tids.values():
+            self.storages_degraded.extend(degraded_storages)
 
-        if not self.storages_optimal:
-            raise gocept.zeoraid.interfaces.RAIDError("Can't start without at least one optimal storage.")
+        # No storages are recovering initially
+        self.storages_recovering = []
 
     # IStorage
 
@@ -246,15 +248,10 @@
                 if last_tid != self._last_tid:
                     self._degrade_storage(name)
 
-            # Create a common tid for all storages if we don't have one yet.
             if tid is None:
-                now = time.time()
-                t = persistent.TimeStamp.TimeStamp(*(time.gmtime(now)[:5] + (now % 60,)))
-                self.ts = t.laterThan(self.ts)
-                self._tid = repr(self.ts)
-            else:
-                self._ts = persistent.TimeStamp.TimeStamp(tid)
-                self._tid = tid
+                # No TID was given, so we create a new one.
+                tid = self._new_tid(self._last_tid)
+            self._tid = tid
 
             self._apply_all_storages('tpc_begin', transaction, self._tid, status)
         finally:
@@ -614,3 +611,12 @@
             storage.store(oid, oldserial, data, '', t)
             storage.tpc_vote(t)
             storage.tpc_finish(t)
+
+    def _new_tid(self, old_tid):
+        """Generates a new TID."""
+        old_ts = persistent.TimeStamp.TimeStamp(old_tid)
+        now = time.time()
+        new_ts = persistent.TimeStamp.TimeStamp(
+            *(time.gmtime(now)[:5] + (now % 60,)))
+        new_ts = new_ts.laterThan(old_ts)
+        return repr(new_ts)



More information about the Checkins mailing list