[Checkins] SVN: gocept.zeoraid/ Created workspace for and imported existing code of gocept.zeoraid.

Thomas Lotze tl at gocept.com
Mon Jan 7 06:52:56 EST 2008


Log message for revision 82727:
  Created workspace for and imported existing code of gocept.zeoraid.
  

Changed:
  A   gocept.zeoraid/
  A   gocept.zeoraid/branches/
  A   gocept.zeoraid/tags/
  A   gocept.zeoraid/trunk/
  A   gocept.zeoraid/trunk/COPYRIGHT.txt
  A   gocept.zeoraid/trunk/LICENSE.txt
  A   gocept.zeoraid/trunk/buildout.cfg
  A   gocept.zeoraid/trunk/setup.py
  A   gocept.zeoraid/trunk/src/
  A   gocept.zeoraid/trunk/src/gocept/
  A   gocept.zeoraid/trunk/src/gocept/__init__.py
  A   gocept.zeoraid/trunk/src/gocept/zeoraid/
  A   gocept.zeoraid/trunk/src/gocept/zeoraid/README.txt
  A   gocept.zeoraid/trunk/src/gocept/zeoraid/ROADMAP.txt
  A   gocept.zeoraid/trunk/src/gocept/zeoraid/__init__.py
  A   gocept.zeoraid/trunk/src/gocept/zeoraid/component.xml
  A   gocept.zeoraid/trunk/src/gocept/zeoraid/datatypes.py
  A   gocept.zeoraid/trunk/src/gocept/zeoraid/manage.py
  A   gocept.zeoraid/trunk/src/gocept/zeoraid/patches.py
  A   gocept.zeoraid/trunk/src/gocept/zeoraid/storage.py
  A   gocept.zeoraid/trunk/src/gocept/zeoraid/tests/
  A   gocept.zeoraid/trunk/src/gocept/zeoraid/tests/__init__.py
  A   gocept.zeoraid/trunk/src/gocept/zeoraid/tests/test_basics.py

-=-
Added: gocept.zeoraid/trunk/COPYRIGHT.txt
===================================================================
--- gocept.zeoraid/trunk/COPYRIGHT.txt	                        (rev 0)
+++ gocept.zeoraid/trunk/COPYRIGHT.txt	2008-01-07 11:52:54 UTC (rev 82727)
@@ -0,0 +1,9 @@
+Copyright (c) 2007 gocept gmbh & co. kg and contributors.
+All Rights Reserved.
+
+This software is subject to the provisions of the Zope Public License,
+Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
+THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+FOR A PARTICULAR PURPOSE. 


Property changes on: gocept.zeoraid/trunk/COPYRIGHT.txt
___________________________________________________________________
Name: svn:keywords
   + Id Rev Date
Name: svn:eol-style
   + native

Added: gocept.zeoraid/trunk/LICENSE.txt
===================================================================
--- gocept.zeoraid/trunk/LICENSE.txt	                        (rev 0)
+++ gocept.zeoraid/trunk/LICENSE.txt	2008-01-07 11:52:54 UTC (rev 82727)
@@ -0,0 +1,44 @@
+Zope Public License (ZPL) Version 2.1
+
+A copyright notice accompanies this license document that identifies the
+copyright holders.
+
+This license has been certified as open source. It has also been designated as
+GPL compatible by the Free Software Foundation (FSF).
+
+Redistribution and use in source and binary forms, with or without
+modification, are permitted provided that the following conditions are met:
+
+1. Redistributions in source code must retain the accompanying copyright
+notice, this list of conditions, and the following disclaimer.
+
+2. Redistributions in binary form must reproduce the accompanying copyright
+notice, this list of conditions, and the following disclaimer in the
+documentation and/or other materials provided with the distribution.
+
+3. Names of the copyright holders must not be used to endorse or promote
+products derived from this software without prior written permission from the
+copyright holders.
+
+4. The right to distribute this software or to use it for any purpose does not
+give you the right to use Servicemarks (sm) or Trademarks (tm) of the copyright
+holders. Use of them is covered by separate agreement with the copyright
+holders.
+
+5. If any files are modified, you must cause the modified files to carry
+prominent notices stating that you changed the files and the date of any
+change.
+
+Disclaimer
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS ``AS IS'' AND ANY EXPRESSED
+OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
+MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
+EVENT SHALL THE COPYRIGHT HOLDERS BE LIABLE FOR ANY DIRECT, INDIRECT,
+INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
+PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE
+OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
+ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+


Property changes on: gocept.zeoraid/trunk/LICENSE.txt
___________________________________________________________________
Name: svn:keywords
   + Id Rev Date
Name: svn:eol-style
   + native

Added: gocept.zeoraid/trunk/buildout.cfg
===================================================================
--- gocept.zeoraid/trunk/buildout.cfg	                        (rev 0)
+++ gocept.zeoraid/trunk/buildout.cfg	2008-01-07 11:52:54 UTC (rev 82727)
@@ -0,0 +1,57 @@
+[buildout]
+develop = . /home/ctheune/Development/ZODB.trunk/
+parts = test client server1 server2 zeoraid
+find-links = http://download.zope.org/distribution/
+
+
+[test]
+recipe = zc.recipe.testrunner
+eggs = gocept.zeoraid
+working-directory = /tmp/testrunner2/
+
+# Some demo parts
+
+[server1]
+recipe = zc.zodbrecipes:server
+zeo.conf = 
+    <zeo>
+    address 8101
+    </zeo>
+    <filestorage 1>
+    path /tmp/Data1.fs
+    </filestorage>
+
+[server2]
+recipe = zc.zodbrecipes:server
+zeo.conf = 
+    <zeo>
+    address 8102
+    </zeo>
+    <filestorage 1>
+    path /tmp/Data2.fs
+    </filestorage>
+
+[zeoraid]
+recipe = zc.zodbrecipes:server
+eggs = gocept.zeoraid
+zeo.conf =
+    <zeo>
+    address 8100
+    </zeo>
+    %import gocept.zeoraid
+    <raidstorage 1>
+        <zeoclient 1>
+            server localhost:8101
+            storage 1
+        </zeoclient>
+        <zeoclient 2>
+            server localhost:8102
+            storage 1
+        </zeoclient>
+    </raidstorage>
+
+[client]
+recipe = zc.recipe.egg
+eggs = ZODB3<3.9dev
+    gocept.zeoraid
+interpreter = client

Added: gocept.zeoraid/trunk/setup.py
===================================================================
--- gocept.zeoraid/trunk/setup.py	                        (rev 0)
+++ gocept.zeoraid/trunk/setup.py	2008-01-07 11:52:54 UTC (rev 82727)
@@ -0,0 +1,28 @@
+from setuptools import setup, find_packages
+
+name = "gocept.zeoraid"
+setup(
+    name = name,
+    version = "dev",
+    author = "Christian Theune",
+    author_email = "ct at gocept.com",
+    description = "A ZODB storage for replication using RAID techniques.",
+    long_description = open('src/gocept/zeoraid/README.txt').read(),
+    license = "ZPL 2.1",
+    keywords = "zodb buildout",
+    classifiers = ["Framework :: Buildout"],
+    url='http://launchpad.net/'+name,
+    zip_safe=False,
+    packages = find_packages('src'),
+    include_package_data = True,
+    package_dir = {'':'src'},
+    namespace_packages = ['gocept'],
+    install_requires = ['setuptools', 'ZODB3'],
+    extras_require = {
+        'recipe': ['zc.buildout']
+    },
+    entry_points = {
+        'zc.buildout': [
+            'default = %s.recipe:Recipe [recipe]' % name,
+        ]},
+    )


Property changes on: gocept.zeoraid/trunk/setup.py
___________________________________________________________________
Name: svn:keywords
   + Id Rev Date
Name: svn:eol-style
   + native

Added: gocept.zeoraid/trunk/src/gocept/__init__.py
===================================================================
--- gocept.zeoraid/trunk/src/gocept/__init__.py	                        (rev 0)
+++ gocept.zeoraid/trunk/src/gocept/__init__.py	2008-01-07 11:52:54 UTC (rev 82727)
@@ -0,0 +1,6 @@
+# namespace package boilerplate
+try:
+    __import__('pkg_resources').declare_namespace(__name__)
+except ImportError, e:
+    from pkgutil import extend_path
+    __path__ = extend_path(__path__, __name__)


Property changes on: gocept.zeoraid/trunk/src/gocept/__init__.py
___________________________________________________________________
Name: svn:keywords
   + Id Rev Date
Name: svn:eol-style
   + native

Added: gocept.zeoraid/trunk/src/gocept/zeoraid/README.txt
===================================================================
--- gocept.zeoraid/trunk/src/gocept/zeoraid/README.txt	                        (rev 0)
+++ gocept.zeoraid/trunk/src/gocept/zeoraid/README.txt	2008-01-07 11:52:54 UTC (rev 82727)
@@ -0,0 +1,77 @@
+================
+ZEO RAID storage
+================
+
+The ZEO RAID storage is a storage intended to make ZEO installations more
+reliable by applying techniques as used in harddisk RAID solutions.
+
+The implementation is intended to make use of as much existing infrastructure
+as possible and provide a seamless and simple experience on setting up a
+reliable ZEO server infrastructure.
+
+Note: We use typical RAID terms to describe the behaviour of this system.
+
+The RAID storage
+================
+
+The ZEO RAID storage is a proxy storage that works like a RAID controller by
+creating a redundant array of ZEO servers. The redundancy is similar to RAID
+level 1 except that each ZEO server keeps a complete copy of the database.
+
+Therefore, up to N-1 out of N ZEO servers can fail without interrupting.
+
+It is intended that any storage can be used as a backend storage for a RAID
+storage, although typically a ClientStorage will be the direct backend.
+
+The ZEO RAID server
+===================
+
+The RAID storage could (in theory) be used directly from a Zope server.
+However, to achieve real reliability, the RAID has to run as a storage for
+multiple Zope servers, like a normal ZEO setup does.
+
+For this, we leverage the normal ZEO server implementation and simply use a
+RAID storage instead of a FileStorage. The system architecture looks like
+this::
+
+    [ ZEO 1 ]     [ ZEO 2 ]   ...   [ ZEO N ]
+               \       |       /
+                \      |      /
+                 \     |     /
+                  \    |    /
+                   \   |   /
+                    \  |  /
+                  [ ZEO RAID ]
+                    /  |  \
+                   /   |   \
+                  /    |    \
+                 /     |     \
+                /      |      \
+               /       |       \
+    [ Zope 1 ]    [ Zope 2 ]  ...   [ Zope N]
+
+ZEO RAID servers maintain a list of all the optimal, degraded and recovering
+storages and provide an extended ZEO rpc API to allow querying the RAID status
+and disabling and recovering storages at runtime.
+
+Making the RAID server reliable
+===============================
+
+The RAID server itself remains the last single point of failure in the system.
+This problem is solved as the RAID server does not maintain any persistent
+state (except the configuration data: it's listening ip and port and the list
+of storages).
+
+The RAID server can be made reliable by providing a hot-spare server using
+existing HA tools (taking over the IP when a host goes down) and the existing
+ZEO ClientStorage behaviour.
+
+The RAID server is capable of deriving the status of all storages after
+startup so the hot-spare server does not have to get updated information
+before switching on. One drawback here: if all storages become corrupt at the
+same time, the RAID server will happily pick up the storage with the newest
+last transaction and use it as the optimal storage.
+
+To avoid this, we'd have to create a well known OID (os something similar) to
+annotate a storage with its status. This would mean that storages would have
+to be initialized as a RAID backend though and can't be easily migrated.


Property changes on: gocept.zeoraid/trunk/src/gocept/zeoraid/README.txt
___________________________________________________________________
Name: svn:keywords
   + Id Rev Date
Name: svn:eol-style
   + native

Added: gocept.zeoraid/trunk/src/gocept/zeoraid/ROADMAP.txt
===================================================================
--- gocept.zeoraid/trunk/src/gocept/zeoraid/ROADMAP.txt	                        (rev 0)
+++ gocept.zeoraid/trunk/src/gocept/zeoraid/ROADMAP.txt	2008-01-07 11:52:54 UTC (rev 82727)
@@ -0,0 +1,63 @@
+====
+TODO
+====
+
+1.0
+===
+
+Stabilization
+-------------
+
+ - Check edge cases for locking on all methods so that degrading a storage
+   works under all circumstances.
+
+ - The second pass of the recovery isn't thread safe. Ensure that only one
+   recovery can run at a time. (This is probably a good idea anyway because of
+   IO load.)
+
+ - Make sure that opening a ZEO client doesn't block forever. (E.g. by using a
+   custom opener that sets 'wait' to True and timeout to 10 seconds )
+
+   Workaround: do this by using "wait off" or setting the timeout in
+   the RAID server config.
+
+ - Run some manual tests for weird situations, high load, ...
+
+ - Compatibility to which ZODB clients and which ZEO servers? Best would be to
+   support Zope 2.9 and Zope 3.4.
+
+ - Re-check API usage and definition for ZODB 3.8 as our base.
+
+Feature-completeness
+--------------------
+
+ - Rebuild storage using the copy mechanism in ZODB to get all historic
+   records completely. (Only rebuild completely, not incrementally)
+
+ - Create a limit for the transaction rate when recovering so that the
+   recovery doesn't clog up the live servers.
+
+ - Support Undo
+
+Cleanup
+-------
+
+ - Remove print statements and provide logging.
+
+ - Make manager script that works like zopectl and has a buildout recipe that
+   can talk to a specific RAID server.
+
+
+2.0
+===
+
+- Support packing?
+
+- Windows support
+
+- make writing to multiple storages asynchronous or at least truly parallel
+
+- Make the read requests come from different backends to optimize caching and
+  distribute IO load.
+
+- Allow adding and removing new backend servers while running.


Property changes on: gocept.zeoraid/trunk/src/gocept/zeoraid/ROADMAP.txt
___________________________________________________________________
Name: svn:keywords
   + Id Rev Date
Name: svn:eol-style
   + native

Added: gocept.zeoraid/trunk/src/gocept/zeoraid/__init__.py
===================================================================
--- gocept.zeoraid/trunk/src/gocept/zeoraid/__init__.py	                        (rev 0)
+++ gocept.zeoraid/trunk/src/gocept/zeoraid/__init__.py	2008-01-07 11:52:54 UTC (rev 82727)
@@ -0,0 +1 @@
+import gocept.zeoraid.patches


Property changes on: gocept.zeoraid/trunk/src/gocept/zeoraid/__init__.py
___________________________________________________________________
Name: svn:keywords
   + Id Rev Date
Name: svn:eol-style
   + native

Added: gocept.zeoraid/trunk/src/gocept/zeoraid/component.xml
===================================================================
--- gocept.zeoraid/trunk/src/gocept/zeoraid/component.xml	                        (rev 0)
+++ gocept.zeoraid/trunk/src/gocept/zeoraid/component.xml	2008-01-07 11:52:54 UTC (rev 82727)
@@ -0,0 +1,45 @@
+<?xml version="1.0"?>
+
+<!-- RAID storage ZConfig section setup.
+
+To create a RAID storage, add the following to zope.conf:
+
+%import gocept.zeoraid
+<zodb>
+    <raidstorage>
+        <zeoclient>
+            server localhost:8100
+            storage 1
+        </zeoclient>
+        <zeoclient>
+            server localhost:8101
+            storage 1
+        </zeoclient>
+    </raidstorage>
+</zodb>
+
+Alternatively you can use the raidstorage as a storage on a ZEO server to turn
+this into a stateless RAID server that can be made HA easily (hot-standby).
+
+-->
+
+<component prefix="gocept.zeoraid.datatypes">
+
+    <sectiontype
+        name="raidstorage"
+        implements="ZODB.storage"
+        datatype=".Storage">
+
+        <multisection 
+            type="ZODB.storage" 
+            name="+"
+            attribute="storages"
+            required="yes">
+            <description>
+                One ore more storages that are combined into a RAID array.
+            </description>
+        </multisection>
+
+    </sectiontype>
+
+</component>

Added: gocept.zeoraid/trunk/src/gocept/zeoraid/datatypes.py
===================================================================
--- gocept.zeoraid/trunk/src/gocept/zeoraid/datatypes.py	                        (rev 0)
+++ gocept.zeoraid/trunk/src/gocept/zeoraid/datatypes.py	2008-01-07 11:52:54 UTC (rev 82727)
@@ -0,0 +1,8 @@
+import ZODB.config
+import gocept.zeoraid.storage
+
+class Storage(ZODB.config.BaseConfig):
+
+    def open(self):
+        return gocept.zeoraid.storage.RAIDStorage(self.name,
+                                                  self.config.storages)


Property changes on: gocept.zeoraid/trunk/src/gocept/zeoraid/datatypes.py
___________________________________________________________________
Name: svn:keywords
   + Id Rev Date
Name: svn:eol-style
   + native

Added: gocept.zeoraid/trunk/src/gocept/zeoraid/manage.py
===================================================================
--- gocept.zeoraid/trunk/src/gocept/zeoraid/manage.py	                        (rev 0)
+++ gocept.zeoraid/trunk/src/gocept/zeoraid/manage.py	2008-01-07 11:52:54 UTC (rev 82727)
@@ -0,0 +1,48 @@
+import sys
+
+import ZEO.zrpc.client
+
+
+class RAIDManager(object):
+
+    def __init__(self):
+        self.manager = ZEO.zrpc.client.ConnectionManager(('127.0.0.1', 8100), self)
+        self.manager.connect(True)
+
+    def testConnection(self, connection):
+        # This is a preferred connection
+        return 1
+
+    def notifyConnected(self, connection):
+        self.connection = connection
+        self.connection.call('register', '1', True)
+
+    def status(self):
+        return self.connection.call('raid_status')
+
+    def recover(self, storage):
+        return self.connection.call('raid_recover', storage)
+
+    def disable(self, storage):
+        return self.connection.call('raid_disable', storage)
+
+    def details(self):
+        return self.connection.call('raid_details')
+
+if __name__ == '__main__':
+    m = RAIDManager()
+
+    if sys.argv[1] == 'status':
+        print m.status()
+    elif sys.argv[1] == 'details':
+        ok, recovering, failed = m.details()
+        print "RAID status:"
+        print "\t", m.status()
+        print "Storage status:"
+        print "\toptimal\t\t", ok
+        print "\trecovering\t", recovering
+        print "\tfailed\t\t", failed
+    elif sys.argv[1] == 'disable':
+        print m.disable(sys.argv[2])
+    elif sys.argv[1] == 'recover':
+        print m.recover(sys.argv[2])


Property changes on: gocept.zeoraid/trunk/src/gocept/zeoraid/manage.py
___________________________________________________________________
Name: svn:keywords
   + Id Rev Date
Name: svn:eol-style
   + native

Added: gocept.zeoraid/trunk/src/gocept/zeoraid/patches.py
===================================================================
--- gocept.zeoraid/trunk/src/gocept/zeoraid/patches.py	                        (rev 0)
+++ gocept.zeoraid/trunk/src/gocept/zeoraid/patches.py	2008-01-07 11:52:54 UTC (rev 82727)
@@ -0,0 +1,8 @@
+
+# Helper method to make ZEO play nice. IMHO ZEO does not implement the
+# interface correctly.
+def _zeoraid_lastTransaction(self):
+    return self._server.lastTransaction()
+
+import ZEO.ClientStorage
+ZEO.ClientStorage.ClientStorage._zeoraid_lastTransaction = _zeoraid_lastTransaction


Property changes on: gocept.zeoraid/trunk/src/gocept/zeoraid/patches.py
___________________________________________________________________
Name: svn:keywords
   + Id Rev Date
Name: svn:eol-style
   + native

Added: gocept.zeoraid/trunk/src/gocept/zeoraid/storage.py
===================================================================
--- gocept.zeoraid/trunk/src/gocept/zeoraid/storage.py	                        (rev 0)
+++ gocept.zeoraid/trunk/src/gocept/zeoraid/storage.py	2008-01-07 11:52:54 UTC (rev 82727)
@@ -0,0 +1,585 @@
+import threading
+import time
+
+import ZEO.ClientStorage
+import ZODB.POSException
+import ZODB.utils
+import persistent.TimeStamp
+import transaction
+
+
+def get_serial(storage, oid):
+    if hasattr(storage, 'lastTid'):
+        # This is something like a FileStorage
+        get_serial = storage.lastTid
+    else:
+        get_serial = storage.getTid
+    return get_serial(oid)
+
+
+def get_last_transaction(storage):
+    if hasattr(storage, '_zeoraid_lastTransaction'):
+        last_transaction = storage._zeoraid_lastTransaction()
+    else:
+        last_transaction = storage.lastTransaction()
+    return last_transaction
+
+
+class RAIDError(Exception):
+    pass
+
+
+class RAIDClosedError(RAIDError, ZEO.ClientStorage.ClientStorageError):
+    pass
+
+
+class RAIDStorage(object):
+    """The RAID storage is a drop-in replacement for the client storages that
+    are configured.
+
+    It has few but important tasks: multiplex all communication to the
+    storages, coordinate the transactions between the storages and alert the
+    RAID controller if a storage fails.
+
+    """
+
+    closed = False
+    _transaction = None
+
+    def __init__(self, name, storages, read_only=False):
+        self.__name__ = name
+        self.read_only = read_only
+        self.storages = {}
+        self._log_stores = False
+
+        # Allocate locks
+        l = threading.RLock()
+        self._lock_acquire = l.acquire
+        self._lock_release = l.release
+        l = threading.Lock()
+        self._commit_lock_acquire = l.acquire
+        self._commit_lock_release = l.release
+
+        # Remember the openers to for recovering a storage later
+        self.openers = {}
+        # Open the storages
+        for opener in storages:
+            self.storages[opener.name] = opener.open()
+            self.openers[opener.name] = opener
+
+        self.storages_optimal = []
+        self.storages_degraded = []
+        self.storages_recovering = []
+
+        tids = {}
+        for name, storage in self.storages.items():
+            try:
+                tid = get_last_transaction(storage)
+            except ZEO.ClientStorage.ClientDisconnected:
+                self._degrade_storage(name, fail=False)
+                continue
+            if tid is None:
+                # Not connected yet.
+                # XXX or empty ...
+                self._degrade_storage(name, fail=False)
+                continue
+            s = tids.setdefault(tid, [])
+            s.append(name)
+ 
+        self._unrecovered_transactions = {}
+        self._last_tid = None
+
+        # Activate all optimal storages
+        if tids:
+            self._last_tid = max(tids.keys())
+            self.storages_optimal.extend(tids[self._last_tid])
+            del tids[self._last_tid]
+
+            # Deactive all degraded storages
+            for degraded_storages in tids.values():
+                self.storages_degraded.extend(degraded_storages)
+
+        t = time.time()
+        self.ts = persistent.TimeStamp.TimeStamp(*(time.gmtime(t)[:5] + (t%60,)))
+
+        if not self.storages_optimal:
+            raise RAIDError("Can't start without at least one optimal storage.")
+
+    def _degrade_storage(self, name, fail=True):
+        if name in self.storages_optimal:
+            self.storages_optimal.remove(name)
+        self.storages_degraded.append(name)
+        storage = self.storages[name]
+        t = threading.Thread(target=storage.close)
+        t.start()
+        del self.storages[name]
+        if not self.storages_optimal and fail:
+            raise RAIDError("No storages remain.")
+
+    def _apply_single_storage(self, method_name, *args, **kw):
+        if self.closed:
+            raise RAIDClosedError("Storage has been closed.")
+        storages = self.storages_optimal[:]
+        if not storages:
+            raise RAIDError("RAID storage is failed.")
+
+        while storages:
+            # XXX storage might be degraded by now, need to check.
+            name = self.storages_optimal[0]
+            storage = self.storages[name]
+            try:
+                # Make random/hashed selection of read storage
+                method = getattr(storage, method_name)
+                return method(*args, **kw)
+            except ZEO.ClientStorage.ClientDisconnected:
+                # XXX find other possible exceptions
+                self._degrade_storage(name)
+
+    def _apply_all_storages(self, method_name, *args, **kw):
+        if self.closed:
+            raise RAIDClosedError("Storage has been closed.")
+        results = []
+        storages = self.storages_optimal[:]
+        if not storages:
+            raise RAIDError("RAID storage is failed.")
+
+        for name in self.storages_optimal:
+            storage = self.storages[name]
+            try:
+                method = getattr(storage, method_name)
+                results.append(method(*args, **kw))
+            except ZEO.ClientStorage.ClientDisconnected:
+                self._degrade_storage(name)
+
+        res = results[:]
+        for test1 in res:
+            for test2 in res:
+                assert test1 == test2, "Results not consistent. Asynchronous storage?"
+        return results[0]
+
+    def isReadOnly(self):
+        """
+        XXX Revisit this approach?
+        """
+        return self.read_only
+
+    def getName(self):
+        return self.__name__
+
+    def getSize(self):
+        return self._apply_single_storage('getSize')
+
+    def close(self):
+        if self.closed:
+            # Storage may be closed more than once, e.g. by tear-down methods
+            # of tests.
+            return
+        self._apply_all_storages('close')
+        self.storages_optimal = []
+        self.closed = True
+
+    def cleanup(self):
+        # XXX This is not actually documented, it's not implemented in all
+        # storages, it's not even clear when it should be called. Not
+        # correctly calling storages' cleanup might leave turds.
+        pass
+
+    def load(self, oid, version):
+        return self._apply_single_storage('load', oid, version)
+
+    def loadEx(self, oid, version):
+        return self._apply_single_storage('loadEx', oid, version)
+
+    def store(self, oid, oldserial, data, version, transaction):
+        if self.isReadOnly():
+            raise ZODB.POSException.ReadOnlyError()
+        if transaction is not self._transaction:
+            raise ZODB.POSException.StorageTransactionError(self, transaction)
+
+        self._lock_acquire()
+        try:
+            self._apply_all_storages('store', oid, oldserial, data, version, 
+                                     transaction)
+            if self._log_stores:
+                oids = self._unrecovered_transactions.setdefault(self._tid, [])
+                oids.append(oid)
+            return self._tid
+        finally:
+            self._lock_release()
+
+    def lastTransaction(self):
+        return self._apply_single_storage('lastTransaction')
+
+    def loadSerial(self, oid, serial):
+        return self._apply_single_storage('loadSerial', oid, serial)
+
+    def loadBefore(self, oid, tid):
+        return self._apply_single_storage('loadBefore', oid, tid)
+
+    #def iterator(self):
+    # XXX Dunno
+
+    def history(self, oid, version=None, size=1):
+        return self._apply_single_storage('history', oid, version, size)
+
+    def new_oid(self):
+        # XXX This is not exactly a read operation, but we only need an answer from one storage
+        if self.isReadOnly():
+            raise ZODB.POSException.ReadOnlyError()
+        self._lock_acquire()
+        try:
+            return self._apply_single_storage('new_oid')
+        finally:
+            self._lock_release()
+
+    def registerDB(self, db, limit=None):
+        # XXX Is it safe to register a DB with multiple storages or do we need some kind
+        # of wrapper here?
+        self._apply_all_storages('registerDB', db)
+
+    def supportsUndo(self):
+        return True
+
+    def undoLog(self, first=0, last=-20, filter=None):
+        return self._apply_single_storage('undoLog', first, last, filter)
+
+    def undoInfo(self, first=0, last=-20, specification=None):
+        return self._apply_single_storage('undoInfo', first, last,
+                                          specification)
+
+    def undo(self, transaction_id, transaction):
+        if self.isReadOnly():
+            raise ZODB.POSException.ReadOnlyError()
+        self._lock_acquire()
+        try:
+            return self._apply_all_storages('undo', transaction_id, transaction)
+        finally:
+            self._lock_release()
+
+    def supportsTransactionalUndo(self):
+        return True
+
+    def pack(self, t, referencesf):
+        if self.isReadOnly():
+            raise ZODB.POSException.ReadOnlyError()
+        self._apply_all_storages('pack', t, referencesf)
+
+    def supportsVersions(self):
+        return True
+
+    def commitVersion(self, src, dest, transaction):
+        if self.isReadOnly():
+            raise ZODB.POSException.ReadOnlyError()
+        self._lock_acquire()
+        try:
+            return self._apply_all_storages('commitVersion', src, dest, transaction)
+        finally:
+            self._lock_release()
+
+    def abortVersion(self, src, transaction):
+        if self.isReadOnly():
+            raise ZODB.POSException.ReadOnlyError()
+        self._lock_acquire()
+        try:
+            return self._apply_all_storages('abortVersion', src, transaction)
+        finally:
+            self._lock_release()
+
+    def tpc_abort(self, transaction):
+        self._lock_acquire()
+        try:
+            if transaction is not self._transaction:
+                return
+            try:
+                # XXX Edge cases for the log_store abort ...
+                if self._log_stores:
+                    # We may have logged some stores within that transaction
+                    # which we have to remove again because we aborted it.
+                    if self._tid in self._unrecovered_transactions:
+                        del self._unrecovered_transactions[self._tid]
+                self._apply_all_storages('tpc_abort', transaction)
+                self._transaction = None
+            finally:
+                self._commit_lock_release()
+        finally:
+            self._lock_release()
+
+    def tpc_transaction(self):
+        """The current transaction being committed."""
+        return self._transaction
+
+    def tpc_begin(self, transaction, tid=None, status=' '):
+        if self.isReadOnly():
+            raise ZODB.POSException.ReadOnlyError()
+
+        self._lock_acquire()
+        try:
+            if self._transaction is transaction:
+                return
+            self._lock_release()
+            self._commit_lock_acquire()
+            self._lock_acquire()
+
+            # I don't understand the lock that protects _transaction.  The commit
+            # lock and status will be deduced by the underlying storages.
+
+            self._transaction = transaction
+
+            # Remove storages that aren't on the same last tid anymore (this happens 
+            # if a storage disconnects
+            for name in self.storages_optimal:
+                storage = self.storages[name]
+                try:
+                    last_tid = get_last_transaction(storage)
+                except ZEO.ClientStorage.ClientDisconnected:
+                    self._degrade_storage(name, fail=False)
+                    continue
+                if last_tid != self._last_tid:
+                    self._degrade_storage(name)
+
+            # Create a common tid for all storages if we don't have one yet.
+            if tid is None:
+                now = time.time()
+                t = persistent.TimeStamp.TimeStamp(*(time.gmtime(now)[:5] + (now % 60,)))
+                self.ts = t.laterThan(self.ts)
+                self._tid = repr(self.ts)
+            else:
+                self._ts = persistent.TimeStamp.TimeStamp(tid)
+                self._tid = tid
+
+            self._apply_all_storages('tpc_begin', transaction, self._tid, status)
+        finally:
+            self._lock_release()
+
+    def tpc_vote(self, transaction):
+        self._lock_acquire()
+        try:
+            if transaction is not self._transaction:
+                return
+            self._apply_all_storages('tpc_vote', transaction)
+        finally:
+            self._lock_release()
+
+    def tpc_finish(self, transaction, callback=None):
+        self._lock_acquire()
+        try:
+            if transaction is not self._transaction:
+                return
+            try:
+                if callback is not None:
+                    callback(self._tid)
+                self._apply_all_storages('tpc_finish', transaction)
+                self._last_tid = self._tid
+                return self._tid
+            finally:
+                self._transaction = None
+                self._commit_lock_release()
+        finally:
+            self._lock_release()
+
+    def getSerial(self, oid):
+        self._lock_acquire()
+        try:
+            return self._apply_single_storage('getSerial', oid)
+        finally:
+            self._lock_release()
+
+    def getExtensionMethods(self):
+        # XXX This is very awkward right now.
+        methods = self._apply_single_storage('getExtensionMethods')
+        if methods is None:
+            # Allow management while status is 'failed'
+            methods = {}
+        methods['raid_recover'] = None
+        methods['raid_status'] = None
+        methods['raid_disable'] = None
+        methods['raid_details'] = None
+        return methods
+
+    def __len__(self):
+        return self._apply_single_storage('__len__')
+
+    def versionEmpty(self, version):
+        return self._apply_single_storage('versionEmpty', version)
+
+    def versions(self, max=None):
+        return self._apply_single_storage('versions', max)
+
+    def modifiedInVersion(self, oid):
+        return self._apply_single_storage('modifiedInVersion', oid)
+
+    def getTid(self, oid):
+        return self._apply_single_storage('getTid', oid)
+
+    # Extension methods for RAIDStorage
+    def raid_recover(self, name):
+        if self.closed:
+            raise RAIDClosedError("Storage has been closed.")
+        if name not in self.storages_degraded:
+            return
+        self.storages_degraded.remove(name)
+        self.storages_recovering.append(name)
+        t = threading.Thread(target=self._recover_impl, args=(name,))
+        t.start()
+        return 'recovering %r' % name
+
+    def _recover_impl(self, name):
+        try:
+            # First pass: Transfer all oids without hindering running transactions
+            begin = time.time()
+            self._recover_first(name)
+            end = time.time()
+
+            # Second pass: Start the TPC on a reference storage to block other
+            # transactions so we can catch up. The second pass should be
+            # significantly faster than the first.
+            begin = time.time()
+            self._recover_second(name)
+            end = time.time()
+        except:
+            # *something* went wrong. Put the storage back to degraded.
+            self._degrade_storage(name)
+            raise
+
+    def _recover_second(self, name):
+        storage = self.storages[name]
+        reference_storage = self.storages[self.storages_optimal[0]]
+        # Start a transation on the reference storage to acquire the
+        # commit log # and prevent other people from committing in the second phase.
+        # XXX This needs to be optimized in a way that the second phase
+        # gets re-run as long as possible, only holding the commit lock if 
+        # no transactions remain that need to be replayed and putting the 
+        # recovered storage back into the array of optimal storages.
+        while 1:
+            tm = transaction.TransactionManager()
+            t = tm.get()
+            last_transaction = get_last_transaction(storage)
+            reference_storage.tpc_begin(t)
+            unrecovered_transactions = self._unrecovered_transactions
+            if unrecovered_transactions:
+                # We acquired the commit lock and there are transactions that
+                # have been committed and were not yet transferred to the 
+                # recovering storage. We have to try to replay those and then
+                # check again. We can remove the commit lock for now.
+                self._unrecovered_transactions = {}
+                reference_storage.tpc_abort(t)
+
+                # RRR: Refactor into its own method?
+                tm2 = transaction.TransactionManager()
+                t2 = tm2.get()
+
+                # Get the unrecovered transactions in the order they were
+                # recorded.
+                tids = sorted(unrecovered_transactions.keys())
+                for tid in tids:
+                    oids = unrecovered_transactions[tid]
+                    # We create one transaction for all oids that belong to one
+                    # transaction.
+                    storage.tpc_begin(t2, tid=tid)
+                    for oid in oids:
+                        data, tid_ = reference_storage.load(oid, '')
+                        if tid_ > tid:
+                            # If the current tid of the object is newer
+                            # than the one we logged, we can ignore it, because
+                            # there will be another entry for this oid in a 
+                            # later transaction.
+                            continue
+                        try:
+                            oldserial = get_serial(storage, oid)
+                        except ZODB.POSException.POSKeyError:
+                            # This means that the object is new and didn't have an
+                            # old transaction yet. 
+                            # XXX Might this also happen with non-undoable storages?
+                            oldserial = ZODB.utils.z64
+                        storage.store(oid, oldserial, data, '', t2)
+                    storage.tpc_vote(t2)
+                    storage.tpc_finish(t2)
+                # /RRR
+            else:
+                # We acquired the commit lock and no committed transactions
+                # are waiting in the log. This means the recovering storage
+                # has caught up by now and we can put it into optimal state
+                # again.
+                self.storages_recovering.remove(name)
+                self.storages_optimal.append(name)
+                # We can also stop logging stores now.
+                self._log_stores = False
+                reference_storage.tpc_abort(t)
+                break
+
+    def _recover_first(self, name):
+        """The inner loop of the recovery code. Does the actual work."""
+        # Re-open storage
+        storage = self.openers[name].open()
+        self.storages[name] = storage
+        # XXX Bring the storage to the current stage. This only copies the
+        # current data, so RAID currently does support neither undo nor versions.
+        next_oid = None
+        tm = transaction.TransactionManager()
+        t = tm.get()
+        # XXX we assume that the last written transaction actually is consistent. We need
+        # a consistency check.
+        last_transaction = get_last_transaction(storage)
+        # This flag starts logging all succcessfull stores and updates those oids
+        # in the second pass again.
+        max_transaction = get_last_transaction(self.storages[self.storages_optimal[0]])
+        self._unrecovered_transactions = {}
+        self._log_stores = True
+        # The init flag allows us to phrase the break condition of the 
+        # following loop a little bit more elegantly.
+        init = True
+        while 1:
+            if next_oid is None and not init:
+                break
+
+            init = False
+            oid, tid, data, next_oid = self._apply_single_storage('record_iternext', next_oid)
+
+            if tid > max_transaction:
+                continue
+
+            if tid <= last_transaction:
+                try:
+                    old_data = storage.loadSerial(oid, tid)
+                except ZODB.POSException.POSKeyError:
+                    pass
+                else:
+                    if old_data == data:
+                        continue
+
+            # There is a newer version of the object available or the existing
+            # version was incorrect. Overwrite it with the right data.
+            try:
+                oldserial = get_serial(storage, oid)
+            except ZODB.POSException.POSKeyError:
+                oldserial = ZODB.utils.z64
+
+
+            assert oldserial <= tid, "last_transaction and oldserial are not in-sync"
+
+            storage.tpc_begin(t, tid=tid)
+            storage.store(oid, oldserial, data, '', t)
+            storage.tpc_vote(t)
+            storage.tpc_finish(t)
+
+    def raid_status(self):
+        if self.closed:
+            raise RAIDClosedError("Storage has been closed.")
+        if self.storages_recovering:
+            return 'recovering'
+        if not self.storages_degraded:
+            return 'optimal'
+        if not self.storages_optimal:
+            return 'failed'
+        return 'degraded'
+
+    def raid_details(self):
+        if self.closed:
+            raise RAIDClosedError("Storage has been closed.")
+        return [self.storages_optimal, self.storages_recovering, self.storages_degraded]
+
+    def raid_disable(self, name):
+        if self.closed:
+            raise RAIDClosedError("Storage has been closed.")
+        self._degrade_storage(name, fail=False)
+        return 'disabled %r' % name


Property changes on: gocept.zeoraid/trunk/src/gocept/zeoraid/storage.py
___________________________________________________________________
Name: svn:keywords
   + Id Rev Date
Name: svn:eol-style
   + native

Added: gocept.zeoraid/trunk/src/gocept/zeoraid/tests/__init__.py
===================================================================
--- gocept.zeoraid/trunk/src/gocept/zeoraid/tests/__init__.py	                        (rev 0)
+++ gocept.zeoraid/trunk/src/gocept/zeoraid/tests/__init__.py	2008-01-07 11:52:54 UTC (rev 82727)
@@ -0,0 +1 @@
+#make this a package


Property changes on: gocept.zeoraid/trunk/src/gocept/zeoraid/tests/__init__.py
___________________________________________________________________
Name: svn:keywords
   + Id Rev Date
Name: svn:eol-style
   + native

Added: gocept.zeoraid/trunk/src/gocept/zeoraid/tests/test_basics.py
===================================================================
--- gocept.zeoraid/trunk/src/gocept/zeoraid/tests/test_basics.py	                        (rev 0)
+++ gocept.zeoraid/trunk/src/gocept/zeoraid/tests/test_basics.py	2008-01-07 11:52:54 UTC (rev 82727)
@@ -0,0 +1,133 @@
+import unittest
+import tempfile
+import os
+
+from ZODB.tests import StorageTestBase, BasicStorage, \
+             TransactionalUndoStorage, VersionStorage, \
+             TransactionalUndoVersionStorage, PackableStorage, \
+             Synchronization, ConflictResolution, HistoryStorage, \
+             Corruption, RevisionStorage, PersistentStorage, \
+             MTStorage, ReadOnlyStorage, RecoveryStorage
+
+import gocept.zeoraid.storage
+
+from ZODB.FileStorage.FileStorage import FileStorage
+
+from ZEO.ClientStorage import ClientStorage
+from ZEO.tests import forker, CommitLockTests, ThreadTests
+from ZEO.tests.testZEO import get_port
+
+
+class DemoOpener(object):
+
+    class_ = FileStorage
+
+    def __init__(self, name, **kwargs):
+        self.name = name
+        self.kwargs = kwargs or {}
+
+    def open(self, **kwargs):
+        return self.class_(self.name, **self.kwargs)
+
+
+class ZEOOpener(DemoOpener):
+
+    class_ = ClientStorage
+
+
+class FileStorageBackendTests(StorageTestBase.StorageTestBase):
+
+    def open(self, **kwargs):
+        # A RAIDStorage requires openers, not storages.
+        s1 = DemoOpener('s1.fs')
+        s2 = DemoOpener('s2.fs')
+
+        self._storage = gocept.zeoraid.storage.RAIDStorage('teststorage',
+                                                           [s1, s2], **kwargs)
+
+    def setUp(self):
+        self.open()
+
+    def tearDown(self):
+        self._storage.close()
+        self._storage.cleanup()
+        os.unlink('s1.fs')
+        os.unlink('s2.fs')
+
+
+class ZEOStorageBackendTests(StorageTestBase.StorageTestBase):
+
+    def open(self, **kwargs):
+        self._storage = gocept.zeoraid.storage.RAIDStorage('teststorage',
+                                                           self._storages, **kwargs)
+
+    def setUp(self):
+        self._server_storage_files = []
+        self._servers = []
+        self._storages = []
+        for i in xrange(5):
+            port = get_port()
+            zconf = forker.ZEOConfig(('', port))
+            zport, adminaddr, pid, path = forker.start_zeo_server(self.getConfig(),
+                                                                  zconf, port)
+
+            self._servers.append(adminaddr)
+            self._storages.append(ZEOOpener(zport, storage='1',
+                                            cache_size=2000000,
+                                            min_disconnect_poll=0.5, wait=1,
+                                            wait_timeout=60))
+        self.open()
+
+    def getConfig(self):
+        filename = self.__fs_base = tempfile.mktemp()
+        self._server_storage_files.append(filename)
+        return """\
+        <filestorage 1>
+        path %s
+        </filestorage>
+        """ % filename
+
+    def tearDown(self):
+        self._storage.close()
+        for server in self._servers:
+            forker.shutdown_zeo_server(server)
+        # XXX wait for servers to come down
+        # XXX delete filestorage files
+
+class ReplicationStorageTests(BasicStorage.BasicStorage,
+        TransactionalUndoStorage.TransactionalUndoStorage,
+        RevisionStorage.RevisionStorage,
+        VersionStorage.VersionStorage,
+        TransactionalUndoVersionStorage.TransactionalUndoVersionStorage,
+        PackableStorage.PackableStorage,
+        PackableStorage.PackableUndoStorage,
+        Synchronization.SynchronizedStorage,
+        ConflictResolution.ConflictResolvingStorage,
+        ConflictResolution.ConflictResolvingTransUndoStorage,
+        HistoryStorage.HistoryStorage,
+        PersistentStorage.PersistentStorage,
+        MTStorage.MTStorage,
+        ReadOnlyStorage.ReadOnlyStorage,
+        ):
+    pass
+
+
+class FSReplicationStorageTests(FileStorageBackendTests,
+                                ReplicationStorageTests):
+    pass
+
+
+class ZEOReplicationStorageTests(ZEOStorageBackendTests,
+                                 ReplicationStorageTests,
+                                 ThreadTests.ThreadTests):
+    pass
+
+
+def test_suite():
+    suite = unittest.TestSuite()
+    suite.addTest(unittest.makeSuite(FSReplicationStorageTests, "check"))
+    suite.addTest(unittest.makeSuite(ZEOReplicationStorageTests, "check"))
+    return suite
+
+if __name__=='__main__':
+    unittest.main()


Property changes on: gocept.zeoraid/trunk/src/gocept/zeoraid/tests/test_basics.py
___________________________________________________________________
Name: svn:keywords
   + Id Rev Date
Name: svn:eol-style
   + native



More information about the Checkins mailing list