[Zope3-checkins] CVS: Zope3/src/persistence/tests - __init__.py:1.1.2.1 test_cache.py:1.1.2.1 test_list.py:1.1.2.1 test_persistence.py:1.1.2.1

Jim Fulton jim@zope.com
Mon, 23 Dec 2002 14:30:42 -0500


Update of /cvs-repository/Zope3/src/persistence/tests
In directory cvs.zope.org:/tmp/cvs-serv19908/persistence/tests

Added Files:
      Tag: NameGeddon-branch
	__init__.py test_cache.py test_list.py test_persistence.py 
Log Message:
Initial renaming before debugging

=== Added File Zope3/src/persistence/tests/__init__.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
# 
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
# 
##############################################################################
""" Persistence unit tests."""


=== Added File Zope3/src/persistence/tests/test_cache.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
# 
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
# 
##############################################################################
import unittest, sys, time

class Test(unittest.TestCase):

    def testBasicLife(self):
        dm=DM(Cache())
        p1=P()
        p1._p_oid=1
        p1._p_jar=dm
        dm.cache[1]=p1
        self.assertEqual(dm.cache.statistics(),
                         {'ghosts': 0, 'active': 1},
                         )
        del p1
        self.assertEqual(dm.cache.statistics(),
                         {'ghosts': 0, 'active': 0},
                         )
        p1=P()
        p1._p_oid=1
        p1._p_jar=dm
        dm.cache[1]=p1
        self.assertEqual(dm.cache.statistics(),
                         {'ghosts': 0, 'active': 1},
                         )
        p=dm.cache[1]
        dm.cache.invalidate(1)
        self.assertEqual(dm.cache.statistics(),
                         {'ghosts': 1, 'active': 0},
                         )
        # XXX deal with current cPersistence implementation
        if p._p_changed != 3:
            self.assertEqual(p._p_changed, None)

        p.a=1
        p._p_changed=0
        self.assertEqual(dm.cache.statistics(),
                         {'ghosts': 0, 'active': 1},
                         )
        dm.cache.invalidateMany([1])
        self.assertEqual(dm.cache.statistics(),
                         {'ghosts': 1, 'active': 0},
                         )
        # XXX deal with current cPersistence implementation
        if p._p_changed != 3:
            self.assertEqual(p._p_changed, None)

        p.a=1
        p._p_changed=0
        self.assertEqual(dm.cache.statistics(),
                         {'ghosts': 0, 'active': 1},
                         )
        dm.cache.invalidateMany(None)
        self.assertEqual(dm.cache.statistics(),
                         {'ghosts': 1, 'active': 0},
                         )
        # XXX deal with current cPersistence implementation
        if p._p_changed != 3:
            self.assertEqual(p._p_changed, None)



        p.a=1
        self.assertEqual(dm.cache.statistics(),
                         {'ghosts': 0, 'active': 1},
                         )
        # No changed because p is modified:
        dm.cache.minimize()
        self.assertEqual(dm.cache.statistics(),
                         {'ghosts': 0, 'active': 1},
                         )
        p._p_changed=0
        # Now the minimize should work
        dm.cache.minimize()
        self.assertEqual(dm.cache.statistics(),
                         {'ghosts': 1, 'active': 0},
                         )
        del p
        del p1
        self.assertEqual(dm.cache.statistics(),
                         {'ghosts': 0, 'active': 0},
                         )
        
    def testGC(self):
        dm=DM(Cache(inactive=1, size=2))
        p1=P()
        p1._p_oid=1
        p1._p_jar=dm
        dm.cache[1]=p1
        p1.a=1
        p1._p_atime=int(time.time()-5000)%86400
        dm.cache.incrgc()
        self.assertEqual(dm.cache.statistics(),
                         {'ghosts': 0, 'active': 1},
                         )
        p1._p_changed=0
        dm.cache.incrgc()
        self.assertEqual(dm.cache.statistics(),
                         {'ghosts': 1, 'active': 0},
                         )
        
        p1.a=1
        p1._p_atime=int(time.time()-5000)%86400
        p1._p_changed=0
        self.assertEqual(dm.cache.statistics(),
                         {'ghosts': 0, 'active': 1},
                         )
        dm.cache.full_sweep()
        self.assertEqual(dm.cache.statistics(),
                         {'ghosts': 1, 'active': 0},
                         )
        
        
        
    

from persistence import Persistent

class P(Persistent): pass

from persistence.cache import Cache

class DM:
    def __init__(self, cache):
        self.called=0
        self.cache=cache
        
    def register(self, ob): self.called += 1
    def setstate(self, ob):
        ob.__setstate__({'x': 42})
        self.cache.setstate(ob._p_oid, ob)


def test_suite():
    loader=unittest.TestLoader()
    return loader.loadTestsFromTestCase(Test)

if __name__=='__main__':
    unittest.TextTestRunner().run(test_suite())
    


=== Added File Zope3/src/persistence/tests/test_list.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
# 
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
# 
##############################################################################
"""Test the list interface to PersistentList
"""

import unittest
from persistence.list import PersistentList

l0 = []
l1 = [0]
l2 = [0, 1]

class TestPList(unittest.TestCase):
    def checkTheWorld(self):
        # Test constructors
        u = PersistentList()
        u0 = PersistentList(l0)
        u1 = PersistentList(l1)
        u2 = PersistentList(l2)

        uu = PersistentList(u)
        uu0 = PersistentList(u0)
        uu1 = PersistentList(u1)
        uu2 = PersistentList(u2)

        v = PersistentList(tuple(u))
        class OtherList:
            def __init__(self, initlist):
                self.__data = initlist
            def __len__(self):
                return len(self.__data)
            def __getitem__(self, i):
                return self.__data[i]
        v0 = PersistentList(OtherList(u0))
        vv = PersistentList("this is also a sequence")

        # Test __repr__
        eq = self.assertEqual

        eq(str(u0), str(l0), "str(u0) == str(l0)")
        eq(repr(u1), repr(l1), "repr(u1) == repr(l1)")
        eq(`u2`, `l2`, "`u2` == `l2`")

        # Test __cmp__ and __len__

        def mycmp(a, b):
            r = cmp(a, b)
            if r < 0: return -1
            if r > 0: return 1
            return r

        all = [l0, l1, l2, u, u0, u1, u2, uu, uu0, uu1, uu2]
        for a in all:
            for b in all:
                eq(mycmp(a, b), mycmp(len(a), len(b)),
                      "mycmp(a, b) == mycmp(len(a), len(b))")

        # Test __getitem__

        for i in range(len(u2)):
            eq(u2[i], i, "u2[i] == i")

        # Test __setitem__

        uu2[0] = 0
        uu2[1] = 100
        try:
            uu2[2] = 200
        except IndexError:
            pass
        else:
            raise TestFailed("uu2[2] shouldn't be assignable")

        # Test __delitem__

        del uu2[1]
        del uu2[0]
        try:
            del uu2[0]
        except IndexError:
            pass
        else:
            raise TestFailed("uu2[0] shouldn't be deletable")

        # Test __getslice__

        for i in range(-3, 4):
            eq(u2[:i], l2[:i], "u2[:i] == l2[:i]")
            eq(u2[i:], l2[i:], "u2[i:] == l2[i:]")
            for j in range(-3, 4):
                eq(u2[i:j], l2[i:j], "u2[i:j] == l2[i:j]")

        # Test __setslice__

        for i in range(-3, 4):
            u2[:i] = l2[:i]
            eq(u2, l2, "u2 == l2")
            u2[i:] = l2[i:]
            eq(u2, l2, "u2 == l2")
            for j in range(-3, 4):
                u2[i:j] = l2[i:j]
                eq(u2, l2, "u2 == l2")

        uu2 = u2[:]
        uu2[:0] = [-2, -1]
        eq(uu2, [-2, -1, 0, 1], "uu2 == [-2, -1, 0, 1]")
        uu2[0:] = []
        eq(uu2, [], "uu2 == []")

        # Test __contains__
        for i in u2:
            self.failUnless(i in u2, "i in u2")
        for i in min(u2)-1, max(u2)+1:
            self.failUnless(i not in u2, "i not in u2")

        # Test __delslice__

        uu2 = u2[:]
        del uu2[1:2]
        del uu2[0:1]
        eq(uu2, [], "uu2 == []")

        uu2 = u2[:]
        del uu2[1:]
        del uu2[:1]
        eq(uu2, [], "uu2 == []")

        # Test __add__, __radd__, __mul__ and __rmul__

        #self.failUnless(u1 + [] == [] + u1 == u1, "u1 + [] == [] + u1 == u1")
        self.failUnless(u1 + [1] == u2, "u1 + [1] == u2")
        #self.failUnless([-1] + u1 == [-1, 0], "[-1] + u1 == [-1, 0]")
        self.failUnless(u2 == u2*1 == 1*u2, "u2 == u2*1 == 1*u2")
        self.failUnless(u2+u2 == u2*2 == 2*u2, "u2+u2 == u2*2 == 2*u2")
        self.failUnless(u2+u2+u2 == u2*3 == 3*u2, "u2+u2+u2 == u2*3 == 3*u2")

        # Test append

        u = u1[:]
        u.append(1)
        eq(u, u2, "u == u2")

        # Test insert

        u = u2[:]
        u.insert(0, -1)
        eq(u, [-1, 0, 1], "u == [-1, 0, 1]")

        # Test pop

        u = PersistentList([0, -1, 1])
        u.pop()
        eq(u, [0, -1], "u == [0, -1]")
        u.pop(0)
        eq(u, [-1], "u == [-1]")

        # Test remove

        u = u2[:]
        u.remove(1)
        eq(u, u1, "u == u1")

        # Test count
        u = u2*3
        eq(u.count(0), 3, "u.count(0) == 3")
        eq(u.count(1), 3, "u.count(1) == 3")
        eq(u.count(2), 0, "u.count(2) == 0")


        # Test index

        eq(u2.index(0), 0, "u2.index(0) == 0")
        eq(u2.index(1), 1, "u2.index(1) == 1")
        try:
            u2.index(2)
        except ValueError:
            pass
        else:
            raise TestFailed("expected ValueError")

        # Test reverse

        u = u2[:]
        u.reverse()
        eq(u, [1, 0], "u == [1, 0]")
        u.reverse()
        eq(u, u2, "u == u2")

        # Test sort

        u = PersistentList([1, 0])
        u.sort()
        eq(u, u2, "u == u2")

        # Test extend

        u = u1[:]
        u.extend(u2)
        eq(u, u1 + u2, "u == u1 + u2")


def test_suite():
    return unittest.makeSuite(TestPList, 'check')

if __name__ == "__main__":
    loader = unittest.TestLoader()
    loader.testMethodPrefix = "check"
    unittest.main(testLoader=loader)


=== Added File Zope3/src/persistence/tests/test_persistence.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
# 
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
# 
##############################################################################
import unittest

from persistence import Persistent
from persistence.interfaces import IPersistent

class Test(unittest.TestCase):

    klass = None # override in subclass

    def testSaved(self):
        p = self.klass()
        p._p_oid = '\0\0\0\0\0\0hi'
        dm = DM()
        p._p_jar = dm
        self.assertEqual(p._p_changed, 0)
        self.assertEqual(dm.called, 0)
        p.inc()
        self.assertEqual(p._p_changed, 1)
        self.assertEqual(dm.called, 1)
        p.inc()
        self.assertEqual(p._p_changed, 1)
        self.assertEqual(dm.called, 1)
        p._p_changed = None
        self.assertEqual(p._p_changed, 1)
        self.assertEqual(dm.called, 1)
        p._p_deactivate()
        self.assertEqual(p._p_changed, 1)
        self.assertEqual(dm.called, 1)
        del p._p_changed
        # XXX deal with current cPersistence implementation
        if p._p_changed != 3:
            self.assertEqual(p._p_changed, None)
        self.assertEqual(dm.called, 1)
        p.inc()
        self.assertEqual(p.x, 43)
        self.assertEqual(p._p_changed, 1)
        self.assertEqual(dm.called, 2)
        p._p_changed = 0
        self.assertEqual(p._p_changed, 0)
        self.assertEqual(dm.called, 2)
        self.assertEqual(p.x, 43)
        p.inc()
        self.assertEqual(p._p_changed, 1)
        self.assertEqual(dm.called, 3)

    def testUnsaved(self):
        p = self.klass()
        
        self.assertEqual(p.x, 0)
        self.assertEqual(p._p_changed, 0)
        self.assertEqual(p._p_jar, None)
        self.assertEqual(p._p_oid, None)
        self.assertEqual(p._p_serial, None)
        p.inc()
        p.inc()
        self.assertEqual(p.x, 2)
        self.assertEqual(p._p_changed, 0)

        p._p_deactivate()
        self.assertEqual(p._p_changed, 0)
        p._p_changed = 1
        self.assertEqual(p._p_changed, 0)
        p._p_changed = None
        self.assertEqual(p._p_changed, 0)
        del p._p_changed
        self.assertEqual(p._p_changed, 0)
        if self.has_dict:
            self.failUnless(p.__dict__)
        self.assertEqual(p.x, 2)

    def testState(self):
        p = self.klass()
        self.assertEqual(p.__getstate__(), {'x': 0})
        self.assertEqual(p._p_changed, 0)
        p.__setstate__({'x':5})
        self.assertEqual(p._p_changed, 0)
        if self.has_dict:
            p._v_foo = 2
        self.assertEqual(p.__getstate__(), {'x': 5})
        self.assertEqual(p._p_changed, 0)

    def testDirectChanged(self):
        p = self.klass()
        p._p_oid = 1
        dm = DM()
        p._p_jar = dm
        self.assertEqual(p._p_changed, 0)
        self.assertEqual(dm.called, 0)
        p._p_changed = 1
        self.assertEqual(dm.called, 1)

    def testActivate(self):
        p = self.klass()
        dm = DM()
        p._p_oid = 1
        p._p_jar = dm
        p._p_changed = 0
        p._p_deactivate()
        # XXX does this really test the activate method?
        p._p_activate()
        self.assertEqual(p.x, 42)

    def testInterface(self):
        self.assert_(IPersistent.isImplementedByInstancesOf(Persistent),
                     "%s does not implement IPersistent" % Persistent)
        p = Persistent()
        self.assert_(IPersistent.isImplementedBy(p),
                     "%s does not implement IPersistent" % p)
        
        self.assert_(IPersistent.isImplementedByInstancesOf(P),
                     "%s does not implement IPersistent" % P)
        p = self.klass()
        self.assert_(IPersistent.isImplementedBy(p),
                     "%s does not implement IPersistent" % p)

    def testDataManagerAndAttributes(self):
        # Test to cover an odd bug where the instance __dict__ was
        # set at the same location as the data manager in the C type.
        p = P()
        p.inc()
        p.inc()
        self.assert_('x' in p.__dict__)
        self.assert_(p._p_jar is None)

    def testMultipleInheritance(self):
        # make sure it is possible to inherit from two different
        # subclasses of persistent.
        class A(Persistent):
            pass
        class B(Persistent):
            pass
        class C(A, B):
            pass
        class D(object):
            pass
        class E(D, B):
            pass

    def testBasicTypeStructure(self):
        # test that a persistent class has a sane C type structure
        # use P (defined below) as simplest example
        self.assertEqual(Persistent.__dictoffset__, 0)
        self.assertEqual(Persistent.__weakrefoffset__, 0)
        self.assert_(Persistent.__basicsize__ > object.__basicsize__)
        self.assert_(P.__dictoffset__)
        self.assert_(P.__weakrefoffset__)
        self.assert_(P.__dictoffset__ < P.__weakrefoffset__)
        self.assert_(P.__basicsize__ > Persistent.__basicsize__)

class P(Persistent):
    def __init__(self):
        self.x = 0
    def inc(self):
        self.x += 1

class P2(P):
    def __getstate__(self):
        return 42
    def __setstate__(self, v):
        self.v = v

class B(Persistent):

    __slots__ = ["x"]
    
    def __init__(self):
        self.x = 0
        
    def inc(self):
        self.x += 1

    def __getstate__(self):
        return {'x': self.x}

    def __setstate__(self, state):
        self.x = state['x']

class DM:
    def __init__(self):
        self.called = 0
    def register(self, ob):
        self.called += 1
    def setstate(self, ob):
        ob.__setstate__({'x': 42})

class PersistentTest(Test):
    klass = P
    has_dict = 1

    def testPicklable(self):
        import pickle

        p = self.klass()
        p.inc()
        p2 = pickle.loads(pickle.dumps(p))
        self.assertEqual(p2.__class__, self.klass);
        self.assertEqual(p2.__dict__, p.__dict__)

    def testPicklableWCustomState(self):
        import pickle
            
        p = P2()
        p2 = pickle.loads(pickle.dumps(p))
        self.assertEqual(p2.__class__, P2);
        self.assertEqual(p2.__dict__, {'v': 42})

class BasePersistentTest(Test):
    klass = B
    has_dict = 0

def test_suite():
    s = unittest.TestSuite()
    for klass in PersistentTest, BasePersistentTest:
        s.addTest(unittest.makeSuite(klass))
    return s

if __name__ == '__main__':
    unittest.TextTestRunner().run(test_suite())