[Zodb-checkins] SVN: ZODB/trunk/ - Merged ctheune-blob-merge-branch to trunk.

Christian Theune ct at gocept.com
Wed Nov 29 10:30:37 EST 2006


Log message for revision 71330:
   - Merged ctheune-blob-merge-branch to trunk. 
  

Changed:
  U   ZODB/trunk/NEWS.txt
  U   ZODB/trunk/src/ZEO/ClientStorage.py
  U   ZODB/trunk/src/ZEO/ServerStub.py
  U   ZODB/trunk/src/ZEO/StorageServer.py
  U   ZODB/trunk/src/ZEO/tests/testZEO.py
  A   ZODB/trunk/src/ZODB/Blobs/
  U   ZODB/trunk/src/ZODB/Connection.py
  U   ZODB/trunk/src/ZODB/DB.py
  U   ZODB/trunk/src/ZODB/ExportImport.py
  U   ZODB/trunk/src/ZODB/FileStorage/FileStorage.py
  U   ZODB/trunk/src/ZODB/component.xml
  U   ZODB/trunk/src/ZODB/config.py
  A   ZODB/trunk/src/ZODB/tests/loggingsupport.py
  U   ZODB/trunk/src/ZODB/tests/testConfig.py
  U   ZODB/trunk/src/ZODB/tests/testConnectionSavepoint.txt
  U   ZODB/trunk/src/ZODB/utils.py
  _U  ZODB/trunk/src/zope/

-=-
Modified: ZODB/trunk/NEWS.txt
===================================================================
--- ZODB/trunk/NEWS.txt	2006-11-29 14:54:23 UTC (rev 71329)
+++ ZODB/trunk/NEWS.txt	2006-11-29 15:30:36 UTC (rev 71330)
@@ -25,7 +25,16 @@
   Clean up weird import dance with ZODB. This is unnecessary since the
   transaction module stopped being imported in ZODB/__init__.py in rev 39622.
 
+Blobs
+-----
 
+- (3.8a1) Added new blob feature. See the ZODB/Blobs directory for
+  documentation.
+
+  ZODB now handles (reasonably) large binary objects efficiently. Useful to
+  use from a few kilobytes to at least multiple hundred megabytes.
+
+
 What's new on ZODB 3.7b2?
 =========================
 

Modified: ZODB/trunk/src/ZEO/ClientStorage.py
===================================================================
--- ZODB/trunk/src/ZEO/ClientStorage.py	2006-11-29 14:54:23 UTC (rev 71329)
+++ ZODB/trunk/src/ZEO/ClientStorage.py	2006-11-29 15:30:36 UTC (rev 71330)
@@ -27,6 +27,7 @@
 import types
 import logging
 
+from zope.interface import implements
 from ZEO import ServerStub
 from ZEO.cache import ClientCache
 from ZEO.TransactionBuffer import TransactionBuffer
@@ -35,7 +36,10 @@
 from ZEO.zrpc.client import ConnectionManager
 
 from ZODB import POSException
+from ZODB import utils
 from ZODB.loglevels import BLATHER
+from ZODB.Blobs.interfaces import IBlobStorage
+from ZODB.Blobs.Blob import FilesystemHelper
 from persistent.TimeStamp import TimeStamp
 
 logger = logging.getLogger('ZEO.ClientStorage')
@@ -93,6 +97,7 @@
     tpc_begin().
     """
 
+    implements(IBlobStorage)
     # Classes we instantiate.  A subclass might override.
 
     TransactionBufferClass = TransactionBuffer
@@ -106,7 +111,8 @@
                  wait_for_server_on_startup=None, # deprecated alias for wait
                  wait=None, wait_timeout=None,
                  read_only=0, read_only_fallback=0,
-                 username='', password='', realm=None):
+                 username='', password='', realm=None,
+                 blob_dir=None):
         """ClientStorage constructor.
 
         This is typically invoked from a custom_zodb.py file.
@@ -177,6 +183,11 @@
         password -- string with plaintext password to be used
             when authenticated.
 
+        realm -- not documented.
+
+        blob_dir -- directory path for blob data.  'blob data' is data that
+            is retrieved via the loadBlob API.
+
         Note that the authentication protocol is defined by the server
         and is detected by the ClientStorage upon connecting (see
         testConnection() and doAuth() for details).
@@ -303,6 +314,18 @@
         # is executing.
         self._lock = threading.Lock()
 
+        # XXX need to check for POSIX-ness here
+        if blob_dir is not None:
+            self.fshelper = FilesystemHelper(blob_dir)
+            self.fshelper.create()
+            self.fshelper.checkSecure()
+        else:
+            self.fshelper = None
+
+        # Initialize locks
+        self.blob_status_lock = threading.Lock()
+        self.blob_status = {}
+
         # Decide whether to use non-temporary files
         if client is not None:
             dir = var or os.getcwd()
@@ -866,6 +889,118 @@
         self._tbuf.store(oid, version, data)
         return self._check_serials()
 
+    def storeBlob(self, oid, serial, data, blobfilename, version, txn):
+        """Storage API: store a blob object."""
+        serials = self.store(oid, serial, data, version, txn)
+        blobfile = open(blobfilename, "rb")
+        while True:
+            chunk = blobfile.read(1<<16)
+            # even if the blobfile is completely empty, we need to call
+            # storeBlob at least once in order to be able to call
+            # storeBlobEnd successfully.
+            self._server.storeBlob(oid, serial, chunk, version, id(txn))
+            if not chunk:
+                self._server.storeBlobEnd(oid, serial, data, version, id(txn))
+                break
+        blobfile.close()
+        os.unlink(blobfilename)
+        return serials
+
+    def _do_load_blob(self, oid, serial, version):
+        """Do the actual loading from the RPC server."""
+        blob_filename = self.fshelper.getBlobFilename(oid, serial)
+        if self._server is None:
+            raise ClientDisconnected()
+
+        targetpath = self.fshelper.getPathForOID(oid)
+        if not os.path.exists(targetpath):
+            os.makedirs(targetpath, 0700)
+
+        # We write to a temporary file first, so we do not accidentally 
+        # allow half-baked copies of this blob be loaded
+        tempfd, tempfilename = self.fshelper.blob_mkstemp(oid, serial)
+        tempfile = os.fdopen(tempfd, 'wb')
+
+        offset = 0
+        while True:
+            chunk = self._server.loadBlob(oid, serial, version, offset)
+            if not chunk:
+                break
+            offset += len(chunk)
+            tempfile.write(chunk)
+
+        tempfile.close()
+        # XXX will fail on Windows if file is open
+        os.rename(tempfilename, blob_filename)
+        return blob_filename
+
+    def loadBlob(self, oid, serial, version):
+        """Loading a blob has to know about loading the same blob
+           from another thread as the same time.
+
+            1. Check if the blob is downloaded already
+            2. Check whether it is currently beeing downloaded
+            2a. Wait for other download to finish, return 
+            3. If not beeing downloaded, start download
+        """
+        if self.fshelper is None:
+            raise POSException.Unsupported("No blob cache directory is "
+                                           "configured.")
+
+        blob_filename = self.fshelper.getBlobFilename(oid, serial)
+        # Case 1: Blob is available already, just use it
+        if os.path.exists(blob_filename):
+            log2("Found blob %s/%s in cache." % (utils.oid_repr(oid),
+                utils.tid_repr(serial)), level=BLATHER)
+            return blob_filename
+
+        # Case 2,3: Blob might still be downloading or not there yet
+
+        # Try to get or create a lock for the downloading of this blob, 
+        # identified by it's oid and serial
+        lock_key = (oid, serial)
+        
+        # We need to make the check for an existing lock and the possible
+        # creation of a new one atomic, so there is another lock:
+        self.blob_status_lock.acquire()
+        try:
+            if not self.blob_status.has_key(oid):
+                self.blob_status[lock_key] = self.getBlobLock()
+            lock = self.blob_status[lock_key]
+        finally:
+            self.blob_status_lock.release()
+
+        # We acquire the lock to either start downloading, or wait
+        # for another download to finish
+        lock.acquire()
+        try:
+            # If there was another download that is finished by now,
+            # we just take the result.
+            if os.path.exists(blob_filename):
+                log2("Found blob %s/%s in cache after it was downloaded "
+                     "from another thread." % (utils.oid_repr(oid),
+                     utils.tid_repr(serial)), level=BLATHER)
+                return blob_filename
+
+            # Otherwise we download and use that
+            return self._do_load_blob(oid, serial, version)
+        finally:
+            # When done we remove the download lock ...
+            lock.release()
+
+            # And the status information isn't needed as well,
+            # but we have to use the second lock here as well, to avoid
+            # making the creation of this status lock non-atomic (see above)
+            self.blob_status_lock.acquire()
+            try:
+                del self.blob_status[lock_key]
+            finally:
+                self.blob_status_lock.release()
+
+    def getBlobLock(self):
+        # indirection to support unit testing
+        return Lock()
+
     def tpc_vote(self, txn):
         """Storage API: vote on a transaction."""
         if txn is not self._transaction:

Modified: ZODB/trunk/src/ZEO/ServerStub.py
===================================================================
--- ZODB/trunk/src/ZEO/ServerStub.py	2006-11-29 14:54:23 UTC (rev 71329)
+++ ZODB/trunk/src/ZEO/ServerStub.py	2006-11-29 15:30:36 UTC (rev 71330)
@@ -220,6 +220,12 @@
     def storea(self, oid, serial, data, version, id):
         self.rpc.callAsync('storea', oid, serial, data, version, id)
 
+    def storeBlobEnd(self, oid, serial, data, version, id):
+        self.rpc.callAsync('storeBlobEnd', oid, serial, data, version, id)
+
+    def storeBlob(self, oid, serial, chunk, version, id):
+        self.rpc.callAsync('storeBlob', oid, serial, chunk, version, id)
+
     ##
     # Start two-phase commit for a transaction
     # @param id id used by client to identify current transaction.  The
@@ -262,6 +268,9 @@
     def load(self, oid, version):
         return self.rpc.call('load', oid, version)
 
+    def loadBlob(self, oid, serial, version, offset):
+        return self.rpc.call('loadBlob', oid, serial, version, offset)
+
     def getSerial(self, oid):
         return self.rpc.call('getSerial', oid)
 

Modified: ZODB/trunk/src/ZEO/StorageServer.py
===================================================================
--- ZODB/trunk/src/ZEO/StorageServer.py	2006-11-29 14:54:23 UTC (rev 71329)
+++ ZODB/trunk/src/ZEO/StorageServer.py	2006-11-29 15:30:36 UTC (rev 71330)
@@ -42,20 +42,24 @@
 from ZODB.POSException import StorageError, StorageTransactionError
 from ZODB.POSException import TransactionError, ReadOnlyError, ConflictError
 from ZODB.serialize import referencesf
-from ZODB.utils import u64, oid_repr
+from ZODB.utils import u64, oid_repr, mktemp
 from ZODB.loglevels import BLATHER
 
+
 logger = logging.getLogger('ZEO.StorageServer')
 
+
 # TODO:  This used to say "ZSS", which is now implied in the logger name.
 # Can this be either set to str(os.getpid()) (if that makes sense) or removed?
 _label = "" # default label used for logging.
 
+
 def set_label():
     """Internal helper to reset the logging label (e.g. after fork())."""
     global _label
     _label = "%s" % os.getpid()
 
+
 def log(message, level=logging.INFO, label=None, exc_info=False):
     """Internal helper to log a message."""
     label = label or _label
@@ -63,9 +67,11 @@
         message = "(%s) %s" % (label, message)
     logger.log(level, message, exc_info=exc_info)
 
+
 class StorageServerError(StorageError):
     """Error reported when an unpicklable exception is raised."""
 
+
 class ZEOStorage:
     """Proxy to underlying storage for a single remote client."""
 
@@ -93,6 +99,9 @@
         self.log_label = _label
         self.authenticated = 0
         self.auth_realm = auth_realm
+        self.blob_transfer = {}
+        self.blob_log = []
+        self.blob_loads = {}
         # The authentication protocol may define extra methods.
         self._extensions = {}
         for func in self.extensions:
@@ -154,8 +163,7 @@
         record_iternext = getattr(self.storage, 'record_iternext', None)
         if record_iternext is not None:
             self.record_iternext = record_iternext
-            
-            
+
         try:
             fn = self.storage.getExtensionMethods
         except AttributeError:
@@ -460,6 +468,38 @@
         self.stats.stores += 1
         self.txnlog.store(oid, serial, data, version)
 
+    def storeBlobEnd(self, oid, serial, data, version, id):
+        key = (oid, id)
+        if key not in self.blob_transfer:
+            raise Exception, "Can't finish a non-started Blob"
+        tempname, tempfile = self.blob_transfer.pop(key)
+        tempfile.close()
+        self.blob_log.append((oid, serial, data, tempname, version))
+
+    def storeBlob(self, oid, serial, chunk, version, id):
+        # XXX check that underlying storage supports blobs
+        key = (oid, id)
+        if key not in self.blob_transfer:
+            tempname = mktemp()
+            tempfile = open(tempname, "wb")
+            self.blob_transfer[key] = (tempname, tempfile)   # XXX Force close and remove them when Storage closes
+        else:
+            tempname, tempfile = self.blob_transfer[key]
+
+        tempfile.write(chunk)
+ 
+    def loadBlob(self, oid, serial, version, offset):
+        key = (oid, serial)
+        if not key in self.blob_loads:
+            self.blob_loads[key] = \
+                    open(self.storage.loadBlob(oid, serial, version))
+        blobdata = self.blob_loads[key]
+        blobdata.seek(offset)
+        chunk = blobdata.read(4096)
+        if not chunk:
+            del self.blob_loads[key]
+        return chunk
+
     # The following four methods return values, so they must acquire
     # the storage lock and begin the transaction before returning.
 
@@ -602,6 +642,13 @@
             # load oid, serial, data, version
             if not self._store(*loader.load()):
                 break
+
+        # Blob support
+        while self.blob_log:
+            oid, oldserial, data, blobfilename, version = self.blob_log.pop()
+            self.storage.storeBlob(oid, oldserial, data, blobfilename, 
+                                   version, self.transaction,)
+
         resp = self._thunk()
         if delay is not None:
             delay.reply(resp)
@@ -919,6 +966,7 @@
             if conn.obj in cl:
                 cl.remove(conn.obj)
 
+
 class StubTimeoutThread:
 
     def begin(self, client):
@@ -987,11 +1035,13 @@
             else:
                 time.sleep(howlong)
 
+
 def run_in_thread(method, *args):
     t = SlowMethodThread(method, args)
     t.start()
     return t.delay
 
+
 class SlowMethodThread(threading.Thread):
     """Thread to run potentially slow storage methods.
 

Modified: ZODB/trunk/src/ZEO/tests/testZEO.py
===================================================================
--- ZODB/trunk/src/ZEO/tests/testZEO.py	2006-11-29 14:54:23 UTC (rev 71329)
+++ ZODB/trunk/src/ZEO/tests/testZEO.py	2006-11-29 15:30:36 UTC (rev 71330)
@@ -23,6 +23,7 @@
 import tempfile
 import time
 import unittest
+import shutil
 
 # ZODB test support
 import ZODB
@@ -141,14 +142,16 @@
         self._pids = [pid]
         self._servers = [adminaddr]
         self._conf_path = path
+        self.blob_cache_dir = tempfile.mkdtemp()  # This is the blob cache for ClientStorage
         self._storage = ClientStorage(zport, '1', cache_size=20000000,
                                       min_disconnect_poll=0.5, wait=1,
-                                      wait_timeout=60)
+                                      wait_timeout=60, blob_dir=self.blob_cache_dir)
         self._storage.registerDB(DummyDB(), None)
 
     def tearDown(self):
         self._storage.close()
         os.remove(self._conf_path)
+        shutil.rmtree(self.blob_cache_dir)
         for server in self._servers:
             forker.shutdown_zeo_server(server)
         if hasattr(os, 'waitpid'):
@@ -210,7 +213,6 @@
     def getConfig(self):
         return """<mappingstorage 1/>"""
 
-
 class HeartbeatTests(ZEO.tests.ConnectionTests.CommonSetupTearDown):
     """Make sure a heartbeat is being sent and that it does no harm
 
@@ -395,6 +397,139 @@
                 ConnectionInvalidationOnReconnect,
                ]
 
+class BlobAdaptedFileStorageTests(GenericTests):
+    """ZEO backed by a BlobStorage-adapted FileStorage."""
+    def setUp(self):
+        self.blobdir = tempfile.mkdtemp()  # This is the blob directory on the ZEO server
+        self.filestorage = tempfile.mktemp()
+        super(BlobAdaptedFileStorageTests, self).setUp()
+
+    def tearDown(self):
+        super(BlobAdaptedFileStorageTests, self).tearDown()
+        shutil.rmtree(self.blobdir)
+
+    def getConfig(self):
+        return """
+        <blobstorage 1>
+          blob-dir %s
+          <filestorage 2>
+            path %s
+          </filestorage>
+        </blobstorage>
+        """ % (self.blobdir, self.filestorage)
+
+    def checkStoreBlob(self):
+        from ZODB.utils import oid_repr, tid_repr
+        from ZODB.Blobs.Blob import Blob
+        from ZODB.Blobs.BlobStorage import BLOB_SUFFIX
+        from ZODB.tests.StorageTestBase import zodb_pickle, ZERO, \
+             handle_serials
+        import transaction
+
+        somedata = 'a' * 10
+
+        blob = Blob()
+        bd_fh = blob.open('w')
+        bd_fh.write(somedata)
+        bd_fh.close()
+        tfname = bd_fh.name
+        oid = self._storage.new_oid()
+        data = zodb_pickle(blob)
+        self.assert_(os.path.exists(tfname))
+
+        t = transaction.Transaction()
+        try:
+            self._storage.tpc_begin(t)
+            r1 = self._storage.storeBlob(oid, ZERO, data, tfname, '', t)
+            r2 = self._storage.tpc_vote(t)
+            revid = handle_serials(oid, r1, r2)
+            self._storage.tpc_finish(t)
+        except:
+            self._storage.tpc_abort(t)
+            raise
+        self.assert_(not os.path.exists(tfname))
+        filename = os.path.join(self.blobdir, oid_repr(oid),
+                                tid_repr(revid) + BLOB_SUFFIX)
+        self.assert_(os.path.exists(filename))
+        self.assertEqual(somedata, open(filename).read())
+        
+    def checkLoadBlob(self):
+        from ZODB.Blobs.Blob import Blob
+        from ZODB.tests.StorageTestBase import zodb_pickle, ZERO, \
+             handle_serials
+        import transaction
+
+        version = ''
+        somedata = 'a' * 10
+
+        blob = Blob()
+        bd_fh = blob.open('w')
+        bd_fh.write(somedata)
+        bd_fh.close()
+        tfname = bd_fh.name
+        oid = self._storage.new_oid()
+        data = zodb_pickle(blob)
+
+        t = transaction.Transaction()
+        try:
+            self._storage.tpc_begin(t)
+            r1 = self._storage.storeBlob(oid, ZERO, data, tfname, '', t)
+            r2 = self._storage.tpc_vote(t)
+            serial = handle_serials(oid, r1, r2)
+            self._storage.tpc_finish(t)
+        except:
+            self._storage.tpc_abort(t)
+            raise
+
+
+        class Dummy:
+            def __init__(self):
+                self.acquired = 0
+                self.released = 0
+            def acquire(self):
+                self.acquired += 1
+            def release(self):
+                self.released += 1
+
+        class statusdict(dict):
+            def __init__(self):
+                self.added = []
+                self.removed = []
+                
+            def __setitem__(self, k, v):
+                self.added.append(k)
+                super(statusdict, self).__setitem__(k, v)
+
+            def __delitem__(self, k):
+                self.removed.append(k)
+                super(statusdict, self).__delitem__(k)
+
+        # ensure that we do locking properly
+        filename = self._storage.fshelper.getBlobFilename(oid, serial)
+        thestatuslock = self._storage.blob_status_lock = Dummy()
+        thebloblock = Dummy()
+
+        def getBlobLock():
+            return thebloblock
+
+        # override getBlobLock to test that locking is performed
+        self._storage.getBlobLock = getBlobLock
+        thestatusdict = self._storage.blob_status = statusdict()
+
+        filename = self._storage.loadBlob(oid, serial, version)
+
+        self.assertEqual(thestatuslock.acquired, 2)
+        self.assertEqual(thestatuslock.released, 2)
+        
+        self.assertEqual(thebloblock.acquired, 1)
+        self.assertEqual(thebloblock.released, 1)
+
+        self.assertEqual(thestatusdict.added, [(oid, serial)])
+        self.assertEqual(thestatusdict.removed, [(oid, serial)])
+
+test_classes = [FileStorageTests, MappingStorageTests,
+                BlobAdaptedFileStorageTests]
+
 def test_suite():
     suite = unittest.TestSuite()
     for klass in test_classes:

Copied: ZODB/trunk/src/ZODB/Blobs (from rev 71329, ZODB/branches/blob-merge-branch/src/ZODB/Blobs)

Modified: ZODB/trunk/src/ZODB/Connection.py
===================================================================
--- ZODB/trunk/src/ZODB/Connection.py	2006-11-29 14:54:23 UTC (rev 71329)
+++ ZODB/trunk/src/ZODB/Connection.py	2006-11-29 15:30:36 UTC (rev 71330)
@@ -20,6 +20,8 @@
 import tempfile
 import threading
 import warnings
+import os
+import shutil
 from time import time
 
 from persistent import PickleCache
@@ -27,6 +29,8 @@
 # interfaces
 from persistent.interfaces import IPersistentDataManager
 from ZODB.interfaces import IConnection
+from ZODB.Blobs.interfaces import IBlob, IBlobStorage
+from ZODB.Blobs.BlobStorage import BlobStorage
 from transaction.interfaces import ISavepointDataManager
 from transaction.interfaces import IDataManagerSavepoint
 from transaction.interfaces import ISynchronizer
@@ -39,8 +43,11 @@
 from ZODB import POSException
 from ZODB.POSException import InvalidObjectReference, ConnectionStateError
 from ZODB.POSException import ConflictError, ReadConflictError
+from ZODB.POSException import Unsupported
+from ZODB.POSException import POSKeyError
 from ZODB.serialize import ObjectWriter, ObjectReader, myhasattr
 from ZODB.utils import p64, u64, z64, oid_repr, positive_id
+from ZODB import utils
 
 global_reset_counter = 0
 
@@ -591,7 +598,29 @@
                     raise ConflictError(object=obj)
                 self._modified.append(oid)
             p = writer.serialize(obj)  # This calls __getstate__ of obj
-            s = self._storage.store(oid, serial, p, self._version, transaction)
+
+            # This is a workaround to calling IBlob.proivdedBy(obj). Calling
+            # Interface.providedBy on a object to be stored can invertible
+            # set the '__providedBy__' and '__implemented__' attributes on the
+            # object. This interferes the storing of the object by requesting
+            # that the values of these objects should be stored with the ZODB.
+            providedBy = getattr(obj, '__providedBy__', None)
+            if providedBy is not None and IBlob in providedBy:
+                if not IBlobStorage.providedBy(self._storage):
+                    raise Unsupported(
+                        "Storing Blobs in %s is not supported." % 
+                        repr(self._storage))
+                s = self._storage.storeBlob(oid, serial, p,
+                                            obj._p_blob_uncommitted,
+                                            self._version, transaction)
+                # we invalidate the object here in order to ensure
+                # that that the next attribute access of its name
+                # unghostify it, which will cause its blob data
+                # to be reattached "cleanly"
+                obj._p_invalidate()
+            else:
+                s = self._storage.store(oid, serial, p, self._version,
+                                        transaction)
             self._store_count += 1
             # Put the object in the cache before handling the
             # response, just in case the response contains the
@@ -801,7 +830,7 @@
 
 
         if self._invalidatedCache:
-            raise ReadConflictError()            
+            raise ReadConflictError()
 
         if (obj._p_oid in self._invalidated and
                 not myhasattr(obj, "_p_independent")):
@@ -830,6 +859,13 @@
         self._reader.setGhostState(obj, p)
         obj._p_serial = serial
 
+        # Blob support
+        providedBy = getattr(obj, '__providedBy__', None)
+        if providedBy is not None and IBlob in providedBy:
+            obj._p_blob_uncommitted = None
+            obj._p_blob_data = \
+                    self._storage.loadBlob(obj._p_oid, serial, self._version)
+
     def _load_before_or_conflict(self, obj):
         """Load non-current state for obj or raise ReadConflictError."""
         if not (self._mvcc and self._setstate_noncurrent(obj)):
@@ -1049,8 +1085,9 @@
 
     def savepoint(self):
         if self._savepoint_storage is None:
-            self._savepoint_storage = TmpStore(self._version,
-                                               self._normal_storage)
+            # XXX what to do about IBlobStorages?
+            tmpstore = TmpStore(self._version, self._normal_storage)
+            self._savepoint_storage = tmpstore
             self._storage = self._savepoint_storage
 
         self._creating.clear()
@@ -1082,7 +1119,7 @@
         self._storage = self._normal_storage
         self._savepoint_storage = None
 
-        self._log.debug("Commiting savepoints of size %s", src.getSize())
+        self._log.debug("Committing savepoints of size %s", src.getSize())
         oids = src.index.keys()
 
         # Copy invalidating and creating info from temporary storage:
@@ -1091,10 +1128,20 @@
 
         for oid in oids:
             data, serial = src.load(oid, src)
-            s = self._storage.store(oid, serial, data,
-                                    self._version, transaction)
+            try:
+                blobfilename = src.loadBlob(oid, serial, self._version)
+            except POSKeyError:
+                s = self._storage.store(oid, serial, data,
+                                        self._version, transaction)
+            else:
+                s = self._storage.storeBlob(oid, serial, data, blobfilename,
+                                            self._version, transaction)
+                # we invalidate the object here in order to ensure
+                # that that the next attribute access of its name
+                # unghostify it, which will cause its blob data
+                # to be reattached "cleanly"
+                self.invalidate(s, {oid:True})
             self._handle_serial(s, oid, change=False)
-
         src.close()
 
     def _abort_savepoint(self):
@@ -1137,9 +1184,14 @@
     def rollback(self):
         self.datamanager._rollback(self.state)
 
+BLOB_SUFFIX = ".blob"
+BLOB_DIRTY = "store"
+
 class TmpStore:
     """A storage-like thing to support savepoints."""
 
+    implements(IBlobStorage)
+
     def __init__(self, base_version, storage):
         self._storage = storage
         for method in (
@@ -1149,6 +1201,10 @@
             setattr(self, method, getattr(storage, method))
 
         self._base_version = base_version
+        tmpdir = os.environ.get('ZODB_BLOB_TEMPDIR')
+        if tmpdir is None:
+            tmpdir = tempfile.mkdtemp()
+        self._blobdir = tmpdir
         self._file = tempfile.TemporaryFile()
         # position: current file position
         # _tpos: file position at last commit point
@@ -1162,6 +1218,7 @@
 
     def close(self):
         self._file.close()
+        shutil.rmtree(self._blobdir)
 
     def load(self, oid, version):
         pos = self.index.get(oid)
@@ -1193,6 +1250,37 @@
         self.position += l + len(header)
         return serial
 
+    def storeBlob(self, oid, serial, data, blobfilename, version,
+                  transaction):
+        serial = self.store(oid, serial, data, version, transaction)
+        assert isinstance(serial, str) # XXX in theory serials could be 
+                                       # something else
+
+        targetpath = self._getBlobPath(oid)
+        if not os.path.exists(targetpath):
+            os.makedirs(targetpath, 0700)
+
+        targetname = self._getCleanFilename(oid, serial)
+        os.rename(blobfilename, targetname)
+
+    def loadBlob(self, oid, serial, version):
+        """Return the filename where the blob file can be found.
+        """
+        filename = self._getCleanFilename(oid, serial)
+        if not os.path.exists(filename):
+            raise POSKeyError, "Not an existing blob."
+        return filename
+
+    def _getBlobPath(self, oid):
+        return os.path.join(self._blobdir,
+                            utils.oid_repr(oid)
+                            )
+
+    def _getCleanFilename(self, oid, tid):
+        return os.path.join(self._getBlobPath(oid),
+                            "%s%s" % (utils.tid_repr(tid), 
+                                      BLOB_SUFFIX,)
+                            )
     def reset(self, position, index):
         self._file.truncate(position)
         self.position = position
@@ -1206,3 +1294,4 @@
         # a copy of the index here.  An alternative would be to ensure that
         # all callers pass copies.  As is, our callers do not make copies.
         self.index = index.copy()
+

Modified: ZODB/trunk/src/ZODB/DB.py
===================================================================
--- ZODB/trunk/src/ZODB/DB.py	2006-11-29 14:54:23 UTC (rev 71329)
+++ ZODB/trunk/src/ZODB/DB.py	2006-11-29 15:30:36 UTC (rev 71330)
@@ -232,10 +232,10 @@
         # Setup storage
         self._storage=storage
         storage.registerDB(self, None)
-        if not hasattr(storage,'tpc_vote'):
+        if not hasattr(storage, 'tpc_vote'):
             storage.tpc_vote = lambda *args: None
         try:
-            storage.load(z64,'')
+            storage.load(z64, '')
         except KeyError:
             # Create the database's root in the storage if it doesn't exist
             from persistent.mapping import PersistentMapping

Modified: ZODB/trunk/src/ZODB/ExportImport.py
===================================================================
--- ZODB/trunk/src/ZODB/ExportImport.py	2006-11-29 14:54:23 UTC (rev 71329)
+++ ZODB/trunk/src/ZODB/ExportImport.py	2006-11-29 15:30:36 UTC (rev 71330)
@@ -13,13 +13,16 @@
 ##############################################################################
 """Support for database export and import."""
 
+import os
+
 from cStringIO import StringIO
 from cPickle import Pickler, Unpickler
 from tempfile import TemporaryFile
 import logging
 
-from ZODB.POSException import ExportError
-from ZODB.utils import p64, u64
+from ZODB.POSException import ExportError, POSKeyError
+from ZODB.utils import p64, u64, cp, mktemp
+from ZODB.Blobs.interfaces import IBlobStorage
 from ZODB.serialize import referencesf
 
 logger = logging.getLogger('ZODB.ExportImport')
@@ -49,6 +52,21 @@
             else:
                 referencesf(p, oids)
                 f.writelines([oid, p64(len(p)), p])
+            # Blob support
+            if not IBlobStorage.providedBy(self._storage):
+                continue
+            try:
+                blobfilename = self._storage.loadBlob(oid, 
+                                                      serial, self._version)
+            except POSKeyError: # Looks like this is not a blob
+                continue
+
+            f.write(blob_begin_marker)
+            f.write(p64(os.stat(blobfilename).st_size))
+            blobdata = open(blobfilename, "rb")
+            cp(blobdata, f)
+            blobdata.close()
+            
         f.write(export_end_marker)
         return f
 
@@ -113,17 +131,20 @@
         version = self._version
 
         while 1:
-            h = f.read(16)
-            if h == export_end_marker:
+            header = f.read(16)
+            if header == export_end_marker:
                 break
-            if len(h) != 16:
+            if len(header) != 16:
                 raise ExportError("Truncated export file")
-            l = u64(h[8:16])
-            p = f.read(l)
-            if len(p) != l:
+
+            # Extract header information
+            ooid = header[:8]
+            length = u64(header[8:16])
+            data = f.read(length)
+
+            if len(data) != length:
                 raise ExportError("Truncated export file")
 
-            ooid = h[:8]
             if oids:
                 oid = oids[ooid]
                 if isinstance(oid, tuple):
@@ -132,7 +153,21 @@
                 oids[ooid] = oid = self._storage.new_oid()
                 return_oid_list.append(oid)
 
-            pfile = StringIO(p)
+            # Blob support
+            blob_begin = f.read(len(blob_begin_marker))
+            if blob_begin == blob_begin_marker:
+                # Copy the blob data to a temporary file
+                # and remember the name
+                blob_len = u64(f.read(8))
+                blob_filename = mktemp()
+                blob_file = open(blob_filename, "wb")
+                cp(f, blob_file, blob_len)
+                blob_file.close()
+            else:
+                f.seek(-len(blob_begin_marker),1)
+                blob_filename = None
+
+            pfile = StringIO(data)
             unpickler = Unpickler(pfile)
             unpickler.persistent_load = persistent_load
 
@@ -142,12 +177,17 @@
 
             pickler.dump(unpickler.load())
             pickler.dump(unpickler.load())
-            p = newp.getvalue()
+            data = newp.getvalue()
 
-            self._storage.store(oid, None, p, version, transaction)
+            if blob_filename is not None:
+                self._storage.storeBlob(oid, None, data, blob_filename, 
+                                        version, transaction)
+            else:
+                self._storage.store(oid, None, data, version, transaction)
 
 
 export_end_marker = '\377'*16
+blob_begin_marker = '\000BLOBSTART'
 
 class Ghost(object):
     __slots__ = ("oid",)

Modified: ZODB/trunk/src/ZODB/FileStorage/FileStorage.py
===================================================================
--- ZODB/trunk/src/ZODB/FileStorage/FileStorage.py	2006-11-29 14:54:23 UTC (rev 71329)
+++ ZODB/trunk/src/ZODB/FileStorage/FileStorage.py	2006-11-29 15:30:36 UTC (rev 71330)
@@ -628,7 +628,7 @@
         finally:
             self._lock_release()
 
-    def store(self, oid, serial, data, version, transaction):
+    def store(self, oid, oldserial, data, version, transaction):
         if self._is_read_only:
             raise POSException.ReadOnlyError()
         if transaction is not self._transaction:
@@ -651,12 +651,12 @@
                         pnv = h.pnv
                     cached_tid = h.tid
 
-                if serial != cached_tid:
+                if oldserial != cached_tid:
                     rdata = self.tryToResolveConflict(oid, cached_tid,
-                                                     serial, data)
+                                                     oldserial, data)
                     if rdata is None:
                         raise POSException.ConflictError(
-                            oid=oid, serials=(cached_tid, serial), data=data)
+                            oid=oid, serials=(cached_tid, oldserial), data=data)
                     else:
                         data = rdata
 
@@ -686,7 +686,7 @@
                 raise FileStorageQuotaError(
                     "The storage quota has been exceeded.")
 
-            if old and serial != cached_tid:
+            if old and oldserial != cached_tid:
                 return ConflictResolution.ResolvedSerial
             else:
                 return self._tid

Modified: ZODB/trunk/src/ZODB/component.xml
===================================================================
--- ZODB/trunk/src/ZODB/component.xml	2006-11-29 14:54:23 UTC (rev 71329)
+++ ZODB/trunk/src/ZODB/component.xml	2006-11-29 15:30:36 UTC (rev 71330)
@@ -65,6 +65,11 @@
   <sectiontype name="zeoclient" datatype=".ZEOClient"
                implements="ZODB.storage">
     <multikey name="server" datatype="socket-connection-address" required="yes"/>
+    <key name="blob-dir" required="no">
+      <description>
+        Path name to the blob storage directory.
+      </description>
+    </key>
     <key name="storage" default="1">
       <description>
         The name of the storage that the client wants to use.  If the
@@ -189,4 +194,18 @@
       </description>
   </sectiontype>
 
+  <sectiontype name="blobstorage" datatype=".BlobStorage"
+    implements="ZODB.storage">
+    <key name="blob-dir" required="yes">
+      <description>
+        Path name to the blob storage directory.
+      </description>
+    </key>
+    <section type="ZODB.storage" name="*" attribute="base"/>
+  </sectiontype>
+
+
+    
+
+
 </component>

Modified: ZODB/trunk/src/ZODB/config.py
===================================================================
--- ZODB/trunk/src/ZODB/config.py	2006-11-29 14:54:23 UTC (rev 71329)
+++ ZODB/trunk/src/ZODB/config.py	2006-11-29 15:30:36 UTC (rev 71330)
@@ -86,7 +86,7 @@
         self.config = config
         self.name = config.getSectionName()
 
-    def open(self):
+    def open(self, database_name='unnamed', databases=None):
         """Open and return the storage object."""
         raise NotImplementedError
 
@@ -134,6 +134,14 @@
                            read_only=self.config.read_only,
                            quota=self.config.quota)
 
+class BlobStorage(BaseConfig):
+
+    def open(self):
+        from ZODB.Blobs.BlobStorage import BlobStorage
+        base = self.config.base.open()
+        return BlobStorage(self.config.blob_dir, base)
+
+        
 class ZEOClient(BaseConfig):
 
     def open(self):
@@ -143,6 +151,7 @@
         L = [server.address for server in self.config.server]
         return ClientStorage(
             L,
+            blob_dir=self.config.blob_dir,
             storage=self.config.storage,
             cache_size=self.config.cache_size,
             name=self.config.name,

Copied: ZODB/trunk/src/ZODB/tests/loggingsupport.py (from rev 71329, ZODB/branches/blob-merge-branch/src/ZODB/tests/loggingsupport.py)

Modified: ZODB/trunk/src/ZODB/tests/testConfig.py
===================================================================
--- ZODB/trunk/src/ZODB/tests/testConfig.py	2006-11-29 14:54:23 UTC (rev 71329)
+++ ZODB/trunk/src/ZODB/tests/testConfig.py	2006-11-29 15:30:36 UTC (rev 71330)
@@ -102,6 +102,9 @@
         # an elaborate comment explaining this instead.  Go ahead,
         # grep for 9.
         from ZEO.ClientStorage import ClientDisconnected
+        import ZConfig
+        from ZODB.config import getDbSchema
+        from StringIO import StringIO
         cfg = """
         <zodb>
           <zeoclient>
@@ -110,9 +113,26 @@
           </zeoclient>
         </zodb>
         """
+        config, handle = ZConfig.loadConfigFile(getDbSchema(), StringIO(cfg))
+        self.assertEqual(config.database.config.storage.config.blob_dir,
+                         None)
         self.assertRaises(ClientDisconnected, self._test, cfg)
 
+        cfg = """
+        <zodb>
+          <zeoclient>
+            blob-dir /tmp
+            server localhost:56897
+            wait false
+          </zeoclient>
+        </zodb>
+        """
+        config, handle = ZConfig.loadConfigFile(getDbSchema(), StringIO(cfg))
+        self.assertEqual(config.database.config.storage.config.blob_dir,
+                         '/tmp')
+        self.assertRaises(ClientDisconnected, self._test, cfg)
 
+
 def test_suite():
     suite = unittest.TestSuite()
     suite.addTest(unittest.makeSuite(ZODBConfigTest))

Modified: ZODB/trunk/src/ZODB/tests/testConnectionSavepoint.txt
===================================================================
--- ZODB/trunk/src/ZODB/tests/testConnectionSavepoint.txt	2006-11-29 14:54:23 UTC (rev 71329)
+++ ZODB/trunk/src/ZODB/tests/testConnectionSavepoint.txt	2006-11-29 15:30:36 UTC (rev 71330)
@@ -193,3 +193,4 @@
     InvalidSavepointRollbackError
 
     >>> transaction.abort()
+

Modified: ZODB/trunk/src/ZODB/utils.py
===================================================================
--- ZODB/trunk/src/ZODB/utils.py	2006-11-29 14:54:23 UTC (rev 71329)
+++ ZODB/trunk/src/ZODB/utils.py	2006-11-29 15:30:36 UTC (rev 71330)
@@ -16,11 +16,13 @@
 import time
 import struct
 from struct import pack, unpack
-from binascii import hexlify
+from binascii import hexlify, unhexlify
 import cPickle as pickle
 from cStringIO import StringIO
 import weakref
 import warnings
+from tempfile import mkstemp
+import os
 
 from persistent.TimeStamp import TimeStamp
 
@@ -82,21 +84,34 @@
 
 U64 = u64
 
-def cp(f1, f2, l):
+def cp(f1, f2, length=None):
+    """Copy all data from one file to another.
+    
+    It copies the data from the current position of the input file (f1)
+    appending it to the current position of the output file (f2). 
+    
+    It copies at most 'length' bytes. If 'length' isn't given, it copies
+    until the end of the input file.
+    """
     read = f1.read
     write = f2.write
     n = 8192
 
-    while l > 0:
-        if n > l:
-            n = l
-        d = read(n)
-        if not d:
+    if length is None:
+        old_pos = f1.tell()
+        f1.seek(0,2)
+        length = f1.tell()
+        f1.seek(old_pos)
+    
+    while length > 0:
+        if n > length:
+            n = length
+        data = read(n)
+        if not data:
             break
-        write(d)
-        l = l - len(d)
+        write(data)
+        length -= len(data)
 
-
 def newTimeStamp(old=None,
                  TimeStamp=TimeStamp,
                  time=time.time, gmtime=time.gmtime):
@@ -120,6 +135,13 @@
     else:
         return repr(oid)
 
+def repr_to_oid(repr):
+    if repr.startswith("0x"):
+        repr = repr[2:]
+    as_bin = unhexlify(repr)
+    as_bin = "\x00"*(8-len(as_bin)) + as_bin
+    return as_bin
+
 serial_repr = oid_repr
 tid_repr = serial_repr
 
@@ -265,3 +287,12 @@
         # We're cheating by breaking into the internals of Python's
         # WeakValueDictionary here (accessing its .data attribute).
         return self.data.data.values()
+
+
+def mktemp(dir=None):
+    """Create a temp file, known by name, in a semi-secure manner."""
+    handle, filename = mkstemp(dir=dir)
+    os.close(handle)
+    return filename
+
+


Property changes on: ZODB/trunk/src/zope
___________________________________________________________________
Name: svn:externals
   - interface  svn://svn.zope.org/repos/main/Zope3/tags/Zope-3.2.0/src/zope/interface
proxy      svn://svn.zope.org/repos/main/Zope3/tags/Zope-3.2.0/src/zope/proxy
testing    svn://svn.zope.org/repos/main/zope.testing/trunk/src/zope/testing

   + interface  svn://svn.zope.org/repos/main/Zope3/trunk/src/zope/interface
proxy      svn://svn.zope.org/repos/main/Zope3/trunk/src/zope/proxy
testing    svn://svn.zope.org/repos/main/zope.testing/trunk/src/zope/testing




More information about the Zodb-checkins mailing list