[Checkins] SVN: gocept.zeoraid/trunk/ - Always format TIDs in recovery status.

Christian Theune ct at gocept.com
Fri Nov 13 08:07:23 EST 2009


Log message for revision 105600:
  - Always format TIDs in recovery status.
  
  - Remove use of custom exceptions: RAIDErrors were pickled and send to the
    client which didn't have ZEORaid installed causing unpickling errors.
  
  - Fix issue when storing blogs in shared mode: tpc_vote returned the serials
    of the first store twice causing spurious RAID inconsistency errors.
  
  - Close degraded storages when registering them as degraded in __init__ by
    calling the appropriate degradation method.
  
  - Make recovery more robust against storages that fail to open (a storage
    failing to open caused the ZEORaid server recovery to hang).
  
  - Hack for processing ZEO server's waiting list which doesn't expect storages
    to be used by someone else in the same process (covered with test). This
    caused clients to stochastically hang indefinitely when committing while a
    recovery was in progress.
  
  - Refactor some sloppy tests.
  
  - Some code cleanups.
  
  
  

Changed:
  U   gocept.zeoraid/trunk/CHANGES.txt
  U   gocept.zeoraid/trunk/src/gocept/zeoraid/interfaces.py
  U   gocept.zeoraid/trunk/src/gocept/zeoraid/scripts/controller.py
  U   gocept.zeoraid/trunk/src/gocept/zeoraid/scripts/recipe.txt
  U   gocept.zeoraid/trunk/src/gocept/zeoraid/scripts/stresstest.py
  U   gocept.zeoraid/trunk/src/gocept/zeoraid/scripts/tests.py
  U   gocept.zeoraid/trunk/src/gocept/zeoraid/storage.py
  A   gocept.zeoraid/trunk/src/gocept/zeoraid/testing.py
  U   gocept.zeoraid/trunk/src/gocept/zeoraid/tests/failingstorage.py
  U   gocept.zeoraid/trunk/src/gocept/zeoraid/tests/test_basics.py
  U   gocept.zeoraid/trunk/src/gocept/zeoraid/tests/test_recovery.py
  A   gocept.zeoraid/trunk/src/gocept/zeoraid/tests/test_zeo.py

-=-
Modified: gocept.zeoraid/trunk/CHANGES.txt
===================================================================
--- gocept.zeoraid/trunk/CHANGES.txt	2009-11-13 13:05:20 UTC (rev 105599)
+++ gocept.zeoraid/trunk/CHANGES.txt	2009-11-13 13:07:22 UTC (rev 105600)
@@ -5,8 +5,29 @@
 1.0b4 (unreleased)
 ------------------
 
-- ...
+- Always format TIDs in recovery status.
 
+- Remove use of custom exceptions: RAIDErrors were pickled and send to the
+  client which didn't have ZEORaid installed causing unpickling errors.
+
+- Fix issue when storing blogs in shared mode: tpc_vote returned the serials
+  of the first store twice causing spurious RAID inconsistency errors.
+
+- Close degraded storages when registering them as degraded in __init__ by
+  calling the appropriate degradation method.
+
+- Make recovery more robust against storages that fail to open (a storage
+  failing to open caused the ZEORaid server recovery to hang).
+
+- Hack for processing ZEO server's waiting list which doesn't expect storages
+  to be used by someone else in the same process (covered with test). This
+  caused clients to stochastically hang indefinitely when committing while a
+  recovery was in progress.
+
+- Refactor some sloppy tests.
+
+- Some code cleanups.
+
 1.0b3 (2009-07-19)
 ------------------
 

Modified: gocept.zeoraid/trunk/src/gocept/zeoraid/interfaces.py
===================================================================
--- gocept.zeoraid/trunk/src/gocept/zeoraid/interfaces.py	2009-11-13 13:05:20 UTC (rev 105599)
+++ gocept.zeoraid/trunk/src/gocept/zeoraid/interfaces.py	2009-11-13 13:07:22 UTC (rev 105600)
@@ -13,20 +13,22 @@
 ##############################################################################
 """Interface descriptions"""
 
-
+import ZEO.ClientStorage
 import zope.interface
 
-import ZEO.ClientStorage
 
+def RAIDError(message):
+    e = RuntimeError(message)
+    e.created_by_zeoraid = True
+    return e
 
-class RAIDError(Exception):
-    pass
 
+def RAIDClosedError(message):
+    e = ZEO.ClientStorage.ClientStorageError(message)
+    e.created_by_zeoraid = True
+    return e
 
-class RAIDClosedError(RAIDError, ZEO.ClientStorage.ClientStorageError):
-    pass
 
-
 class IRAIDStorage(zope.interface.Interface):
     """A ZODB storage providing simple RAID capabilities."""
 

Modified: gocept.zeoraid/trunk/src/gocept/zeoraid/scripts/controller.py
===================================================================
--- gocept.zeoraid/trunk/src/gocept/zeoraid/scripts/controller.py	2009-11-13 13:05:20 UTC (rev 105599)
+++ gocept.zeoraid/trunk/src/gocept/zeoraid/scripts/controller.py	2009-11-13 13:07:22 UTC (rev 105600)
@@ -37,12 +37,9 @@
 
 """
 
-import optparse
-import sys
-
 import ZEO.ClientStorage
-
 import logging
+import optparse
 
 
 class RAIDManager(object):
@@ -76,11 +73,13 @@
     def cmd_reload(self):
         self.raid.raid_reload()
 
+
 def main(host="127.0.0.1", port=8100, storage="1"):
     usage = "usage: %prog [options] command [command-options]"
-    description = ("Connect to a RAIDStorage on a ZEO server and perform "
-                   "maintenance tasks. Available commands: status, details, "
-                   "recover <STORAGE>, disable <STORAGE>, reload </PATH/TO/ZEO.CONF>")
+    description = (
+        "Connect to a RAIDStorage on a ZEO server and perform "
+        "maintenance tasks. Available commands: status, details, "
+        "recover <STORAGE>, disable <STORAGE>, reload </PATH/TO/ZEO.CONF>")
 
     parser = optparse.OptionParser(usage=usage, description=description)
     parser.add_option("-S", "--storage", default=storage,

Modified: gocept.zeoraid/trunk/src/gocept/zeoraid/scripts/recipe.txt
===================================================================
--- gocept.zeoraid/trunk/src/gocept/zeoraid/scripts/recipe.txt	2009-11-13 13:05:20 UTC (rev 105599)
+++ gocept.zeoraid/trunk/src/gocept/zeoraid/scripts/recipe.txt	2009-11-13 13:07:22 UTC (rev 105600)
@@ -125,6 +125,7 @@
       '/sample-pyN.N.egg',
       '/sample-pyN.N.egg',
       '/sample-pyN.N.egg',
+      '/sample-pyN.N.egg',
       ]
     <BLANKLINE>
     import gocept.zeoraid.scripts.controller
@@ -223,6 +224,7 @@
       '/sample-pyN.N.egg',
       '/sample-pyN.N.egg',
       '/sample-pyN.N.egg',
+      '/sample-pyN.N.egg',
       ]
     <BLANKLINE>
     import gocept.zeoraid.scripts.controller

Modified: gocept.zeoraid/trunk/src/gocept/zeoraid/scripts/stresstest.py
===================================================================
--- gocept.zeoraid/trunk/src/gocept/zeoraid/scripts/stresstest.py	2009-11-13 13:05:20 UTC (rev 105599)
+++ gocept.zeoraid/trunk/src/gocept/zeoraid/scripts/stresstest.py	2009-11-13 13:07:22 UTC (rev 105600)
@@ -5,7 +5,6 @@
 import ZODB.POSException
 import ZEO.zrpc.error
 import ZEO.Exceptions
-import ZODB.utils
 import logging
 logging.getLogger().addHandler(logging.StreamHandler())
 logging.getLogger().setLevel(0)

Modified: gocept.zeoraid/trunk/src/gocept/zeoraid/scripts/tests.py
===================================================================
--- gocept.zeoraid/trunk/src/gocept/zeoraid/scripts/tests.py	2009-11-13 13:05:20 UTC (rev 105599)
+++ gocept.zeoraid/trunk/src/gocept/zeoraid/scripts/tests.py	2009-11-13 13:07:22 UTC (rev 105600)
@@ -27,6 +27,7 @@
     zc.buildout.testing.install('zope.event', test)
     zc.buildout.testing.install('zope.testing', test)
     zc.buildout.testing.install('zc.recipe.egg', test)
+    zc.buildout.testing.install('zope.exceptions', test)
     zc.buildout.testing.install('zdaemon', test)
     zc.buildout.testing.install('ZConfig', test)
     zc.buildout.testing.install('ZODB3', test)
@@ -40,20 +41,19 @@
     (re.compile(
     "Couldn't find index page for '[a-zA-Z0-9.]+' "
     "\(maybe misspelled\?\)"
-    "\n"
-    ), ''),
-    (re.compile('#![^\n]+\n'), ''),                
+    "\n"), ''),
+    (re.compile('#![^\n]+\n'), ''),
     (re.compile('-\S+-py\d[.]\d(-\S+)?.egg'),
      '-pyN.N.egg',
     ),
     ])
 
+
 def test_suite():
     return unittest.TestSuite((
         doctest.DocFileSuite(
             'recipe.txt',
             setUp=setUp, tearDown=zc.buildout.testing.buildoutTearDown,
             checker=checker,
-            optionflags=doctest.REPORT_NDIFF|doctest.ELLIPSIS
-            ),
+            optionflags=doctest.REPORT_NDIFF | doctest.ELLIPSIS),
         ))

Modified: gocept.zeoraid/trunk/src/gocept/zeoraid/storage.py
===================================================================
--- gocept.zeoraid/trunk/src/gocept/zeoraid/storage.py	2009-11-13 13:05:20 UTC (rev 105599)
+++ gocept.zeoraid/trunk/src/gocept/zeoraid/storage.py	2009-11-13 13:07:22 UTC (rev 105600)
@@ -13,39 +13,37 @@
 ##############################################################################
 """ZEORaid storage implementation."""
 
-import threading
-import time
-import logging
-import tempfile
-import os
-import os.path
-import shutil
-import random
-
-import zc.lockfile
-import zope.interface
-
+from ZEO.runzeo import ZEOOptions
 import ZEO.ClientStorage
 import ZEO.interfaces
 import ZODB.POSException
+import ZODB.blob
 import ZODB.interfaces
 import ZODB.utils
+import gocept.zeoraid.interfaces
+import gocept.zeoraid.recovery
+import logging
+import os
+import os.path
 import persistent.TimeStamp
+import random
+import shutil
+import tempfile
+import threading
+import time
 import transaction
 import transaction.interfaces
-import ZODB.blob
-from ZEO.runzeo import ZEOOptions
+import zc.lockfile
+import zope.interface
 
-import gocept.zeoraid.interfaces
-import gocept.zeoraid.recovery
-
 logger = logging.getLogger('gocept.zeoraid')
 
 
 def ensure_open_storage(method):
     def check_open(self, *args, **kw):
         if self.closed:
-            raise gocept.zeoraid.interfaces.RAIDClosedError("Storage has been closed.")
+            raise gocept.zeoraid.interfaces.RAIDClosedError(
+                "Storage has been closed.")
         return method(self, *args, **kw)
     return check_open
 
@@ -168,8 +166,9 @@
 
         # Set up list of degraded storages
         self.storages_degraded = []
-        for degraded_storages in tids.values():
-            self.storages_degraded.extend(degraded_storages)
+        # Degrade all remaining (non-optimal) storages
+        for name in reduce(lambda x,y:x+y, tids.values(), []):
+            self._degrade_storage(name)
 
         # No storage is recovering initially
         self.storage_recovering = None
@@ -186,8 +185,9 @@
         try:
             try:
                 self._apply_all_storages('close', expect_connected=False)
-            except gocept.zeoraid.interfaces.RAIDError:
-                pass
+            except Exception, e:
+                if not zeoraid_exception(e):
+                    raise e
         finally:
             self.closed = True
             del self.storages_optimal[:]
@@ -205,8 +205,10 @@
         """An approximate size of the database, in bytes."""
         try:
             return self._apply_single_storage('getSize')[0]
-        except gocept.zeoraid.interfaces.RAIDError:
-            return 0
+        except Exception, e:
+            if zeoraid_exception(e):
+                return 0
+            raise e
 
     def history(self, oid, version='', size=1):
         """Return a sequence of history information dictionaries."""
@@ -227,8 +229,10 @@
         """The approximate number of objects in the storage."""
         try:
             return self._apply_single_storage('__len__')[0]
-        except gocept.zeoraid.interfaces.RAIDError:
-            return 0
+        except RuntimeError, e:
+            if zeoraid_exception(e):
+                return 0
+            raise e
 
     def load(self, oid, version=''):
         """Load data for an object id and version."""
@@ -304,8 +308,8 @@
             raise ZODB.POSException.StorageTransactionError(self, transaction)
         self._write_lock.acquire()
         try:
-            self._apply_all_storages('store',
-                                     (oid, oldserial, data, version, transaction))
+            self._apply_all_storages(
+                'store', (oid, oldserial, data, version, transaction))
             return self._tid
         finally:
             self._write_lock.release()
@@ -324,6 +328,7 @@
                 self._commit_lock.release()
         finally:
             self._write_lock.release()
+        self._process_zeo_waiting()
 
     @ensure_writable
     def tpc_begin(self, transaction, tid=None, status=' '):
@@ -357,6 +362,7 @@
     def tpc_finish(self, transaction, callback=None):
         """Finish the transaction, making any transaction changes permanent.
         """
+        result = None
         self._write_lock.acquire()
         try:
             if transaction is not self._transaction:
@@ -371,13 +377,15 @@
                     # ClientStorage contradict each other and the documentation
                     # is non-existent. We trust ClientStorage here.
                     callback(self._tid)
-                return self._tid
+                result = self._tid
             finally:
                 self._transaction = None
                 self._tpc_cleanup()
                 self._commit_lock.release()
         finally:
             self._write_lock.release()
+        self._process_zeo_waiting()
+        return result
 
     def tpc_vote(self, transaction):
         """Provide a storage with an opportunity to veto a transaction."""
@@ -385,7 +393,8 @@
         try:
             if transaction is not self._transaction:
                 return
-            self._apply_all_storages('tpc_vote', (transaction,))
+            self._apply_all_storages(
+                'tpc_vote', (transaction,), filter_results=unique_serials)
         finally:
             self._write_lock.release()
 
@@ -545,19 +554,15 @@
 
     def tpc_transaction(self):
         """The current transaction being committed."""
-        # XXX Awful hack against ZEO: always return None so the transaction
-        # waiting list of ZEO is never triggered but rather we allow everybody
-        # to block in here. We need to resolve this via a discussion on
-        # zodb-dev.
-        # return self._transaction
-        return
+        return self._transaction
 
     def getTid(self, oid):
         """The last transaction to change an object."""
         return self._apply_single_storage('getTid', (oid,))[0]
 
     def getExtensionMethods(self):
-        # This method isn't officially part of the interface but it is supported.
+        # This method isn't officially part of the interface but
+        # it is supported.
         methods = dict.fromkeys(
             ['raid_recover', 'raid_status', 'raid_disable', 'raid_details',
             'raid_reload'])
@@ -705,7 +710,7 @@
     @ensure_open_storage
     def _apply_all_storages(self, method_name, args=(), kw={},
                             expect_connected=True, exclude=(),
-                            ignore_noop=False):
+                            ignore_noop=False, filter_results=lambda x: x):
         """Calls the given method on all optimal backend storages in order.
 
         `args` can be given as an n-tuple with the positional arguments that
@@ -765,8 +770,10 @@
             # must be consistent anyway.
             pass
         elif results:
+            results = dict((storage, filter_results(result))
+                           for storage, result in results.items())
             ref = results.values()[0]
-            for test in results.values():
+            for test in results.values()[1:]:
                 if test != ref:
                     consistent = False
                     break
@@ -790,10 +797,15 @@
     def _recover_impl(self, name):
         self.storages_degraded.remove(name)
         self.storage_recovering = name
-        storage = self.openers[name].open()
-        self.storages[name] = storage
+        try:
+            target = self.openers[name].open()
+        except Exception:
+            self.storage_recovering = None
+            self.storages_degraded.append(name)
+            raise
+        self.storages[name] = target
         recovery = gocept.zeoraid.recovery.Recovery(
-            self, storage, self._finalize_recovery,
+            self, target, self._finalize_recovery,
             recover_blobs=(self.blob_fshelper and not self.shared_blob_dir))
         for msg in recovery():
             self.recovery_status = msg
@@ -851,7 +863,36 @@
             except OSError:
                 pass
 
+    def _process_zeo_waiting(self):
+        if not hasattr(self, '_waiting'):
+            return
+        # XXX This is a hack because ZEO's StorageServer stores private data
+        # on us and doesn't expect us to lock/unlock ourselves without it
+        # noticing. Due to that it can happen that transactions get blocked
+        # but never restarted. We have to implement the restart dance
+        # ourselves here. We hope that ZODB will grow a mechanism to do this
+        # cleanly in the future.
+        #
+        # Restart any client waiting for the storage lock.
+        while self._waiting:
+            delay, zeo_storage = self._waiting.pop(0)
+            try:
+                zeo_storage._restart(delay)
+            except:
+                zeo_storage.log(
+                    "Unexpected error handling waiting transaction",
+                    level=logging.WARNING, exc_info=True)
+                zeo_storage.connection.close()
+                continue
 
+            if self._waiting:
+                n = len(self._waiting)
+                zeo_storage.log("Blocked transaction restarted.  "
+                         "Clients waiting: %d" % n)
+            else:
+                zeo_storage.log("Blocked transaction restarted.")
+
+
 def optimistic_copy(source, target):
     """Try creating a hard link to source at target. Fall back to copying the
     file.
@@ -867,3 +908,33 @@
         finally:
             file1.close()
             file2.close()
+
+
+def unique_serials(serials):
+    """Filter a sequence of oid/serial pairs and remove late duplicates."""
+    if serials is None:
+        return
+    # I keep both a list and a set: the list ensures order, the set ensures
+    # containment test performance.
+    seen = set()
+    result = []
+    for pair in serials:
+        if pair in result:
+            continue
+        result.append(pair)
+        seen.add(pair)
+    return result
+
+
+def zeoraid_exception(e):
+    """Determine whether the given exception is a RAID error.
+
+    Unfortunately creating custom exceptions breaks ZEO clients as the
+    exceptions might get pickled but ZEORaid code isn't installed on the
+    clients.
+
+    We thus default to raising simple exceptions that we annotate with a
+    special attribute.
+
+    """
+    return bool(getattr(e, 'created_by_zeoraid', False))

Added: gocept.zeoraid/trunk/src/gocept/zeoraid/testing.py
===================================================================
--- gocept.zeoraid/trunk/src/gocept/zeoraid/testing.py	                        (rev 0)
+++ gocept.zeoraid/trunk/src/gocept/zeoraid/testing.py	2009-11-13 13:07:22 UTC (rev 105600)
@@ -0,0 +1,32 @@
+##############################################################################
+#
+# Copyright (c) 2007-2008 Zope Foundation and contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""Test harness for gocept.zeoraid."""
+
+## Uncomment this to get helpful logging from the ZEO servers on the console
+#import logging
+#logging.getLogger().addHandler(logging.StreamHandler())
+#logging.getLogger().setLevel(0)
+
+import ZEO.ClientStorage
+
+
+class ZEOOpener(object):
+
+    def __init__(self, name, addr, **kwargs):
+        self.name = name
+        self.addr = addr
+        self.kwargs = kwargs or {}
+
+    def open(self):
+        return ZEO.ClientStorage.ClientStorage(self.addr, **self.kwargs)


Property changes on: gocept.zeoraid/trunk/src/gocept/zeoraid/testing.py
___________________________________________________________________
Added: svn:eol-style
   + native

Modified: gocept.zeoraid/trunk/src/gocept/zeoraid/tests/failingstorage.py
===================================================================
--- gocept.zeoraid/trunk/src/gocept/zeoraid/tests/failingstorage.py	2009-11-13 13:05:20 UTC (rev 105599)
+++ gocept.zeoraid/trunk/src/gocept/zeoraid/tests/failingstorage.py	2009-11-13 13:07:22 UTC (rev 105600)
@@ -13,12 +13,9 @@
 ##############################################################################
 """Unit test support."""
 
-import tempfile
-
-import ZODB.utils
-import ZODB.config
 import ZODB.FileStorage
-
+import ZODB.config
+import tempfile
 import zope.proxy
 
 

Modified: gocept.zeoraid/trunk/src/gocept/zeoraid/tests/test_basics.py
===================================================================
--- gocept.zeoraid/trunk/src/gocept/zeoraid/tests/test_basics.py	2009-11-13 13:05:20 UTC (rev 105599)
+++ gocept.zeoraid/trunk/src/gocept/zeoraid/tests/test_basics.py	2009-11-13 13:07:22 UTC (rev 105600)
@@ -13,47 +13,32 @@
 ##############################################################################
 """Test harness for gocept.zeoraid."""
 
-import random
-import unittest
-import tempfile
+from ZEO.tests import forker, ThreadTests
+from ZEO.tests.testZEO import get_port
+from ZODB.tests import RevisionStorage, PersistentStorage
+from ZODB.tests import MTStorage, ReadOnlyStorage, RecoveryStorage
+from ZODB.tests import StorageTestBase, BasicStorage
+from ZODB.tests import Synchronization, ConflictResolution, HistoryStorage
+from ZODB.tests import TransactionalUndoStorage, PackableStorage
+from gocept.zeoraid.tests.loggingstorage import LoggingStorage
+import ZEO.runzeo
+import ZODB.config
+import ZODB.interfaces
+import gocept.zeoraid.storage
+import gocept.zeoraid.tests.test_recovery
 import os
-import time
+import random
 import shutil
 import stat
+import tempfile
 import threading
-import sys
-
+import time
+import transaction
+import unittest
 import zc.lockfile
 import zope.interface.verify
 
-import persistent.dict
-import transaction
 
-from ZODB.tests import StorageTestBase, BasicStorage, \
-             TransactionalUndoStorage, PackableStorage, \
-             Synchronization, ConflictResolution, HistoryStorage, \
-             Corruption, RevisionStorage, PersistentStorage, \
-             MTStorage, ReadOnlyStorage, RecoveryStorage
-
-import gocept.zeoraid.storage
-import gocept.zeoraid.tests.test_recovery
-from gocept.zeoraid.tests.loggingstorage import LoggingStorage
-
-from ZEO.ClientStorage import ClientStorage
-from ZEO.tests import forker, CommitLockTests, ThreadTests
-from ZEO.tests.testZEO import get_port
-import ZEO.runzeo
-
-import ZODB.interfaces
-import ZEO.interfaces
-import ZODB.config
-
-# Uncomment this to get helpful logging from the ZEO servers on the console
-#import logging
-#logging.getLogger().addHandler(logging.StreamHandler())
-#logging.getLogger().setLevel(0)
-
-
 def fail(obj, name):
     old_method = getattr(obj, name)
     def failing_method(*args, **kw):
@@ -62,22 +47,11 @@
     setattr(obj, name, failing_method)
 
 
-class ZEOOpener(object):
-
-    def __init__(self, name, addr, **kwargs):
-        self.name = name
-        self.addr = addr
-        self.kwargs = kwargs or {}
-
-    def open(self):
-        return ClientStorage(self.addr, **self.kwargs)
-
-
 class ZEOStorageBackendTests(StorageTestBase.StorageTestBase):
 
     def open(self, **kwargs):
-        self._storage = gocept.zeoraid.storage.RAIDStorage('teststorage',
-                                                           self._storages, **kwargs)
+        self._storage = gocept.zeoraid.storage.RAIDStorage(
+            'teststorage', self._storages, **kwargs)
 
     def setUp(self):
         self._server_storage_files = []
@@ -87,13 +61,13 @@
         for i in xrange(5):
             port = get_port()
             zconf = forker.ZEOConfig(('', port))
-            zport, adminaddr, pid, path = forker.start_zeo_server(self.getConfig(),
-                                                                  zconf, port)
+            zport, adminaddr, pid, path = forker.start_zeo_server(
+                self.getConfig(), zconf, port)
             self._pids.append(pid)
             self._servers.append(adminaddr)
-            self._storages.append(ZEOOpener(str(i), zport, storage='1',
-                                            min_disconnect_poll=0.5, wait=1,
-                                            wait_timeout=60))
+            self._storages.append(gocept.zeoraid.testing.ZEOOpener(
+                str(i), zport, storage='1',
+                min_disconnect_poll=0.5, wait=1, wait_timeout=60))
         self.open()
 
     def getConfig(self):
@@ -115,6 +89,7 @@
             if os.path.exists(file):
                 os.unlink(file)
 
+
 class ReplicationStorageTests(BasicStorage.BasicStorage,
         TransactionalUndoStorage.TransactionalUndoStorage,
         RevisionStorage.RevisionStorage,
@@ -174,11 +149,10 @@
             blob_dir = tempfile.mkdtemp()
             self.temp_paths.append(blob_dir)
             self._servers.append(adminaddr)
-            self._storages.append(ZEOOpener(str(i), zport, storage='1',
-                                            cache_size=12,
-                                            blob_dir=blob_dir,
-                                            min_disconnect_poll=0.5, wait=1,
-                                            wait_timeout=60))
+            self._storages.append(gocept.zeoraid.testing.ZEOOpener(
+                str(i), zport, storage='1', cache_size=12,
+                blob_dir=blob_dir, min_disconnect_poll=0.5, wait=1,
+                wait_timeout=60))
         blob_dir = tempfile.mkdtemp()
         self.temp_paths.append(blob_dir)
         self._storage = gocept.zeoraid.storage.RAIDStorage(
@@ -218,12 +192,10 @@
                 zconf, port)
             self._pids.append(pid)
             self._servers.append(adminaddr)
-            self._storages.append(ZEOOpener(str(i), zport, storage='1',
-                                            cache_size=12,
-                                            blob_dir=blob_dir,
-                                            shared_blob_dir=True,
-                                            min_disconnect_poll=0.5, wait=1,
-                                            wait_timeout=60))
+            self._storages.append(gocept.zeoraid.testing.ZEOOpener(
+                str(i), zport, storage='1', cache_size=12, blob_dir=blob_dir,
+                shared_blob_dir=True, min_disconnect_poll=0.5, wait=1,
+                wait_timeout=60))
         self._storage = gocept.zeoraid.storage.RAIDStorage(
             'teststorage', self._storages, blob_dir=blob_dir,
             shared_blob_dir=True)
@@ -328,8 +300,7 @@
         self.assertEquals(1, len(self._storage.history(oid, '')))
 
         self._disable_storage(0)
-        self.assertRaises(gocept.zeoraid.interfaces.RAIDError,
-                          self._storage.history, oid, '')
+        self.assertRaises(RuntimeError, self._storage.history, oid, '')
 
     def test_history_degrading(self):
         oid = self._storage.new_oid()
@@ -344,8 +315,7 @@
         self.assertEquals('degraded', self._storage.raid_status())
 
         self._backend(0).fail('history')
-        self.assertRaises(gocept.zeoraid.interfaces.RAIDError,
-                          self._storage.history, oid, '')
+        self.assertRaises(RuntimeError, self._storage.history, oid, '')
         self.assertEquals('failed', self._storage.raid_status())
 
     def test_lastTransaction(self):
@@ -363,8 +333,7 @@
         self.assertEquals(None, self._storage.lastTransaction())
         self._disable_storage(0)
         self.assertEquals('failed', self._storage.raid_status())
-        self.assertRaises(gocept.zeoraid.interfaces.RAIDError,
-                          self._storage.lastTransaction)
+        self.assertRaises(RuntimeError, self._storage.lastTransaction)
 
     def test_len_degrading(self):
         # Brrrr. ClientStorage doesn't seem to implement __len__ correctly.
@@ -400,8 +369,9 @@
 
         self._dostore(oid=oid, revid='\x00\x00\x00\x00\x00\x00\x00\x01')
         data_record, serial = self._storage.load(oid)
-        self.assertEquals('((U\x10ZODB.tests.MinPOq\x01U\x05MinPOq\x02tq\x03Nt.}q\x04U\x05valueq\x05K\x07s.',
-                          data_record)
+        self.assertEquals(
+            '((U\x10ZODB.tests.MinPOq\x01U\x05MinPOq\x02tq\x03Nt.}'
+            'q\x04U\x05valueq\x05K\x07s.', data_record)
         self.assertEquals(self._storage.lastTransaction(), serial)
         self.assertEquals((data_record, serial), self._backend(0).load(oid))
         self.assertEquals((data_record, serial), self._backend(1).load(oid))
@@ -417,8 +387,7 @@
         self.assertEquals((data_record, serial), self._backend(0).load(oid))
 
         self._disable_storage(0)
-        self.assertRaises(gocept.zeoraid.interfaces.RAIDError,
-                          self._storage.load, oid)
+        self.assertRaises(RuntimeError, self._storage.load, oid)
 
     def test_load_can_be_failed(self):
         # ClientStorage does not directly call `load` but
@@ -438,15 +407,15 @@
         self._dostore(oid=oid, revid='\x00\x00\x00\x00\x00\x00\x00\x01')
         self._backend(0).fail('load')
         data_record, serial = self._storage.load(oid)
-        self.assertEquals('((U\x10ZODB.tests.MinPOq\x01U\x05MinPOq\x02tq\x03Nt.}q\x04U\x05valueq\x05K\x07s.',
-                          data_record)
+        self.assertEquals(
+            '((U\x10ZODB.tests.MinPOq\x01U\x05MinPOq\x02tq\x03Nt.}'
+            'q\x04U\x05valueq\x05K\x07s.', data_record)
         self.assertEquals(self._storage.lastTransaction(), serial)
         self.assertEquals((data_record, serial), self._backend(0).load(oid))
         self.assertEquals('degraded', self._storage.raid_status())
 
         self._backend(0).fail('load')
-        self.assertRaises(gocept.zeoraid.interfaces.RAIDError,
-                          self._storage.load, oid)
+        self.assertRaises(RuntimeError, self._storage.load, oid)
         self.assertEquals('failed', self._storage.raid_status())
 
     def test_loadBefore_degrading1(self):
@@ -480,8 +449,7 @@
                           self._backend(0).loadBefore(oid, revid2))
 
         self._disable_storage(0)
-        self.assertRaises(gocept.zeoraid.interfaces.RAIDError,
-                          self._storage.loadBefore, oid, revid2)
+        self.assertRaises(RuntimeError, self._storage.loadBefore, oid, revid2)
 
     def test_loadBefore_degrading2(self):
         oid = self._storage.new_oid()
@@ -502,8 +470,7 @@
         self.assertEquals('degraded', self._storage.raid_status())
 
         self._backend(0).fail('loadBefore')
-        self.assertRaises(gocept.zeoraid.interfaces.RAIDError,
-                          self._storage.loadBefore, oid, revid2)
+        self.assertRaises(RuntimeError, self._storage.loadBefore, oid, revid2)
         self.assertEquals('failed', self._storage.raid_status())
 
     def test_loadSerial_degrading1(self):
@@ -539,8 +506,7 @@
                           self._backend(0).loadSerial(oid, revid))
 
         self._disable_storage(0)
-        self.assertRaises(gocept.zeoraid.interfaces.RAIDError,
-                          self._storage.loadSerial, oid, revid)
+        self.assertRaises(RuntimeError, self._storage.loadSerial, oid, revid)
 
     def test_loadSerial_degrading2(self):
         oid = self._storage.new_oid()
@@ -563,8 +529,7 @@
         self.assertEquals('degraded', self._storage.raid_status())
 
         self._backend(0).fail('loadSerial')
-        self.assertRaises(gocept.zeoraid.interfaces.RAIDError,
-                          self._storage.loadSerial, oid, revid)
+        self.assertRaises(RuntimeError, self._storage.loadSerial, oid, revid)
         self.assertEquals('failed', self._storage.raid_status())
 
     def test_new_oid_degrading1(self):
@@ -572,8 +537,7 @@
         self._disable_storage(0)
         self.assertEquals(8, len(self._storage.new_oid()))
         self._disable_storage(0)
-        self.assertRaises(gocept.zeoraid.interfaces.RAIDError,
-                          self._storage.new_oid)
+        self.assertRaises(RuntimeError, self._storage.new_oid)
 
     def test_new_oid_degrading2(self):
         self.assertEquals(8, len(self._storage.new_oid()))
@@ -586,8 +550,7 @@
 
         self._backend(0)._oids = None
         self._backend(0).fail('new_oid')
-        self.assertRaises(gocept.zeoraid.interfaces.RAIDError,
-                          self._storage.new_oid)
+        self.assertRaises(RuntimeError, self._storage.new_oid)
         self.assertEquals('failed', self._storage.raid_status())
 
     def test_new_oid_unsynchronised_degrading(self):
@@ -629,7 +592,7 @@
         self._dostore(oid=oid, revid=revid3, data=4)
         self.assertEquals(256, self._storage.getSize())
         self._disable_storage(0)
-        self.assertRaises(gocept.zeoraid.interfaces.RAIDError,
+        self.assertRaises(RuntimeError,
                           self._storage.pack,
                           time.time(), ZODB.serialize.referencesf)
 
@@ -654,7 +617,7 @@
         self.assertEquals(256, self._storage.getSize())
 
         self._backend(0).fail('pack')
-        self.assertRaises(gocept.zeoraid.interfaces.RAIDError,
+        self.assertRaises(RuntimeError,
                           self._storage.pack,
                           time.time(), ZODB.serialize.referencesf)
         self.assertEquals('failed', self._storage.raid_status())
@@ -669,7 +632,7 @@
         self.assertEquals('degraded', self._storage.raid_status())
 
         self._backend(0).fail('store')
-        self.assertRaises(gocept.zeoraid.interfaces.RAIDError,
+        self.assertRaises(RuntimeError,
                           self._dostoreNP,
                           oid=oid, revid=revid, data='bar')
         self.assertEquals('failed', self._storage.raid_status())
@@ -684,9 +647,7 @@
 
         oid = self._storage.new_oid()
         self._backend(0).fail('tpc_begin')
-        self.assertRaises(gocept.zeoraid.interfaces.RAIDError,
-                          self._dostoreNP,
-                          oid=oid, data='bar')
+        self.assertRaises(RuntimeError, self._dostoreNP, oid=oid, data='bar')
         self.assertEquals('failed', self._storage.raid_status())
 
     def test_tpc_vote_degrading(self):
@@ -699,9 +660,7 @@
 
         oid = self._storage.new_oid()
         self._backend(0).fail('tpc_vote')
-        self.assertRaises(gocept.zeoraid.interfaces.RAIDError,
-                          self._dostoreNP,
-                          oid=oid, data='bar')
+        self.assertRaises(RuntimeError, self._dostoreNP, oid=oid, data='bar')
         self.assertEquals('failed', self._storage.raid_status())
 
     def test_tpc_finish_degrading(self):
@@ -714,9 +673,7 @@
 
         oid = self._storage.new_oid()
         self._backend(0).fail('tpc_finish')
-        self.assertRaises(gocept.zeoraid.interfaces.RAIDError,
-                          self._dostoreNP,
-                          oid=oid, data='bar')
+        self.assertRaises(RuntimeError, self._dostoreNP, oid=oid, data='bar')
         self.assertEquals('failed', self._storage.raid_status())
 
     def test_tpc_abort_not_degrading(self):
@@ -788,7 +745,7 @@
         self._storage.tpc_begin(t)
         self._disable_storage(0)
         self._disable_storage(0)
-        self.assertRaises(gocept.zeoraid.interfaces.RAIDError,
+        self.assertRaises(RuntimeError,
                           self._storage.storeBlob,
                           oid, ZODB.utils.z64, 'foo', blob_file_name, '', t)
 
@@ -824,7 +781,7 @@
         # when tpc_vote ist called.
         self._storage.storeBlob(
             oid, ZODB.utils.z64, 'foo', blob_file_name, '', t)
-        self.assertRaises(gocept.zeoraid.interfaces.RAIDError,
+        self.assertRaises(RuntimeError,
                           self._storage.tpc_vote, t)
 
     def test_storeBlob_degrading3(self):
@@ -856,7 +813,7 @@
             raise Exception()
         self._backend(0).storeBlob = fail
         self._backend(1).storeBlob = fail
-        self.assertRaises(gocept.zeoraid.interfaces.RAIDError,
+        self.assertRaises(RuntimeError,
                           self._storage.storeBlob,
                           oid, ZODB.utils.z64, 'foo', blob_file_name, '', t)
 
@@ -885,7 +842,7 @@
         os.remove(stored_file_name)
 
         self._disable_storage(0)
-        self.assertRaises(gocept.zeoraid.interfaces.RAIDError,
+        self.assertRaises(RuntimeError,
                           self._storage.loadBlob, oid, last_transaction)
 
     def test_loadBlob_degrading2(self):
@@ -918,7 +875,7 @@
         os.unlink(b0_filename)
 
         self._backend(0).fail('loadBlob')
-        self.assertRaises(gocept.zeoraid.interfaces.RAIDError,
+        self.assertRaises(RuntimeError,
                           self._storage.loadBlob, oid, last_transaction)
         self.assertEquals('failed', self._storage.raid_status())
 
@@ -977,6 +934,7 @@
     def test_supportsUndo_required(self):
         class Opener(object):
             name = 'foo'
+
             def open(self):
                 return ZODB.MappingStorage.MappingStorage()
 
@@ -1010,9 +968,7 @@
         t = transaction.Transaction()
         self._storage.tpc_begin(t)
         self._disable_storage(0)
-        self.assertRaises(gocept.zeoraid.interfaces.RAIDError,
-                          self._storage.undo,
-                          info[2]['id'], t)
+        self.assertRaises(RuntimeError, self._storage.undo, info[2]['id'], t)
 
     def test_undo_degrading2(self):
         oid = self._storage.new_oid()
@@ -1038,9 +994,7 @@
         t = transaction.Transaction()
         self._storage.tpc_begin(t)
         self._backend(0).fail('undo')
-        self.assertRaises(gocept.zeoraid.interfaces.RAIDError,
-                          self._storage.undo,
-                          info[2]['id'], t)
+        self.assertRaises(RuntimeError, self._storage.undo, info[2]['id'], t)
         self.assertEquals('failed', self._storage.raid_status())
 
     def test_undoLog_degrading1(self):
@@ -1056,8 +1010,7 @@
         self.assertEquals(2, len(info))
 
         self._disable_storage(0)
-        self.assertRaises(gocept.zeoraid.interfaces.RAIDError,
-                          self._storage.undoLog)
+        self.assertRaises(RuntimeError, self._storage.undoLog)
 
     def test_undoLog_degrading2(self):
         oid = self._storage.new_oid()
@@ -1073,8 +1026,7 @@
         self.assertEquals(2, len(info))
 
         self._backend(0).fail('undoLog')
-        self.assertRaises(gocept.zeoraid.interfaces.RAIDError,
-                          self._storage.undoLog)
+        self.assertRaises(RuntimeError, self._storage.undoLog)
         self.assertEquals('failed', self._storage.raid_status())
 
     def test_undoInfo_degrading1(self):
@@ -1090,8 +1042,7 @@
         self.assertEquals(2, len(info))
 
         self._disable_storage(0)
-        self.assertRaises(gocept.zeoraid.interfaces.RAIDError,
-                          self._storage.undoInfo)
+        self.assertRaises(RuntimeError, self._storage.undoInfo)
 
     def test_undoInfo_degrading2(self):
         oid = self._storage.new_oid()
@@ -1107,8 +1058,7 @@
         self.assertEquals(2, len(info))
 
         self._backend(0).fail('undoInfo')
-        self.assertRaises(gocept.zeoraid.interfaces.RAIDError,
-                          self._storage.undoInfo)
+        self.assertRaises(RuntimeError, self._storage.undoInfo)
         self.assertEquals('failed', self._storage.raid_status())
 
     def test_record_iternext(self):
@@ -1138,8 +1088,7 @@
         self.assertEquals('0', data)
 
         self._disable_storage(0)
-        self.assertRaises(gocept.zeoraid.interfaces.RAIDError,
-                          self._storage.record_iternext, next)
+        self.assertRaises(RuntimeError, self._storage.record_iternext, next)
 
     def test_record_iternext_degrading2(self):
         for x in range(5):
@@ -1152,8 +1101,7 @@
         self.assertEquals('degraded', self._storage.raid_status())
 
         self._backend(0).fail('record_iternext')
-        self.assertRaises(gocept.zeoraid.interfaces.RAIDError,
-                          self._storage.record_iternext, next)
+        self.assertRaises(RuntimeError, self._storage.record_iternext, next)
         self.assertEquals('failed', self._storage.raid_status())
 
     def test_gettid_degrading1(self):
@@ -1165,8 +1113,7 @@
         self.assertEquals(revid, tid)
 
         self._disable_storage(0)
-        self.assertRaises(gocept.zeoraid.interfaces.RAIDError,
-                          self._storage.getTid, oid)
+        self.assertRaises(RuntimeError, self._storage.getTid, oid)
 
     def test_gettid_degrading2(self):
         oid = self._storage.new_oid()
@@ -1178,8 +1125,7 @@
         self.assertEquals('degraded', self._storage.raid_status())
 
         self._backend(0).fail('getTid')
-        self.assertRaises(gocept.zeoraid.interfaces.RAIDError,
-                          self._storage.getTid, oid)
+        self.assertRaises(RuntimeError, self._storage.getTid, oid)
         self.assertEquals('failed', self._storage.raid_status())
 
     def test_tpc_transaction_finishing(self):
@@ -1395,7 +1341,7 @@
         self._storage.tpc_begin(t)
         fail(self._backend(0), 'storeBlob')
         fail(self._backend(1), 'storeBlob')
-        self.assertRaises(gocept.zeoraid.interfaces.RAIDError,
+        self.assertRaises(RuntimeError,
                           self._storage.storeBlob,
                           oid, ZODB.utils.z64, 'foo', blob_file_name, '', t)
 
@@ -1512,9 +1458,9 @@
                                                               zconf, port)
         self._pids.append(pid)
         self._servers.append(adminaddr)
-        self._storages.append(ZEOOpener('5', zport, storage='1',
-                                        min_disconnect_poll=0.5, wait=1,
-                                        wait_timeout=60))
+        self._storages.append(gocept.zeoraid.testing.ZEOOpener(
+            '5', zport, storage='1', min_disconnect_poll=0.5, wait=1,
+            wait_timeout=60))
 
         # configure the RAID to use the new backend
         self.update_config()

Modified: gocept.zeoraid/trunk/src/gocept/zeoraid/tests/test_recovery.py
===================================================================
--- gocept.zeoraid/trunk/src/gocept/zeoraid/tests/test_recovery.py	2009-11-13 13:05:20 UTC (rev 105599)
+++ gocept.zeoraid/trunk/src/gocept/zeoraid/tests/test_recovery.py	2009-11-13 13:07:22 UTC (rev 105600)
@@ -32,6 +32,13 @@
 import gocept.zeoraid.recovery
 
 
+class Opener(object):
+
+    def __init__(self, name, open):
+        self.name = name
+        self.open = open
+
+
 def compare(test, source, target):
     recovery = gocept.zeoraid.recovery.Recovery(
         source, target, lambda target: None)
@@ -170,8 +177,14 @@
 
     def setUp(self):
         self.temp_paths = []
-        self.source = ZODB.FileStorage.FileStorage(tempfile.mktemp())
-        self.target = ZODB.FileStorage.FileStorage(tempfile.mktemp())
+        source_path = tempfile.mktemp()
+        target_path = tempfile.mktemp()
+        self.source_opener = Opener(
+            'source', lambda: ZODB.FileStorage.FileStorage(source_path))
+        self.target_opener = Opener(
+            'target', lambda: ZODB.FileStorage.FileStorage(target_path))
+        self.source = self.source_opener.open()
+        self.target = self.target_opener.open()
         self.recovery = gocept.zeoraid.recovery.Recovery(
             self.source, self.target, lambda target: None)
 
@@ -190,17 +203,9 @@
         else:
             blob_dir = None
 
-        class Opener(object):
-            def __init__(self, name, storage):
-                self.name = name
-                self.storage = storage
-
-            def open(self):
-                return self.storage
-
         return gocept.zeoraid.storage.RAIDStorage(
             'raid',
-            [Opener('source', self.source), Opener('target', self.target)],
+            [self.source_opener, self.target_opener],
             blob_dir=blob_dir, shared_blob_dir=self.shared)
 
     def test_verify_both_empty(self):
@@ -343,6 +348,8 @@
     def test_recover_raid_storage(self):
         self.store([self.source, self.target])
         self.store([self.source])
+        self.source.close()
+        self.target.close()
         raid = self.setup_raid()
         self.assertEquals('degraded', raid.raid_status())
         raid.raid_recover('target')
@@ -352,6 +359,9 @@
                 break
         else:
             self.fail('Timeout while RAID recovers.')
+        raid.close()
+        self.source = self.source_opener.open()
+        self.target = self.target_opener.open()
         self.compare(self.source, self.target)
 
 
@@ -365,12 +375,19 @@
         else:
             target_blob_dir = tempfile.mkdtemp()
             self.temp_paths.append(target_blob_dir)
-        self.source = ZODB.blob.BlobStorage(
-            source_blob_dir,
-            ZODB.FileStorage.FileStorage(tempfile.mktemp()))
-        self.target = ZODB.blob.BlobStorage(
-            target_blob_dir,
-            ZODB.FileStorage.FileStorage(tempfile.mktemp()))
+
+        source_path = tempfile.mktemp()
+        target_path = tempfile.mktemp()
+
+        self.source_opener = Opener('source', lambda:
+            ZODB.blob.BlobStorage(source_blob_dir,
+                ZODB.FileStorage.FileStorage(source_path)))
+        self.target_opener = Opener('target', lambda:
+            ZODB.blob.BlobStorage(target_blob_dir,
+                ZODB.FileStorage.FileStorage(target_path)))
+
+        self.source = self.source_opener.open()
+        self.target = self.target_opener.open()
         self.recovery = gocept.zeoraid.recovery.Recovery(
             self.source, self.target, lambda target: None)
 

Added: gocept.zeoraid/trunk/src/gocept/zeoraid/tests/test_zeo.py
===================================================================
--- gocept.zeoraid/trunk/src/gocept/zeoraid/tests/test_zeo.py	                        (rev 0)
+++ gocept.zeoraid/trunk/src/gocept/zeoraid/tests/test_zeo.py	2009-11-13 13:07:22 UTC (rev 105600)
@@ -0,0 +1,122 @@
+##############################################################################
+#
+# Copyright (c) 2007-2008 Zope Foundation and contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""Tests for ZEORaid being served via ZEO."""
+
+from ZEO.tests import forker
+from ZEO.tests.testZEO import get_port
+from ZODB.tests import StorageTestBase
+import gocept.zeoraid.testing
+import os
+import tempfile
+import transaction
+import ZODB.FileStorage
+import time
+
+
+class ZEOServedTests(StorageTestBase.StorageTestBase):
+
+    def setUp(self):
+        self._server_storage_files = []
+        self._servers = []
+        self._pids = []
+        self._storages = []
+
+    def tearDown(self):
+        for s in self._storages:
+            s.close()
+        for server in self._servers:
+            forker.shutdown_zeo_server(server)
+        for pid in self._pids:
+            os.waitpid(pid, 0)
+        for file in self._server_storage_files:
+            if os.path.exists(file):
+                os.unlink(file)
+
+    def test_committing_during_recovery(self):
+        # Step 1: Create a ZODB with a good bit of content so that recovery
+        # will take a while. Also has to be spread over multiple largeish
+        # transactions.
+        filename1 = tempfile.mktemp()
+        self._server_storage_files.append(filename1)
+        storage = ZODB.FileStorage.FileStorage(filename1)
+        self._storages.append(storage)
+        for txn in range(100):
+            t = transaction.Transaction()
+            storage.tpc_begin(t)
+            for record in range(100):
+                storage.store(storage.new_oid(), '\0' * 8, 'fdsafsdafdsa' * 10,
+                              '', t)
+            storage.tpc_vote(t)
+            storage.tpc_finish(t)
+        storage.close()
+        self._storages.pop()
+
+        # Step 2: Fire up a ZEO server that serves two filestorage: an empty
+        # one and the prepared one
+        port = get_port()
+        zconf = forker.ZEOConfig(('', port))
+        self._server_storage_files.append(tempfile.mktemp())
+        config = """\
+        %%import gocept.zeoraid
+        <raidstorage 1>
+            <filestorage 1>
+            path %s
+            </filestorage>
+            <filestorage 2>
+            path %s
+            </filestorage>
+        </raidstorage>
+        """ % tuple(self._server_storage_files)
+        zport, adminaddr, pid, path = forker.start_zeo_server(
+            config, zconf, port)
+        self._pids.append(pid)
+        self._servers.append(adminaddr)
+
+        raid = gocept.zeoraid.testing.ZEOOpener(
+            '1', zport, storage='1', min_disconnect_poll=0.5, wait=1,
+            wait_timeout=60).open()
+        self._storages.append(raid)
+
+        self.assertEquals([['1'], None, ['2'], ''], raid.raid_details())
+        self.assertEquals('degraded', raid.raid_status())
+
+        raid.raid_recover('2')
+
+        # Wait until the storage starts to recover
+        status = ''
+        while  status != 'recover':
+            status = raid.raid_details()[-1]
+            if isinstance(status, tuple):
+                status = status[0]
+            else:
+                status = ''
+            time.sleep(0.5)
+
+        # Now, hammer the storage with more transactions with a high chance of
+        # triggering the waiting list
+        # This test is kinda bad: if it fails it gets stuck.
+        for txn in range(100):
+            t = transaction.Transaction()
+            raid.tpc_begin(t)
+            for record in range(100):
+                raid.store(raid.new_oid(), '\0' * 8, 'fdsafsdafdsa' * 10,
+                           '', t)
+            raid.tpc_vote(t)
+            raid.tpc_finish(t)
+
+        # Wait for the recovery to finish
+        while raid.raid_status() == 'recovering':
+            time.sleep(1)
+
+        self.assertEquals('optimal', raid.raid_status())


Property changes on: gocept.zeoraid/trunk/src/gocept/zeoraid/tests/test_zeo.py
___________________________________________________________________
Added: svn:eol-style
   + native



More information about the checkins mailing list