[Checkins] SVN: gocept.zeoraid/trunk/ Started work on blob support. Extended FailingStorage to support blobs.

Christian Theune ct at gocept.com
Mon Jan 28 09:19:33 EST 2008


Log message for revision 83282:
  Started work on blob support. Extended FailingStorage to support blobs.
  Implemented basic test for blob functionality.
  

Changed:
  U   gocept.zeoraid/trunk/ROADMAP.txt
  U   gocept.zeoraid/trunk/src/gocept/zeoraid/storage.py
  U   gocept.zeoraid/trunk/src/gocept/zeoraid/tests/failingstorage.py
  U   gocept.zeoraid/trunk/src/gocept/zeoraid/tests/test_basics.py

-=-
Modified: gocept.zeoraid/trunk/ROADMAP.txt
===================================================================
--- gocept.zeoraid/trunk/ROADMAP.txt	2008-01-28 13:22:08 UTC (rev 83281)
+++ gocept.zeoraid/trunk/ROADMAP.txt	2008-01-28 14:19:32 UTC (rev 83282)
@@ -28,12 +28,19 @@
 
  - Re-check API usage and definition for ZODB 3.8 as our base.
 
+ - Ensure that blob-caching parameters are equal for all clientstorages
+
+ - Provide RAID-aware blob storage implementation that ignores requests on a
+   shared file system that were handled already and are consistent.
+
 Feature-completeness
 --------------------
 
  - Rebuild storage using the copy mechanism in ZODB to get all historic
    records completely. (Only rebuild completely, not incrementally)
 
+ - Rebuild/recover with blobs!
+
  - Create a limit for the transaction rate when recovering so that the
    recovery doesn't clog up the live servers.
 
@@ -49,6 +56,9 @@
 
  - XXX pack may never be run while a storage is recovering.
 
+ - XXX Blobs: Hard links created for the multiple backend storages need to be tracked
+   and cleaned up.
+
 Future
 ======
 

Modified: gocept.zeoraid/trunk/src/gocept/zeoraid/storage.py
===================================================================
--- gocept.zeoraid/trunk/src/gocept/zeoraid/storage.py	2008-01-28 13:22:08 UTC (rev 83281)
+++ gocept.zeoraid/trunk/src/gocept/zeoraid/storage.py	2008-01-28 14:19:32 UTC (rev 83282)
@@ -7,6 +7,9 @@
 import threading
 import time
 import logging
+import tempfile
+import os
+import os.path
 
 import zope.interface
 
@@ -348,15 +351,40 @@
 
     # IBlobStorage
 
+    # XXX degradation tests
     @storeBlob_38_compatible
     @ensure_writable
     def storeBlob(self, oid, oldserial, data, blob, transaction):
         """Stores data that has a BLOB attached."""
-        # XXX
+        if transaction is not self._transaction:
+            raise ZODB.POSException.StorageTransactionError(self, transaction)
 
+        def get_blob_data():
+            # Client storages expect to be the only ones operating on the blob
+            # file. We need to create individual appearances of the original
+            # file so that they can move the file to their cache location.
+            yield (oid, oldserial, data, blob, '', transaction)
+            base_dir = tempfile.mkdtemp(dir=os.path.dirname(blob))
+            copies = 0
+            while True:
+                # We need to create a new directory to make sure that
+                # atomicity of file creation is preserved.
+                copies += 1
+                new_blob = os.path.join(base_dir, '%i.blob' % copies)
+                os.link(blob, new_blob)
+                yield (oid, oldserial, data, new_blob, '', transaction)
+
+        self._write_lock.acquire()
+        try:
+            self._apply_all_storages('storeBlob', get_blob_data)
+            return self._tid
+        finally:
+            self._write_lock.release()
+
+    # XXX degradation tests
     def loadBlob(self, oid, serial):
         """Return the filename of the Blob data for this OID and serial."""
-        # XXX
+        return self._apply_single_storage('loadBlob', (oid, serial))
 
     def temporaryDirectory(self):
         """Return a directory that should be used for uncommitted blob data.
@@ -489,6 +517,7 @@
             # Handle StorageErrors first, otherwise they would be swallowed
             # when POSErrors are.
             reliable = False
+            raise
         except (ZODB.POSException.POSError,
                 transaction.interfaces.TransactionError), e:
             # These exceptions are valid answers from the storage. They don't
@@ -520,15 +549,37 @@
     @ensure_open_storage
     def _apply_all_storages(self, method_name, args=(), kw={},
                             expect_connected=True):
-        """Calls the given method on all optimal backend storages in order."""
+        """Calls the given method on all optimal backend storages in order.
+
+        `args` can be given as an n-tupel with the positional arguments that
+        should be passed to each storage.
+
+        Alternatively `args` can be a callable that returns an iterable. The
+        N-th item of the iterable is expected to be a tuple, passed to the
+        N-th storage.
+
+        """
         results = []
         exceptions = []
+
+        if callable(args):
+            argument_iterable = args()
+        else:
+            # Provide a fallback if `args` is given as a simple tuple.
+            static_arguments = args
+            def dummy_generator():
+                while True:
+                    yield static_arguments
+            argument_iterable = dummy_generator()
+
         for name in self.storages_optimal[:]:
             try:
+                args = argument_iterable.next()
                 reliable, result = self.__apply_storage(
                     name, method_name, args, kw, expect_connected)
             except Exception, e:
                 exceptions.append(e)
+                raise
             else:
                 if reliable:
                     results.append(result)

Modified: gocept.zeoraid/trunk/src/gocept/zeoraid/tests/failingstorage.py
===================================================================
--- gocept.zeoraid/trunk/src/gocept/zeoraid/tests/failingstorage.py	2008-01-28 13:22:08 UTC (rev 83281)
+++ gocept.zeoraid/trunk/src/gocept/zeoraid/tests/failingstorage.py	2008-01-28 14:19:32 UTC (rev 83282)
@@ -10,43 +10,65 @@
 import ZODB.config
 import ZODB.FileStorage
 
+import zope.proxy
 
+
 class Opener(ZODB.config.BaseConfig):
 
     def open(self):
-        return FailingStorage(self.name)
+        blob_dir = tempfile.mkdtemp()
+        file_handle, file_name = tempfile.mkstemp()
+        fs = ZODB.FileStorage.FileStorage(file_name)
+        return FailingStorage(blob_dir, fs)
 
 
 def failing_method(name):
     """Produces a method that can be made to fail."""
     def fail(self, *args, **kw):
         if name == self._fail:
+            self._fail = None
             raise Exception()
-        return getattr(ZODB.FileStorage.FileStorage, name)(self, *args, **kw)
+        if hasattr(ZODB.blob.BlobStorage, name):
+            original_method = getattr(ZODB.blob.BlobStorage, name).fget(self)
+        else:
+            original_method = getattr(zope.proxy.getProxiedObject(self), name)
+        return original_method(*args, **kw)
     return fail
 
 
-class FailingStorage(ZODB.FileStorage.FileStorage):
+class FailingStorage(ZODB.blob.BlobStorage):
 
-    _fail = None
+    __slots__ = ('_fail',) + ZODB.blob.BlobStorage.__slots__
 
-    def __init__(self, name):
-        self.name = name
-        file_handle, file_name = tempfile.mkstemp()
-        ZODB.FileStorage.FileStorage.__init__(self, file_name)
+    def __init__(self, base_directory, storage):
+        ZODB.blob.BlobStorage.__init__(
+            self, base_directory, storage)
+        self._fail = None
 
+    @zope.proxy.non_overridable
     def close(self):
-        ZODB.FileStorage.FileStorage.close(self)
-        self.cleanup()
+        if self._fail == 'open':
+            self._fail = None
+            raise Exception()
+        zope.proxy.getProxiedObject(self).close()
+        zope.proxy.getProxiedObject(self).cleanup()
+        # XXX rmtree blobdir
 
+    @zope.proxy.non_overridable
     def getExtensionMethods(self):
         return dict(fail=None)
 
-    history = failing_method('history')
-    loadSerial = failing_method('loadSerial')
+    # Create a set of stub methods that have to be made to fail but are set as
+    # non-data descriptors on the proxy object.
+    __stub_methods__ = ['history', 'loadSerial', 'close', 'getSize',
+                        'pack', 'tpc_abort', 'tpc_finish']
+    for name in __stub_methods__:
+        method = zope.proxy.non_overridable(failing_method(name))
+        locals()[name] = method
 
+    @zope.proxy.non_overridable
     def fail(self, method_name):
-        if method_name in ['history', 'loadSerial']:
+        if method_name in self.__stub_methods__:
             # Those methods are copied/references by the server code, we can't
             # rebind them here.
             self._fail = method_name

Modified: gocept.zeoraid/trunk/src/gocept/zeoraid/tests/test_basics.py
===================================================================
--- gocept.zeoraid/trunk/src/gocept/zeoraid/tests/test_basics.py	2008-01-28 13:22:08 UTC (rev 83281)
+++ gocept.zeoraid/trunk/src/gocept/zeoraid/tests/test_basics.py	2008-01-28 14:19:32 UTC (rev 83282)
@@ -59,7 +59,6 @@
             zconf = forker.ZEOConfig(('', port))
             zport, adminaddr, pid, path = forker.start_zeo_server(self.getConfig(),
                                                                   zconf, port)
-
             self._servers.append(adminaddr)
             self._storages.append(ZEOOpener(zport, storage='1',
                                             min_disconnect_poll=0.5, wait=1,
@@ -124,6 +123,7 @@
         # Ensure compatibility
         gocept.zeoraid.compatibility.setup()
 
+        self._blob_dirs = []
         self._servers = []
         self._storages = []
         for i in xrange(self.backend_count):
@@ -134,9 +134,12 @@
                 <failingstorage 1>
                 </failingstorage>""",
                 zconf, port)
+            blob_dir = tempfile.mkdtemp()
+            self._blob_dirs.append(blob_dir)
             self._servers.append(adminaddr)
             self._storages.append(ZEOOpener(zport, storage='1',
                                             cache_size=12,
+                                            blob_dir=blob_dir,
                                             min_disconnect_poll=0.5, wait=1,
                                             wait_timeout=60))
         self._storage = gocept.zeoraid.storage.RAIDStorage('teststorage',
@@ -631,7 +634,22 @@
         self._storage.tpc_begin(t)
         self.assertEquals('optimal', self._storage.raid_status())
 
+    def test_blob_usage(self):
+        oid = self._storage.new_oid()
+        handle, blob_file_name = tempfile.mkstemp()
+        open(blob_file_name, 'w').write('I am a happy blob.')
+        t = transaction.Transaction()
+        self._storage.tpc_begin(t)
+        self._storage.storeBlob(
+          oid, ZODB.utils.z64, 'foo', blob_file_name, '', t)
+        self._storage.tpc_vote(t)
+        self._storage.tpc_finish(t)
+        stored_file_name = self._storage.loadBlob(
+            oid, self._storage.lastTransaction())
+        self.assertEquals('I am a happy blob.',
+                          open(stored_file_name, 'r').read())
 
+
 class ZEOReplicationStorageTests(ZEOStorageBackendTests,
                                  ReplicationStorageTests,
                                  ThreadTests.ThreadTests):



More information about the Checkins mailing list