[Zope-Checkins] CVS: StandaloneZODB/ZODB/tests - testDB.py:1.1 testCache.py:1.2 testTransaction.py:1.7

Jeremy Hylton jeremy@zope.com
Mon, 15 Apr 2002 14:55:12 -0400


Update of /cvs-repository/StandaloneZODB/ZODB/tests
In directory cvs.zope.org:/tmp/cvs-serv3778

Modified Files:
	testCache.py testTransaction.py 
Added Files:
	testDB.py 
Log Message:
Add a bunch more tests of the LRU cache mechanism and of DB methods.

Also, reformat a test in testTransaction.




=== Added File StandaloneZODB/ZODB/tests/testDB.py ===
import time
import unittest

import ZODB
import ZODB.MappingStorage

from ZODB.tests.MinPO import MinPO

class DBTests(unittest.TestCase):

    def setUp(self):
        store = ZODB.MappingStorage.MappingStorage()
        self.db = ZODB.DB(store)

    def tearDown(self):
        self.db.close()

    def dowork(self, version=''):
        c = self.db.open(version)
        r = c.root()
        o = r[time.time()] = MinPO(0)
        get_transaction().commit()
        for i in range(25):
            o.value = MinPO(i)
            get_transaction().commit()
            o = o.value
        print r.items()
        c.close()

    # make sure the basic methods are callable

    def testSets(self):
        # test set methods that have non-trivial implementations
        self.db.setCacheDeactivateAfter(12) # deprecated
        self.db.setCacheSize(15)
        self.db.setVersionCacheDeactivateAfter(12) # deprecated
        self.db.setVersionCacheSize(15)

def test_suite():
    return unittest.makeSuite(DBTests)



=== StandaloneZODB/ZODB/tests/testCache.py 1.1 => 1.2 ===
 objects in memory under the assumption that they may be used again.
 """
+from __future__ import nested_scopes
 
 import random
 import time
@@ -12,17 +13,20 @@
 
 import ZODB
 import ZODB.MappingStorage
+from ZODB.cPickleCache import PickleCache
+from ZODB.POSException import ConflictError
 from ZODB.PersistentMapping import PersistentMapping
 from ZODB.tests.MinPO import MinPO
 from ZODB.utils import p64
 
+from Persistence import Persistent
+
 class CacheTestBase(unittest.TestCase):
 
     def setUp(self):
         store = ZODB.MappingStorage.MappingStorage()
         self.db = ZODB.DB(store,
-                          cache_size = self.CACHE_SIZE,
-                          cache_deactivate_after = self.CACHE_DEACTIVATE_AFTER)
+                          cache_size = self.CACHE_SIZE)
         self.conns = []
 
     def tearDown(self):
@@ -33,12 +37,12 @@
     NUM_COLLECTIONS = 10
     MAX_OBJECTS = 100
     CACHE_SIZE = 20
-    CACHE_DEACTIVATE_AFTER = 5
 
     def noodle_new_connection(self):
         """Do some reads and writes on a new connection."""
 
         c = self.db.open()
+        self.conns.append(c)
         self.noodle_connection(c)
 
     def noodle_connection(self, c):
@@ -106,5 +110,180 @@
 
     # XXX same for the get and invalidate methods
 
+    def checkLRUitems(self):
+        # get a cache
+        c = self.conns[0]._cache
+        c.lru_items()
+
+    def checkClassItems(self):
+        c = self.conns[0]._cache
+        c.klass_items()
+
+class LRUCacheTests(CacheTestBase):
+    
+    def checkLRU(self):
+        # verify the LRU behavior of the cache
+        CACHE_SIZE = 5
+        self.db.setCacheSize(CACHE_SIZE)
+        c = self.db.open()
+        r = c.root()
+        l = [None] * 10
+        for i in range(10):
+            l[i] = r[i] = MinPO(i)
+        # the root is the only thing in the cache, because all the
+        # other objects are new
+        self.assertEqual(len(c._cache), 1)
+        get_transaction().commit()
+        # commit() will register the objects, placing them in the cache.
+        # at the end of commit, the cache will be reduced down to CACHE_SIZE
+        # items
+        self.assertEqual(c._cache.ringlen(), CACHE_SIZE)
+        x = c._cache.get(p64(0), None)
+        self.assertEqual(x._p_changed, None) # the root is ghosted
+        for i in range(len(l)):
+            if i < CACHE_SIZE:
+                self.assertEqual(l[i]._p_changed, None)
+            else:
+                self.assertEqual(l[i]._p_changed, 0)
+
+    def checkSize(self):
+        self.assertEqual(self.db.cacheSize(), 0)
+        self.assertEqual(self.db.cacheDetailSize(), [])
+
+        CACHE_SIZE = 10
+        self.db.setCacheSize(CACHE_SIZE)
+
+        CONNS = 3
+        for i in range(CONNS):
+            self.noodle_new_connection()
+        
+        self.assertEquals(self.db.cacheSize(), CACHE_SIZE * CONNS)
+        details = self.db.cacheDetailSize()
+        self.assertEquals(len(details), CONNS)
+        for d in details:
+            self.assertEquals(d['ngsize'], CACHE_SIZE)
+            # the root is also in the cache as ghost, because
+            # the connection holds a reference to it
+            self.assertEquals(d['size'], CACHE_SIZE + 1)
+
+    def checkDetail(self):
+        CACHE_SIZE = 10
+        self.db.setCacheSize(CACHE_SIZE)
+
+        CONNS = 3
+        for i in range(CONNS):
+            self.noodle_new_connection()
+        
+        for klass, count in self.db.cacheDetail():
+            if klass.endswith('PersistentMapping'):
+                # one root per connection
+                self.assertEqual(count, CONNS)
+            if klass.endswith('MinPO'):
+                self.assertEqual(count, CONNS * CACHE_SIZE)
+
+        for details in self.db.cacheExtremeDetail():
+            # one dict per object.  keys:
+            if details['klass'].endswith('PersistentMapping'):
+                self.assertEqual(details['state'], None)
+            else:
+                self.assert_(details['klass'].endswith('MinPO'))
+                self.assertEqual(details['state'], 0)
+
+class StubDataManager:
+    def setklassstate(self, object):
+        pass
+
+class StubObject(Persistent):
+    pass
+
+class CacheErrors(unittest.TestCase):
+
+    def setUp(self):
+        self.jar = StubDataManager()
+        self.cache = PickleCache(self.jar)
+
+    def checkGetBogusKey(self):
+        self.assertRaises(KeyError, self.cache.get, p64(0))
+        try:
+            self.cache[12]
+        except KeyError:
+            pass
+        else:
+            self.fail("expected KeyError")
+        try:
+            self.cache[12] = 12
+        except TypeError:
+            pass
+        else:
+            self.fail("expected TyepError")
+        try:
+            del self.cache[12]
+        except TypeError:
+            pass
+        else:
+            self.fail("expected TypeError")
+
+    def checkBogusObject(self):
+        def add(key, obj):
+            self.cache[key] = obj
+        
+        key = p64(2)
+        # value isn't persistent
+        self.assertRaises(TypeError, add, key, 12)
+
+        o = StubObject()
+        # o._p_oid == None
+        self.assertRaises(ValueError, add, key, o)
+
+        o._p_oid = key
+        # o._p_jar == None
+        self.assertRaises(Exception, add, key, o)
+
+        o._p_jar = self.jar
+        self.cache[key] = o
+        # make sure it can be added multiple times
+        self.cache[key] = o
+
+        # same object, different keys
+        self.assertRaises(ValueError, add, p64(0), o)
+
+    def checkTwoCaches(self):
+        jar2 = StubDataManager()
+        cache2 = PickleCache(jar2)
+
+        o = StubObject()
+        key = o._p_oid = p64(1)
+        o._p_jar = jar2
+
+        cache2[key] = o
+
+        try:
+            self.cache[key] = o
+        except ValueError:
+            pass
+        else:
+            self.fail("expected ValueError because object already in cache")
+
+    def checkReadOnlyAttrsWhenCached(self):
+        o = StubObject()
+        key = o._p_oid = p64(1)
+        o._p_jar = self.jar
+        self.cache[key] = o
+        try:
+            o._p_oid = p64(2)
+        except ValueError:
+            pass
+        else:
+            self.fail("expect that you can't change oid of cached object")
+        try:
+            del o._p_jar
+        except ValueError:
+            pass
+        else:
+            self.fail("expect that you can't delete jar of cached object")
+
 def test_suite():
-    return unittest.makeSuite(DBMethods, 'check')
+    s = unittest.makeSuite(DBMethods, 'check')
+    s.addTest(unittest.makeSuite(LRUCacheTests, 'check'))
+    s.addTest(unittest.makeSuite(CacheErrors, 'check'))
+    return s


=== StandaloneZODB/ZODB/tests/testTransaction.py 1.6 => 1.7 ===
 
     def modify(self, nojar=0, tracing=0):
-
         if not nojar:
-            
             if self.nost:
                 self._p_jar = NoSubTransactionJar(tracing=tracing)
-                
             else:
                 self._p_jar = SubTransactionJar(tracing=tracing)
-                
-        else: pass
-
         get_transaction().register(self)
 
-
-
-class TestTxnException(Exception): pass
+class TestTxnException(Exception):
+    pass
 
 class BasicJar: