[Zodb-checkins] SVN: ZODB/branches/blob-merge-branch/src/ZEO/ClientStorage.py - better locking strategy

Christian Theune ct at gocept.com
Sat Feb 25 17:26:17 EST 2006


Log message for revision 65482:
   - better locking strategy
  

Changed:
  U   ZODB/branches/blob-merge-branch/src/ZEO/ClientStorage.py

-=-
Modified: ZODB/branches/blob-merge-branch/src/ZEO/ClientStorage.py
===================================================================
--- ZODB/branches/blob-merge-branch/src/ZEO/ClientStorage.py	2006-02-25 22:25:43 UTC (rev 65481)
+++ ZODB/branches/blob-merge-branch/src/ZEO/ClientStorage.py	2006-02-25 22:26:17 UTC (rev 65482)
@@ -112,7 +112,7 @@
                  wait=None, wait_timeout=None,
                  read_only=0, read_only_fallback=0,
                  username='', password='', realm=None,
-                 blob_dir=tempfile.gettempdir()):
+                 blob_dir=None):
         """ClientStorage constructor.
 
         This is typically invoked from a custom_zodb.py file.
@@ -316,6 +316,10 @@
 
         self.blob_dir = blob_dir
 
+        # Initialize locks
+        self.blob_status_lock = threading.Lock()
+        self.blob_status = {}
+
         # Decide whether to use non-temporary files
         if client is not None:
             dir = var or os.getcwd()
@@ -759,6 +763,7 @@
         return self.loadEx(oid, version)[:2]
 
     def loadEx(self, oid, version):
+        print "LOAD"
         self._lock.acquire()    # for atomic processing of invalidations
         try:
             t = self._cache.load(oid, version)
@@ -899,6 +904,7 @@
         return self._check_serials()
 
     def storeBlob(self, oid, serial, data, blobfilename, version, txn):
+        """Storage API: store a blob object."""
         serials = self.store(oid, serial, data, version, txn)
         blobfile = open(blobfilename, "rb")
         while True:
@@ -925,33 +931,87 @@
                                          BLOB_SUFFIX,)
                             )
 
+    def _do_load_blob(self, oid, serial):
+        """Do the actual loading from the RPC server."""
+        blob_filename = self._getCleanFilename(oid, serial)
+        if self._server is None:
+            raise ClientDisconnected()
+
+        # We write to a temporary file first, so we do not accidentally 
+        # allow half-baked copies of this blob be loaded
+        tempfilename = self._getDirtyFilename(oid, serial)
+        tempfile = open(tempfilename, "wb")
+        
+        offset = 0
+        while True:
+            chunk = self._server.loadBlob(oid, serial, version, offset)
+            if not chunk:
+                break
+            offset += len(chunk)
+            tempfile.write(chunk)
+
+        tempfile.close()
+        utils.best_rename(tempfilename, blob_filename)
+        return blob_filename
+
     def loadBlob(self, oid, serial, version):
+        """Loading a blob has to know about loading the same blob
+           from another thread as the same time.
+
+            1. Check if the blob is downloaded already
+            2. Check whether it is currently beeing downloaded
+            2a. Wait for other download to finish, return 
+            3. If not beeing downloaded, start download
+        """
+        if self.blob_dir is None:
+            raise POSException.Unsupported("No blob cache directory is configured. Can not load blob.")
+
         blob_filename = self._getCleanFilename(oid, serial)
-        if os.path.exists(blob_filename):    # XXX see race condition below
-            return blob_filename
+        # Case 1: Blob is available already, just use it
+        if os.path.exists(blob_filename):
+                return blob_filename
 
-        self._load_lock.acquire()
+        # Case 2,3: Blob might still be downloading or not there yet
+
+        # Try to get or create a lock for the downloading of this blob, 
+        # identified by it's oid and serial
+        lock_key = (oid, serial)
+        
+        # We need to make the check for an existing lock and the possible
+        # creation of a new one atomic, so there is another lock:
+        self.blob_status_lock.acquire()
         try:
-            if self._server is None:
-                raise ClientDisconnected()
+            if not self.blob_status.has_key(oid):
+                self.blob_status[lock_key] = Lock()
+            lock = self.blob_status[lock_key]
+        finally:
+            self.blob_status_lock.release()
 
-            tempfilename = self._getDirtyFilename(oid, serial)
-            tempfile = open(tempfilename, "wb")
-            
-            offset = 0
-            while True:
-                chunk = self._server.loadBlob(oid, serial, version, offset)
-                if not chunk:
-                    break
-                offset += len(chunk)
-                tempfile.write(chunk)
+        # We acquire the lock to either start downloading, or wait
+        # for another download to finish
+        lock.acquire()
+        try:
+            # If there was another download that is finished by now,
+            # we just take the result.
+            if os.path.exists(blob_filename):
+                return blob_filename
 
-            tempfile.close()
-            utils.best_rename(tempfilename, blob_filename)
-            return blob_filename
+            # Otherwise we download and use that
+            return self._do_load_blob(oid, serial)
         finally:
-            self._load_lock.release()
+            # When done we remove the download lock ...
+            lock.release()
 
+            # And the status information isn't needed as well,
+            # but we have to use the second lock here as well, to avoid
+            # making the creation of this status lock non-atomic (see above)
+            self.blob_status_lock.acquire()
+            try:
+                del self.blob_status_lock[lock_key]
+            finally:
+                self.blob_status_lock.release()
+        
+
     def tpc_vote(self, txn):
         """Storage API: vote on a transaction."""
         if txn is not self._transaction:



More information about the Zodb-checkins mailing list