[Checkins] SVN: ZODB/branches/jim-zeo-blob-cache/ checkpoint

Jim Fulton jim at zope.com
Mon Dec 1 19:57:02 EST 2008


Log message for revision 93525:
  checkpoint

Changed:
  U   ZODB/branches/jim-zeo-blob-cache/buildout.cfg
  U   ZODB/branches/jim-zeo-blob-cache/src/ZEO/ClientStorage.py
  U   ZODB/branches/jim-zeo-blob-cache/src/ZEO/tests/forker.py
  U   ZODB/branches/jim-zeo-blob-cache/src/ZEO/tests/testZEO.py

-=-
Modified: ZODB/branches/jim-zeo-blob-cache/buildout.cfg
===================================================================
--- ZODB/branches/jim-zeo-blob-cache/buildout.cfg	2008-12-02 00:11:40 UTC (rev 93524)
+++ ZODB/branches/jim-zeo-blob-cache/buildout.cfg	2008-12-02 00:57:01 UTC (rev 93525)
@@ -15,4 +15,5 @@
 [scripts]
 recipe = zc.recipe.egg
 eggs = ZODB3
+       lockfile
 interpreter = py

Modified: ZODB/branches/jim-zeo-blob-cache/src/ZEO/ClientStorage.py
===================================================================
--- ZODB/branches/jim-zeo-blob-cache/src/ZEO/ClientStorage.py	2008-12-02 00:11:40 UTC (rev 93524)
+++ ZODB/branches/jim-zeo-blob-cache/src/ZEO/ClientStorage.py	2008-12-02 00:57:01 UTC (rev 93525)
@@ -30,7 +30,7 @@
 from ZODB import POSException
 from ZODB import utils
 from ZODB.loglevels import BLATHER
-import BTrees.IOBTree()
+import BTrees.IOBTree
 import cPickle
 import logging
 import os
@@ -121,7 +121,7 @@
                  drop_cache_rather_verify=False,
                  username='', password='', realm=None,
                  blob_dir=None, shared_blob_dir=False,
-                 blob_cache_size=1<<30, blob_cache_size_check=50,
+                 blob_cache_size=1<<62, blob_cache_size_check=100,
                  ):
         """ClientStorage constructor.
 
@@ -247,11 +247,6 @@
             read_only_fallback and "fallback" or "normal",
             storage,
             )
-        
-        if debug:
-            logger.warning(
-                "%s ClientStorage(): debug argument is no longer used",
-                self.__name__)
 
         self._drop_cache_rather_verify = drop_cache_rather_verify
 
@@ -459,36 +454,53 @@
         if self.shared_blob_dir or not self.blob_dir:
             return
 
-        def check():
-            target = self._blob_cache_size
-            files_by_atime = BTree.IOBTree.BTree()
-            for base, dirs, files in os.walk(self.blob_dir):
-                if base = self.temporaryDirectory():
-                    continue
-                for file_name in files:
-                    file_name = os.path.join(base, file_name)
-                    stat = os.stat(file_name).st_size
-                    target -= stat.st_size
-                    t = max(stat.st_atime, stat.st_mtime)
-                    if t not in files_by_atime:
-                        files_by_atime[t] = []
-                    files_by_atime[t] = file_name
-
-            while target <= 0 and files_by_atime:
-                for file_name in files_by_atime.pop(files_by_atime.minKey()):
-                    size = os.stat(file_name).st_size
-                    try:
-                        os.remove(file_name)
-                    except OSError:
-                        pass
-                    else:
-                        target -= size
-            
-        check_blob_size_thread = threading.Thread(target=check)
+        check_blob_size_thread = threading.Thread(
+            target=self._check_blob_size_method)
         check_blob_size_thread.setDaemon(True)
         check_blob_size_thread.start()
         self._check_blob_size_thread = check_blob_size_thread
 
+    def _check_blob_size_method(self):
+        try:
+            lock = zc.lockfile.LockFile(
+                os.path.join(self.blob_dir, 'cache.lock'))
+        except zc.lockfile.LockError:
+            # Someone is already cleaning up, so don't bother
+            return
+
+        try:
+           target = self._blob_cache_size
+           tmp = self.temporaryDirectory()
+           blob_suffix = ZODB.blob.BLOB_SUFFIX
+           files_by_atime = BTrees.IOBTree.BTree()
+           for base, dirs, files in os.walk(self.blob_dir):
+               if base == tmp:
+                   del dirs[:]
+                   continue
+               for file_name in files:
+                   if not file_name.endswith(blob_suffix):
+                       continue
+                   file_name = os.path.join(base, file_name)
+                   stat = os.stat(file_name)
+                   target -= stat.st_size
+                   t = max(stat.st_atime, stat.st_mtime)
+                   if t not in files_by_atime:
+                       files_by_atime[t] = []
+                   files_by_atime[t].append(file_name)
+
+           while target <= 0 and files_by_atime:
+               for file_name in files_by_atime.pop(files_by_atime.minKey()):
+                   size = os.stat(file_name).st_size
+                   try:
+                       os.remove(file_name)
+                   except OSError:
+                       raise
+                   else:
+                       target -= size
+        finally:
+            lock.close()
+
+
     def registerDB(self, db):
         """Storage API: register a database for invalidation messages.
 
@@ -950,13 +962,6 @@
         self._server.storeBlobShared(
             oid, serial, data, os.path.basename(target), id(txn))
 
-    def _have_blob(self, blob_filename, oid, serial):
-        if os.path.exists(blob_filename):
-            logger.debug("%s Found blob %r/%r in cache.",
-                         self.__name__, oid, serial)
-            return True
-        return False
-
     def receiveBlobStart(self, oid, serial):
         blob_filename = self.fshelper.getBlobFilename(oid, serial)
         assert not os.path.exists(blob_filename)
@@ -993,7 +998,7 @@
 
         blob_filename = self.fshelper.getBlobFilename(oid, serial)
         # Case 1: Blob is available already, just use it
-        if self._have_blob(blob_filename, oid, serial):
+        if os.path.exists(blob_filename):
             return blob_filename
 
         if self.shared_blob_dir:
@@ -1001,14 +1006,21 @@
             # here, it's not anywhere.
             raise POSException.POSKeyError("No blob file", oid, serial)
 
+        self._blob_download_name = 
+
+
         # First, we'll create the directory for this oid, if it doesn't exist. 
-        self.fshelper.createPathForOID(oid)
+#        self.fshelper.createPathForOID(oid)
 
         # OK, it's not here and we (or someone) needs to get it.  We
         # want to avoid getting it multiple times.  We want to avoid
         # getting it multiple times even accross separate client
         # processes on the same machine. We'll use file locking.
 
+        lockfilename = os.path.join(
+            self.blob_dir, (oid+serial).encode(hex)+'.lock')
+
+
         lockfilename = blob_filename+'.lock'
         try:
             lock = zc.lockfile.LockFile(lockfilename)
@@ -1033,7 +1045,7 @@
                         pass
                     break
 
-            if self._have_blob(blob_filename, oid, serial):
+            if os.path.exists(blob_filename):
                 return blob_filename
 
             return None
@@ -1043,7 +1055,7 @@
             # we'll double check that someone didn't download it while we
             # were getting the lock:
 
-            if self._have_blob(blob_filename, oid, serial):
+            if os.path.exists(blob_filename):
                 return blob_filename
 
             # Ask the server to send it to us.  When this function
@@ -1052,7 +1064,7 @@
 
             self._server.sendBlob(oid, serial)
 
-            if self._have_blob(blob_filename, oid, serial):
+            if os.path.exists(blob_filename):
                 return blob_filename
 
             raise POSException.POSKeyError("No blob file", oid, serial)
@@ -1204,12 +1216,12 @@
             blobs = self._tbuf.blobs
             while blobs:
                 oid, blobfilename = blobs.pop()
-                self._blob_cache_size += os.stat(blobfilename).st_size
+                self._blob_data_bytes_loaded += os.stat(blobfilename).st_size
                 targetpath = self.fshelper.getPathForOID(oid, create=True)
                 rename_or_copy_blob(blobfilename,
                                     self.fshelper.getBlobFilename(oid, tid),
                                     )
-                if self._blob_cache_size > self._blob_cache_size_check:
+                if self._blob_data_bytes_loaded > self._blob_cache_size_check:
                     self._check_blob_size()
 
         self._tbuf.clear()

Modified: ZODB/branches/jim-zeo-blob-cache/src/ZEO/tests/forker.py
===================================================================
--- ZODB/branches/jim-zeo-blob-cache/src/ZEO/tests/forker.py	2008-12-02 00:11:40 UTC (rev 93524)
+++ ZODB/branches/jim-zeo-blob-cache/src/ZEO/tests/forker.py	2008-12-02 00:57:01 UTC (rev 93525)
@@ -285,7 +285,7 @@
     servers = {}
 
     def start_server(storage_conf=None, zeo_conf=None, port=None, keep=False,
-                     addr=None, path='Data.fs', protocol=None):
+                     addr=None, path='Data.fs', protocol=None, blob_dir=None):
         """Start a ZEO server.
 
         Return the server and admin addresses.
@@ -298,7 +298,7 @@
         elif addr is not None:
             raise TypeError("Can't specify port and addr")
         addr, adminaddr, pid, config_path = start_zeo_server(
-            storage_conf, zeo_conf, port, keep, path, protocol)
+            storage_conf, zeo_conf, port, keep, path, protocol, blob_dir)
         os.remove(config_path)
         servers[adminaddr] = pid
         return addr, adminaddr

Modified: ZODB/branches/jim-zeo-blob-cache/src/ZEO/tests/testZEO.py
===================================================================
--- ZODB/branches/jim-zeo-blob-cache/src/ZEO/tests/testZEO.py	2008-12-02 00:11:40 UTC (rev 93524)
+++ ZODB/branches/jim-zeo-blob-cache/src/ZEO/tests/testZEO.py	2008-12-02 00:57:01 UTC (rev 93525)
@@ -1168,7 +1168,7 @@
         doctest.DocFileSuite(
             'zeo-fan-out.test', 'zdoptions.test',
             'drop_cache_rather_than_verify.txt',
-            'protocols.test',
+            'protocols.test', 'zeo_blob_cache.test',
             setUp=forker.setUp, tearDown=zope.testing.setupstack.tearDown,
             ),
         )



More information about the Checkins mailing list