[Checkins] SVN: gocept.zeoraid/trunk/ checkpoint: refactoring to introduce cluster modes
Thomas Lotze
tl at gocept.com
Tue Sep 28 09:56:10 EDT 2010
Log message for revision 117004:
checkpoint: refactoring to introduce cluster modes
Changed:
U gocept.zeoraid/trunk/setup.py
U gocept.zeoraid/trunk/src/gocept/zeoraid/component.xml
U gocept.zeoraid/trunk/src/gocept/zeoraid/datatypes.py
U gocept.zeoraid/trunk/src/gocept/zeoraid/scripts/recipe.txt
U gocept.zeoraid/trunk/src/gocept/zeoraid/scripts/tests.py
U gocept.zeoraid/trunk/src/gocept/zeoraid/storage.py
U gocept.zeoraid/trunk/src/gocept/zeoraid/tests/test_basics.py
U gocept.zeoraid/trunk/versions.cfg
-=-
Modified: gocept.zeoraid/trunk/setup.py
===================================================================
--- gocept.zeoraid/trunk/setup.py 2010-09-28 12:36:52 UTC (rev 117003)
+++ gocept.zeoraid/trunk/setup.py 2010-09-28 13:56:10 UTC (rev 117004)
@@ -33,9 +33,12 @@
include_package_data=True,
package_dir={'': 'src'},
namespace_packages=['gocept'],
- install_requires=['setuptools',
- 'zc.zodbrecipes',
- 'ZODB3>=3.9dev'],
+ install_requires=[
+ 'mock',
+ 'setuptools',
+ 'zc.zodbrecipes',
+ 'ZODB3>=3.9dev',
+ ],
entry_points="""
[zc.buildout]
server = gocept.zeoraid.scripts.recipe:ZEORAIDServer
Modified: gocept.zeoraid/trunk/src/gocept/zeoraid/component.xml
===================================================================
--- gocept.zeoraid/trunk/src/gocept/zeoraid/component.xml 2010-09-28 12:36:52 UTC (rev 117003)
+++ gocept.zeoraid/trunk/src/gocept/zeoraid/component.xml 2010-09-28 13:56:10 UTC (rev 117004)
@@ -54,6 +54,15 @@
</description>
</key>
+ <key name="cluster-mode" required="no"
+ datatype=".cluster_mode" default="coop">
+ <description>
+ Whether the RAID server expects to be the only RAID server in the
+ set-up ("single") or to expect other RAID servers to use the same
+ back-end storages ("coop").
+ </description>
+ </key>
+
<multisection
type="ZODB.storage"
name="+"
Modified: gocept.zeoraid/trunk/src/gocept/zeoraid/datatypes.py
===================================================================
--- gocept.zeoraid/trunk/src/gocept/zeoraid/datatypes.py 2010-09-28 12:36:52 UTC (rev 117003)
+++ gocept.zeoraid/trunk/src/gocept/zeoraid/datatypes.py 2010-09-28 13:56:10 UTC (rev 117004)
@@ -20,6 +20,13 @@
import gocept.zeoraid.storage
+def cluster_mode(value):
+ if value not in ('single', 'coop'):
+ raise ValueError(
+ "Only valid cluster modes: 'single', 'coop', found %r" % value)
+ return value
+
+
class Storage(ZODB.config.BaseConfig):
def open(self):
@@ -35,5 +42,6 @@
self.config.storages,
blob_dir=self.config.blob_dir,
read_only=self.config.read_only,
+ cluster_mode=self.config.cluster_mode,
shared_blob_dir=self.config.shared_blob_dir,
zeo=zeo)
Modified: gocept.zeoraid/trunk/src/gocept/zeoraid/scripts/recipe.txt
===================================================================
--- gocept.zeoraid/trunk/src/gocept/zeoraid/scripts/recipe.txt 2010-09-28 12:36:52 UTC (rev 117003)
+++ gocept.zeoraid/trunk/src/gocept/zeoraid/scripts/recipe.txt 2010-09-28 13:56:10 UTC (rev 117004)
@@ -146,6 +146,7 @@
'/sample-pyN.N.egg',
'/sample-pyN.N.egg',
'/sample-pyN.N.egg',
+ '/sample-pyN.N.egg',
]
<BLANKLINE>
import gocept.zeoraid.scripts.controller
@@ -173,6 +174,7 @@
'/sample-pyN.N.egg',
'/sample-pyN.N.egg',
'/sample-pyN.N.egg',
+ '/sample-pyN.N.egg',
]
<BLANKLINE>
import gocept.zeoraid.scripts.controller
@@ -273,6 +275,7 @@
'/sample-pyN.N.egg',
'/sample-pyN.N.egg',
'/sample-pyN.N.egg',
+ '/sample-pyN.N.egg',
]
<BLANKLINE>
import gocept.zeoraid.scripts.controller
Modified: gocept.zeoraid/trunk/src/gocept/zeoraid/scripts/tests.py
===================================================================
--- gocept.zeoraid/trunk/src/gocept/zeoraid/scripts/tests.py 2010-09-28 12:36:52 UTC (rev 117003)
+++ gocept.zeoraid/trunk/src/gocept/zeoraid/scripts/tests.py 2010-09-28 13:56:10 UTC (rev 117004)
@@ -22,6 +22,7 @@
def setUp(test):
zc.buildout.testing.buildoutSetUp(test)
zc.buildout.testing.install_develop('gocept.zeoraid', test)
+ zc.buildout.testing.install('mock', test)
zc.buildout.testing.install('zc.lockfile', test)
zc.buildout.testing.install('zc.zodbrecipes', test)
zc.buildout.testing.install('zope.event', test)
Modified: gocept.zeoraid/trunk/src/gocept/zeoraid/storage.py
===================================================================
--- gocept.zeoraid/trunk/src/gocept/zeoraid/storage.py 2010-09-28 12:36:52 UTC (rev 117003)
+++ gocept.zeoraid/trunk/src/gocept/zeoraid/storage.py 2010-09-28 13:56:10 UTC (rev 117004)
@@ -1,6 +1,6 @@
##############################################################################
#
-# Copyright (c) 2007-2008 Zope Foundation and Contributors.
+# Copyright (c) 2007-2010 Zope Foundation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
@@ -72,11 +72,11 @@
self.args = args
self.kw = kw
self.expect_connected = expect_connected
- self.__apply_storage = apply_storage
+ self._apply_storage = apply_storage
def run(self):
try:
- self.reliable, self.result = self.__apply_storage(
+ self.reliable, self.result = self._apply_storage(
self.storage_name, self.method_name, self.args,
self.kw, self.expect_connected)
except Exception, e:
@@ -111,12 +111,13 @@
_db = None
# Timeout for threaded/parallel operations on backend storages.
- timeout = 60
+ timeout = 6000
- def __init__(self, name, openers, read_only=False, blob_dir=None,
- shared_blob_dir=False, zeo=None):
+ def __init__(self, name, openers, read_only=False, cluster_mode='coop',
+ blob_dir=None, shared_blob_dir=False, zeo=None):
self.__name__ = name
self.read_only = read_only
+ self.cluster_mode = cluster_mode
self.shared_blob_dir = shared_blob_dir
self.zeo = zeo
self.storages = {}
@@ -190,7 +191,7 @@
return
try:
try:
- self._apply_all_storages('close', expect_connected=False)
+ AllStoragesOperation(self, expect_connected=False).close()
except Exception, e:
if not zeoraid_exception(e):
raise e
@@ -207,7 +208,7 @@
def getSize(self):
"""An approximate size of the database, in bytes."""
try:
- return self._apply_single_storage('getSize')[0]
+ return self._reader.getSize()
except Exception, e:
if zeoraid_exception(e):
return 0
@@ -216,7 +217,7 @@
def history(self, oid, version='', size=1):
"""Return a sequence of history information dictionaries."""
assert version is ''
- return self._apply_single_storage('history', (oid, size))[0]
+ return self._reader.history(oid, size)
def isReadOnly(self):
"""Test whether a storage allows committing new transactions."""
@@ -226,12 +227,14 @@
"""Return the id of the last committed transaction."""
if self.raid_status() == 'failed':
raise gocept.zeoraid.interfaces.RAIDError('RAID is failed.')
- return self._apply_all_storages('lastTransaction')
+ # Although this is a read operation we apply it to all storages as a
+ # safety belt to ensure consistency.
+ return AllStoragesOperation(self).lastTransaction()
def __len__(self):
"""The approximate number of objects in the storage."""
try:
- return self._apply_single_storage('__len__')[0]
+ return self._reader.__len__()
except RuntimeError, e:
if zeoraid_exception(e):
return 0
@@ -240,15 +243,15 @@
def load(self, oid, version=''):
"""Load data for an object id and version."""
assert version is ''
- return self._apply_single_storage('load', (oid,))[0]
+ return self._reader.load(oid)
def loadBefore(self, oid, tid):
"""Load the object data written before a transaction id."""
- return self._apply_single_storage('loadBefore', (oid, tid))[0]
+ return self._reader.loadBefore(oid, tid)
def loadSerial(self, oid, serial):
"""Load the object record for the give transaction id."""
- return self._apply_single_storage('loadSerial', (oid, serial))[0]
+ return self._reader.loadSerial(oid, serial)
@ensure_writable
def new_oid(self):
@@ -263,7 +266,7 @@
# Not write-lock protected implementation of new_oid
oids = []
for storage in self.storages_optimal[:]:
- reliable, oid = self.__apply_storage(storage, 'new_oid')
+ reliable, oid = self._apply_storage(storage, 'new_oid')
if reliable:
oids.append((oid, storage))
if not oids:
@@ -293,7 +296,7 @@
# through the list.
# This is a simplified implementation of a way to prioritize the list
# of optimal storages.
- self._apply_all_storages('pack', (t, referencesf))
+ self._writer.pack(t, referencesf)
def registerDB(self, db, limit=None):
"""Register an IStorageDB."""
@@ -302,7 +305,7 @@
# coordination by the StorageServer and set semantics in ZODB's
# Connection class make this correct and cheap.
self._db = db
- self._apply_all_storages('registerDB', (db,))
+ self._writer.registerDB(db)
def sortKey(self):
"""Sort key used to order distributed transactions."""
@@ -315,8 +318,8 @@
raise ZODB.POSException.StorageTransactionError(self, transaction)
self._write_lock.acquire()
try:
- self._apply_all_storages(
- 'store', (oid, oldserial, data, version, transaction))
+ self._writer.store(
+ oid, oldserial, data, version, transaction)
return self._tid
finally:
self._write_lock.release()
@@ -328,7 +331,7 @@
if transaction is not self._transaction:
return
try:
- self._apply_all_storages('tpc_abort', (transaction,))
+ self._writer.tpc_abort(transaction)
self._transaction = None
finally:
self._tpc_cleanup()
@@ -361,8 +364,7 @@
tid = self._new_tid(self.lastTransaction())
self._tid = tid
- self._apply_all_storages('tpc_begin',
- (transaction, self._tid, status))
+ self._writer.tpc_begin(transaction, self._tid, status)
finally:
self._write_lock.release()
@@ -375,7 +377,7 @@
if transaction is not self._transaction:
return
try:
- self._apply_all_storages('tpc_finish', (transaction,))
+ self._writer.tpc_finish(transaction)
if callback is not None:
# This callback is relevant for processing invalidations
# at transaction boundaries.
@@ -400,8 +402,9 @@
try:
if transaction is not self._transaction:
return
- self._apply_all_storages(
- 'tpc_vote', (transaction,), filter_results=unique_serials)
+ tpc_vote = AllStoragesOperation(
+ self, filter_results=unique_serials).tpc_vote
+ tpc_vote(transaction)
finally:
self._write_lock.release()
@@ -437,18 +440,21 @@
self._write_lock.acquire()
try:
if self.shared_blob_dir:
- result, storage = self._apply_single_storage(
- 'storeBlob',
- (oid, oldserial, data, blob, version, transaction))
- self._apply_all_storages(
- 'store', (oid, oldserial, data, version, transaction),
- exclude=(storage,), ignore_noop=True)
+ op = SingleStorageOperation(self)
+ result = op.storeBlob(
+ oid, oldserial, data, blob, version, transaction)
+ AllStoragesOperation(self, exclude=(op.storage,),
+ ignore_noop=True).store(
+ oid, oldserial, data, version, transaction)
else:
# The back end storages receive links to the blob file and
# take care of them appropriately. We have to remove the
# original link to the blob file ourselves.
self.tmp_paths.append(blob)
- self._apply_all_storages('storeBlob', get_blob_data)
+ # We'd like to say _writer here but cannot because of the
+ # method signature cleverness applied in AllStoragesOperation
+ # in order to consume the generator.
+ AllStoragesOperation(self)('storeBlob', get_blob_data)
return self._tid
finally:
self._write_lock.release()
@@ -465,8 +471,9 @@
# it's not anywhere.
raise ZODB.POSException.POSKeyError("No blob file", oid, serial)
- backend_filename = self._apply_single_storage(
- 'loadBlob', (oid, serial))[0]
+ reader = self._reader
+ reader.filter_results = relative_blob_path
+ backend_filename = reader.loadBlob(oid, serial)
lock_filename = blob_filename + '.lock'
self.blob_fshelper.createPathForOID(oid)
try:
@@ -524,25 +531,23 @@
"""Undo a transaction identified by id."""
self._write_lock.acquire()
try:
- return self._apply_all_storages('undo',
- (transaction_id, transaction))
+ return self._writer.undo(transaction_id, transaction)
finally:
self._write_lock.release()
def undoLog(self, first=0, last=-20, filter=None):
"""Return a sequence of descriptions for undoable transactions."""
- return self._apply_single_storage('undoLog', (first, last, filter))[0]
+ return self._reader.undoLog(first, last, filter)
def undoInfo(self, first=0, last=-20, specification=None):
"""Return a sequence of descriptions for undoable transactions."""
- return self._apply_single_storage(
- 'undoInfo', (first, last, specification))[0]
+ return self._reader.undoInfo(first, last, specification)
# IStorageCurrentRecordIteration
def record_iternext(self, next=None):
"""Iterate over the records in a storage."""
- return self._apply_single_storage('record_iternext', (next,))[0]
+ return self._reader.record_iternext(next)
# IStorageIteration
@@ -550,14 +555,16 @@
"""Return an IStorageTransactionInformation iterator."""
# XXX This should really include fail-over for iterators over storages
# that degrade or recover while this iterator is running.
- return self._apply_single_storage('iterator', (start, stop))[0]
+ # XXX This is also a threat to consistency when running in cooperation
+ # with other RAID servers.
+ return SingleStorageOperation(self).iterator(start, stop)
# IServeable
# Note: We opt to not implement lastInvalidations until ClientStorage does.
# def lastInvalidations(self, size):
# """Get recent transaction invalidations."""
- # return self._apply_single_storage('lastInvalidations', (size,))[0]
+ # return self._reader.lastInvalidations(size)
def tpc_transaction(self):
"""The current transaction being committed."""
@@ -565,7 +572,7 @@
def getTid(self, oid):
"""The last transaction to change an object."""
- return self._apply_single_storage('getTid', (oid,))[0]
+ return self._reader.getTid(oid)
def getExtensionMethods(self):
# This method isn't officially part of the interface but
@@ -731,7 +738,7 @@
if not self.storages_optimal and fail:
raise gocept.zeoraid.interfaces.RAIDError("No storages remain.")
- def __apply_storage(self, storage_name, method_name, args=(), kw={},
+ def _apply_storage(self, storage_name, method_name, args=(), kw={},
expect_connected=True):
"""Calls a method on a given backend storage.
@@ -773,120 +780,31 @@
self._degrade_storage(storage_name, reason=reason)
return (reliable, result)
- @ensure_open_storage
- def _apply_single_storage(self, method_name, args=(), kw={}):
- """Calls the given method on a random optimal storage."""
- # Try to find a storage that we can talk to. Stop after we found a
- # reliable result.
- storages = self.storages_optimal[:]
- reliable = False
- while not reliable:
- if not storages:
- break
- name = random.choice(storages)
- storages.remove(name)
- reliable, result = self.__apply_storage(
- name, method_name, args, kw)
- if reliable:
- return result, name
+ @property
+ def _reader(self):
+ """Calls the given method on the back-end storages with a strategy
+ appropriate for reading.
- # We could not determine a result from any storage.
- raise gocept.zeoraid.interfaces.RAIDError("RAID storage is failed.")
-
- @ensure_open_storage
- def _apply_all_storages(self, method_name, args=(), kw={},
- expect_connected=True, exclude=(),
- ignore_noop=False, filter_results=lambda x: x):
- """Calls the given method on all optimal backend storages in order.
-
- `args` can be given as an n-tuple with the positional arguments that
- should be passed to each storage.
-
- Alternatively `args` can be a callable that returns an iterable. The
- N-th item of the iterable is expected to be a tuple, passed to the
- N-th storage.
-
"""
- if callable(args):
- argument_iterable = args()
+ if self.cluster_mode == 'single':
+ # When run as a single server, we can choose to optimise read
+ # operations.
+ return SingleStorageOperation(self)
else:
- # Provide a fallback if `args` is given as a simple tuple.
- static_arguments = args
+ # When run in cooperation with other servers, we need to be
+ # prepared for the event that foreign write operations have
+ # reached only some of the back-ends. Reading from all back-ends
+ # ensures consistency.
+ return AllStoragesOperation(self)
- def dummy_generator():
- while True:
- yield static_arguments
- argument_iterable = dummy_generator()
+ @property
+ def _writer(self):
+ """Calls the given method on the back-end storages with a strategy
+ appropriate for writing.
- applicable_storages = self.storages_optimal[:]
- applicable_storages = [storage for storage in applicable_storages
- if storage not in exclude]
+ """
+ return AllStoragesOperation(self)
- # Run __apply_storage on all applicable storages in parallel.
- threads = []
- for storage_name in applicable_storages:
- args = argument_iterable.next()
- t = ThreadedApplyStorage(storage_name, method_name, args, kw,
- expect_connected, self.__apply_storage)
- threads.append(t)
- t.start()
-
- # Wait for threads to finish and pick up results.
- results = {}
- exceptions = []
- for thread in threads:
- # XXX The timeout should be calculated such that the total time
- # spent in this loop doesn't grow with the number of storages.
- thread.join(self.timeout)
- if thread.isAlive():
- # Storage timed out.
- self._degrade_storage(
- thread.storage_name,
- reason='no response within %s seconds' %
- self.timeout)
- self._threads.add(thread)
- continue
- if thread.exception:
- exceptions.append(thread.exception)
- elif thread.reliable:
- results[thread.storage_name] = thread.result
-
- # Analyse result consistency.
- consistent = True
- if exceptions and results:
- consistent = False
- elif exceptions:
- # Since we can only get one kind of exceptions at the moment, they
- # must be consistent anyway.
- pass
- elif results:
- results = dict((storage, filter_results(result))
- for storage, result in results.items())
- ref = results.values()[0]
- for test in results.values()[1:]:
- if test != ref:
- logger.debug(
- 'Got inconsistent results for method %s: %r' %
- (method_name, results))
- consistent = False
- break
- if not consistent:
- self.close()
- raise gocept.zeoraid.interfaces.RAIDError(
- "RAID is inconsistent and was closed.")
-
- # Select result.
- if exceptions:
- raise exceptions[0]
- if results:
- return results.values()[0]
-
- # We did not get any reliable result, making this call effectively a
- # no-op.
- if ignore_noop:
- return
- raise gocept.zeoraid.interfaces.RAIDError("RAID storage is failed.")
-
def _recover_impl(self, name):
try:
target = self.openers[name].open()
@@ -1004,6 +922,150 @@
x.join(timeout)
+class StorageOperation(object):
+
+ def __init__(self, raid):
+ self.raid = raid
+
+ def __getattr__(self, name):
+ return lambda *args, **kw: self(name, args, kw)
+
+ @property
+ def closed(self):
+ return self.raid.closed
+
+
+class SingleStorageOperation(StorageOperation):
+
+ name = None
+
+ @ensure_open_storage
+ def __call__(self, method_name, args=(), kw={}):
+ """Calls the given method on a random optimal storage."""
+ # Try to find a storage that we can talk to. Stop after we found a
+ # reliable result.
+ storages = self.raid.storages_optimal[:]
+ reliable = False
+ while not reliable:
+ if not storages:
+ break
+ name = random.choice(storages)
+ storages.remove(name)
+ reliable, result = self.raid._apply_storage(
+ name, method_name, args, kw)
+ if reliable:
+ self.storage = name
+ return result
+
+ # We could not determine a result from any storage.
+ raise gocept.zeoraid.interfaces.RAIDError("RAID storage is failed.")
+
+
+class AllStoragesOperation(StorageOperation):
+
+ def __init__(self, raid, expect_connected=True, exclude=(),
+ ignore_noop=False, filter_results=lambda x, y: x):
+ super(AllStoragesOperation, self).__init__(raid)
+ self.expect_connected = expect_connected
+ self.exclude = exclude
+ self.ignore_noop = ignore_noop
+ self.filter_results = filter_results
+
+ @ensure_open_storage
+ def __call__(self, method_name, args=(), kw={}):
+ """Calls the given method on all optimal backend storages in order.
+
+ `args` can be given as an n-tuple with the positional arguments that
+ should be passed to each storage.
+
+ Alternatively `args` can be a callable that returns an iterable. The
+ N-th item of the iterable is expected to be a tuple, passed to the
+ N-th storage.
+
+ """
+ if callable(args):
+ argument_iterable = args()
+ else:
+ # Provide a fallback if `args` is given as a simple tuple.
+ static_arguments = args
+
+ def dummy_generator():
+ while True:
+ yield static_arguments
+ argument_iterable = dummy_generator()
+
+ applicable_storages = self.raid.storages_optimal[:]
+ applicable_storages = [storage for storage in applicable_storages
+ if storage not in self.exclude]
+
+ # Run _apply_storage on all applicable storages in parallel.
+ threads = []
+ for storage_name in applicable_storages:
+ args = argument_iterable.next()
+ t = ThreadedApplyStorage(
+ storage_name, method_name, args, kw,
+ self.expect_connected, self.raid._apply_storage)
+ threads.append(t)
+ t.start()
+
+ # Wait for threads to finish and pick up results.
+ results = {}
+ exceptions = []
+ for thread in threads:
+ # XXX The timeout should be calculated such that the total time
+ # spent in this loop doesn't grow with the number of storages.
+ thread.join(self.raid.timeout)
+ if thread.isAlive():
+ # Storage timed out.
+ self.raid._degrade_storage(
+ thread.storage_name,
+ reason='no response within %s seconds' %
+ self.raid.timeout)
+ self.raid._threads.add(thread)
+ continue
+ if thread.exception:
+ exceptions.append(thread.exception)
+ elif thread.reliable:
+ results[thread.storage_name] = thread.result
+
+ # Analyse result consistency.
+ consistent = True
+ if exceptions and results:
+ consistent = False
+ elif exceptions:
+ # Since we can only get one kind of exceptions at the moment, they
+ # must be consistent anyway.
+ pass
+ elif results:
+ filtered_results = [
+ self.filter_results(result, self.raid.storages[storage])
+ for storage, result in results.items()]
+ ref = filtered_results[0]
+ for test in filtered_results[1:]:
+ if test != ref:
+ logger.debug(
+ 'Got inconsistent results for method %s: %r' %
+ (method_name, results))
+ consistent = False
+ break
+ if not consistent:
+ self.raid.close()
+ raise gocept.zeoraid.interfaces.RAIDError(
+ "RAID is inconsistent and was closed.")
+
+ # Select result.
+ if exceptions:
+ raise exceptions[0]
+ if results:
+ return results.values()[0]
+
+ # We did not get any reliable result, making this call effectively a
+ # no-op.
+ if self.ignore_noop:
+ return
+ raise gocept.zeoraid.interfaces.RAIDError("RAID storage is failed.")
+
+
def optimistic_copy(source, target):
"""Try creating a hard link to source at target. Fall back to copying the
file.
@@ -1021,7 +1083,7 @@
file2.close()
-def unique_serials(serials):
+def unique_serials(serials, storage):
"""Filter a sequence of oid/serial pairs and remove late duplicates."""
if serials is None:
return
@@ -1037,6 +1099,14 @@
return result
+def relative_blob_path(path, storage):
+ """Normalise a path to a blob by getting rid of the tmp dir part."""
+ if path.startswith(storage.fshelper.temp_dir):
+ return path.replace(storage.fshelper.temp_dir, 'TMP:', 1)
+ if path.startswith(storage.fshelper.base_dir):
+ return path.replace(storage.fshelper.base_dir, '', 1)
+
+
def zeoraid_exception(e):
"""Determine whether the given exception is a RAID error.
Modified: gocept.zeoraid/trunk/src/gocept/zeoraid/tests/test_basics.py
===================================================================
--- gocept.zeoraid/trunk/src/gocept/zeoraid/tests/test_basics.py 2010-09-28 12:36:52 UTC (rev 117003)
+++ gocept.zeoraid/trunk/src/gocept/zeoraid/tests/test_basics.py 2010-09-28 13:56:10 UTC (rev 117004)
@@ -1,6 +1,6 @@
##############################################################################
#
-# Copyright (c) 2007-2008 Zope Foundation and Contributors.
+# Copyright (c) 2007-2010 Zope Foundation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
@@ -22,10 +22,12 @@
from ZODB.tests import TransactionalUndoStorage, PackableStorage
from gocept.zeoraid.tests.loggingstorage import LoggingStorage
import ZEO.runzeo
+import ZODB.MappingStorage
import ZODB.config
import ZODB.interfaces
import gocept.zeoraid.storage
import gocept.zeoraid.tests.test_recovery
+import mock
import os
import random
import shutil
@@ -39,6 +41,11 @@
import zope.interface.verify
+# import logging
+# logging.getLogger().setLevel(0)
+# logging.getLogger().addHandler(logging.StreamHandler())
+
+
def fail(obj, name):
old_method = getattr(obj, name)
@@ -48,6 +55,27 @@
setattr(obj, name, failing_method)
+class MockStorage(ZODB.MappingStorage.MappingStorage):
+
+ def __init__(self, undo):
+ super(MockStorage, self).__init__()
+ self.undo = undo
+
+ def supportsUndo(self):
+ return self.undo
+
+
+class Opener(object):
+
+ name = 'foo'
+
+ def __init__(self, undo=True):
+ self.undo = undo
+
+ def open(self):
+ return MockStorage(self.undo)
+
+
class ZEOStorageBackendTests(StorageTestBase.StorageTestBase):
def open(self, **kwargs):
@@ -214,7 +242,7 @@
backend_storage = self._storage.storages[backend_name]
backend_storage.close()
- reliable, oid = self._storage._RAIDStorage__apply_storage(
+ reliable, oid = self._storage._apply_storage(
backend_name, 'new_oid')
self.assertEquals(False, reliable)
self.assertEquals([backend_name], self._storage.storages_degraded)
@@ -934,16 +962,9 @@
storage.close()
def test_supportsUndo_required(self):
-
- class Opener(object):
- name = 'foo'
-
- def open(self):
- return ZODB.MappingStorage.MappingStorage()
-
self.assertRaises(RuntimeError,
gocept.zeoraid.storage.RAIDStorage,
- 'name', [Opener()])
+ 'name', [Opener(undo=False)])
def test_supportsUndo(self):
self.assertEquals(True, self._storage.supportsUndo())
@@ -1415,41 +1436,26 @@
return LoggingStorage(self.name, self.file_name)
-class LoggingStorageDistributedTests(StorageTestBase.StorageTestBase):
+class LoggingStorageDistributedTests(unittest.TestCase):
- # The backend and call counts have been chosen such that the probability
- # of all calls being served by the same backend is about 1:10^6.
- backend_count = 10
- call_count = 6
-
- def _backend(self, index):
- return self._storage.storages[
- self._storage.storages_optimal[index]]
-
- def setUp(self):
- self._storages = []
- for i in xrange(self.backend_count):
- self._storages.append(LoggingStorageOpener(str(i)))
- self._storage = gocept.zeoraid.storage.RAIDStorage(
- 'teststorage', self._storages)
-
- def tearDown(self):
- self._storage.close()
-
def test_distributed_single_calls(self):
- for i in xrange(self.call_count):
- self._storage.getSize()
+ raid = mock.Mock()
+ raid.closed = False
+ raid.storages_optimal = ['1', '2']
+ raid._apply_storage = mock.Mock(return_value=(True, 5))
+ op = gocept.zeoraid.storage.SingleStorageOperation(raid)
+ for i in xrange(20):
+ op.getSize()
+ self.assertEqual(20, raid._apply_storage.call_count)
+ counts = {}
+ for item in raid._apply_storage.call_args_list:
+ storage = item[0][0]
+ counts[storage] = counts.get(storage, 0) + 1
+ self.assertEqual(['1', '2'], sorted(counts.keys()))
+ self.assert_(counts['1'] > 2)
+ self.assert_(counts['2'] > 2)
- # assert that at least two storages gets called at least one time
- storages_called = [x for x in xrange(self.backend_count)
- if len(self._backend(x)._log) >= 1]
- self.assertEquals(storages_called >= 2, True)
- # assert that six calls were made
- self.assertEquals(6, sum([len(self._backend(x)._log)
- for x in xrange(self.backend_count)]))
-
-
class ExtensionMethodsTests(ZEOStorageBackendTests):
def open(self):
@@ -1653,6 +1659,26 @@
self._storage.raid_details())
+class ClusterModeTests(unittest.TestCase):
+
+ def setUp(self):
+ self.storage = gocept.zeoraid.storage.RAIDStorage(
+ 'test', [Opener('1'), Opener('2')])
+ self.storage._apply_storage = mock.Mock()
+
+ def test_single_mode_read(self):
+ self.storage.cluster_mode = 'single'
+ self.assert_(
+ isinstance(self.storage._reader,
+ gocept.zeoraid.storage.SingleStorageOperation))
+
+ def test_coop_mode_read(self):
+ self.storage.cluster_mode = 'coop'
+ self.assert_(
+ isinstance(self.storage._reader,
+ gocept.zeoraid.storage.AllStoragesOperation))
+
+
def test_suite():
suite = unittest.TestSuite()
@@ -1661,4 +1687,5 @@
suite.addTest(unittest.makeSuite(FailingStorageSharedBlobTests))
suite.addTest(unittest.makeSuite(LoggingStorageDistributedTests))
suite.addTest(unittest.makeSuite(ExtensionMethodsTests))
+ suite.addTest(unittest.makeSuite(ClusterModeTests))
return suite
Modified: gocept.zeoraid/trunk/versions.cfg
===================================================================
--- gocept.zeoraid/trunk/versions.cfg 2010-09-28 12:36:52 UTC (rev 117003)
+++ gocept.zeoraid/trunk/versions.cfg 2010-09-28 13:56:10 UTC (rev 117004)
@@ -1,6 +1,7 @@
[versions]
ZConfig = 2.7.1
ZODB3 = 3.9.6
+mock = 0.7.0b2
setuptools = 0.6c11
transaction = 1.1.1
zc.buildout = 1.4.2
More information about the checkins
mailing list