[Checkins] SVN: ZODB/trunk/src/ZEO/ Made caches thread safe. In theory, caches are protected by ZEO

Jim Fulton jim at zope.com
Fri Aug 29 09:59:31 EDT 2008


Log message for revision 90595:
  Made caches thread safe.  In theory, caches are protected by ZEO
  clients, but ZEO clients haven't provided very good protection,
  leading to cache corruption.  We'll hopefully fix these client issues,
  which cause other problems beside cache corruption, but it seems
  prudent to provide low-level cache protection.
  

Changed:
  U   ZODB/trunk/src/ZEO/ClientStorage.py
  U   ZODB/trunk/src/ZEO/cache.py
  U   ZODB/trunk/src/ZEO/tests/test_cache.py

-=-
Modified: ZODB/trunk/src/ZEO/ClientStorage.py
===================================================================
--- ZODB/trunk/src/ZEO/ClientStorage.py	2008-08-29 13:55:50 UTC (rev 90594)
+++ ZODB/trunk/src/ZEO/ClientStorage.py	2008-08-29 13:59:29 UTC (rev 90595)
@@ -341,8 +341,6 @@
         else:
             cache_path = None
         self._cache = self.ClientCacheClass(cache_path, size=cache_size)
-        # TODO:  maybe there's a better time to open the cache?  Unclear.
-        self._cache.open()
 
         self._rpc_mgr = self.ConnectionManagerClass(addr, self,
                                                     tmin=min_disconnect_poll,

Modified: ZODB/trunk/src/ZEO/cache.py
===================================================================
--- ZODB/trunk/src/ZEO/cache.py	2008-08-29 13:55:50 UTC (rev 90594)
+++ ZODB/trunk/src/ZEO/cache.py	2008-08-29 13:59:29 UTC (rev 90595)
@@ -30,6 +30,7 @@
 import logging
 import os
 import tempfile
+import threading
 import time
 
 import ZODB.fsIndex
@@ -119,7 +120,22 @@
 # to the end of the file that the new object can't fit in one
 # contiguous chunk, currentofs is reset to ZEC_HEADER_SIZE first.
 
+class locked(object):
 
+    def __init__(self, func):
+        self.func = func
+
+    def __get__(self, inst, class_):
+        if inst is None:
+            return self
+        def call(*args, **kw):
+            inst._lock.acquire()
+            try:
+                return self.func(inst, *args, **kw)
+            finally:
+                inst._lock.release()
+        return call
+
 class ClientCache(object):
     """A simple in-memory cache."""
 
@@ -197,6 +213,10 @@
 
         self._setup_trace(path)
 
+        self.open()
+
+        self._lock = threading.RLock()
+
     # Backward compatibility. Client code used to have to use the fc
     # attr to get to the file cache to get cache stats.
     @property
@@ -351,6 +371,7 @@
     # instance, and also written out near the start of the cache file.  The
     # new tid must be strictly greater than our current idea of the most
     # recent tid.
+    @locked
     def setLastTid(self, tid):
         if self.tid is not None and tid <= self.tid:
             raise ValueError("new last tid (%s) must be greater than "
@@ -367,10 +388,11 @@
     # @return a transaction id
     # @defreturn string, or None if no transaction is yet known
     def getLastTid(self):
-        if self.tid == z64:
+        tid = self.tid
+        if tid == z64:
             return None
         else:
-            return self.tid
+            return tid
 
     ##
     # Return the current data record for oid.
@@ -379,6 +401,7 @@
     #         in the cache
     # @defreturn 3-tuple: (string, string, string)
 
+    @locked
     def load(self, oid):
         ofs = self.current.get(oid)
         if ofs is None:
@@ -406,6 +429,7 @@
     # @return data record, serial number, start tid, and end tid
     # @defreturn 4-tuple: (string, string, string, string)
 
+    @locked
     def loadBefore(self, oid, before_tid):
         noncurrent_for_oid = self.noncurrent.get(u64(oid))
         if noncurrent_for_oid is None:
@@ -447,6 +471,7 @@
     #                current.
     # @param data the actual data
 
+    @locked
     def store(self, oid, start_tid, end_tid, data):
         seek = self.f.seek
         if end_tid is None:
@@ -533,6 +558,7 @@
     # @param oid object id
     # @param tid the id of the transaction that wrote a new revision of oid,
     #        or None to forget all cached info about oid.
+    @locked
     def invalidate(self, oid, tid):
         if tid > self.tid and tid is not None:
             self.setLastTid(tid)
@@ -572,13 +598,19 @@
         seek = self.f.seek
         read = self.f.read
         for oid, ofs in self.current.iteritems():
-            seek(ofs)
-            assert read(1) == 'a', (ofs, self.f.tell(), oid)
-            size, saved_oid, tid, end_tid = unpack(">I8s8s8s", read(28))
-            assert saved_oid == oid, (ofs, self.f.tell(), oid, saved_oid)
-            assert end_tid == z64, (ofs, self.f.tell(), oid)
-            yield oid, tid
+            self._lock.acquire()
+            try:
+                seek(ofs)
+                assert read(1) == 'a', (ofs, self.f.tell(), oid)
+                size, saved_oid, tid, end_tid = unpack(">I8s8s8s", read(28))
+                assert saved_oid == oid, (ofs, self.f.tell(), oid, saved_oid)
+                assert end_tid == z64, (ofs, self.f.tell(), oid)
+                result = oid, tid
+            finally:
+                self._lock.release()
 
+            yield result
+
     def dump(self):
         from ZODB.utils import oid_repr
         print "cache size", len(self)

Modified: ZODB/trunk/src/ZEO/tests/test_cache.py
===================================================================
--- ZODB/trunk/src/ZEO/tests/test_cache.py	2008-08-29 13:55:50 UTC (rev 90594)
+++ ZODB/trunk/src/ZEO/tests/test_cache.py	2008-08-29 13:59:29 UTC (rev 90595)
@@ -67,7 +67,6 @@
         # testSerialization reads the entire file into a string, it's not
         # good to leave it that big.
         self.cache = ZEO.cache.ClientCache(size=1024**2)
-        self.cache.open()
 
     def tearDown(self):
         if self.cache.path:
@@ -154,7 +153,6 @@
         dst.write(src.read(self.cache.maxsize))
         dst.close()
         copy = ZEO.cache.ClientCache(path)
-        copy.open()
 
         # Verify that internals of both objects are the same.
         # Could also test that external API produces the same results.
@@ -170,7 +168,6 @@
         if self.cache.path:
             os.remove(self.cache.path)
         self.cache = ZEO.cache.ClientCache(size=50)
-        self.cache.open()
 
         # We store an object that is a bit larger than the cache can handle.
         self.cache.store(n1, n2, None, "x"*64)
@@ -186,7 +183,6 @@
         if self.cache.path:
             os.remove(self.cache.path)
         cache = ZEO.cache.ClientCache(size=50)
-        cache.open()
 
         # We store an object that is a bit larger than the cache can handle.
         cache.store(n1, n2, n3, "x"*64)
@@ -231,7 +227,6 @@
     ...     _ = os.spawnl(os.P_WAIT, sys.executable, sys.executable, 't')
     ...     if os.path.exists('cache'):
     ...         cache = ZEO.cache.ClientCache('cache')
-    ...         cache.open()
     ...         cache.close()
     ...         os.remove('cache')
     ...         os.remove('cache.lock')
@@ -251,7 +246,6 @@
     >>> cache.store(p64(1), p64(1), None, data)
     >>> cache.close()
     >>> cache = ZEO.cache.ClientCache('cache', 1000)
-    >>> cache.open()
     >>> cache.store(p64(2), p64(2), None, 'XXX')
 
     >>> cache.close()
@@ -268,6 +262,56 @@
 
     >>> cache.close()
     """,
+
+    thread_safe =
+    r"""
+
+    >>> import ZEO.cache, ZODB.utils
+    >>> cache = ZEO.cache.ClientCache('cache', 1000000)
+
+    >>> for i in range(100):
+    ...     cache.store(ZODB.utils.p64(i), ZODB.utils.p64(1), None, '0')
+
+    >>> import random, sys, threading
+    >>> random = random.Random(0)
+    >>> stop = False
+    >>> read_failure = None
+
+    >>> def read_thread():
+    ...     def pick_oid():
+    ...         return ZODB.utils.p64(random.randint(0,99))
+    ...
+    ...     try:
+    ...         while not stop:
+    ...             cache.load(pick_oid())
+    ...             cache.loadBefore(pick_oid(), ZODB.utils.p64(2))
+    ...     except:
+    ...         global read_failure
+    ...         read_failure = sys.exc_info()
+
+    >>> thread = threading.Thread(target=read_thread)
+    >>> thread.start()
+
+    >>> for tid in range(2,10):
+    ...     for oid in range(100):
+    ...         oid = ZODB.utils.p64(oid)
+    ...         cache.invalidate(oid, ZODB.utils.p64(tid))
+    ...         cache.store(oid, ZODB.utils.p64(tid), None, str(tid))
+
+    >>> stop = True
+    >>> thread.join()
+    >>> if read_failure:
+    ...    print 'Read failure:'
+    ...    import traceback
+    ...    traceback.print_exception(*read_failure)
+
+    >>> expected = '9', ZODB.utils.p64(9)
+    >>> for oid in range(100):
+    ...     loaded = cache.load(ZODB.utils.p64(oid))
+    ...     if loaded != expected:
+    ...         print oid, loaded
+    
+    """,
     )
 
 def test_suite():



More information about the Checkins mailing list