[Zope3-checkins] CVS: Zope3/src/zodb/code/tests - __init__.py:1.1.2.1 _pmtest.py:1.1.2.1 atestmodule.py:1.1.2.1 test_module.py:1.1.2.1 test_patch.py:1.1.2.1

Jim Fulton jim@zope.com
Mon, 23 Dec 2002 14:30:50 -0500


Update of /cvs-repository/Zope3/src/zodb/code/tests
In directory cvs.zope.org:/tmp/cvs-serv19908/zodb/code/tests

Added Files:
      Tag: NameGeddon-branch
	__init__.py _pmtest.py atestmodule.py test_module.py 
	test_patch.py 
Log Message:
Initial renaming before debugging

=== Added File Zope3/src/zodb/code/tests/__init__.py ===
#
# This file is necessary to make this directory a package.


=== Added File Zope3/src/zodb/code/tests/_pmtest.py ===
"""A simple module"""

# XXX why aren't modules pickleable?
# import os
# from xml import sax

a = 1
b = 2
c = 3

def f(x):
    return a * x ** 2 + b * x + c



=== Added File Zope3/src/zodb/code/tests/atestmodule.py ===
"""A module used to test persistent module patching."""

def aFunc():
    def nestedFunc():
        return aFunc
    return 1

class Foo(object):
    def meth(self):
        return 0

    class Nested(object):
        def bar(self):
            return 1

class Bar:
    def bar(self, x):
        return 2 * x

    static = staticmethod(aFunc)
    alias = aFunc

    classbar = classmethod(bar)

def anotherFunc():
    class NotFound:
        pass


=== Added File Zope3/src/zodb/code/tests/test_module.py ===
##############################################################################
#
# Copyright (c) 2002 Zope Corporation and Contributors.
# All Rights Reserved.
# 
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
# 
##############################################################################
import os
import unittest

from zodb.storage.mapping import DB, MappingStorage
from zodb.utils import u64
import zodb.db

from persistence.dict import PersistentDict
from zodb.code.module import \
     PersistentModuleManager, PersistentModuleRegistry, \
     PersistentModuleImporter, PersistentPackage
from persistence import tests

from transaction import get_transaction

# snippets of source code used by testModules
foo_src = """\
import string
x = 1
def f(y):
    return x + y
"""
quux_src = """\
from foo import x
def f(y):
    return x + y
"""
side_effect_src = """\
x = 1
def inc():
    global x
    x += 1
    return x
"""
builtin_src = """\
x = 1, 2, 3
def f():
    return len(x)
"""
nested_src = """\
def f(x):
    def g(y):
        def z(z):
            return x + y + z
        return x + y
    return g

g = f(3)
"""
closure_src = """\
def f(x):
    def g(y):
        return x + y
    return g

inc = f(1)
"""

class TestPersistentModuleImporter(PersistentModuleImporter):

    def __init__(self, registry):
        self._registry = registry
        self._registry._p_activate()

    def __import__(self, name, globals={}, locals={}, fromlist=[]):
        mod = self._import(self._registry, name, self._get_parent(globals),
                           fromlist)
        if mod is not None:
            return mod
        return self._saved_import(name, globals, locals, fromlist)

class TestModule(unittest.TestCase):

    def setUp(self):
        self.db = DB()
        self.root = self.db.open().root()
        self.registry = PersistentModuleRegistry()
        self.importer = TestPersistentModuleImporter(self.registry)
        self.importer.install()
        self.root["registry"] = self.registry
        get_transaction().commit()
        _dir, _file = os.path.split(tests.__file__)
        self._pmtest = os.path.join(_dir, "_pmtest.py")

    def tearDown(self):
        self.importer.uninstall()

    def testModule(self):
        mgr = PersistentModuleManager(self.registry)
        mgr.new("pmtest", open(self._pmtest).read())
        get_transaction().commit()
        self.assert_(self.registry.findModule("pmtest"))
        import pmtest
        pmtest._p_deactivate()
        self.assertEqual(pmtest.a, 1)
        pmtest.f(4)

    def testUpdateFunction(self):
        mgr = PersistentModuleManager(self.registry)
        mgr.new("pmtest", "def f(x): return x")
        get_transaction().commit()
        import pmtest
        self.assertEqual(pmtest.f(3), 3)
        copy = pmtest.f
        mgr.update("def f(x): return x + 1")
        get_transaction().commit()
        pmtest._p_deactivate()
        self.assertEqual(pmtest.f(3), 4)
        self.assertEqual(copy(3), 4)

    def testUpdateClass(self):
        mgr = PersistentModuleManager(self.registry)
        mgr.new("pmtest", src)
        get_transaction().commit()
        import pmtest
        inst = pmtest.Foo()
        v0 = inst.x
        v1 = inst.m()
        v2 = inst.n()
        self.assertEqual(v1 - 1, v2)
        self.assertEqual(v0 + 1, v1)
        mgr.update(src2)
        get_transaction().commit()
        self.assertRaises(AttributeError, getattr, inst, "n")

    def testModules(self):
        foomgr = PersistentModuleManager(self.registry)
        foomgr.new("foo", foo_src)
        # quux has a copy of foo.x
        quuxmgr = PersistentModuleManager(self.registry)
        quuxmgr.new("quux", quux_src)
        # bar has a reference to foo
        barmgr = PersistentModuleManager(self.registry)
        barmgr.new("bar", "import foo")
        # baz has reference to f and copy of x,
        # remember the the global x in f is looked up in foo
        bazmgr = PersistentModuleManager(self.registry)
        bazmgr.new("baz", "from foo import *")
        import foo, bar, baz, quux
        self.assert_(foo._p_oid is None)
        get_transaction().commit()
        self.assert_(foo._p_oid)
        self.assert_(bar._p_oid)
        self.assert_(baz._p_oid)
        self.assert_(quux._p_oid)
        self.assertEqual(foo.f(4), 5)
        self.assertEqual(bar.foo.f(4), 5)
        self.assertEqual(baz.f(4), 5)
        self.assertEqual(quux.f(4), 5)
        self.assert_(foo.f is bar.foo.f)
        self.assert_(foo.f is baz.f)
        foo.x = 42
        self.assertEqual(quux.f(4), 5)
        get_transaction().commit()
        self.assertEqual(quux.f(4), 5)
        foo._p_deactivate()
        # foo is deactivated, which means its dict is empty when f()
        # is activated, how do we guarantee that foo is also
        # activated?
        self.assertEqual(baz.f(4), 46)
        self.assertEqual(bar.foo.f(4), 46)
        self.assertEqual(foo.f(4), 46)

    def testFunctionAttrs(self):
        mgr = PersistentModuleManager(self.registry)
        mgr.new("foo", foo_src)
        import foo
        A = foo.f.attr = "attr"
        self.assertEqual(foo.f.attr, A)
        get_transaction().commit()
        self.assertEqual(foo.f.attr, A)
        foo.f._p_deactivate()
        self.assertEqual(foo.f.attr, A)
        del foo.f.attr
        self.assertRaises(AttributeError, getattr, foo.f, "attr")
        foo.f.func_code

    def testFunctionSideEffects(self):
        mgr = PersistentModuleManager(self.registry)
        mgr.new("effect", side_effect_src)
        import effect
        effect.inc()
        get_transaction().commit()
        effect.inc()
        self.assert_(effect._p_changed)

    def testBuiltins(self):
        mgr = PersistentModuleManager(self.registry)
        mgr.new("test", builtin_src)
        get_transaction().commit()
        import test
        self.assertEqual(test.f(), len(test.x))
        test._p_deactivate()
        self.assertEqual(test.f(), len(test.x))

    def testNested(self):
        mgr = PersistentModuleManager(self.registry)
        mgr.new("nested", nested_src)
        get_transaction().commit()
        import nested
        self.assertEqual(nested.g(5), 8)

    def testLambda(self):
        mgr = PersistentModuleManager(self.registry)
        # test a lambda that contains another lambda as a default
        src = "f = lambda x, y = lambda: 1: x + y()"
        mgr.new("test", src)
        get_transaction().commit()
        import test
        self.assertEqual(test.f(1), 2)

##    def testClosure(self):
##        # This test causes a seg fault because ???
##        self.importer.module_from_source("closure", closure_src)
##        get_transaction().commit()
##        import closure
##        self.assertEqual(closure.inc(5), 6)
##        closure._p_deactivate()
##        self.assertEqual(closure.inc(5), 6)

    def testClass(self):
        mgr = PersistentModuleManager(self.registry)
        mgr.new("foo", src)
        get_transaction().commit()
        import foo
        obj = foo.Foo()
        obj.m()
        self.root["m"] = obj
        get_transaction().commit()
        foo._p_deactivate()
        o = foo.Foo()
        i = o.m()
        j = o.m()
        self.assertEqual(i + 1, j)

    def testPackage(self):
        mgr = PersistentModuleManager(self.registry)
        mgr.new("A.B.C", "def f(x): return x")
        get_transaction().commit()

        import A.B.C
        self.assert_(isinstance(A, PersistentPackage))
        self.assertEqual(A.B.C.f("A"), "A")

        mgr = PersistentModuleManager(self.registry)
        self.assertRaises(ValueError,
                          mgr.new, "A.B", "def f(x): return x + 1")

        mgr = PersistentModuleManager(self.registry)
        mgr.new("A.B.D", "def f(x): return x")
        get_transaction().commit()

        from A.B import D
        self.assert_(hasattr(A.B.D, "f"))

    def testPackageInit(self):
        mgr = PersistentModuleManager(self.registry)
        mgr.new("A.B.C", "def f(x): return x")
        get_transaction().commit()

        import A.B.C

        mgr = PersistentModuleManager(self.registry)
        mgr.new("A.B.__init__", "x = 2")
        get_transaction().commit()

        import A.B
        self.assert_(hasattr(A.B, "C"))
        self.assertEqual(A.B.x, 2)

        mgr = PersistentModuleManager(self.registry)
        self.assertRaises(ValueError,
                          mgr.new, "A.__init__.D", "x = 2")

    def testPackageRelativeImport(self):
        mgr = PersistentModuleManager(self.registry)
        mgr.new("A.B.C", "def f(x): return x")
        get_transaction().commit()

        mgr = PersistentModuleManager(self.registry)
        mgr.new("A.Q", "from B.C import f")
        get_transaction().commit()

        import A.Q
        self.assertEqual(A.B.C.f, A.Q.f)

        mgr.update("import B.C")
        get_transaction().commit()

        self.assertEqual(A.B.C.f, A.Q.B.C.f)

        try:
            import A.B.Q
        except ImportError:
            pass

    def testImportAll(self):
        mgr = PersistentModuleManager(self.registry)
        mgr.new("A.B.C", """__all__ = ["a", "b"]; a, b, c = 1, 2, 3""")
        get_transaction().commit()

        d = {}
        exec "from A.B.C import *" in d
        self.assertEqual(d['a'], 1)
        self.assertEqual(d['b'], 2)
        self.assertRaises(KeyError, d.__getitem__, "c")

        mgr = PersistentModuleManager(self.registry)
        mgr.new("A.B.D", "from C import *")
        get_transaction().commit()

        import A.B.D
        self.assert_(hasattr(A.B.D, "a"))
        self.assert_(hasattr(A.B.D, "b"))
        self.assert_(not hasattr(A.B.D, "c"))

        mgr = PersistentModuleManager(self.registry)
        mgr.new("A.__init__", """__all__ = ["B", "F"]""")
        get_transaction().commit()

        mgr = PersistentModuleManager(self.registry)
        mgr.new("A.F", "spam = 1")
        get_transaction().commit()

        import A
        self.assertEqual(A.F.spam, 1)

class TestModuleReload(unittest.TestCase):
    """Test reloading of modules"""

    def setUp(self):
        self.storage = MappingStorage()
        self.open()
        _dir, _file = os.path.split(tests.__file__)
        self._pmtest = os.path.join(_dir, "_pmtest.py")

    def open(self):
        # open a new db and importer from the storage
        self.db = ZODB.DB.DB(self.storage)
        self.root = self.db.open().root()
        self.registry = self.root.get("registry")
        if self.registry is None:
            self.root["registry"] = self.registry = PersistentModuleRegistry()
        self.importer = TestPersistentModuleImporter(self.registry)
        self.importer.install()
        get_transaction().commit()

    def close(self):
        self.importer.uninstall()
        self.db.close()

    def testModuleReload(self):
        mgr = PersistentModuleManager(self.registry)
        mgr.new("pmtest", open(self._pmtest).read())
        get_transaction().commit()
        import pmtest
        pmtest._p_deactivate()
        self.assertEqual(pmtest.a, 1)
        pmtest.f(4)
        self.close()
        pmtest._p_deactivate()
        self.importer.uninstall()
        self.open()
        del pmtest
        import pmtest

    def testClassReload(self):
        mgr = PersistentModuleManager(self.registry)
        mgr.new("foo", src)
        get_transaction().commit()
        import foo
        obj = foo.Foo()
        obj.m()
        self.root["d"] = d = PersistentDict()
        d["m"] = obj
        get_transaction().commit()
        self.close()
        foo._p_deactivate()
        self.importer.uninstall()
        self.open()
        del foo
        import foo

def test_suite():
    s = unittest.TestSuite()
    for klass in TestModule, TestModuleReload:
        s.addTest(unittest.makeSuite(klass))
    return s

src = """\
class Foo(object):
    def __init__(self):
        self.x = id(self)
    def m(self):
        self.x += 1
        return self.x
    def n(self):
        self.x -= 1
        return self.x
"""

src2 = """\
class Foo(object):
    def __init__(self):
        self.x = 0
    def m(self):
        self.x += 10
        return self.x
"""


=== Added File Zope3/src/zodb/code/tests/test_patch.py ===
from zodb.code.patch import NameFinder, convert
from persistence.tests import ATestModule

import unittest

class TestNameFinder(unittest.TestCase):

    def testNameFinder(self):
        nf = NameFinder(ATestModule)
        names = nf.names()
        for name in ("Foo", "Bar", "aFunc", "anotherFunc",
                     "Foo.meth", "Foo.Nested", "Bar.bar",
                     "Foo.Nested.bar"):
            self.assert_(name in names)
        for name in ("aFunc.nestedFunc", "anotherFunc.NotFound"):
            self.assert_(name not in names)

class TestPatch(unittest.TestCase):

    def testPatch(self):
        moddict = ATestModule.__dict__
        convert(ATestModule, {})
        newdict = ATestModule.__dict__
        
        L1 = moddict.keys()
        L2 = newdict.keys()
        L1.sort()
        L2.sort()
        self.assertEqual(L1, L2)

        self.assertEqual(ATestModule.__dict__, ATestModule.aFunc.func_globals)

def test_suite():
    s = unittest.TestSuite()
    for c in TestNameFinder, TestPatch:
        s.addTest(unittest.makeSuite(c))
    return s