[Checkins] SVN: gocept.zeoraid/ Created workspace for and imported
existing code of gocept.zeoraid.
Thomas Lotze
tl at gocept.com
Mon Jan 7 06:52:56 EST 2008
Log message for revision 82727:
Created workspace for and imported existing code of gocept.zeoraid.
Changed:
A gocept.zeoraid/
A gocept.zeoraid/branches/
A gocept.zeoraid/tags/
A gocept.zeoraid/trunk/
A gocept.zeoraid/trunk/COPYRIGHT.txt
A gocept.zeoraid/trunk/LICENSE.txt
A gocept.zeoraid/trunk/buildout.cfg
A gocept.zeoraid/trunk/setup.py
A gocept.zeoraid/trunk/src/
A gocept.zeoraid/trunk/src/gocept/
A gocept.zeoraid/trunk/src/gocept/__init__.py
A gocept.zeoraid/trunk/src/gocept/zeoraid/
A gocept.zeoraid/trunk/src/gocept/zeoraid/README.txt
A gocept.zeoraid/trunk/src/gocept/zeoraid/ROADMAP.txt
A gocept.zeoraid/trunk/src/gocept/zeoraid/__init__.py
A gocept.zeoraid/trunk/src/gocept/zeoraid/component.xml
A gocept.zeoraid/trunk/src/gocept/zeoraid/datatypes.py
A gocept.zeoraid/trunk/src/gocept/zeoraid/manage.py
A gocept.zeoraid/trunk/src/gocept/zeoraid/patches.py
A gocept.zeoraid/trunk/src/gocept/zeoraid/storage.py
A gocept.zeoraid/trunk/src/gocept/zeoraid/tests/
A gocept.zeoraid/trunk/src/gocept/zeoraid/tests/__init__.py
A gocept.zeoraid/trunk/src/gocept/zeoraid/tests/test_basics.py
-=-
Added: gocept.zeoraid/trunk/COPYRIGHT.txt
===================================================================
--- gocept.zeoraid/trunk/COPYRIGHT.txt (rev 0)
+++ gocept.zeoraid/trunk/COPYRIGHT.txt 2008-01-07 11:52:54 UTC (rev 82727)
@@ -0,0 +1,9 @@
+Copyright (c) 2007 gocept gmbh & co. kg and contributors.
+All Rights Reserved.
+
+This software is subject to the provisions of the Zope Public License,
+Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
+THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+FOR A PARTICULAR PURPOSE.
Property changes on: gocept.zeoraid/trunk/COPYRIGHT.txt
___________________________________________________________________
Name: svn:keywords
+ Id Rev Date
Name: svn:eol-style
+ native
Added: gocept.zeoraid/trunk/LICENSE.txt
===================================================================
--- gocept.zeoraid/trunk/LICENSE.txt (rev 0)
+++ gocept.zeoraid/trunk/LICENSE.txt 2008-01-07 11:52:54 UTC (rev 82727)
@@ -0,0 +1,44 @@
+Zope Public License (ZPL) Version 2.1
+
+A copyright notice accompanies this license document that identifies the
+copyright holders.
+
+This license has been certified as open source. It has also been designated as
+GPL compatible by the Free Software Foundation (FSF).
+
+Redistribution and use in source and binary forms, with or without
+modification, are permitted provided that the following conditions are met:
+
+1. Redistributions in source code must retain the accompanying copyright
+notice, this list of conditions, and the following disclaimer.
+
+2. Redistributions in binary form must reproduce the accompanying copyright
+notice, this list of conditions, and the following disclaimer in the
+documentation and/or other materials provided with the distribution.
+
+3. Names of the copyright holders must not be used to endorse or promote
+products derived from this software without prior written permission from the
+copyright holders.
+
+4. The right to distribute this software or to use it for any purpose does not
+give you the right to use Servicemarks (sm) or Trademarks (tm) of the copyright
+holders. Use of them is covered by separate agreement with the copyright
+holders.
+
+5. If any files are modified, you must cause the modified files to carry
+prominent notices stating that you changed the files and the date of any
+change.
+
+Disclaimer
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS ``AS IS'' AND ANY EXPRESSED
+OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
+MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
+EVENT SHALL THE COPYRIGHT HOLDERS BE LIABLE FOR ANY DIRECT, INDIRECT,
+INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
+PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE
+OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
+ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
Property changes on: gocept.zeoraid/trunk/LICENSE.txt
___________________________________________________________________
Name: svn:keywords
+ Id Rev Date
Name: svn:eol-style
+ native
Added: gocept.zeoraid/trunk/buildout.cfg
===================================================================
--- gocept.zeoraid/trunk/buildout.cfg (rev 0)
+++ gocept.zeoraid/trunk/buildout.cfg 2008-01-07 11:52:54 UTC (rev 82727)
@@ -0,0 +1,57 @@
+[buildout]
+develop = . /home/ctheune/Development/ZODB.trunk/
+parts = test client server1 server2 zeoraid
+find-links = http://download.zope.org/distribution/
+
+
+[test]
+recipe = zc.recipe.testrunner
+eggs = gocept.zeoraid
+working-directory = /tmp/testrunner2/
+
+# Some demo parts
+
+[server1]
+recipe = zc.zodbrecipes:server
+zeo.conf =
+ <zeo>
+ address 8101
+ </zeo>
+ <filestorage 1>
+ path /tmp/Data1.fs
+ </filestorage>
+
+[server2]
+recipe = zc.zodbrecipes:server
+zeo.conf =
+ <zeo>
+ address 8102
+ </zeo>
+ <filestorage 1>
+ path /tmp/Data2.fs
+ </filestorage>
+
+[zeoraid]
+recipe = zc.zodbrecipes:server
+eggs = gocept.zeoraid
+zeo.conf =
+ <zeo>
+ address 8100
+ </zeo>
+ %import gocept.zeoraid
+ <raidstorage 1>
+ <zeoclient 1>
+ server localhost:8101
+ storage 1
+ </zeoclient>
+ <zeoclient 2>
+ server localhost:8102
+ storage 1
+ </zeoclient>
+ </raidstorage>
+
+[client]
+recipe = zc.recipe.egg
+eggs = ZODB3<3.9dev
+ gocept.zeoraid
+interpreter = client
Added: gocept.zeoraid/trunk/setup.py
===================================================================
--- gocept.zeoraid/trunk/setup.py (rev 0)
+++ gocept.zeoraid/trunk/setup.py 2008-01-07 11:52:54 UTC (rev 82727)
@@ -0,0 +1,28 @@
+from setuptools import setup, find_packages
+
+name = "gocept.zeoraid"
+setup(
+ name = name,
+ version = "dev",
+ author = "Christian Theune",
+ author_email = "ct at gocept.com",
+ description = "A ZODB storage for replication using RAID techniques.",
+ long_description = open('src/gocept/zeoraid/README.txt').read(),
+ license = "ZPL 2.1",
+ keywords = "zodb buildout",
+ classifiers = ["Framework :: Buildout"],
+ url='http://launchpad.net/'+name,
+ zip_safe=False,
+ packages = find_packages('src'),
+ include_package_data = True,
+ package_dir = {'':'src'},
+ namespace_packages = ['gocept'],
+ install_requires = ['setuptools', 'ZODB3'],
+ extras_require = {
+ 'recipe': ['zc.buildout']
+ },
+ entry_points = {
+ 'zc.buildout': [
+ 'default = %s.recipe:Recipe [recipe]' % name,
+ ]},
+ )
Property changes on: gocept.zeoraid/trunk/setup.py
___________________________________________________________________
Name: svn:keywords
+ Id Rev Date
Name: svn:eol-style
+ native
Added: gocept.zeoraid/trunk/src/gocept/__init__.py
===================================================================
--- gocept.zeoraid/trunk/src/gocept/__init__.py (rev 0)
+++ gocept.zeoraid/trunk/src/gocept/__init__.py 2008-01-07 11:52:54 UTC (rev 82727)
@@ -0,0 +1,6 @@
+# namespace package boilerplate
+try:
+ __import__('pkg_resources').declare_namespace(__name__)
+except ImportError, e:
+ from pkgutil import extend_path
+ __path__ = extend_path(__path__, __name__)
Property changes on: gocept.zeoraid/trunk/src/gocept/__init__.py
___________________________________________________________________
Name: svn:keywords
+ Id Rev Date
Name: svn:eol-style
+ native
Added: gocept.zeoraid/trunk/src/gocept/zeoraid/README.txt
===================================================================
--- gocept.zeoraid/trunk/src/gocept/zeoraid/README.txt (rev 0)
+++ gocept.zeoraid/trunk/src/gocept/zeoraid/README.txt 2008-01-07 11:52:54 UTC (rev 82727)
@@ -0,0 +1,77 @@
+================
+ZEO RAID storage
+================
+
+The ZEO RAID storage is a storage intended to make ZEO installations more
+reliable by applying techniques as used in harddisk RAID solutions.
+
+The implementation is intended to make use of as much existing infrastructure
+as possible and provide a seamless and simple experience on setting up a
+reliable ZEO server infrastructure.
+
+Note: We use typical RAID terms to describe the behaviour of this system.
+
+The RAID storage
+================
+
+The ZEO RAID storage is a proxy storage that works like a RAID controller by
+creating a redundant array of ZEO servers. The redundancy is similar to RAID
+level 1 except that each ZEO server keeps a complete copy of the database.
+
+Therefore, up to N-1 out of N ZEO servers can fail without interrupting.
+
+It is intended that any storage can be used as a backend storage for a RAID
+storage, although typically a ClientStorage will be the direct backend.
+
+The ZEO RAID server
+===================
+
+The RAID storage could (in theory) be used directly from a Zope server.
+However, to achieve real reliability, the RAID has to run as a storage for
+multiple Zope servers, like a normal ZEO setup does.
+
+For this, we leverage the normal ZEO server implementation and simply use a
+RAID storage instead of a FileStorage. The system architecture looks like
+this::
+
+ [ ZEO 1 ] [ ZEO 2 ] ... [ ZEO N ]
+ \ | /
+ \ | /
+ \ | /
+ \ | /
+ \ | /
+ \ | /
+ [ ZEO RAID ]
+ / | \
+ / | \
+ / | \
+ / | \
+ / | \
+ / | \
+ [ Zope 1 ] [ Zope 2 ] ... [ Zope N]
+
+ZEO RAID servers maintain a list of all the optimal, degraded and recovering
+storages and provide an extended ZEO rpc API to allow querying the RAID status
+and disabling and recovering storages at runtime.
+
+Making the RAID server reliable
+===============================
+
+The RAID server itself remains the last single point of failure in the system.
+This problem is solved as the RAID server does not maintain any persistent
+state (except the configuration data: it's listening ip and port and the list
+of storages).
+
+The RAID server can be made reliable by providing a hot-spare server using
+existing HA tools (taking over the IP when a host goes down) and the existing
+ZEO ClientStorage behaviour.
+
+The RAID server is capable of deriving the status of all storages after
+startup so the hot-spare server does not have to get updated information
+before switching on. One drawback here: if all storages become corrupt at the
+same time, the RAID server will happily pick up the storage with the newest
+last transaction and use it as the optimal storage.
+
+To avoid this, we'd have to create a well known OID (os something similar) to
+annotate a storage with its status. This would mean that storages would have
+to be initialized as a RAID backend though and can't be easily migrated.
Property changes on: gocept.zeoraid/trunk/src/gocept/zeoraid/README.txt
___________________________________________________________________
Name: svn:keywords
+ Id Rev Date
Name: svn:eol-style
+ native
Added: gocept.zeoraid/trunk/src/gocept/zeoraid/ROADMAP.txt
===================================================================
--- gocept.zeoraid/trunk/src/gocept/zeoraid/ROADMAP.txt (rev 0)
+++ gocept.zeoraid/trunk/src/gocept/zeoraid/ROADMAP.txt 2008-01-07 11:52:54 UTC (rev 82727)
@@ -0,0 +1,63 @@
+====
+TODO
+====
+
+1.0
+===
+
+Stabilization
+-------------
+
+ - Check edge cases for locking on all methods so that degrading a storage
+ works under all circumstances.
+
+ - The second pass of the recovery isn't thread safe. Ensure that only one
+ recovery can run at a time. (This is probably a good idea anyway because of
+ IO load.)
+
+ - Make sure that opening a ZEO client doesn't block forever. (E.g. by using a
+ custom opener that sets 'wait' to True and timeout to 10 seconds )
+
+ Workaround: do this by using "wait off" or setting the timeout in
+ the RAID server config.
+
+ - Run some manual tests for weird situations, high load, ...
+
+ - Compatibility to which ZODB clients and which ZEO servers? Best would be to
+ support Zope 2.9 and Zope 3.4.
+
+ - Re-check API usage and definition for ZODB 3.8 as our base.
+
+Feature-completeness
+--------------------
+
+ - Rebuild storage using the copy mechanism in ZODB to get all historic
+ records completely. (Only rebuild completely, not incrementally)
+
+ - Create a limit for the transaction rate when recovering so that the
+ recovery doesn't clog up the live servers.
+
+ - Support Undo
+
+Cleanup
+-------
+
+ - Remove print statements and provide logging.
+
+ - Make manager script that works like zopectl and has a buildout recipe that
+ can talk to a specific RAID server.
+
+
+2.0
+===
+
+- Support packing?
+
+- Windows support
+
+- make writing to multiple storages asynchronous or at least truly parallel
+
+- Make the read requests come from different backends to optimize caching and
+ distribute IO load.
+
+- Allow adding and removing new backend servers while running.
Property changes on: gocept.zeoraid/trunk/src/gocept/zeoraid/ROADMAP.txt
___________________________________________________________________
Name: svn:keywords
+ Id Rev Date
Name: svn:eol-style
+ native
Added: gocept.zeoraid/trunk/src/gocept/zeoraid/__init__.py
===================================================================
--- gocept.zeoraid/trunk/src/gocept/zeoraid/__init__.py (rev 0)
+++ gocept.zeoraid/trunk/src/gocept/zeoraid/__init__.py 2008-01-07 11:52:54 UTC (rev 82727)
@@ -0,0 +1 @@
+import gocept.zeoraid.patches
Property changes on: gocept.zeoraid/trunk/src/gocept/zeoraid/__init__.py
___________________________________________________________________
Name: svn:keywords
+ Id Rev Date
Name: svn:eol-style
+ native
Added: gocept.zeoraid/trunk/src/gocept/zeoraid/component.xml
===================================================================
--- gocept.zeoraid/trunk/src/gocept/zeoraid/component.xml (rev 0)
+++ gocept.zeoraid/trunk/src/gocept/zeoraid/component.xml 2008-01-07 11:52:54 UTC (rev 82727)
@@ -0,0 +1,45 @@
+<?xml version="1.0"?>
+
+<!-- RAID storage ZConfig section setup.
+
+To create a RAID storage, add the following to zope.conf:
+
+%import gocept.zeoraid
+<zodb>
+ <raidstorage>
+ <zeoclient>
+ server localhost:8100
+ storage 1
+ </zeoclient>
+ <zeoclient>
+ server localhost:8101
+ storage 1
+ </zeoclient>
+ </raidstorage>
+</zodb>
+
+Alternatively you can use the raidstorage as a storage on a ZEO server to turn
+this into a stateless RAID server that can be made HA easily (hot-standby).
+
+-->
+
+<component prefix="gocept.zeoraid.datatypes">
+
+ <sectiontype
+ name="raidstorage"
+ implements="ZODB.storage"
+ datatype=".Storage">
+
+ <multisection
+ type="ZODB.storage"
+ name="+"
+ attribute="storages"
+ required="yes">
+ <description>
+ One ore more storages that are combined into a RAID array.
+ </description>
+ </multisection>
+
+ </sectiontype>
+
+</component>
Added: gocept.zeoraid/trunk/src/gocept/zeoraid/datatypes.py
===================================================================
--- gocept.zeoraid/trunk/src/gocept/zeoraid/datatypes.py (rev 0)
+++ gocept.zeoraid/trunk/src/gocept/zeoraid/datatypes.py 2008-01-07 11:52:54 UTC (rev 82727)
@@ -0,0 +1,8 @@
+import ZODB.config
+import gocept.zeoraid.storage
+
+class Storage(ZODB.config.BaseConfig):
+
+ def open(self):
+ return gocept.zeoraid.storage.RAIDStorage(self.name,
+ self.config.storages)
Property changes on: gocept.zeoraid/trunk/src/gocept/zeoraid/datatypes.py
___________________________________________________________________
Name: svn:keywords
+ Id Rev Date
Name: svn:eol-style
+ native
Added: gocept.zeoraid/trunk/src/gocept/zeoraid/manage.py
===================================================================
--- gocept.zeoraid/trunk/src/gocept/zeoraid/manage.py (rev 0)
+++ gocept.zeoraid/trunk/src/gocept/zeoraid/manage.py 2008-01-07 11:52:54 UTC (rev 82727)
@@ -0,0 +1,48 @@
+import sys
+
+import ZEO.zrpc.client
+
+
+class RAIDManager(object):
+
+ def __init__(self):
+ self.manager = ZEO.zrpc.client.ConnectionManager(('127.0.0.1', 8100), self)
+ self.manager.connect(True)
+
+ def testConnection(self, connection):
+ # This is a preferred connection
+ return 1
+
+ def notifyConnected(self, connection):
+ self.connection = connection
+ self.connection.call('register', '1', True)
+
+ def status(self):
+ return self.connection.call('raid_status')
+
+ def recover(self, storage):
+ return self.connection.call('raid_recover', storage)
+
+ def disable(self, storage):
+ return self.connection.call('raid_disable', storage)
+
+ def details(self):
+ return self.connection.call('raid_details')
+
+if __name__ == '__main__':
+ m = RAIDManager()
+
+ if sys.argv[1] == 'status':
+ print m.status()
+ elif sys.argv[1] == 'details':
+ ok, recovering, failed = m.details()
+ print "RAID status:"
+ print "\t", m.status()
+ print "Storage status:"
+ print "\toptimal\t\t", ok
+ print "\trecovering\t", recovering
+ print "\tfailed\t\t", failed
+ elif sys.argv[1] == 'disable':
+ print m.disable(sys.argv[2])
+ elif sys.argv[1] == 'recover':
+ print m.recover(sys.argv[2])
Property changes on: gocept.zeoraid/trunk/src/gocept/zeoraid/manage.py
___________________________________________________________________
Name: svn:keywords
+ Id Rev Date
Name: svn:eol-style
+ native
Added: gocept.zeoraid/trunk/src/gocept/zeoraid/patches.py
===================================================================
--- gocept.zeoraid/trunk/src/gocept/zeoraid/patches.py (rev 0)
+++ gocept.zeoraid/trunk/src/gocept/zeoraid/patches.py 2008-01-07 11:52:54 UTC (rev 82727)
@@ -0,0 +1,8 @@
+
+# Helper method to make ZEO play nice. IMHO ZEO does not implement the
+# interface correctly.
+def _zeoraid_lastTransaction(self):
+ return self._server.lastTransaction()
+
+import ZEO.ClientStorage
+ZEO.ClientStorage.ClientStorage._zeoraid_lastTransaction = _zeoraid_lastTransaction
Property changes on: gocept.zeoraid/trunk/src/gocept/zeoraid/patches.py
___________________________________________________________________
Name: svn:keywords
+ Id Rev Date
Name: svn:eol-style
+ native
Added: gocept.zeoraid/trunk/src/gocept/zeoraid/storage.py
===================================================================
--- gocept.zeoraid/trunk/src/gocept/zeoraid/storage.py (rev 0)
+++ gocept.zeoraid/trunk/src/gocept/zeoraid/storage.py 2008-01-07 11:52:54 UTC (rev 82727)
@@ -0,0 +1,585 @@
+import threading
+import time
+
+import ZEO.ClientStorage
+import ZODB.POSException
+import ZODB.utils
+import persistent.TimeStamp
+import transaction
+
+
+def get_serial(storage, oid):
+ if hasattr(storage, 'lastTid'):
+ # This is something like a FileStorage
+ get_serial = storage.lastTid
+ else:
+ get_serial = storage.getTid
+ return get_serial(oid)
+
+
+def get_last_transaction(storage):
+ if hasattr(storage, '_zeoraid_lastTransaction'):
+ last_transaction = storage._zeoraid_lastTransaction()
+ else:
+ last_transaction = storage.lastTransaction()
+ return last_transaction
+
+
+class RAIDError(Exception):
+ pass
+
+
+class RAIDClosedError(RAIDError, ZEO.ClientStorage.ClientStorageError):
+ pass
+
+
+class RAIDStorage(object):
+ """The RAID storage is a drop-in replacement for the client storages that
+ are configured.
+
+ It has few but important tasks: multiplex all communication to the
+ storages, coordinate the transactions between the storages and alert the
+ RAID controller if a storage fails.
+
+ """
+
+ closed = False
+ _transaction = None
+
+ def __init__(self, name, storages, read_only=False):
+ self.__name__ = name
+ self.read_only = read_only
+ self.storages = {}
+ self._log_stores = False
+
+ # Allocate locks
+ l = threading.RLock()
+ self._lock_acquire = l.acquire
+ self._lock_release = l.release
+ l = threading.Lock()
+ self._commit_lock_acquire = l.acquire
+ self._commit_lock_release = l.release
+
+ # Remember the openers to for recovering a storage later
+ self.openers = {}
+ # Open the storages
+ for opener in storages:
+ self.storages[opener.name] = opener.open()
+ self.openers[opener.name] = opener
+
+ self.storages_optimal = []
+ self.storages_degraded = []
+ self.storages_recovering = []
+
+ tids = {}
+ for name, storage in self.storages.items():
+ try:
+ tid = get_last_transaction(storage)
+ except ZEO.ClientStorage.ClientDisconnected:
+ self._degrade_storage(name, fail=False)
+ continue
+ if tid is None:
+ # Not connected yet.
+ # XXX or empty ...
+ self._degrade_storage(name, fail=False)
+ continue
+ s = tids.setdefault(tid, [])
+ s.append(name)
+
+ self._unrecovered_transactions = {}
+ self._last_tid = None
+
+ # Activate all optimal storages
+ if tids:
+ self._last_tid = max(tids.keys())
+ self.storages_optimal.extend(tids[self._last_tid])
+ del tids[self._last_tid]
+
+ # Deactive all degraded storages
+ for degraded_storages in tids.values():
+ self.storages_degraded.extend(degraded_storages)
+
+ t = time.time()
+ self.ts = persistent.TimeStamp.TimeStamp(*(time.gmtime(t)[:5] + (t%60,)))
+
+ if not self.storages_optimal:
+ raise RAIDError("Can't start without at least one optimal storage.")
+
+ def _degrade_storage(self, name, fail=True):
+ if name in self.storages_optimal:
+ self.storages_optimal.remove(name)
+ self.storages_degraded.append(name)
+ storage = self.storages[name]
+ t = threading.Thread(target=storage.close)
+ t.start()
+ del self.storages[name]
+ if not self.storages_optimal and fail:
+ raise RAIDError("No storages remain.")
+
+ def _apply_single_storage(self, method_name, *args, **kw):
+ if self.closed:
+ raise RAIDClosedError("Storage has been closed.")
+ storages = self.storages_optimal[:]
+ if not storages:
+ raise RAIDError("RAID storage is failed.")
+
+ while storages:
+ # XXX storage might be degraded by now, need to check.
+ name = self.storages_optimal[0]
+ storage = self.storages[name]
+ try:
+ # Make random/hashed selection of read storage
+ method = getattr(storage, method_name)
+ return method(*args, **kw)
+ except ZEO.ClientStorage.ClientDisconnected:
+ # XXX find other possible exceptions
+ self._degrade_storage(name)
+
+ def _apply_all_storages(self, method_name, *args, **kw):
+ if self.closed:
+ raise RAIDClosedError("Storage has been closed.")
+ results = []
+ storages = self.storages_optimal[:]
+ if not storages:
+ raise RAIDError("RAID storage is failed.")
+
+ for name in self.storages_optimal:
+ storage = self.storages[name]
+ try:
+ method = getattr(storage, method_name)
+ results.append(method(*args, **kw))
+ except ZEO.ClientStorage.ClientDisconnected:
+ self._degrade_storage(name)
+
+ res = results[:]
+ for test1 in res:
+ for test2 in res:
+ assert test1 == test2, "Results not consistent. Asynchronous storage?"
+ return results[0]
+
+ def isReadOnly(self):
+ """
+ XXX Revisit this approach?
+ """
+ return self.read_only
+
+ def getName(self):
+ return self.__name__
+
+ def getSize(self):
+ return self._apply_single_storage('getSize')
+
+ def close(self):
+ if self.closed:
+ # Storage may be closed more than once, e.g. by tear-down methods
+ # of tests.
+ return
+ self._apply_all_storages('close')
+ self.storages_optimal = []
+ self.closed = True
+
+ def cleanup(self):
+ # XXX This is not actually documented, it's not implemented in all
+ # storages, it's not even clear when it should be called. Not
+ # correctly calling storages' cleanup might leave turds.
+ pass
+
+ def load(self, oid, version):
+ return self._apply_single_storage('load', oid, version)
+
+ def loadEx(self, oid, version):
+ return self._apply_single_storage('loadEx', oid, version)
+
+ def store(self, oid, oldserial, data, version, transaction):
+ if self.isReadOnly():
+ raise ZODB.POSException.ReadOnlyError()
+ if transaction is not self._transaction:
+ raise ZODB.POSException.StorageTransactionError(self, transaction)
+
+ self._lock_acquire()
+ try:
+ self._apply_all_storages('store', oid, oldserial, data, version,
+ transaction)
+ if self._log_stores:
+ oids = self._unrecovered_transactions.setdefault(self._tid, [])
+ oids.append(oid)
+ return self._tid
+ finally:
+ self._lock_release()
+
+ def lastTransaction(self):
+ return self._apply_single_storage('lastTransaction')
+
+ def loadSerial(self, oid, serial):
+ return self._apply_single_storage('loadSerial', oid, serial)
+
+ def loadBefore(self, oid, tid):
+ return self._apply_single_storage('loadBefore', oid, tid)
+
+ #def iterator(self):
+ # XXX Dunno
+
+ def history(self, oid, version=None, size=1):
+ return self._apply_single_storage('history', oid, version, size)
+
+ def new_oid(self):
+ # XXX This is not exactly a read operation, but we only need an answer from one storage
+ if self.isReadOnly():
+ raise ZODB.POSException.ReadOnlyError()
+ self._lock_acquire()
+ try:
+ return self._apply_single_storage('new_oid')
+ finally:
+ self._lock_release()
+
+ def registerDB(self, db, limit=None):
+ # XXX Is it safe to register a DB with multiple storages or do we need some kind
+ # of wrapper here?
+ self._apply_all_storages('registerDB', db)
+
+ def supportsUndo(self):
+ return True
+
+ def undoLog(self, first=0, last=-20, filter=None):
+ return self._apply_single_storage('undoLog', first, last, filter)
+
+ def undoInfo(self, first=0, last=-20, specification=None):
+ return self._apply_single_storage('undoInfo', first, last,
+ specification)
+
+ def undo(self, transaction_id, transaction):
+ if self.isReadOnly():
+ raise ZODB.POSException.ReadOnlyError()
+ self._lock_acquire()
+ try:
+ return self._apply_all_storages('undo', transaction_id, transaction)
+ finally:
+ self._lock_release()
+
+ def supportsTransactionalUndo(self):
+ return True
+
+ def pack(self, t, referencesf):
+ if self.isReadOnly():
+ raise ZODB.POSException.ReadOnlyError()
+ self._apply_all_storages('pack', t, referencesf)
+
+ def supportsVersions(self):
+ return True
+
+ def commitVersion(self, src, dest, transaction):
+ if self.isReadOnly():
+ raise ZODB.POSException.ReadOnlyError()
+ self._lock_acquire()
+ try:
+ return self._apply_all_storages('commitVersion', src, dest, transaction)
+ finally:
+ self._lock_release()
+
+ def abortVersion(self, src, transaction):
+ if self.isReadOnly():
+ raise ZODB.POSException.ReadOnlyError()
+ self._lock_acquire()
+ try:
+ return self._apply_all_storages('abortVersion', src, transaction)
+ finally:
+ self._lock_release()
+
+ def tpc_abort(self, transaction):
+ self._lock_acquire()
+ try:
+ if transaction is not self._transaction:
+ return
+ try:
+ # XXX Edge cases for the log_store abort ...
+ if self._log_stores:
+ # We may have logged some stores within that transaction
+ # which we have to remove again because we aborted it.
+ if self._tid in self._unrecovered_transactions:
+ del self._unrecovered_transactions[self._tid]
+ self._apply_all_storages('tpc_abort', transaction)
+ self._transaction = None
+ finally:
+ self._commit_lock_release()
+ finally:
+ self._lock_release()
+
+ def tpc_transaction(self):
+ """The current transaction being committed."""
+ return self._transaction
+
+ def tpc_begin(self, transaction, tid=None, status=' '):
+ if self.isReadOnly():
+ raise ZODB.POSException.ReadOnlyError()
+
+ self._lock_acquire()
+ try:
+ if self._transaction is transaction:
+ return
+ self._lock_release()
+ self._commit_lock_acquire()
+ self._lock_acquire()
+
+ # I don't understand the lock that protects _transaction. The commit
+ # lock and status will be deduced by the underlying storages.
+
+ self._transaction = transaction
+
+ # Remove storages that aren't on the same last tid anymore (this happens
+ # if a storage disconnects
+ for name in self.storages_optimal:
+ storage = self.storages[name]
+ try:
+ last_tid = get_last_transaction(storage)
+ except ZEO.ClientStorage.ClientDisconnected:
+ self._degrade_storage(name, fail=False)
+ continue
+ if last_tid != self._last_tid:
+ self._degrade_storage(name)
+
+ # Create a common tid for all storages if we don't have one yet.
+ if tid is None:
+ now = time.time()
+ t = persistent.TimeStamp.TimeStamp(*(time.gmtime(now)[:5] + (now % 60,)))
+ self.ts = t.laterThan(self.ts)
+ self._tid = repr(self.ts)
+ else:
+ self._ts = persistent.TimeStamp.TimeStamp(tid)
+ self._tid = tid
+
+ self._apply_all_storages('tpc_begin', transaction, self._tid, status)
+ finally:
+ self._lock_release()
+
+ def tpc_vote(self, transaction):
+ self._lock_acquire()
+ try:
+ if transaction is not self._transaction:
+ return
+ self._apply_all_storages('tpc_vote', transaction)
+ finally:
+ self._lock_release()
+
+ def tpc_finish(self, transaction, callback=None):
+ self._lock_acquire()
+ try:
+ if transaction is not self._transaction:
+ return
+ try:
+ if callback is not None:
+ callback(self._tid)
+ self._apply_all_storages('tpc_finish', transaction)
+ self._last_tid = self._tid
+ return self._tid
+ finally:
+ self._transaction = None
+ self._commit_lock_release()
+ finally:
+ self._lock_release()
+
+ def getSerial(self, oid):
+ self._lock_acquire()
+ try:
+ return self._apply_single_storage('getSerial', oid)
+ finally:
+ self._lock_release()
+
+ def getExtensionMethods(self):
+ # XXX This is very awkward right now.
+ methods = self._apply_single_storage('getExtensionMethods')
+ if methods is None:
+ # Allow management while status is 'failed'
+ methods = {}
+ methods['raid_recover'] = None
+ methods['raid_status'] = None
+ methods['raid_disable'] = None
+ methods['raid_details'] = None
+ return methods
+
+ def __len__(self):
+ return self._apply_single_storage('__len__')
+
+ def versionEmpty(self, version):
+ return self._apply_single_storage('versionEmpty', version)
+
+ def versions(self, max=None):
+ return self._apply_single_storage('versions', max)
+
+ def modifiedInVersion(self, oid):
+ return self._apply_single_storage('modifiedInVersion', oid)
+
+ def getTid(self, oid):
+ return self._apply_single_storage('getTid', oid)
+
+ # Extension methods for RAIDStorage
+ def raid_recover(self, name):
+ if self.closed:
+ raise RAIDClosedError("Storage has been closed.")
+ if name not in self.storages_degraded:
+ return
+ self.storages_degraded.remove(name)
+ self.storages_recovering.append(name)
+ t = threading.Thread(target=self._recover_impl, args=(name,))
+ t.start()
+ return 'recovering %r' % name
+
+ def _recover_impl(self, name):
+ try:
+ # First pass: Transfer all oids without hindering running transactions
+ begin = time.time()
+ self._recover_first(name)
+ end = time.time()
+
+ # Second pass: Start the TPC on a reference storage to block other
+ # transactions so we can catch up. The second pass should be
+ # significantly faster than the first.
+ begin = time.time()
+ self._recover_second(name)
+ end = time.time()
+ except:
+ # *something* went wrong. Put the storage back to degraded.
+ self._degrade_storage(name)
+ raise
+
+ def _recover_second(self, name):
+ storage = self.storages[name]
+ reference_storage = self.storages[self.storages_optimal[0]]
+ # Start a transation on the reference storage to acquire the
+ # commit log # and prevent other people from committing in the second phase.
+ # XXX This needs to be optimized in a way that the second phase
+ # gets re-run as long as possible, only holding the commit lock if
+ # no transactions remain that need to be replayed and putting the
+ # recovered storage back into the array of optimal storages.
+ while 1:
+ tm = transaction.TransactionManager()
+ t = tm.get()
+ last_transaction = get_last_transaction(storage)
+ reference_storage.tpc_begin(t)
+ unrecovered_transactions = self._unrecovered_transactions
+ if unrecovered_transactions:
+ # We acquired the commit lock and there are transactions that
+ # have been committed and were not yet transferred to the
+ # recovering storage. We have to try to replay those and then
+ # check again. We can remove the commit lock for now.
+ self._unrecovered_transactions = {}
+ reference_storage.tpc_abort(t)
+
+ # RRR: Refactor into its own method?
+ tm2 = transaction.TransactionManager()
+ t2 = tm2.get()
+
+ # Get the unrecovered transactions in the order they were
+ # recorded.
+ tids = sorted(unrecovered_transactions.keys())
+ for tid in tids:
+ oids = unrecovered_transactions[tid]
+ # We create one transaction for all oids that belong to one
+ # transaction.
+ storage.tpc_begin(t2, tid=tid)
+ for oid in oids:
+ data, tid_ = reference_storage.load(oid, '')
+ if tid_ > tid:
+ # If the current tid of the object is newer
+ # than the one we logged, we can ignore it, because
+ # there will be another entry for this oid in a
+ # later transaction.
+ continue
+ try:
+ oldserial = get_serial(storage, oid)
+ except ZODB.POSException.POSKeyError:
+ # This means that the object is new and didn't have an
+ # old transaction yet.
+ # XXX Might this also happen with non-undoable storages?
+ oldserial = ZODB.utils.z64
+ storage.store(oid, oldserial, data, '', t2)
+ storage.tpc_vote(t2)
+ storage.tpc_finish(t2)
+ # /RRR
+ else:
+ # We acquired the commit lock and no committed transactions
+ # are waiting in the log. This means the recovering storage
+ # has caught up by now and we can put it into optimal state
+ # again.
+ self.storages_recovering.remove(name)
+ self.storages_optimal.append(name)
+ # We can also stop logging stores now.
+ self._log_stores = False
+ reference_storage.tpc_abort(t)
+ break
+
+ def _recover_first(self, name):
+ """The inner loop of the recovery code. Does the actual work."""
+ # Re-open storage
+ storage = self.openers[name].open()
+ self.storages[name] = storage
+ # XXX Bring the storage to the current stage. This only copies the
+ # current data, so RAID currently does support neither undo nor versions.
+ next_oid = None
+ tm = transaction.TransactionManager()
+ t = tm.get()
+ # XXX we assume that the last written transaction actually is consistent. We need
+ # a consistency check.
+ last_transaction = get_last_transaction(storage)
+ # This flag starts logging all succcessfull stores and updates those oids
+ # in the second pass again.
+ max_transaction = get_last_transaction(self.storages[self.storages_optimal[0]])
+ self._unrecovered_transactions = {}
+ self._log_stores = True
+ # The init flag allows us to phrase the break condition of the
+ # following loop a little bit more elegantly.
+ init = True
+ while 1:
+ if next_oid is None and not init:
+ break
+
+ init = False
+ oid, tid, data, next_oid = self._apply_single_storage('record_iternext', next_oid)
+
+ if tid > max_transaction:
+ continue
+
+ if tid <= last_transaction:
+ try:
+ old_data = storage.loadSerial(oid, tid)
+ except ZODB.POSException.POSKeyError:
+ pass
+ else:
+ if old_data == data:
+ continue
+
+ # There is a newer version of the object available or the existing
+ # version was incorrect. Overwrite it with the right data.
+ try:
+ oldserial = get_serial(storage, oid)
+ except ZODB.POSException.POSKeyError:
+ oldserial = ZODB.utils.z64
+
+
+ assert oldserial <= tid, "last_transaction and oldserial are not in-sync"
+
+ storage.tpc_begin(t, tid=tid)
+ storage.store(oid, oldserial, data, '', t)
+ storage.tpc_vote(t)
+ storage.tpc_finish(t)
+
+ def raid_status(self):
+ if self.closed:
+ raise RAIDClosedError("Storage has been closed.")
+ if self.storages_recovering:
+ return 'recovering'
+ if not self.storages_degraded:
+ return 'optimal'
+ if not self.storages_optimal:
+ return 'failed'
+ return 'degraded'
+
+ def raid_details(self):
+ if self.closed:
+ raise RAIDClosedError("Storage has been closed.")
+ return [self.storages_optimal, self.storages_recovering, self.storages_degraded]
+
+ def raid_disable(self, name):
+ if self.closed:
+ raise RAIDClosedError("Storage has been closed.")
+ self._degrade_storage(name, fail=False)
+ return 'disabled %r' % name
Property changes on: gocept.zeoraid/trunk/src/gocept/zeoraid/storage.py
___________________________________________________________________
Name: svn:keywords
+ Id Rev Date
Name: svn:eol-style
+ native
Added: gocept.zeoraid/trunk/src/gocept/zeoraid/tests/__init__.py
===================================================================
--- gocept.zeoraid/trunk/src/gocept/zeoraid/tests/__init__.py (rev 0)
+++ gocept.zeoraid/trunk/src/gocept/zeoraid/tests/__init__.py 2008-01-07 11:52:54 UTC (rev 82727)
@@ -0,0 +1 @@
+#make this a package
Property changes on: gocept.zeoraid/trunk/src/gocept/zeoraid/tests/__init__.py
___________________________________________________________________
Name: svn:keywords
+ Id Rev Date
Name: svn:eol-style
+ native
Added: gocept.zeoraid/trunk/src/gocept/zeoraid/tests/test_basics.py
===================================================================
--- gocept.zeoraid/trunk/src/gocept/zeoraid/tests/test_basics.py (rev 0)
+++ gocept.zeoraid/trunk/src/gocept/zeoraid/tests/test_basics.py 2008-01-07 11:52:54 UTC (rev 82727)
@@ -0,0 +1,133 @@
+import unittest
+import tempfile
+import os
+
+from ZODB.tests import StorageTestBase, BasicStorage, \
+ TransactionalUndoStorage, VersionStorage, \
+ TransactionalUndoVersionStorage, PackableStorage, \
+ Synchronization, ConflictResolution, HistoryStorage, \
+ Corruption, RevisionStorage, PersistentStorage, \
+ MTStorage, ReadOnlyStorage, RecoveryStorage
+
+import gocept.zeoraid.storage
+
+from ZODB.FileStorage.FileStorage import FileStorage
+
+from ZEO.ClientStorage import ClientStorage
+from ZEO.tests import forker, CommitLockTests, ThreadTests
+from ZEO.tests.testZEO import get_port
+
+
+class DemoOpener(object):
+
+ class_ = FileStorage
+
+ def __init__(self, name, **kwargs):
+ self.name = name
+ self.kwargs = kwargs or {}
+
+ def open(self, **kwargs):
+ return self.class_(self.name, **self.kwargs)
+
+
+class ZEOOpener(DemoOpener):
+
+ class_ = ClientStorage
+
+
+class FileStorageBackendTests(StorageTestBase.StorageTestBase):
+
+ def open(self, **kwargs):
+ # A RAIDStorage requires openers, not storages.
+ s1 = DemoOpener('s1.fs')
+ s2 = DemoOpener('s2.fs')
+
+ self._storage = gocept.zeoraid.storage.RAIDStorage('teststorage',
+ [s1, s2], **kwargs)
+
+ def setUp(self):
+ self.open()
+
+ def tearDown(self):
+ self._storage.close()
+ self._storage.cleanup()
+ os.unlink('s1.fs')
+ os.unlink('s2.fs')
+
+
+class ZEOStorageBackendTests(StorageTestBase.StorageTestBase):
+
+ def open(self, **kwargs):
+ self._storage = gocept.zeoraid.storage.RAIDStorage('teststorage',
+ self._storages, **kwargs)
+
+ def setUp(self):
+ self._server_storage_files = []
+ self._servers = []
+ self._storages = []
+ for i in xrange(5):
+ port = get_port()
+ zconf = forker.ZEOConfig(('', port))
+ zport, adminaddr, pid, path = forker.start_zeo_server(self.getConfig(),
+ zconf, port)
+
+ self._servers.append(adminaddr)
+ self._storages.append(ZEOOpener(zport, storage='1',
+ cache_size=2000000,
+ min_disconnect_poll=0.5, wait=1,
+ wait_timeout=60))
+ self.open()
+
+ def getConfig(self):
+ filename = self.__fs_base = tempfile.mktemp()
+ self._server_storage_files.append(filename)
+ return """\
+ <filestorage 1>
+ path %s
+ </filestorage>
+ """ % filename
+
+ def tearDown(self):
+ self._storage.close()
+ for server in self._servers:
+ forker.shutdown_zeo_server(server)
+ # XXX wait for servers to come down
+ # XXX delete filestorage files
+
+class ReplicationStorageTests(BasicStorage.BasicStorage,
+ TransactionalUndoStorage.TransactionalUndoStorage,
+ RevisionStorage.RevisionStorage,
+ VersionStorage.VersionStorage,
+ TransactionalUndoVersionStorage.TransactionalUndoVersionStorage,
+ PackableStorage.PackableStorage,
+ PackableStorage.PackableUndoStorage,
+ Synchronization.SynchronizedStorage,
+ ConflictResolution.ConflictResolvingStorage,
+ ConflictResolution.ConflictResolvingTransUndoStorage,
+ HistoryStorage.HistoryStorage,
+ PersistentStorage.PersistentStorage,
+ MTStorage.MTStorage,
+ ReadOnlyStorage.ReadOnlyStorage,
+ ):
+ pass
+
+
+class FSReplicationStorageTests(FileStorageBackendTests,
+ ReplicationStorageTests):
+ pass
+
+
+class ZEOReplicationStorageTests(ZEOStorageBackendTests,
+ ReplicationStorageTests,
+ ThreadTests.ThreadTests):
+ pass
+
+
+def test_suite():
+ suite = unittest.TestSuite()
+ suite.addTest(unittest.makeSuite(FSReplicationStorageTests, "check"))
+ suite.addTest(unittest.makeSuite(ZEOReplicationStorageTests, "check"))
+ return suite
+
+if __name__=='__main__':
+ unittest.main()
Property changes on: gocept.zeoraid/trunk/src/gocept/zeoraid/tests/test_basics.py
___________________________________________________________________
Name: svn:keywords
+ Id Rev Date
Name: svn:eol-style
+ native
More information about the Checkins
mailing list