[Checkins] SVN: gocept.zeoraid/trunk/ checkpoint: refactoring to introduce cluster modes

Thomas Lotze tl at gocept.com
Tue Sep 28 09:56:10 EDT 2010


Log message for revision 117004:
  checkpoint: refactoring to introduce cluster modes

Changed:
  U   gocept.zeoraid/trunk/setup.py
  U   gocept.zeoraid/trunk/src/gocept/zeoraid/component.xml
  U   gocept.zeoraid/trunk/src/gocept/zeoraid/datatypes.py
  U   gocept.zeoraid/trunk/src/gocept/zeoraid/scripts/recipe.txt
  U   gocept.zeoraid/trunk/src/gocept/zeoraid/scripts/tests.py
  U   gocept.zeoraid/trunk/src/gocept/zeoraid/storage.py
  U   gocept.zeoraid/trunk/src/gocept/zeoraid/tests/test_basics.py
  U   gocept.zeoraid/trunk/versions.cfg

-=-
Modified: gocept.zeoraid/trunk/setup.py
===================================================================
--- gocept.zeoraid/trunk/setup.py	2010-09-28 12:36:52 UTC (rev 117003)
+++ gocept.zeoraid/trunk/setup.py	2010-09-28 13:56:10 UTC (rev 117004)
@@ -33,9 +33,12 @@
     include_package_data=True,
     package_dir={'': 'src'},
     namespace_packages=['gocept'],
-    install_requires=['setuptools',
-                      'zc.zodbrecipes',
-                      'ZODB3>=3.9dev'],
+    install_requires=[
+        'mock',
+        'setuptools',
+        'zc.zodbrecipes',
+        'ZODB3>=3.9dev',
+        ],
     entry_points="""
         [zc.buildout]
         server = gocept.zeoraid.scripts.recipe:ZEORAIDServer

Modified: gocept.zeoraid/trunk/src/gocept/zeoraid/component.xml
===================================================================
--- gocept.zeoraid/trunk/src/gocept/zeoraid/component.xml	2010-09-28 12:36:52 UTC (rev 117003)
+++ gocept.zeoraid/trunk/src/gocept/zeoraid/component.xml	2010-09-28 13:56:10 UTC (rev 117004)
@@ -54,6 +54,15 @@
             </description>
         </key>
 
+        <key name="cluster-mode" required="no"
+             datatype=".cluster_mode" default="coop">
+          <description>
+            Whether the RAID server expects to be the only RAID server in the
+            set-up ("single") or to expect other RAID servers to use the same
+            back-end storages ("coop").
+          </description>
+        </key>
+
         <multisection 
             type="ZODB.storage" 
             name="+"

Modified: gocept.zeoraid/trunk/src/gocept/zeoraid/datatypes.py
===================================================================
--- gocept.zeoraid/trunk/src/gocept/zeoraid/datatypes.py	2010-09-28 12:36:52 UTC (rev 117003)
+++ gocept.zeoraid/trunk/src/gocept/zeoraid/datatypes.py	2010-09-28 13:56:10 UTC (rev 117004)
@@ -20,6 +20,13 @@
 import gocept.zeoraid.storage
 
 
+def cluster_mode(value):
+    if value not in ('single', 'coop'):
+        raise ValueError(
+            "Only valid cluster modes: 'single', 'coop', found %r" % value)
+    return value
+
+
 class Storage(ZODB.config.BaseConfig):
 
     def open(self):
@@ -35,5 +42,6 @@
             self.config.storages,
             blob_dir=self.config.blob_dir,
             read_only=self.config.read_only,
+            cluster_mode=self.config.cluster_mode,
             shared_blob_dir=self.config.shared_blob_dir,
             zeo=zeo)

Modified: gocept.zeoraid/trunk/src/gocept/zeoraid/scripts/recipe.txt
===================================================================
--- gocept.zeoraid/trunk/src/gocept/zeoraid/scripts/recipe.txt	2010-09-28 12:36:52 UTC (rev 117003)
+++ gocept.zeoraid/trunk/src/gocept/zeoraid/scripts/recipe.txt	2010-09-28 13:56:10 UTC (rev 117004)
@@ -146,6 +146,7 @@
       '/sample-pyN.N.egg',
       '/sample-pyN.N.egg',
       '/sample-pyN.N.egg',
+      '/sample-pyN.N.egg',
       ]
     <BLANKLINE>
     import gocept.zeoraid.scripts.controller
@@ -173,6 +174,7 @@
       '/sample-pyN.N.egg',
       '/sample-pyN.N.egg',
       '/sample-pyN.N.egg',
+      '/sample-pyN.N.egg',
       ]
     <BLANKLINE>
     import gocept.zeoraid.scripts.controller
@@ -273,6 +275,7 @@
       '/sample-pyN.N.egg',
       '/sample-pyN.N.egg',
       '/sample-pyN.N.egg',
+      '/sample-pyN.N.egg',
       ]
     <BLANKLINE>
     import gocept.zeoraid.scripts.controller

Modified: gocept.zeoraid/trunk/src/gocept/zeoraid/scripts/tests.py
===================================================================
--- gocept.zeoraid/trunk/src/gocept/zeoraid/scripts/tests.py	2010-09-28 12:36:52 UTC (rev 117003)
+++ gocept.zeoraid/trunk/src/gocept/zeoraid/scripts/tests.py	2010-09-28 13:56:10 UTC (rev 117004)
@@ -22,6 +22,7 @@
 def setUp(test):
     zc.buildout.testing.buildoutSetUp(test)
     zc.buildout.testing.install_develop('gocept.zeoraid', test)
+    zc.buildout.testing.install('mock', test)
     zc.buildout.testing.install('zc.lockfile', test)
     zc.buildout.testing.install('zc.zodbrecipes', test)
     zc.buildout.testing.install('zope.event', test)

Modified: gocept.zeoraid/trunk/src/gocept/zeoraid/storage.py
===================================================================
--- gocept.zeoraid/trunk/src/gocept/zeoraid/storage.py	2010-09-28 12:36:52 UTC (rev 117003)
+++ gocept.zeoraid/trunk/src/gocept/zeoraid/storage.py	2010-09-28 13:56:10 UTC (rev 117004)
@@ -1,6 +1,6 @@
 ##############################################################################
 #
-# Copyright (c) 2007-2008 Zope Foundation and Contributors.
+# Copyright (c) 2007-2010 Zope Foundation and Contributors.
 # All Rights Reserved.
 #
 # This software is subject to the provisions of the Zope Public License,
@@ -72,11 +72,11 @@
         self.args = args
         self.kw = kw
         self.expect_connected = expect_connected
-        self.__apply_storage = apply_storage
+        self._apply_storage = apply_storage
 
     def run(self):
         try:
-            self.reliable, self.result = self.__apply_storage(
+            self.reliable, self.result = self._apply_storage(
                 self.storage_name, self.method_name, self.args,
                 self.kw, self.expect_connected)
         except Exception, e:
@@ -111,12 +111,13 @@
     _db = None
 
     # Timeout for threaded/parallel operations on backend storages.
-    timeout = 60
+    timeout = 6000
 
-    def __init__(self, name, openers, read_only=False, blob_dir=None,
-                 shared_blob_dir=False, zeo=None):
+    def __init__(self, name, openers, read_only=False, cluster_mode='coop',
+                 blob_dir=None, shared_blob_dir=False, zeo=None):
         self.__name__ = name
         self.read_only = read_only
+        self.cluster_mode = cluster_mode
         self.shared_blob_dir = shared_blob_dir
         self.zeo = zeo
         self.storages = {}
@@ -190,7 +191,7 @@
             return
         try:
             try:
-                self._apply_all_storages('close', expect_connected=False)
+                AllStoragesOperation(self, expect_connected=False).close()
             except Exception, e:
                 if not zeoraid_exception(e):
                     raise e
@@ -207,7 +208,7 @@
     def getSize(self):
         """An approximate size of the database, in bytes."""
         try:
-            return self._apply_single_storage('getSize')[0]
+            return self._reader.getSize()
         except Exception, e:
             if zeoraid_exception(e):
                 return 0
@@ -216,7 +217,7 @@
     def history(self, oid, version='', size=1):
         """Return a sequence of history information dictionaries."""
         assert version is ''
-        return self._apply_single_storage('history', (oid, size))[0]
+        return self._reader.history(oid, size)
 
     def isReadOnly(self):
         """Test whether a storage allows committing new transactions."""
@@ -226,12 +227,14 @@
         """Return the id of the last committed transaction."""
         if self.raid_status() == 'failed':
             raise gocept.zeoraid.interfaces.RAIDError('RAID is failed.')
-        return self._apply_all_storages('lastTransaction')
+        # Although this is a read operation we apply it to all storages as a
+        # safety belt to ensure consistency.
+        return AllStoragesOperation(self).lastTransaction()
 
     def __len__(self):
         """The approximate number of objects in the storage."""
         try:
-            return self._apply_single_storage('__len__')[0]
+            return self._reader.__len__()
         except RuntimeError, e:
             if zeoraid_exception(e):
                 return 0
@@ -240,15 +243,15 @@
     def load(self, oid, version=''):
         """Load data for an object id and version."""
         assert version is ''
-        return self._apply_single_storage('load', (oid,))[0]
+        return self._reader.load(oid)
 
     def loadBefore(self, oid, tid):
         """Load the object data written before a transaction id."""
-        return self._apply_single_storage('loadBefore', (oid, tid))[0]
+        return self._reader.loadBefore(oid, tid)
 
     def loadSerial(self, oid, serial):
         """Load the object record for the give transaction id."""
-        return self._apply_single_storage('loadSerial', (oid, serial))[0]
+        return self._reader.loadSerial(oid, serial)
 
     @ensure_writable
     def new_oid(self):
@@ -263,7 +266,7 @@
         # Not write-lock protected implementation of new_oid
         oids = []
         for storage in self.storages_optimal[:]:
-            reliable, oid = self.__apply_storage(storage, 'new_oid')
+            reliable, oid = self._apply_storage(storage, 'new_oid')
             if reliable:
                 oids.append((oid, storage))
         if not oids:
@@ -293,7 +296,7 @@
         #    through the list.
         # This is a simplified implementation of a way to prioritize the list
         # of optimal storages.
-        self._apply_all_storages('pack', (t, referencesf))
+        self._writer.pack(t, referencesf)
 
     def registerDB(self, db, limit=None):
         """Register an IStorageDB."""
@@ -302,7 +305,7 @@
         # coordination by the StorageServer and set semantics in ZODB's
         # Connection class make this correct and cheap.
         self._db = db
-        self._apply_all_storages('registerDB', (db,))
+        self._writer.registerDB(db)
 
     def sortKey(self):
         """Sort key used to order distributed transactions."""
@@ -315,8 +318,8 @@
             raise ZODB.POSException.StorageTransactionError(self, transaction)
         self._write_lock.acquire()
         try:
-            self._apply_all_storages(
-                'store', (oid, oldserial, data, version, transaction))
+            self._writer.store(
+                oid, oldserial, data, version, transaction)
             return self._tid
         finally:
             self._write_lock.release()
@@ -328,7 +331,7 @@
             if transaction is not self._transaction:
                 return
             try:
-                self._apply_all_storages('tpc_abort', (transaction,))
+                self._writer.tpc_abort(transaction)
                 self._transaction = None
             finally:
                 self._tpc_cleanup()
@@ -361,8 +364,7 @@
                 tid = self._new_tid(self.lastTransaction())
             self._tid = tid
 
-            self._apply_all_storages('tpc_begin',
-                                     (transaction, self._tid, status))
+            self._writer.tpc_begin(transaction, self._tid, status)
         finally:
             self._write_lock.release()
 
@@ -375,7 +377,7 @@
             if transaction is not self._transaction:
                 return
             try:
-                self._apply_all_storages('tpc_finish', (transaction,))
+                self._writer.tpc_finish(transaction)
                 if callback is not None:
                     # This callback is relevant for processing invalidations
                     # at transaction boundaries.
@@ -400,8 +402,9 @@
         try:
             if transaction is not self._transaction:
                 return
-            self._apply_all_storages(
-                'tpc_vote', (transaction,), filter_results=unique_serials)
+            tpc_vote = AllStoragesOperation(
+                self, filter_results=unique_serials).tpc_vote
+            tpc_vote(transaction)
         finally:
             self._write_lock.release()
 
@@ -437,18 +440,21 @@
         self._write_lock.acquire()
         try:
             if self.shared_blob_dir:
-                result, storage = self._apply_single_storage(
-                    'storeBlob',
-                    (oid, oldserial, data, blob, version, transaction))
-                self._apply_all_storages(
-                    'store', (oid, oldserial, data, version, transaction),
-                    exclude=(storage,), ignore_noop=True)
+                op = SingleStorageOperation(self)
+                result = op.storeBlob(
+                    oid, oldserial, data, blob, version, transaction)
+                AllStoragesOperation(self, exclude=(op.storage,),
+                                     ignore_noop=True).store(
+                    oid, oldserial, data, version, transaction)
             else:
                 # The back end storages receive links to the blob file and
                 # take care of them appropriately. We have to remove the
                 # original link to the blob file ourselves.
                 self.tmp_paths.append(blob)
-                self._apply_all_storages('storeBlob', get_blob_data)
+                # We'd like to say _writer here but cannot because of the
+                # method signature cleverness applied in AllStoragesOperation
+                # in order to consume the generator.
+                AllStoragesOperation(self)('storeBlob', get_blob_data)
             return self._tid
         finally:
             self._write_lock.release()
@@ -465,8 +471,9 @@
             # it's not anywhere.
             raise ZODB.POSException.POSKeyError("No blob file", oid, serial)
 
-        backend_filename = self._apply_single_storage(
-            'loadBlob', (oid, serial))[0]
+        reader = self._reader
+        reader.filter_results = relative_blob_path
+        backend_filename = reader.loadBlob(oid, serial)
         lock_filename = blob_filename + '.lock'
         self.blob_fshelper.createPathForOID(oid)
         try:
@@ -524,25 +531,23 @@
         """Undo a transaction identified by id."""
         self._write_lock.acquire()
         try:
-            return self._apply_all_storages('undo',
-                                            (transaction_id, transaction))
+            return self._writer.undo(transaction_id, transaction)
         finally:
             self._write_lock.release()
 
     def undoLog(self, first=0, last=-20, filter=None):
         """Return a sequence of descriptions for undoable transactions."""
-        return self._apply_single_storage('undoLog', (first, last, filter))[0]
+        return self._reader.undoLog(first, last, filter)
 
     def undoInfo(self, first=0, last=-20, specification=None):
         """Return a sequence of descriptions for undoable transactions."""
-        return self._apply_single_storage(
-            'undoInfo', (first, last, specification))[0]
+        return self._reader.undoInfo(first, last, specification)
 
     # IStorageCurrentRecordIteration
 
     def record_iternext(self, next=None):
         """Iterate over the records in a storage."""
-        return self._apply_single_storage('record_iternext', (next,))[0]
+        return self._reader.record_iternext(next)
 
     # IStorageIteration
 
@@ -550,14 +555,16 @@
         """Return an IStorageTransactionInformation iterator."""
         # XXX This should really include fail-over for iterators over storages
         # that degrade or recover while this iterator is running.
-        return self._apply_single_storage('iterator', (start, stop))[0]
+        # XXX This is also a threat to consistency when running in cooperation
+        # with other RAID servers.
+        return SingleStorageOperation(self).iterator(start, stop)
 
     # IServeable
 
     # Note: We opt to not implement lastInvalidations until ClientStorage does.
     # def lastInvalidations(self, size):
     #    """Get recent transaction invalidations."""
-    #    return self._apply_single_storage('lastInvalidations', (size,))[0]
+    #    return self._reader.lastInvalidations(size)
 
     def tpc_transaction(self):
         """The current transaction being committed."""
@@ -565,7 +572,7 @@
 
     def getTid(self, oid):
         """The last transaction to change an object."""
-        return self._apply_single_storage('getTid', (oid,))[0]
+        return self._reader.getTid(oid)
 
     def getExtensionMethods(self):
         # This method isn't officially part of the interface but
@@ -731,7 +738,7 @@
         if not self.storages_optimal and fail:
             raise gocept.zeoraid.interfaces.RAIDError("No storages remain.")
 
-    def __apply_storage(self, storage_name, method_name, args=(), kw={},
+    def _apply_storage(self, storage_name, method_name, args=(), kw={},
                         expect_connected=True):
         """Calls a method on a given backend storage.
 
@@ -773,120 +780,31 @@
             self._degrade_storage(storage_name, reason=reason)
         return (reliable, result)
 
-    @ensure_open_storage
-    def _apply_single_storage(self, method_name, args=(), kw={}):
-        """Calls the given method on a random optimal storage."""
-        # Try to find a storage that we can talk to. Stop after we found a
-        # reliable result.
-        storages = self.storages_optimal[:]
-        reliable = False
-        while not reliable:
-            if not storages:
-                break
-            name = random.choice(storages)
-            storages.remove(name)
-            reliable, result = self.__apply_storage(
-                name, method_name, args, kw)
-            if reliable:
-                return result, name
+    @property
+    def _reader(self):
+        """Calls the given method on the back-end storages with a strategy
+        appropriate for reading.
 
-        # We could not determine a result from any storage.
-        raise gocept.zeoraid.interfaces.RAIDError("RAID storage is failed.")
-
-    @ensure_open_storage
-    def _apply_all_storages(self, method_name, args=(), kw={},
-                            expect_connected=True, exclude=(),
-                            ignore_noop=False, filter_results=lambda x: x):
-        """Calls the given method on all optimal backend storages in order.
-
-        `args` can be given as an n-tuple with the positional arguments that
-        should be passed to each storage.
-
-        Alternatively `args` can be a callable that returns an iterable. The
-        N-th item of the iterable is expected to be a tuple, passed to the
-        N-th storage.
-
         """
-        if callable(args):
-            argument_iterable = args()
+        if self.cluster_mode == 'single':
+            # When run as a single server, we can choose to optimise read
+            # operations.
+            return SingleStorageOperation(self)
         else:
-            # Provide a fallback if `args` is given as a simple tuple.
-            static_arguments = args
+            # When run in cooperation with other servers, we need to be
+            # prepared for the event that foreign write operations have
+            # reached only some of the back-ends. Reading from all back-ends
+            # ensures consistency.
+            return AllStoragesOperation(self)
 
-            def dummy_generator():
-                while True:
-                    yield static_arguments
-            argument_iterable = dummy_generator()
+    @property
+    def _writer(self):
+        """Calls the given method on the back-end storages with a strategy
+        appropriate for writing.
 
-        applicable_storages = self.storages_optimal[:]
-        applicable_storages = [storage for storage in applicable_storages
-                               if storage not in exclude]
+        """
+        return AllStoragesOperation(self)
 
-        # Run __apply_storage on all applicable storages in parallel.
-        threads = []
-        for storage_name in applicable_storages:
-            args = argument_iterable.next()
-            t = ThreadedApplyStorage(storage_name, method_name, args, kw,
-                                     expect_connected, self.__apply_storage)
-            threads.append(t)
-            t.start()
-
-        # Wait for threads to finish and pick up results.
-        results = {}
-        exceptions = []
-        for thread in threads:
-            # XXX The timeout should be calculated such that the total time
-            # spent in this loop doesn't grow with the number of storages.
-            thread.join(self.timeout)
-            if thread.isAlive():
-                # Storage timed out.
-                self._degrade_storage(
-                    thread.storage_name,
-                    reason='no response within %s seconds' %
-                        self.timeout)
-                self._threads.add(thread)
-                continue
-            if thread.exception:
-                exceptions.append(thread.exception)
-            elif thread.reliable:
-                results[thread.storage_name] = thread.result
-
-        # Analyse result consistency.
-        consistent = True
-        if exceptions and results:
-            consistent = False
-        elif exceptions:
-            # Since we can only get one kind of exceptions at the moment, they
-            # must be consistent anyway.
-            pass
-        elif results:
-            results = dict((storage, filter_results(result))
-                           for storage, result in results.items())
-            ref = results.values()[0]
-            for test in results.values()[1:]:
-                if test != ref:
-                    logger.debug(
-                        'Got inconsistent results for method %s: %r' %
-                        (method_name, results))
-                    consistent = False
-                    break
-        if not consistent:
-            self.close()
-            raise gocept.zeoraid.interfaces.RAIDError(
-                "RAID is inconsistent and was closed.")
-
-        # Select result.
-        if exceptions:
-            raise exceptions[0]
-        if results:
-            return results.values()[0]
-
-        # We did not get any reliable result, making this call effectively a
-        # no-op.
-        if ignore_noop:
-            return
-        raise gocept.zeoraid.interfaces.RAIDError("RAID storage is failed.")
-
     def _recover_impl(self, name):
         try:
             target = self.openers[name].open()
@@ -1004,6 +922,150 @@
             x.join(timeout)
 
 
+class StorageOperation(object):
+
+    def __init__(self, raid):
+        self.raid = raid
+
+    def __getattr__(self, name):
+        return lambda *args, **kw: self(name, args, kw)
+
+    @property
+    def closed(self):
+        return self.raid.closed
+
+
+class SingleStorageOperation(StorageOperation):
+
+    name = None
+
+    @ensure_open_storage
+    def __call__(self, method_name, args=(), kw={}):
+        """Calls the given method on a random optimal storage."""
+        # Try to find a storage that we can talk to. Stop after we found a
+        # reliable result.
+        storages = self.raid.storages_optimal[:]
+        reliable = False
+        while not reliable:
+            if not storages:
+                break
+            name = random.choice(storages)
+            storages.remove(name)
+            reliable, result = self.raid._apply_storage(
+                name, method_name, args, kw)
+            if reliable:
+                self.storage = name
+                return result
+
+        # We could not determine a result from any storage.
+        raise gocept.zeoraid.interfaces.RAIDError("RAID storage is failed.")
+
+
+class AllStoragesOperation(StorageOperation):
+
+    def __init__(self, raid, expect_connected=True, exclude=(),
+                 ignore_noop=False, filter_results=lambda x, y: x):
+        super(AllStoragesOperation, self).__init__(raid)
+        self.expect_connected = expect_connected
+        self.exclude = exclude
+        self.ignore_noop = ignore_noop
+        self.filter_results = filter_results
+
+    @ensure_open_storage
+    def __call__(self, method_name, args=(), kw={}):
+        """Calls the given method on all optimal backend storages in order.
+
+        `args` can be given as an n-tuple with the positional arguments that
+        should be passed to each storage.
+
+        Alternatively `args` can be a callable that returns an iterable. The
+        N-th item of the iterable is expected to be a tuple, passed to the
+        N-th storage.
+
+        """
+        if callable(args):
+            argument_iterable = args()
+        else:
+            # Provide a fallback if `args` is given as a simple tuple.
+            static_arguments = args
+
+            def dummy_generator():
+                while True:
+                    yield static_arguments
+            argument_iterable = dummy_generator()
+
+        applicable_storages = self.raid.storages_optimal[:]
+        applicable_storages = [storage for storage in applicable_storages
+                               if storage not in self.exclude]
+
+        # Run _apply_storage on all applicable storages in parallel.
+        threads = []
+        for storage_name in applicable_storages:
+            args = argument_iterable.next()
+            t = ThreadedApplyStorage(
+                storage_name, method_name, args, kw,
+                self.expect_connected, self.raid._apply_storage)
+            threads.append(t)
+            t.start()
+
+        # Wait for threads to finish and pick up results.
+        results = {}
+        exceptions = []
+        for thread in threads:
+            # XXX The timeout should be calculated such that the total time
+            # spent in this loop doesn't grow with the number of storages.
+            thread.join(self.raid.timeout)
+            if thread.isAlive():
+                # Storage timed out.
+                self.raid._degrade_storage(
+                    thread.storage_name,
+                    reason='no response within %s seconds' %
+                        self.raid.timeout)
+                self.raid._threads.add(thread)
+                continue
+            if thread.exception:
+                exceptions.append(thread.exception)
+            elif thread.reliable:
+                results[thread.storage_name] = thread.result
+
+        # Analyse result consistency.
+        consistent = True
+        if exceptions and results:
+            consistent = False
+        elif exceptions:
+            # Since we can only get one kind of exceptions at the moment, they
+            # must be consistent anyway.
+            pass
+        elif results:
+            filtered_results = [
+                self.filter_results(result, self.raid.storages[storage])
+                for storage, result in results.items()]
+            ref = filtered_results[0]
+            for test in filtered_results[1:]:
+                if test != ref:
+                    logger.debug(
+                        'Got inconsistent results for method %s: %r' %
+                        (method_name, results))
+                    consistent = False
+                    break
+        if not consistent:
+            self.raid.close()
+            raise gocept.zeoraid.interfaces.RAIDError(
+                "RAID is inconsistent and was closed.")
+
+        # Select result.
+        if exceptions:
+            raise exceptions[0]
+        if results:
+            return results.values()[0]
+
+        # We did not get any reliable result, making this call effectively a
+        # no-op.
+        if self.ignore_noop:
+            return
+        raise gocept.zeoraid.interfaces.RAIDError("RAID storage is failed.")
+
+
 def optimistic_copy(source, target):
     """Try creating a hard link to source at target. Fall back to copying the
     file.
@@ -1021,7 +1083,7 @@
             file2.close()
 
 
-def unique_serials(serials):
+def unique_serials(serials, storage):
     """Filter a sequence of oid/serial pairs and remove late duplicates."""
     if serials is None:
         return
@@ -1037,6 +1099,14 @@
     return result
 
 
+def relative_blob_path(path, storage):
+    """Normalise a path to a blob by getting rid of the tmp dir part."""
+    if path.startswith(storage.fshelper.temp_dir):
+        return path.replace(storage.fshelper.temp_dir, 'TMP:', 1)
+    if path.startswith(storage.fshelper.base_dir):
+        return path.replace(storage.fshelper.base_dir, '', 1)
+
+
 def zeoraid_exception(e):
     """Determine whether the given exception is a RAID error.
 

Modified: gocept.zeoraid/trunk/src/gocept/zeoraid/tests/test_basics.py
===================================================================
--- gocept.zeoraid/trunk/src/gocept/zeoraid/tests/test_basics.py	2010-09-28 12:36:52 UTC (rev 117003)
+++ gocept.zeoraid/trunk/src/gocept/zeoraid/tests/test_basics.py	2010-09-28 13:56:10 UTC (rev 117004)
@@ -1,6 +1,6 @@
 ##############################################################################
 #
-# Copyright (c) 2007-2008 Zope Foundation and Contributors.
+# Copyright (c) 2007-2010 Zope Foundation and Contributors.
 # All Rights Reserved.
 #
 # This software is subject to the provisions of the Zope Public License,
@@ -22,10 +22,12 @@
 from ZODB.tests import TransactionalUndoStorage, PackableStorage
 from gocept.zeoraid.tests.loggingstorage import LoggingStorage
 import ZEO.runzeo
+import ZODB.MappingStorage
 import ZODB.config
 import ZODB.interfaces
 import gocept.zeoraid.storage
 import gocept.zeoraid.tests.test_recovery
+import mock
 import os
 import random
 import shutil
@@ -39,6 +41,11 @@
 import zope.interface.verify
 
 
+# import logging
+# logging.getLogger().setLevel(0)
+# logging.getLogger().addHandler(logging.StreamHandler())
+
+
 def fail(obj, name):
     old_method = getattr(obj, name)
 
@@ -48,6 +55,27 @@
     setattr(obj, name, failing_method)
 
 
+class MockStorage(ZODB.MappingStorage.MappingStorage):
+
+    def __init__(self, undo):
+        super(MockStorage, self).__init__()
+        self.undo = undo
+
+    def supportsUndo(self):
+        return self.undo
+
+
+class Opener(object):
+
+    name = 'foo'
+
+    def __init__(self, undo=True):
+        self.undo = undo
+
+    def open(self):
+        return MockStorage(self.undo)
+
+
 class ZEOStorageBackendTests(StorageTestBase.StorageTestBase):
 
     def open(self, **kwargs):
@@ -214,7 +242,7 @@
         backend_storage = self._storage.storages[backend_name]
         backend_storage.close()
 
-        reliable, oid = self._storage._RAIDStorage__apply_storage(
+        reliable, oid = self._storage._apply_storage(
             backend_name, 'new_oid')
         self.assertEquals(False, reliable)
         self.assertEquals([backend_name], self._storage.storages_degraded)
@@ -934,16 +962,9 @@
         storage.close()
 
     def test_supportsUndo_required(self):
-
-        class Opener(object):
-            name = 'foo'
-
-            def open(self):
-                return ZODB.MappingStorage.MappingStorage()
-
         self.assertRaises(RuntimeError,
                           gocept.zeoraid.storage.RAIDStorage,
-                          'name', [Opener()])
+                          'name', [Opener(undo=False)])
 
     def test_supportsUndo(self):
         self.assertEquals(True, self._storage.supportsUndo())
@@ -1415,41 +1436,26 @@
         return LoggingStorage(self.name, self.file_name)
 
 
-class LoggingStorageDistributedTests(StorageTestBase.StorageTestBase):
+class LoggingStorageDistributedTests(unittest.TestCase):
 
-    # The backend and call counts have been chosen such that the probability
-    # of all calls being served by the same backend is about 1:10^6.
-    backend_count = 10
-    call_count = 6
-
-    def _backend(self, index):
-        return self._storage.storages[
-            self._storage.storages_optimal[index]]
-
-    def setUp(self):
-        self._storages = []
-        for i in xrange(self.backend_count):
-            self._storages.append(LoggingStorageOpener(str(i)))
-        self._storage = gocept.zeoraid.storage.RAIDStorage(
-            'teststorage', self._storages)
-
-    def tearDown(self):
-        self._storage.close()
-
     def test_distributed_single_calls(self):
-        for i in xrange(self.call_count):
-            self._storage.getSize()
+        raid = mock.Mock()
+        raid.closed = False
+        raid.storages_optimal = ['1', '2']
+        raid._apply_storage = mock.Mock(return_value=(True, 5))
+        op = gocept.zeoraid.storage.SingleStorageOperation(raid)
+        for i in xrange(20):
+            op.getSize()
+        self.assertEqual(20, raid._apply_storage.call_count)
+        counts = {}
+        for item in raid._apply_storage.call_args_list:
+            storage = item[0][0]
+            counts[storage] = counts.get(storage, 0) + 1
+        self.assertEqual(['1', '2'], sorted(counts.keys()))
+        self.assert_(counts['1'] > 2)
+        self.assert_(counts['2'] > 2)
 
-        # assert that at least two storages gets called at least one time
-        storages_called = [x for x in xrange(self.backend_count)
-                           if len(self._backend(x)._log) >= 1]
-        self.assertEquals(storages_called >= 2, True)
 
-        # assert that six calls were made
-        self.assertEquals(6, sum([len(self._backend(x)._log)
-                                  for x in xrange(self.backend_count)]))
-
-
 class ExtensionMethodsTests(ZEOStorageBackendTests):
 
     def open(self):
@@ -1653,6 +1659,26 @@
                           self._storage.raid_details())
 
 
+class ClusterModeTests(unittest.TestCase):
+
+    def setUp(self):
+        self.storage = gocept.zeoraid.storage.RAIDStorage(
+            'test', [Opener('1'), Opener('2')])
+        self.storage._apply_storage = mock.Mock()
+
+    def test_single_mode_read(self):
+        self.storage.cluster_mode = 'single'
+        self.assert_(
+            isinstance(self.storage._reader,
+                       gocept.zeoraid.storage.SingleStorageOperation))
+
+    def test_coop_mode_read(self):
+        self.storage.cluster_mode = 'coop'
+        self.assert_(
+            isinstance(self.storage._reader,
+                       gocept.zeoraid.storage.AllStoragesOperation))
+
+
 def test_suite():
 
     suite = unittest.TestSuite()
@@ -1661,4 +1687,5 @@
     suite.addTest(unittest.makeSuite(FailingStorageSharedBlobTests))
     suite.addTest(unittest.makeSuite(LoggingStorageDistributedTests))
     suite.addTest(unittest.makeSuite(ExtensionMethodsTests))
+    suite.addTest(unittest.makeSuite(ClusterModeTests))
     return suite

Modified: gocept.zeoraid/trunk/versions.cfg
===================================================================
--- gocept.zeoraid/trunk/versions.cfg	2010-09-28 12:36:52 UTC (rev 117003)
+++ gocept.zeoraid/trunk/versions.cfg	2010-09-28 13:56:10 UTC (rev 117004)
@@ -1,6 +1,7 @@
 [versions]
 ZConfig = 2.7.1
 ZODB3 = 3.9.6
+mock = 0.7.0b2
 setuptools = 0.6c11
 transaction = 1.1.1
 zc.buildout = 1.4.2



More information about the checkins mailing list