[Checkins] SVN: gocept.zeoraid/trunk/src/gocept/zeoraid/ - Removed `log_store` functionality for now. Recovery will be rewritten soon.

Christian Theune ct at gocept.com
Thu Jan 17 07:21:39 EST 2008


Log message for revision 82932:
  - Removed `log_store` functionality for now. Recovery will be rewritten soon.
  - tested and cleaned up tpc_* methods
  - removed unnecessary cleanup method
  

Changed:
  U   gocept.zeoraid/trunk/src/gocept/zeoraid/storage.py
  U   gocept.zeoraid/trunk/src/gocept/zeoraid/tests/test_basics.py

-=-
Modified: gocept.zeoraid/trunk/src/gocept/zeoraid/storage.py
===================================================================
--- gocept.zeoraid/trunk/src/gocept/zeoraid/storage.py	2008-01-17 06:18:09 UTC (rev 82931)
+++ gocept.zeoraid/trunk/src/gocept/zeoraid/storage.py	2008-01-17 12:21:39 UTC (rev 82932)
@@ -97,12 +97,6 @@
     # we bring them back into the pool of optimal storages.
     _db = None
 
-    # This flag signals whether any `store` operation should be logged. This
-    # is necessary to support the two-phase recovery process. It is set to
-    # `true` when a recovery starts and set back to `false` when it is
-    # finished.
-    _log_stores = False
-
     # The last transaction that we know of. This is used to keep a global
     # knowledge of the current assumed state and verify storages that might
     # have fallen out of sync. It is also used as a point of reference
@@ -117,8 +111,7 @@
         # Allocate locks
         # The write lock must be acquired when:
         # a) performing write operations on the backends
-        # b) reading or writing log_stores
-        # c) writing _transaction
+        # b) writing _transaction
         self._write_lock = threading.RLock()
         # The commit lock must be acquired when setting _transaction, and
         # released when unsetting _transaction.
@@ -265,26 +258,17 @@
         try:
             self._apply_all_storages('store',
                                      (oid, oldserial, data, '', transaction))
-            if self._log_stores:
-                oids = self._unrecovered_transactions.setdefault(self._tid, [])
-                oids.append(oid)
             return self._tid
         finally:
             self._write_lock.release()
 
-    # XXX
     def tpc_abort(self, transaction):
+        """Abort the two-phase commit."""
         self._write_lock.acquire()
         try:
             if transaction is not self._transaction:
                 return
             try:
-                # XXX Edge cases for the log_store abort ...
-                if self._log_stores:
-                    # We may have logged some stores within that transaction
-                    # which we have to remove again because we aborted it.
-                    if self._tid in self._unrecovered_transactions:
-                        del self._unrecovered_transactions[self._tid]
                 self._apply_all_storages('tpc_abort', (transaction,))
                 self._transaction = None
             finally:
@@ -292,34 +276,25 @@
         finally:
             self._write_lock.release()
 
-    # XXX
     @ensure_writable
     def tpc_begin(self, transaction, tid=None, status=' '):
+        """Begin the two-phase commit process."""
         self._write_lock.acquire()
         try:
             if self._transaction is transaction:
+                # It is valid that tpc_begin is called multiple times with
+                # the same transaction and is silently ignored.
                 return
+
+            # Release and re-acquire to avoid dead-locks. commit_lock is a
+            # long-term lock whereas write_lock is a short-term lock. Acquire
+            # the long-term lock first.
             self._write_lock.release()
             self._commit_lock.acquire()
             self._write_lock.acquire()
 
-            # I don't understand the lock that protects _transaction.  The commit
-            # lock and status will be deduced by the underlying storages.
-
             self._transaction = transaction
 
-            # Remove storages that aren't on the same last tid anymore (this happens 
-            # if a storage disconnects
-            for name in self.storages_optimal:
-                storage = self.storages[name]
-                try:
-                    last_tid = storage.lastTransaction()
-                except ZEO.ClientStorage.ClientDisconnected:
-                    self._degrade_storage(name, fail=False)
-                    continue
-                if last_tid != self._last_tid:
-                    self._degrade_storage(name)
-
             if tid is None:
                 # No TID was given, so we create a new one.
                 tid = self._new_tid(self._last_tid)
@@ -330,16 +305,23 @@
         finally:
             self._write_lock.release()
 
-    # XXX
     def tpc_finish(self, transaction, callback=None):
+        """Finish the transaction, making any transaction changes permanent.
+        """
         self._write_lock.acquire()
         try:
             if transaction is not self._transaction:
                 return
             try:
+                self._apply_all_storages('tpc_finish', (transaction,))
                 if callback is not None:
+                    # This callback is relevant for processing invalidations
+                    # at transaction boundaries.
+                    # XXX It is somewhat unclear whether this should be done
+                    # before or after calling tpc_finish. BaseStorage and
+                    # ClientStorage contradict each other and the documentation
+                    # is non-existent. We trust ClientStorage here.
                     callback(self._tid)
-                self._apply_all_storages('tpc_finish', (transaction,))
                 self._last_tid = self._tid
                 return self._tid
             finally:
@@ -348,8 +330,8 @@
         finally:
             self._write_lock.release()
 
-    # XXX
     def tpc_vote(self, transaction):
+        """Provide a storage with an opportunity to veto a transaction."""
         self._write_lock.acquire()
         try:
             if transaction is not self._transaction:
@@ -358,12 +340,6 @@
         finally:
             self._write_lock.release()
 
-    def cleanup(self):
-        # XXX This is not actually documented, it's not implemented in all
-        # storages, it's not even clear when it should be called. Not
-        # correctly calling storages' cleanup might leave turds.
-        pass
-
     def supportsVersions(self):
         return False
 

Modified: gocept.zeoraid/trunk/src/gocept/zeoraid/tests/test_basics.py
===================================================================
--- gocept.zeoraid/trunk/src/gocept/zeoraid/tests/test_basics.py	2008-01-17 06:18:09 UTC (rev 82931)
+++ gocept.zeoraid/trunk/src/gocept/zeoraid/tests/test_basics.py	2008-01-17 12:21:39 UTC (rev 82932)
@@ -11,6 +11,7 @@
 
 import zope.interface.verify
 
+import transaction
 from ZODB.tests import StorageTestBase, BasicStorage, \
              TransactionalUndoStorage, PackableStorage, \
              Synchronization, ConflictResolution, HistoryStorage, \
@@ -569,7 +570,68 @@
                           oid=oid, revid=revid, data='bar')
         self.assertEquals('failed', self._storage.raid_status())
 
+    def test_tpc_begin_degrading(self):
+        self._backend(0).fail('tpc_begin')
+        oid = self._storage.new_oid()
+        self._dostoreNP(oid=oid, data='foo')
+        self.assertEquals('foo', self._backend(0).load(oid)[0])
+        self.assertEquals('foo', self._storage.load(oid)[0])
+        self.assertEquals('degraded', self._storage.raid_status())
 
+        oid = self._storage.new_oid()
+        self._backend(0).fail('tpc_begin')
+        self.assertRaises(gocept.zeoraid.interfaces.RAIDError,
+                          self._dostoreNP,
+                          oid=oid, data='bar')
+        self.assertEquals('failed', self._storage.raid_status())
+
+    def test_tpc_vote_degrading(self):
+        self._backend(0).fail('tpc_vote')
+        oid = self._storage.new_oid()
+        self._dostoreNP(oid=oid, data='foo')
+        self.assertEquals('foo', self._backend(0).load(oid)[0])
+        self.assertEquals('foo', self._storage.load(oid)[0])
+        self.assertEquals('degraded', self._storage.raid_status())
+
+        oid = self._storage.new_oid()
+        self._backend(0).fail('tpc_vote')
+        self.assertRaises(gocept.zeoraid.interfaces.RAIDError,
+                          self._dostoreNP,
+                          oid=oid, data='bar')
+        self.assertEquals('failed', self._storage.raid_status())
+
+    def test_tpc_finish_degrading(self):
+        self._backend(0).fail('tpc_finish')
+        oid = self._storage.new_oid()
+        self._dostoreNP(oid=oid, data='foo')
+        self.assertEquals('foo', self._backend(0).load(oid)[0])
+        self.assertEquals('foo', self._storage.load(oid)[0])
+        self.assertEquals('degraded', self._storage.raid_status())
+
+        oid = self._storage.new_oid()
+        self._backend(0).fail('tpc_finish')
+        self.assertRaises(gocept.zeoraid.interfaces.RAIDError,
+                          self._dostoreNP,
+                          oid=oid, data='bar')
+        self.assertEquals('failed', self._storage.raid_status())
+
+    def test_tpc_abort_not_degrading(self):
+        # tpc_abort (in combination with ClientStorage) will never cause
+        # degradation, even if it raises an exception.
+        # This is because of an asynchronous call made by ClientStorage.
+        # For us this is ok. If there really is something wrong with the
+        # storage, we'll know in the next synchronous call.
+        self._backend(0).fail('tpc_abort')
+        t = transaction.Transaction()
+        self._storage.tpc_begin(t)
+        self._storage.tpc_abort(t)
+        # tpc_abort is asynchronous. We make another synchronous call to make
+        # sure that it was already executed.
+        t = transaction.Transaction()
+        self._storage.tpc_begin(t)
+        self.assertEquals('optimal', self._storage.raid_status())
+
+
 class ZEOReplicationStorageTests(ZEOStorageBackendTests,
                                  ReplicationStorageTests,
                                  ThreadTests.ThreadTests):



More information about the Checkins mailing list