[Checkins] SVN: gocept.zeoraid/trunk/src/gocept/zeoraid/ - Removed
`log_store` functionality for now. Recovery will be rewritten soon.
Christian Theune
ct at gocept.com
Thu Jan 17 07:21:39 EST 2008
Log message for revision 82932:
- Removed `log_store` functionality for now. Recovery will be rewritten soon.
- tested and cleaned up tpc_* methods
- removed unnecessary cleanup method
Changed:
U gocept.zeoraid/trunk/src/gocept/zeoraid/storage.py
U gocept.zeoraid/trunk/src/gocept/zeoraid/tests/test_basics.py
-=-
Modified: gocept.zeoraid/trunk/src/gocept/zeoraid/storage.py
===================================================================
--- gocept.zeoraid/trunk/src/gocept/zeoraid/storage.py 2008-01-17 06:18:09 UTC (rev 82931)
+++ gocept.zeoraid/trunk/src/gocept/zeoraid/storage.py 2008-01-17 12:21:39 UTC (rev 82932)
@@ -97,12 +97,6 @@
# we bring them back into the pool of optimal storages.
_db = None
- # This flag signals whether any `store` operation should be logged. This
- # is necessary to support the two-phase recovery process. It is set to
- # `true` when a recovery starts and set back to `false` when it is
- # finished.
- _log_stores = False
-
# The last transaction that we know of. This is used to keep a global
# knowledge of the current assumed state and verify storages that might
# have fallen out of sync. It is also used as a point of reference
@@ -117,8 +111,7 @@
# Allocate locks
# The write lock must be acquired when:
# a) performing write operations on the backends
- # b) reading or writing log_stores
- # c) writing _transaction
+ # b) writing _transaction
self._write_lock = threading.RLock()
# The commit lock must be acquired when setting _transaction, and
# released when unsetting _transaction.
@@ -265,26 +258,17 @@
try:
self._apply_all_storages('store',
(oid, oldserial, data, '', transaction))
- if self._log_stores:
- oids = self._unrecovered_transactions.setdefault(self._tid, [])
- oids.append(oid)
return self._tid
finally:
self._write_lock.release()
- # XXX
def tpc_abort(self, transaction):
+ """Abort the two-phase commit."""
self._write_lock.acquire()
try:
if transaction is not self._transaction:
return
try:
- # XXX Edge cases for the log_store abort ...
- if self._log_stores:
- # We may have logged some stores within that transaction
- # which we have to remove again because we aborted it.
- if self._tid in self._unrecovered_transactions:
- del self._unrecovered_transactions[self._tid]
self._apply_all_storages('tpc_abort', (transaction,))
self._transaction = None
finally:
@@ -292,34 +276,25 @@
finally:
self._write_lock.release()
- # XXX
@ensure_writable
def tpc_begin(self, transaction, tid=None, status=' '):
+ """Begin the two-phase commit process."""
self._write_lock.acquire()
try:
if self._transaction is transaction:
+ # It is valid that tpc_begin is called multiple times with
+ # the same transaction and is silently ignored.
return
+
+ # Release and re-acquire to avoid dead-locks. commit_lock is a
+ # long-term lock whereas write_lock is a short-term lock. Acquire
+ # the long-term lock first.
self._write_lock.release()
self._commit_lock.acquire()
self._write_lock.acquire()
- # I don't understand the lock that protects _transaction. The commit
- # lock and status will be deduced by the underlying storages.
-
self._transaction = transaction
- # Remove storages that aren't on the same last tid anymore (this happens
- # if a storage disconnects
- for name in self.storages_optimal:
- storage = self.storages[name]
- try:
- last_tid = storage.lastTransaction()
- except ZEO.ClientStorage.ClientDisconnected:
- self._degrade_storage(name, fail=False)
- continue
- if last_tid != self._last_tid:
- self._degrade_storage(name)
-
if tid is None:
# No TID was given, so we create a new one.
tid = self._new_tid(self._last_tid)
@@ -330,16 +305,23 @@
finally:
self._write_lock.release()
- # XXX
def tpc_finish(self, transaction, callback=None):
+ """Finish the transaction, making any transaction changes permanent.
+ """
self._write_lock.acquire()
try:
if transaction is not self._transaction:
return
try:
+ self._apply_all_storages('tpc_finish', (transaction,))
if callback is not None:
+ # This callback is relevant for processing invalidations
+ # at transaction boundaries.
+ # XXX It is somewhat unclear whether this should be done
+ # before or after calling tpc_finish. BaseStorage and
+ # ClientStorage contradict each other and the documentation
+ # is non-existent. We trust ClientStorage here.
callback(self._tid)
- self._apply_all_storages('tpc_finish', (transaction,))
self._last_tid = self._tid
return self._tid
finally:
@@ -348,8 +330,8 @@
finally:
self._write_lock.release()
- # XXX
def tpc_vote(self, transaction):
+ """Provide a storage with an opportunity to veto a transaction."""
self._write_lock.acquire()
try:
if transaction is not self._transaction:
@@ -358,12 +340,6 @@
finally:
self._write_lock.release()
- def cleanup(self):
- # XXX This is not actually documented, it's not implemented in all
- # storages, it's not even clear when it should be called. Not
- # correctly calling storages' cleanup might leave turds.
- pass
-
def supportsVersions(self):
return False
Modified: gocept.zeoraid/trunk/src/gocept/zeoraid/tests/test_basics.py
===================================================================
--- gocept.zeoraid/trunk/src/gocept/zeoraid/tests/test_basics.py 2008-01-17 06:18:09 UTC (rev 82931)
+++ gocept.zeoraid/trunk/src/gocept/zeoraid/tests/test_basics.py 2008-01-17 12:21:39 UTC (rev 82932)
@@ -11,6 +11,7 @@
import zope.interface.verify
+import transaction
from ZODB.tests import StorageTestBase, BasicStorage, \
TransactionalUndoStorage, PackableStorage, \
Synchronization, ConflictResolution, HistoryStorage, \
@@ -569,7 +570,68 @@
oid=oid, revid=revid, data='bar')
self.assertEquals('failed', self._storage.raid_status())
+ def test_tpc_begin_degrading(self):
+ self._backend(0).fail('tpc_begin')
+ oid = self._storage.new_oid()
+ self._dostoreNP(oid=oid, data='foo')
+ self.assertEquals('foo', self._backend(0).load(oid)[0])
+ self.assertEquals('foo', self._storage.load(oid)[0])
+ self.assertEquals('degraded', self._storage.raid_status())
+ oid = self._storage.new_oid()
+ self._backend(0).fail('tpc_begin')
+ self.assertRaises(gocept.zeoraid.interfaces.RAIDError,
+ self._dostoreNP,
+ oid=oid, data='bar')
+ self.assertEquals('failed', self._storage.raid_status())
+
+ def test_tpc_vote_degrading(self):
+ self._backend(0).fail('tpc_vote')
+ oid = self._storage.new_oid()
+ self._dostoreNP(oid=oid, data='foo')
+ self.assertEquals('foo', self._backend(0).load(oid)[0])
+ self.assertEquals('foo', self._storage.load(oid)[0])
+ self.assertEquals('degraded', self._storage.raid_status())
+
+ oid = self._storage.new_oid()
+ self._backend(0).fail('tpc_vote')
+ self.assertRaises(gocept.zeoraid.interfaces.RAIDError,
+ self._dostoreNP,
+ oid=oid, data='bar')
+ self.assertEquals('failed', self._storage.raid_status())
+
+ def test_tpc_finish_degrading(self):
+ self._backend(0).fail('tpc_finish')
+ oid = self._storage.new_oid()
+ self._dostoreNP(oid=oid, data='foo')
+ self.assertEquals('foo', self._backend(0).load(oid)[0])
+ self.assertEquals('foo', self._storage.load(oid)[0])
+ self.assertEquals('degraded', self._storage.raid_status())
+
+ oid = self._storage.new_oid()
+ self._backend(0).fail('tpc_finish')
+ self.assertRaises(gocept.zeoraid.interfaces.RAIDError,
+ self._dostoreNP,
+ oid=oid, data='bar')
+ self.assertEquals('failed', self._storage.raid_status())
+
+ def test_tpc_abort_not_degrading(self):
+ # tpc_abort (in combination with ClientStorage) will never cause
+ # degradation, even if it raises an exception.
+ # This is because of an asynchronous call made by ClientStorage.
+ # For us this is ok. If there really is something wrong with the
+ # storage, we'll know in the next synchronous call.
+ self._backend(0).fail('tpc_abort')
+ t = transaction.Transaction()
+ self._storage.tpc_begin(t)
+ self._storage.tpc_abort(t)
+ # tpc_abort is asynchronous. We make another synchronous call to make
+ # sure that it was already executed.
+ t = transaction.Transaction()
+ self._storage.tpc_begin(t)
+ self.assertEquals('optimal', self._storage.raid_status())
+
+
class ZEOReplicationStorageTests(ZEOStorageBackendTests,
ReplicationStorageTests,
ThreadTests.ThreadTests):
More information about the Checkins
mailing list