[Checkins] SVN: ZODB/trunk/src/ZEO/ Fixed a bug in the logic to reduce the blob cache size.

Jim Fulton jim at zope.com
Thu Dec 11 11:31:32 EST 2008


Log message for revision 93912:
  Fixed a bug in the logic to reduce the blob cache size.
  
  Changed the default blob-cache-size-check size to 10%.
  
  Changed the algorithm for deciding the target for blob cache
  reduction. Now the target is
  
  blob-cache-size * (100 - blob-cache-size-check)/100
  

Changed:
  U   ZODB/trunk/src/ZEO/ClientStorage.py
  U   ZODB/trunk/src/ZEO/tests/zeo_blob_cache.test

-=-
Modified: ZODB/trunk/src/ZEO/ClientStorage.py
===================================================================
--- ZODB/trunk/src/ZEO/ClientStorage.py	2008-12-11 16:30:08 UTC (rev 93911)
+++ ZODB/trunk/src/ZEO/ClientStorage.py	2008-12-11 16:31:31 UTC (rev 93912)
@@ -122,7 +122,7 @@
                  drop_cache_rather_verify=False,
                  username='', password='', realm=None,
                  blob_dir=None, shared_blob_dir=False,
-                 blob_cache_size=None, blob_cache_size_check=100,
+                 blob_cache_size=None, blob_cache_size_check=10,
                  ):
         """ClientStorage constructor.
 
@@ -231,7 +231,7 @@
         blob_cache_size_check
             ZEO check size as percent of blob_cache_size.  The ZEO
             cache size will be checked when this many bytes have been
-            loaded into the cache. Defaults to 100% of the blob cache
+            loaded into the cache. Defaults to 10% of the blob cache
             size.   This option is ignored if shared_blob_dir is true.
 
         Note that the authentication protocol is defined by the server
@@ -472,6 +472,9 @@
             return
         
         self._blob_data_bytes_loaded = 0
+
+        target = max(self._blob_cache_size - self._blob_cache_size_check, 0)
+        
         check_blob_size_thread = threading.Thread(
             target=_check_blob_cache_size,
             args=(self.blob_dir, self._blob_cache_size),
@@ -1610,9 +1613,13 @@
 cache_file_name = re.compile(r'\d+$').match
 def _check_blob_cache_size(blob_dir, target):
 
+    logger = logging.getLogger(__name__+'.check_blob_cache')
+    logger.info("Checking blob cache size")
+    
     layout = open(os.path.join(blob_dir, ZODB.blob.LAYOUT_MARKER)
                   ).read().strip()
     if not layout == 'zeocache':
+        logger.critical("Invalid blob directory layout %s", layout)
         raise ValueError("Invalid blob directory layout", layout)
 
     try:
@@ -1620,51 +1627,59 @@
             os.path.join(blob_dir, 'check_size.lock'))
     except zc.lockfile.LockError:
         # Someone is already cleaning up, so don't bother
+        logger.info("Another thread is checking the blob cache size")
         return
     
     try:
-       size = 0
-       blob_suffix = ZODB.blob.BLOB_SUFFIX
-       files_by_atime = BTrees.IOBTree.BTree()
+        size = 0
+        blob_suffix = ZODB.blob.BLOB_SUFFIX
+        files_by_atime = BTrees.IOBTree.BTree()
 
-       for dirname in os.listdir(blob_dir):
-           if not cache_file_name(dirname):
-               continue
-           base = os.path.join(blob_dir, dirname)
-           if not os.path.isdir(base):
-               continue
-           for file_name in os.listdir(base):
-               if not file_name.endswith(blob_suffix):
-                   continue
-               file_name = os.path.join(base, file_name)
-               if not os.path.isfile(file_name):
-                   continue
-               stat = os.stat(file_name)
-               size += stat.st_size
-               t = int(stat.st_atime)
-               if t not in files_by_atime:
-                   files_by_atime[t] = []
-               files_by_atime[t].append(file_name)
+        for dirname in os.listdir(blob_dir):
+            if not cache_file_name(dirname):
+                continue
+            base = os.path.join(blob_dir, dirname)
+            if not os.path.isdir(base):
+                continue
+            for file_name in os.listdir(base):
+                if not file_name.endswith(blob_suffix):
+                    continue
+                file_name = os.path.join(base, file_name)
+                if not os.path.isfile(file_name):
+                    continue
+                stat = os.stat(file_name)
+                size += stat.st_size
+                t = int(stat.st_atime)
+                if t not in files_by_atime:
+                    files_by_atime[t] = []
+                files_by_atime[t].append(file_name)
 
-       while size > target and files_by_atime:
-           for file_name in files_by_atime.pop(files_by_atime.minKey()):
-               lockfilename = os.path.join(os.path.dirname(file_name),
-                                           '.lock')
-               try:
-                   lock = zc.lockfile.LockFile(lockfilename)
-               except zc.lockfile.LockError:
-                   continue  # In use, skip
+        logger.info("blob cache size: %s", size)
 
-               try:
-                   size = os.stat(file_name).st_size
-                   try:
-                       ZODB.blob.remove_committed(file_name)
-                   except OSError, v:
-                       pass # probably open on windows
-                   else:
-                       size -= size
-               finally:
-                   lock.close()
+        while size > target and files_by_atime:
+            for file_name in files_by_atime.pop(files_by_atime.minKey()):
+                lockfilename = os.path.join(os.path.dirname(file_name),
+                                            '.lock')
+                try:
+                    lock = zc.lockfile.LockFile(lockfilename)
+                except zc.lockfile.LockError:
+                    logger.info("Skipping locked %s",
+                                os.path.basename(file_name))
+                    continue  # In use, skip
+
+                try:
+                    fsize = os.stat(file_name).st_size
+                    try:
+                        ZODB.blob.remove_committed(file_name)
+                    except OSError, v:
+                        pass # probably open on windows
+                    else:
+                        size -= fsize
+                finally:
+                    lock.close()
+
+        logger.info("reduced blob cache size: %s", size)
+
     finally:
         check_lock.close()
 

Modified: ZODB/trunk/src/ZEO/tests/zeo_blob_cache.test
===================================================================
--- ZODB/trunk/src/ZEO/tests/zeo_blob_cache.test	2008-12-11 16:30:08 UTC (rev 93911)
+++ ZODB/trunk/src/ZEO/tests/zeo_blob_cache.test	2008-12-11 16:31:31 UTC (rev 93912)
@@ -22,8 +22,7 @@
 We'll also create a client.
 
     >>> import ZEO
-    >>> db = ZEO.DB(addr, blob_dir='blobs',
-    ...             blob_cache_size=4000, blob_cache_size_check=10)
+    >>> db = ZEO.DB(addr, blob_dir='blobs', blob_cache_size=4000)
 
 Here, we passed a blob_cache_size parameter, which specifies a target
 blob cache size.  This is not a hard limit, but rather a target.  It
@@ -66,7 +65,7 @@
     
     >>> db.storage._check_blob_size_thread.join()
 
-    >>> cache_size('blobs') < 6000
+    >>> cache_size('blobs') < 5000
     True
 
 If we read all of the blobs, data will be downloaded again, as
@@ -80,7 +79,7 @@
 
     >>> db.storage._check_blob_size_thread.join()
 
-    >>> cache_size('blobs') < 6000
+    >>> cache_size('blobs') < 5000
     True
 
     >>> for i in range(1, 101):
@@ -97,7 +96,7 @@
 
     >>> db.storage._check_blob_size_thread.join()
 
-    >>> cache_size('blobs') < 6000
+    >>> cache_size('blobs') < 5000
     True
 
     >>> for i in range(1, 101):
@@ -107,7 +106,7 @@
 
     >>> db.storage._check_blob_size_thread.join()
 
-    >>> cache_size('blobs') < 6000
+    >>> cache_size('blobs') < 5000
     True
 
 Now let see if we can stress things a bit.  We'll create many clients
@@ -116,8 +115,7 @@
 
     >>> import threading, random
     >>> def run():
-    ...     db = ZEO.DB(addr, blob_dir='blobs',
-    ...                 blob_cache_size=4000, blob_cache_size_check=10)
+    ...     db = ZEO.DB(addr, blob_dir='blobs', blob_cache_size=4000)
     ...     conn = db.open()
     ...     for i in range(300):
     ...         time.sleep(0)
@@ -140,7 +138,7 @@
     >>> for thread in threads:
     ...     thread.join()
 
-    >>> cache_size('blobs') < 6000
+    >>> cache_size('blobs') < 5000
     True
 
 .. cleanup



More information about the Checkins mailing list