[Checkins] SVN: gocept.zeoraid/trunk/ - Cleanup.

Christian Theune ct at gocept.com
Wed Jan 9 10:28:08 EST 2008


Log message for revision 82772:
  - Cleanup.
  - Working through the APIs by sorting them. 
  - Moved patches to a more flexible approach of compatibility layers for
    storages.
  - Removed support for direct FileStorages for now, we'll stick to
    ClientStorage as the backend.
  

Changed:
  U   gocept.zeoraid/trunk/ROADMAP.txt
  U   gocept.zeoraid/trunk/setup.py
  U   gocept.zeoraid/trunk/src/gocept/zeoraid/__init__.py
  A   gocept.zeoraid/trunk/src/gocept/zeoraid/compatibility.py
  U   gocept.zeoraid/trunk/src/gocept/zeoraid/datatypes.py
  A   gocept.zeoraid/trunk/src/gocept/zeoraid/interfaces.py
  D   gocept.zeoraid/trunk/src/gocept/zeoraid/patches.py
  U   gocept.zeoraid/trunk/src/gocept/zeoraid/storage.py
  U   gocept.zeoraid/trunk/src/gocept/zeoraid/tests/test_basics.py
  A   gocept.zeoraid/trunk/src/gocept/zeoraid/utils.py

-=-
Modified: gocept.zeoraid/trunk/ROADMAP.txt
===================================================================
--- gocept.zeoraid/trunk/ROADMAP.txt	2008-01-09 11:05:41 UTC (rev 82771)
+++ gocept.zeoraid/trunk/ROADMAP.txt	2008-01-09 15:28:07 UTC (rev 82772)
@@ -67,3 +67,5 @@
 - Asynchronuous write to off-site servers
 
 - Better performance for reading (distribute read load)
+
+- Cleaner compatibility setup

Modified: gocept.zeoraid/trunk/setup.py
===================================================================
--- gocept.zeoraid/trunk/setup.py	2008-01-09 11:05:41 UTC (rev 82771)
+++ gocept.zeoraid/trunk/setup.py	2008-01-09 15:28:07 UTC (rev 82772)
@@ -17,7 +17,9 @@
     include_package_data = True,
     package_dir = {'':'src'},
     namespace_packages = ['gocept'],
-    install_requires = ['setuptools', 'ZODB3<3.9dev'],
+    install_requires = ['setuptools',
+                        'ZODB3<3.9dev',
+                        'zope.component'],
     extras_require = {
         'recipe': ['zc.buildout']
     },

Modified: gocept.zeoraid/trunk/src/gocept/zeoraid/__init__.py
===================================================================
--- gocept.zeoraid/trunk/src/gocept/zeoraid/__init__.py	2008-01-09 11:05:41 UTC (rev 82771)
+++ gocept.zeoraid/trunk/src/gocept/zeoraid/__init__.py	2008-01-09 15:28:07 UTC (rev 82772)
@@ -1 +1,5 @@
-import gocept.zeoraid.patches
+# vim:fileencoding=utf-8
+# Copyright (c) 2007 gocept gmbh & co. kg
+# See also LICENSE.txt
+# $Id$
+"""ZEORaid storage implementation."""

Added: gocept.zeoraid/trunk/src/gocept/zeoraid/compatibility.py
===================================================================
--- gocept.zeoraid/trunk/src/gocept/zeoraid/compatibility.py	                        (rev 0)
+++ gocept.zeoraid/trunk/src/gocept/zeoraid/compatibility.py	2008-01-09 15:28:07 UTC (rev 82772)
@@ -0,0 +1,55 @@
+# vim:fileencoding=utf-8
+# Copyright (c) 2007 gocept gmbh & co. kg
+# See also LICENSE.txt
+# $Id$
+"""Compatibility layers."""
+
+import zope.interface
+import zope.component
+import zope.proxy
+import zope.proxy.decorator
+
+import ZEO.ClientStorage
+
+import gocept.zeoraid.interfaces
+import gocept.zeoraid.utils
+
+
+class ClientStorage38(zope.proxy.decorator.SpecificationDecoratorBase):
+    """Compatibility layer for the ClientStorage of ZODB 3.8.
+
+    Includes fixes for:
+
+        - lastTransaction: uses the _cache incorrectly to determine the last
+          transaction.
+
+    """
+
+    zope.component.adapts(
+        ZEO.ClientStorage.ClientStorage)
+    zope.interface.implements(
+        gocept.zeoraid.interfaces.IRAIDCompatibleStorage)
+
+    @zope.proxy.non_overridable
+    def lastTransaction(self):
+        return zope.proxy.getProxiedObject(self)._server.lastTransaction()
+
+
+compatibility_matrix = {
+    '3.8': ClientStorage38
+}
+
+
+compatibility_initialized = False
+
+
+def setup():
+    global compatibility_initialized
+    if compatibility_initialized:
+        return
+    zodb_version = gocept.zeoraid.utils.guess_zodb_version()
+    gocept.zeoraid.utils.logger.info(
+        'Setting up compatibility layer for ZODB %s.' % zodb_version)
+    storage_adapter = compatibility_matrix[zodb_version]
+    zope.component.provideAdapter(storage_adapter)
+    compatibility_initialized = True


Property changes on: gocept.zeoraid/trunk/src/gocept/zeoraid/compatibility.py
___________________________________________________________________
Name: svn:eol-style
   + native

Modified: gocept.zeoraid/trunk/src/gocept/zeoraid/datatypes.py
===================================================================
--- gocept.zeoraid/trunk/src/gocept/zeoraid/datatypes.py	2008-01-09 11:05:41 UTC (rev 82771)
+++ gocept.zeoraid/trunk/src/gocept/zeoraid/datatypes.py	2008-01-09 15:28:07 UTC (rev 82772)
@@ -4,5 +4,7 @@
 class Storage(ZODB.config.BaseConfig):
 
     def open(self):
+        # Ensure that compatibility is set up.
+        gocept.zeoraid.compatibility.setup()
         return gocept.zeoraid.storage.RAIDStorage(self.name,
                                                   self.config.storages)

Added: gocept.zeoraid/trunk/src/gocept/zeoraid/interfaces.py
===================================================================
--- gocept.zeoraid/trunk/src/gocept/zeoraid/interfaces.py	                        (rev 0)
+++ gocept.zeoraid/trunk/src/gocept/zeoraid/interfaces.py	2008-01-09 15:28:07 UTC (rev 82772)
@@ -0,0 +1,41 @@
+# vim:fileencoding=utf-8
+# Copyright (c) 2007 gocept gmbh & co. kg
+# See also LICENSE.txt
+# $Id$
+"""Interface descriptions"""
+
+
+import zope.interface
+
+import ZEO.ClientStorage
+
+
+class RAIDError(Exception):
+    pass
+
+
+class RAIDClosedError(RAIDError, ZEO.ClientStorage.ClientStorageError):
+    pass
+
+
+class IRAIDStorage(zope.interface.Interface):
+    """A ZODB storage providing simple RAID capabilities."""
+
+    def raid_status():
+        pass
+
+    def raid_details():
+        pass
+
+    def raid_disable(name):
+        pass
+
+    def raid_recover(name):
+        pass
+
+
+class IRAIDCompatibleStorage(zope.interface.Interface):
+    """A compatibility layer that every (backend) storage is adapted to before
+    a RAID storage works with it.
+    """
+    # XXX Document the implemented interfaces


Property changes on: gocept.zeoraid/trunk/src/gocept/zeoraid/interfaces.py
___________________________________________________________________
Name: svn:eol-style
   + native

Deleted: gocept.zeoraid/trunk/src/gocept/zeoraid/patches.py
===================================================================
--- gocept.zeoraid/trunk/src/gocept/zeoraid/patches.py	2008-01-09 11:05:41 UTC (rev 82771)
+++ gocept.zeoraid/trunk/src/gocept/zeoraid/patches.py	2008-01-09 15:28:07 UTC (rev 82772)
@@ -1,8 +0,0 @@
-
-# Helper method to make ZEO play nice. IMHO ZEO does not implement the
-# interface correctly.
-def _zeoraid_lastTransaction(self):
-    return self._server.lastTransaction()
-
-import ZEO.ClientStorage
-ZEO.ClientStorage.ClientStorage._zeoraid_lastTransaction = _zeoraid_lastTransaction

Modified: gocept.zeoraid/trunk/src/gocept/zeoraid/storage.py
===================================================================
--- gocept.zeoraid/trunk/src/gocept/zeoraid/storage.py	2008-01-09 11:05:41 UTC (rev 82771)
+++ gocept.zeoraid/trunk/src/gocept/zeoraid/storage.py	2008-01-09 15:28:07 UTC (rev 82772)
@@ -1,3 +1,9 @@
+# vim:fileencoding=utf-8
+# Copyright (c) 2007 gocept gmbh & co. kg
+# See also LICENSE.txt
+# $Id$
+"""ZEORaid storage implementation."""
+
 import threading
 import time
 
@@ -3,40 +9,16 @@
 import zope.interface
 
-import ZODB.interfaces
-import ZEO.interfaces
 import ZEO.ClientStorage
+import ZEO.interfaces
 import ZODB.POSException
+import ZODB.interfaces
 import ZODB.utils
 import persistent.TimeStamp
 import transaction
 
+import gocept.zeoraid.interfaces
+import gocept.zeoraid.compatibility
 
-# XXX
-def get_serial(storage, oid):
-    if hasattr(storage, 'lastTid'):
-        # This is something like a FileStorage
-        get_serial = storage.lastTid
-    else:
-        get_serial = storage.getTid
-    return get_serial(oid)
 
-
-# XXX
-def get_last_transaction(storage):
-    if hasattr(storage, '_zeoraid_lastTransaction'):
-        last_transaction = storage._zeoraid_lastTransaction()
-    else:
-        last_transaction = storage.lastTransaction()
-    return last_transaction
-
-
-class RAIDError(Exception):
-    pass
-
-
-class RAIDClosedError(RAIDError, ZEO.ClientStorage.ClientStorageError):
-    pass
-
-
 class RAIDStorage(object):
     """The RAID storage is a drop-in replacement for the client storages that
@@ -59,7 +41,7 @@
     closed = False
     _transaction = None
 
-    def __init__(self, name, storages, read_only=False):
+    def __init__(self, name, openers, read_only=False):
         self.__name__ = name
         self.read_only = read_only
         self.storages = {}
@@ -76,9 +58,9 @@
         # Remember the openers to for recovering a storage later
         self.openers = {}
         # Open the storages
-        for opener in storages:
-            self.storages[opener.name] = opener.open()
+        for opener in openers:
             self.openers[opener.name] = opener
+            self._open_storage(opener.name)
 
         self.storages_optimal = []
         self.storages_degraded = []
@@ -87,7 +69,7 @@
         tids = {}
         for name, storage in self.storages.items():
             try:
-                tid = get_last_transaction(storage)
+                tid = storage.lastTransaction()
             except ZEO.ClientStorage.ClientDisconnected:
                 self._degrade_storage(name, fail=False)
                 continue
@@ -116,77 +98,11 @@
         self.ts = persistent.TimeStamp.TimeStamp(*(time.gmtime(t)[:5] + (t%60,)))
 
         if not self.storages_optimal:
-            raise RAIDError("Can't start without at least one optimal storage.")
+            raise gocept.zeoraid.interfaces.RAIDError("Can't start without at least one optimal storage.")
 
-    def _degrade_storage(self, name, fail=True):
-        if name in self.storages_optimal:
-            self.storages_optimal.remove(name)
-        self.storages_degraded.append(name)
-        storage = self.storages[name]
-        t = threading.Thread(target=storage.close)
-        t.start()
-        del self.storages[name]
-        if not self.storages_optimal and fail:
-            raise RAIDError("No storages remain.")
-
-    def _apply_single_storage(self, method_name, *args, **kw):
-        if self.closed:
-            raise RAIDClosedError("Storage has been closed.")
-        storages = self.storages_optimal[:]
-        if not storages:
-            raise RAIDError("RAID storage is failed.")
-
-        while storages:
-            # XXX storage might be degraded by now, need to check.
-            name = self.storages_optimal[0]
-            storage = self.storages[name]
-            try:
-                # Make random/hashed selection of read storage
-                method = getattr(storage, method_name)
-                return method(*args, **kw)
-            except ZEO.ClientStorage.ClientDisconnected:
-                # XXX find other possible exceptions
-                self._degrade_storage(name)
-
-    def _apply_all_storages(self, method_name, *args, **kw):
-        if self.closed:
-            raise RAIDClosedError("Storage has been closed.")
-        results = []
-        storages = self.storages_optimal[:]
-        if not storages:
-            raise RAIDError("RAID storage is failed.")
-
-        for name in self.storages_optimal:
-            storage = self.storages[name]
-            try:
-                method = getattr(storage, method_name)
-                results.append(method(*args, **kw))
-            except ZEO.ClientStorage.ClientDisconnected:
-                self._degrade_storage(name)
-
-        res = results[:]
-        for test1 in res:
-            for test2 in res:
-                assert test1 == test2, "Results not consistent. Asynchronous storage?"
-        return results[0]
-
     # IStorage
 
-    def sortKey(self):
-        return id(self)
-
-    def isReadOnly(self):
-        """
-        XXX Revisit this approach?
-        """
-        return self.read_only
-
-    def getName(self):
-        return self.__name__
-
-    def getSize(self):
-        return self._apply_single_storage('getSize')
-
+    # XXX
     def close(self):
         if self.closed:
             # Storage may be closed more than once, e.g. by tear-down methods
@@ -196,50 +112,46 @@
         self.storages_optimal = []
         self.closed = True
 
-    def cleanup(self):
-        # XXX This is not actually documented, it's not implemented in all
-        # storages, it's not even clear when it should be called. Not
-        # correctly calling storages' cleanup might leave turds.
-        pass
+    # XXX
+    def getName(self):
+        return self.__name__
 
-    def load(self, oid, version):
-        return self._apply_single_storage('load', oid, version)
+    # XXX
+    def getSize(self):
+        return self._apply_single_storage('getSize')
 
-    def loadEx(self, oid, version):
-        return self._apply_single_storage('loadEx', oid, version)
+    # XXX
+    def history(self, oid, version=None, size=1):
+        return self._apply_single_storage('history', oid, version, size)
 
-    def store(self, oid, oldserial, data, version, transaction):
-        if self.isReadOnly():
-            raise ZODB.POSException.ReadOnlyError()
-        if transaction is not self._transaction:
-            raise ZODB.POSException.StorageTransactionError(self, transaction)
+    # XXX
+    def isReadOnly(self):
+        """
+        XXX Revisit this approach?
+        """
+        return self.read_only
 
-        self._lock_acquire()
-        try:
-            self._apply_all_storages('store', oid, oldserial, data, version, 
-                                     transaction)
-            if self._log_stores:
-                oids = self._unrecovered_transactions.setdefault(self._tid, [])
-                oids.append(oid)
-            return self._tid
-        finally:
-            self._lock_release()
-
+    # XXX
     def lastTransaction(self):
         return self._apply_single_storage('lastTransaction')
 
-    def loadSerial(self, oid, serial):
-        return self._apply_single_storage('loadSerial', oid, serial)
+    # XXX
+    def __len__(self):
+        return self._apply_single_storage('__len__')
 
+    # XXX
+    def load(self, oid, version):
+        return self._apply_single_storage('load', oid, version)
+
+    # XXX
     def loadBefore(self, oid, tid):
         return self._apply_single_storage('loadBefore', oid, tid)
 
-    #def iterator(self):
-    # XXX Dunno
+    # XXX
+    def loadSerial(self, oid, serial):
+        return self._apply_single_storage('loadSerial', oid, serial)
 
-    def history(self, oid, version=None, size=1):
-        return self._apply_single_storage('history', oid, version, size)
-
+    # XXX
     def new_oid(self):
         # XXX This is not exactly a read operation, but we only need an answer from one storage
         if self.isReadOnly():
@@ -250,59 +162,41 @@
         finally:
             self._lock_release()
 
+    # XXX
+    def pack(self, t, referencesf):
+        if self.isReadOnly():
+            raise ZODB.POSException.ReadOnlyError()
+        self._apply_all_storages('pack', t, referencesf)
+
+    # XXX
     def registerDB(self, db, limit=None):
         # XXX Is it safe to register a DB with multiple storages or do we need some kind
         # of wrapper here?
         self._apply_all_storages('registerDB', db)
 
-    def supportsUndo(self):
-        return True
+    # XXX
+    def sortKey(self):
+        return id(self)
 
-    def undoLog(self, first=0, last=-20, filter=None):
-        return self._apply_single_storage('undoLog', first, last, filter)
-
-    def undoInfo(self, first=0, last=-20, specification=None):
-        return self._apply_single_storage('undoInfo', first, last,
-                                          specification)
-
-    def undo(self, transaction_id, transaction):
+    # XXX
+    def store(self, oid, oldserial, data, version, transaction):
         if self.isReadOnly():
             raise ZODB.POSException.ReadOnlyError()
-        self._lock_acquire()
-        try:
-            return self._apply_all_storages('undo', transaction_id, transaction)
-        finally:
-            self._lock_release()
+        if transaction is not self._transaction:
+            raise ZODB.POSException.StorageTransactionError(self, transaction)
 
-    def supportsTransactionalUndo(self):
-        return True
-
-    def pack(self, t, referencesf):
-        if self.isReadOnly():
-            raise ZODB.POSException.ReadOnlyError()
-        self._apply_all_storages('pack', t, referencesf)
-
-    def supportsVersions(self):
-        return True
-
-    def commitVersion(self, src, dest, transaction):
-        if self.isReadOnly():
-            raise ZODB.POSException.ReadOnlyError()
         self._lock_acquire()
         try:
-            return self._apply_all_storages('commitVersion', src, dest, transaction)
+            self._apply_all_storages('store', oid, oldserial, data, version, 
+                                     transaction)
+            if self._log_stores:
+                oids = self._unrecovered_transactions.setdefault(self._tid, [])
+                oids.append(oid)
+            return self._tid
         finally:
             self._lock_release()
 
-    def abortVersion(self, src, transaction):
-        if self.isReadOnly():
-            raise ZODB.POSException.ReadOnlyError()
-        self._lock_acquire()
-        try:
-            return self._apply_all_storages('abortVersion', src, transaction)
-        finally:
-            self._lock_release()
-
+    # XXX
     def tpc_abort(self, transaction):
         self._lock_acquire()
         try:
@@ -322,10 +216,7 @@
         finally:
             self._lock_release()
 
-    def tpc_transaction(self):
-        """The current transaction being committed."""
-        return self._transaction
-
+    # XXX
     def tpc_begin(self, transaction, tid=None, status=' '):
         if self.isReadOnly():
             raise ZODB.POSException.ReadOnlyError()
@@ -348,7 +239,7 @@
             for name in self.storages_optimal:
                 storage = self.storages[name]
                 try:
-                    last_tid = get_last_transaction(storage)
+                    last_tid = storage.lastTransaction()
                 except ZEO.ClientStorage.ClientDisconnected:
                     self._degrade_storage(name, fail=False)
                     continue
@@ -369,15 +260,7 @@
         finally:
             self._lock_release()
 
-    def tpc_vote(self, transaction):
-        self._lock_acquire()
-        try:
-            if transaction is not self._transaction:
-                return
-            self._apply_all_storages('tpc_vote', transaction)
-        finally:
-            self._lock_release()
-
+    # XXX
     def tpc_finish(self, transaction, callback=None):
         self._lock_acquire()
         try:
@@ -395,15 +278,91 @@
         finally:
             self._lock_release()
 
-    def getSerial(self, oid):
+    # XXX
+    def tpc_vote(self, transaction):
         self._lock_acquire()
         try:
-            return self._apply_single_storage('getSerial', oid)
+            if transaction is not self._transaction:
+                return
+            self._apply_all_storages('tpc_vote', transaction)
         finally:
             self._lock_release()
 
+    def cleanup(self):
+        # XXX This is not actually documented, it's not implemented in all
+        # storages, it's not even clear when it should be called. Not
+        # correctly calling storages' cleanup might leave turds.
+        pass
+
+    def supportsVersions(self):
+        return False
+
+    def modifiedInVersion(self, oid):
+        return ''
+
+    # IBlobStorage
+
+    def storeBlob(self, oid, oldserial, data, blob, version, transaction):
+        """Stores data that has a BLOB attached."""
+        # XXX
+
+    def loadBlob(self, oid, serial):
+        """Return the filename of the Blob data for this OID and serial."""
+        # XXX
+
+    def temporaryDirectory(self):
+        """Return a directory that should be used for uncommitted blob data.
+        """
+        # XXX
+
+    # IStorageUndoable
+
+    # XXX
+    def supportsUndo(self):
+        return True
+
+    # XXX
+    def undo(self, transaction_id, transaction):
+        if self.isReadOnly():
+            raise ZODB.POSException.ReadOnlyError()
+        self._lock_acquire()
+        try:
+            return self._apply_all_storages('undo', transaction_id, transaction)
+        finally:
+            self._lock_release()
+
+    # XXX
+    def undoLog(self, first=0, last=-20, filter=None):
+        return self._apply_single_storage('undoLog', first, last, filter)
+
+    # XXX
+    def undoInfo(self, first=0, last=-20, specification=None):
+        return self._apply_single_storage('undoInfo', first, last,
+                                          specification)
+
+    # IStorageCurrentRecordIteration
+
+    # XXX
+    def record_iternext(self, next=None):
+        """Iterate over the records in a storage."""
+
+    # IServeable
+
+    # XXX
+    def lastInvalidations(self, size):
+        """Get recent transaction invalidations."""
+
+    # XXX
+    def tpc_transaction(self):
+        """The current transaction being committed."""
+        return self._transaction
+
+    # XXX
+    def getTid(self, oid):
+        return self._apply_single_storage('getTid', oid)
+
+    # XXX
     def getExtensionMethods(self):
-        # XXX This is very awkward right now.
         methods = self._apply_single_storage('getExtensionMethods')
         if methods is None:
             # Allow management while status is 'failed'
@@ -414,25 +373,41 @@
         methods['raid_details'] = None
         return methods
 
-    def __len__(self):
-        return self._apply_single_storage('__len__')
+    # IRAIDStorage
 
-    def versionEmpty(self, version):
-        return self._apply_single_storage('versionEmpty', version)
+    # XXX
+    def raid_status(self):
+        if self.closed:
+            raise gocept.zeoraid.interfaces.RAIDClosedError(
+                "Storage has been closed.")
+        if self.storages_recovering:
+            return 'recovering'
+        if not self.storages_degraded:
+            return 'optimal'
+        if not self.storages_optimal:
+            return 'failed'
+        return 'degraded'
 
-    def versions(self, max=None):
-        return self._apply_single_storage('versions', max)
+    # XXX
+    def raid_details(self):
+        if self.closed:
+            raise gocept.zeoraid.interfaces.RAIDClosedError(
+                "Storage has been closed.")
+        return [self.storages_optimal, self.storages_recovering, self.storages_degraded]
 
-    def modifiedInVersion(self, oid):
-        return self._apply_single_storage('modifiedInVersion', oid)
+    # XXX
+    def raid_disable(self, name):
+        if self.closed:
+            # XXX refactor into decorator
+            raise gocept.zeoraid.interfaces.RAIDClosedError(
+                "Storage has been closed.")
+        self._degrade_storage(name, fail=False)
+        return 'disabled %r' % name
 
-    def getTid(self, oid):
-        return self._apply_single_storage('getTid', oid)
-
-    # Extension methods for RAIDStorage
+    # XXX
     def raid_recover(self, name):
         if self.closed:
-            raise RAIDClosedError("Storage has been closed.")
+            raise gocept.zeoraid.interfaces.RAIDClosedError("Storage has been closed.")
         if name not in self.storages_degraded:
             return
         self.storages_degraded.remove(name)
@@ -441,6 +416,66 @@
         t.start()
         return 'recovering %r' % name
 
+    # internal
+
+    def _open_storage(self, name):
+        assert name not in self.storages, "Storage %s already opened" % name
+        storage = self.openers[name].open()
+        storage = gocept.zeoraid.interfaces.IRAIDCompatibleStorage(storage)
+        self.storages[name] = storage
+
+    def _degrade_storage(self, name, fail=True):
+        if name in self.storages_optimal:
+            self.storages_optimal.remove(name)
+        self.storages_degraded.append(name)
+        storage = self.storages[name]
+        t = threading.Thread(target=storage.close)
+        t.start()
+        del self.storages[name]
+        if not self.storages_optimal and fail:
+            raise gocept.zeoraid.interfaces.RAIDError("No storages remain.")
+
+    def _apply_single_storage(self, method_name, *args, **kw):
+        if self.closed:
+            raise gocept.zeoraid.interfaces.RAIDClosedError("Storage has been closed.")
+        storages = self.storages_optimal[:]
+        if not storages:
+            raise gocept.zeoraid.interfaces.RAIDError("RAID storage is failed.")
+
+        while storages:
+            # XXX storage might be degraded by now, need to check.
+            name = self.storages_optimal[0]
+            storage = self.storages[name]
+            try:
+                # Make random/hashed selection of read storage
+                method = getattr(storage, method_name)
+                return method(*args, **kw)
+            except ZEO.ClientStorage.ClientDisconnected:
+                # XXX find other possible exceptions
+                self._degrade_storage(name)
+
+    def _apply_all_storages(self, method_name, *args, **kw):
+        if self.closed:
+            raise gocept.zeoraid.interfaces.RAIDClosedError("Storage has been closed.")
+        results = []
+        storages = self.storages_optimal[:]
+        if not storages:
+            raise gocept.zeoraid.interfaces.RAIDError("RAID storage is failed.")
+
+        for name in self.storages_optimal:
+            storage = self.storages[name]
+            try:
+                method = getattr(storage, method_name)
+                results.append(method(*args, **kw))
+            except ZEO.ClientStorage.ClientDisconnected:
+                self._degrade_storage(name)
+
+        res = results[:]
+        for test1 in res:
+            for test2 in res:
+                assert test1 == test2, "Results not consistent. Asynchronous storage?"
+        return results[0]
+
     def _recover_impl(self, name):
         try:
             # First pass: Transfer all oids without hindering running transactions
@@ -471,7 +506,7 @@
         while 1:
             tm = transaction.TransactionManager()
             t = tm.get()
-            last_transaction = get_last_transaction(storage)
+            last_transaction = storage.lastTransaction()
             reference_storage.tpc_begin(t)
             unrecovered_transactions = self._unrecovered_transactions
             if unrecovered_transactions:
@@ -503,7 +538,7 @@
                             # later transaction.
                             continue
                         try:
-                            oldserial = get_serial(storage, oid)
+                            oldserial = storage.getTid(oid)
                         except ZODB.POSException.POSKeyError:
                             # This means that the object is new and didn't have an
                             # old transaction yet. 
@@ -537,10 +572,10 @@
         t = tm.get()
         # XXX we assume that the last written transaction actually is consistent. We need
         # a consistency check.
-        last_transaction = get_last_transaction(storage)
+        last_transaction = storage.lastTransaction()
         # This flag starts logging all succcessfull stores and updates those oids
         # in the second pass again.
-        max_transaction = get_last_transaction(self.storages[self.storages_optimal[0]])
+        max_transaction = self.storages[self.storages_optimal[0]].lastTransaction()
         self._unrecovered_transactions = {}
         self._log_stores = True
         # The init flag allows us to phrase the break condition of the 
@@ -568,7 +603,7 @@
             # There is a newer version of the object available or the existing
             # version was incorrect. Overwrite it with the right data.
             try:
-                oldserial = get_serial(storage, oid)
+                oldserial = storage.getTid(oid)
             except ZODB.POSException.POSKeyError:
                 oldserial = ZODB.utils.z64
 
@@ -579,50 +614,3 @@
             storage.store(oid, oldserial, data, '', t)
             storage.tpc_vote(t)
             storage.tpc_finish(t)
-
-    def raid_status(self):
-        if self.closed:
-            raise RAIDClosedError("Storage has been closed.")
-        if self.storages_recovering:
-            return 'recovering'
-        if not self.storages_degraded:
-            return 'optimal'
-        if not self.storages_optimal:
-            return 'failed'
-        return 'degraded'
-
-    def raid_details(self):
-        if self.closed:
-            raise RAIDClosedError("Storage has been closed.")
-        return [self.storages_optimal, self.storages_recovering, self.storages_degraded]
-
-    def raid_disable(self, name):
-        if self.closed:
-            raise RAIDClosedError("Storage has been closed.")
-        self._degrade_storage(name, fail=False)
-        return 'disabled %r' % name
-
-    # IBlobStorage
-
-    def storeBlob(self, oid, oldserial, data, blob, version, transaction):
-        """Stores data that has a BLOB attached."""
-        # XXX
-
-    def loadBlob(self, oid, serial):
-        """Return the filename of the Blob data for this OID and serial."""
-        # XXX
-
-    def temporaryDirectory(self):
-        """Return a directory that should be used for uncommitted blob data.
-        """
-        # XXX
-
-    # IStorageCurrentRecordIteration
-
-    def record_iternext(self, next=None):
-        """Iterate over the records in a storage."""
-
-    # IServeable
-
-    def lastInvalidations(self, size):
-        """Get recent transaction invalidations."""

Modified: gocept.zeoraid/trunk/src/gocept/zeoraid/tests/test_basics.py
===================================================================
--- gocept.zeoraid/trunk/src/gocept/zeoraid/tests/test_basics.py	2008-01-09 11:05:41 UTC (rev 82771)
+++ gocept.zeoraid/trunk/src/gocept/zeoraid/tests/test_basics.py	2008-01-09 15:28:07 UTC (rev 82772)
@@ -5,16 +5,13 @@
 import zope.interface.verify
 
 from ZODB.tests import StorageTestBase, BasicStorage, \
-             TransactionalUndoStorage, VersionStorage, \
-             TransactionalUndoVersionStorage, PackableStorage, \
+             TransactionalUndoStorage, PackableStorage, \
              Synchronization, ConflictResolution, HistoryStorage, \
              Corruption, RevisionStorage, PersistentStorage, \
              MTStorage, ReadOnlyStorage, RecoveryStorage
 
 import gocept.zeoraid.storage
 
-from ZODB.FileStorage.FileStorage import FileStorage
-
 from ZEO.ClientStorage import ClientStorage
 from ZEO.tests import forker, CommitLockTests, ThreadTests
 from ZEO.tests.testZEO import get_port
@@ -23,43 +20,16 @@
 import ZEO.interfaces
 
 
-class DemoOpener(object):
+class ZEOOpener(object):
 
-    class_ = FileStorage
-
     def __init__(self, name, **kwargs):
         self.name = name
         self.kwargs = kwargs or {}
 
     def open(self, **kwargs):
-        return self.class_(self.name, **self.kwargs)
+        return ClientStorage(self.name, **self.kwargs)
 
 
-class ZEOOpener(DemoOpener):
-
-    class_ = ClientStorage
-
-
-class FileStorageBackendTests(StorageTestBase.StorageTestBase):
-
-    def open(self, **kwargs):
-        # A RAIDStorage requires openers, not storages.
-        s1 = DemoOpener('s1.fs')
-        s2 = DemoOpener('s2.fs')
-
-        self._storage = gocept.zeoraid.storage.RAIDStorage('teststorage',
-                                                           [s1, s2], **kwargs)
-
-    def setUp(self):
-        self.open()
-
-    def tearDown(self):
-        self._storage.close()
-        self._storage.cleanup()
-        os.unlink('s1.fs')
-        os.unlink('s2.fs')
-
-
 class ZEOStorageBackendTests(StorageTestBase.StorageTestBase):
 
     def open(self, **kwargs):
@@ -67,6 +37,8 @@
                                                            self._storages, **kwargs)
 
     def setUp(self):
+        # Ensure compatibility
+        gocept.zeoraid.compatibility.setup()
         self._server_storage_files = []
         self._servers = []
         self._storages = []
@@ -102,8 +74,6 @@
 class ReplicationStorageTests(BasicStorage.BasicStorage,
         TransactionalUndoStorage.TransactionalUndoStorage,
         RevisionStorage.RevisionStorage,
-        VersionStorage.VersionStorage,
-        TransactionalUndoVersionStorage.TransactionalUndoVersionStorage,
         PackableStorage.PackableStorage,
         PackableStorage.PackableUndoStorage,
         Synchronization.SynchronizedStorage,
@@ -126,11 +96,6 @@
                                                             self._storage))
 
 
-class FSReplicationStorageTests(FileStorageBackendTests,
-                                ReplicationStorageTests):
-    pass
-
-
 class ZEOReplicationStorageTests(ZEOStorageBackendTests,
                                  ReplicationStorageTests,
                                  ThreadTests.ThreadTests):
@@ -139,7 +104,6 @@
 
 def test_suite():
     suite = unittest.TestSuite()
-    suite.addTest(unittest.makeSuite(FSReplicationStorageTests, "check"))
     suite.addTest(unittest.makeSuite(ZEOReplicationStorageTests, "check"))
     return suite
 

Added: gocept.zeoraid/trunk/src/gocept/zeoraid/utils.py
===================================================================
--- gocept.zeoraid/trunk/src/gocept/zeoraid/utils.py	                        (rev 0)
+++ gocept.zeoraid/trunk/src/gocept/zeoraid/utils.py	2008-01-09 15:28:07 UTC (rev 82772)
@@ -0,0 +1,32 @@
+# vim:fileencoding=utf-8
+# Copyright (c) 2007 gocept gmbh & co. kg
+# See also LICENSE.txt
+# $Id$
+"""Utilities."""
+
+import logging
+logger = logging.getLogger('gocept.zeoraid')
+
+
+def guess_zodb_version():
+    # Determine the version of ZODB used. Unfortunately ZODB doesn't have a
+    # reliable version number accessible, so we need to do some guesstimation:
+
+    # If there are no versions, we have a ZODB 3.9 (or later)
+    import ZODB.DemoStorage
+    if not hasattr(ZODB.DemoStorage.DemoStorage, 'supportsVersions'):
+        return '3.9+'
+
+    # If there are blobs, we have a ZODB 3.8
+    try:
+        import ZODB.blob
+    except ImportError:
+        pass
+    else:
+        return '3.8'
+
+    # if ... we have a ZODB 3.7
+
+    # if ... we have a ZODB 3.6
+
+    # if ... we have a ZODB < 3.6 (unsupported)


Property changes on: gocept.zeoraid/trunk/src/gocept/zeoraid/utils.py
___________________________________________________________________
Name: svn:eol-style
   + native



More information about the Checkins mailing list