[Zope3-checkins] CVS: Zope3/src/zodb/tests - __init__.py:1.1.2.1 test_connection.py:1.1.2.1 test_timestamp.py:1.1.2.1 test_txn.py:1.1.2.1 test_utils.py:1.1.2.1 test_zodb.py:1.1.2.1 undo.py:1.1.2.1

Jim Fulton jim@zope.com
Mon, 23 Dec 2002 14:30:53 -0500


Update of /cvs-repository/Zope3/src/zodb/tests
In directory cvs.zope.org:/tmp/cvs-serv19908/zodb/tests

Added Files:
      Tag: NameGeddon-branch
	__init__.py test_connection.py test_timestamp.py test_txn.py 
	test_utils.py test_zodb.py undo.py 
Log Message:
Initial renaming before debugging

=== Added File Zope3/src/zodb/tests/__init__.py ===
#
# This file is necessary to make this directory a package.


=== Added File Zope3/src/zodb/tests/test_connection.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
# 
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
# 
##############################################################################
import unittest

from persistence import Persistent
from transaction.tests.abstestIDataManager import IDataManagerTests

from zodb.storage.mapping import DB
from zodb.ztransaction import Transaction

class P(Persistent):
    pass

class ConnectionTests(IDataManagerTests):

    def setUp(self):
        self.db = DB()
        self.datamgr = self.db.open()
        self.obj = P()
        self.txn_factory = Transaction

    def get_transaction(self):
        t = super(ConnectionTests, self).get_transaction()
        t.setUser('IDataManagerTests')
        t.note('dummy note')
        return t

    def test_cacheGC(self):
        self.datamgr.cacheGC()
    
    def tearDown(self):
        self.datamgr.close()
        self.db.close()

def test_suite():
    return unittest.makeSuite(ConnectionTests)


=== Added File Zope3/src/zodb/tests/test_timestamp.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
# 
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
# 
##############################################################################
"""Test the TimeStamp utility type"""

import time
import unittest

from zodb.timestamp import TimeStamp

EPSILON = 0.000001

class TimeStampTests(unittest.TestCase):

    def checkYMDTimeStamp(self):
        self._check_ymd(2001, 6, 3)

    def _check_ymd(self, yr, mo, dy):
        ts = TimeStamp(yr, mo, dy)
        self.assertEqual(ts.year(), yr)
        self.assertEqual(ts.month(), mo)
        self.assertEqual(ts.day(), dy)

        self.assertEquals(ts.hour(), 0)
        self.assertEquals(ts.minute(), 0)
        self.assertEquals(ts.second(), 0)

        t = time.gmtime(ts.timeTime())
        self.assertEquals(yr, t[0])
        self.assertEquals(mo, t[1])
        self.assertEquals(dy, t[2])

    def checkFullTimeStamp(self):
        t = time.gmtime()
        ts = TimeStamp(*t[:6])

        # XXX floating point comparison
        self.assertEquals(ts.timeTime() + time.timezone, time.mktime(t))

        self.assertEqual(ts.year(), t[0])
        self.assertEqual(ts.month(), t[1])
        self.assertEqual(ts.day(), t[2])

        self.assertEquals(ts.hour(), t[3])
        self.assertEquals(ts.minute(), t[4])
        self.assert_(abs(ts.second() - t[5]) < EPSILON)

    def checkRawTimestamp(self):
        t = time.gmtime()
        ts1 = TimeStamp(*t[:6])
        ts2 = TimeStamp(ts1.raw())

        self.assertEquals(ts1, ts2)
        self.assertEquals(ts1.timeTime(), ts2.timeTime())
        self.assertEqual(ts1.year(), ts2.year())
        self.assertEqual(ts1.month(), ts2.month())
        self.assertEqual(ts1.day(), ts2.day())
        self.assertEquals(ts1.hour(), ts2.hour())
        self.assertEquals(ts1.minute(), ts2.minute())
        self.assert_(abs(ts1.second() - ts2.second()) < EPSILON)

    def checkDictKey(self):
        t = time.gmtime()
        ts1 = TimeStamp(*t[:6])
        ts2 = TimeStamp(2000, *t[1:6])

        d = {}
        d[ts1] = 1
        d[ts2] = 2

        self.assertEquals(len(d), 2)

    def checkCompare(self):
        ts1 = TimeStamp(1972, 6, 27)
        ts2 = TimeStamp(1971, 12, 12)
        self.assert_(ts1 > ts2)
        self.assert_(ts2 <= ts1)

    def checkLaterThan(self):
        # XXX what does laterThan() do?
        t = time.gmtime()
        ts = TimeStamp(*t[:6])
        ts2 = ts.laterThan(ts)
        self.assert_(ts2 > ts)

    # XXX should test for bogus inputs to TimeStamp constructor

    def checkTimeStamp(self):
        # Alternate test suite
        t = TimeStamp(2002, 1, 23, 10, 48, 5) # GMT
##        self.assertEquals(str(t), '2002-01-23 10:48:05.000000')
        self.assertEquals(t.raw(), '\x03B9H\x15UUU')
        self.assertEquals(TimeStamp('\x03B9H\x15UUU'), t)
        self.assertEquals(t.year(), 2002)
        self.assertEquals(t.month(), 1)
        self.assertEquals(t.day(), 23)
        self.assertEquals(t.hour(), 10)
        self.assertEquals(t.minute(), 48)
        self.assertEquals(round(t.second()), 5)
        self.assertEquals(t.timeTime(), 1011782885)
        t1 = TimeStamp(2002, 1, 23, 10, 48, 10)
##        self.assertEquals(str(t1), '2002-01-23 10:48:10.000000')
        self.assert_(t == t)
        self.assert_(t != t1)
        self.assert_(t < t1)
        self.assert_(t <= t1)
        self.assert_(t1 >= t)
        self.assert_(t1 > t)
        self.failIf(t == t1)
        self.failIf(t != t)
        self.failIf(t > t1)
        self.failIf(t >= t1)
        self.failIf(t1 < t)
        self.failIf(t1 <= t)
        self.assertEquals(cmp(t, t), 0)
        self.assertEquals(cmp(t, t1), -1)
        self.assertEquals(cmp(t1, t), 1)
        self.assertEquals(t1.laterThan(t), t1)
        self.assert_(t.laterThan(t1) > t1)
        self.assertEquals(TimeStamp(2002,1,23), TimeStamp(2002,1,23,0,0,0))

def test_suite():
    return unittest.makeSuite(TimeStampTests, 'check')


=== Added File Zope3/src/zodb/tests/test_txn.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
# 
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
# 
##############################################################################
"""High-level tests of the transaction interface"""

import os
import tempfile
import unittest

from transaction import get_transaction

import zodb
from zodb.db import DB
from zodb.storage.file import FileStorage
from zodb.storage.tests.minpo import MinPO
from zodb.interfaces import RollbackError

class TransactionTestBase(unittest.TestCase):

    def setUp(self):
        self.fs_path = tempfile.mktemp()
        self.fs = FileStorage(self.fs_path)
        db = DB(self.fs)
        conn = db.open()
        self.root = conn.root()

    def tearDown(self):
        get_transaction().abort()
        self.fs.close()
        for ext in '', '.index', '.lock', '.tmp':
            path = self.fs_path + ext
            if os.path.exists(path):
                os.unlink(path)

class BasicTests:

    def testSingleCommit(self, subtrans=None):
        self.root["a"] = MinPO("a")
        if subtrans:
            get_transaction().savepoint()
        else:
            get_transaction().commit()
        self.assertEqual(self.root["a"].value, "a")

    def testMultipleCommits(self, subtrans=None):
        a = self.root["a"] = MinPO("a")
        get_transaction().commit()
        a.extra_attr = MinPO("b")
        if subtrans:
            get_transaction().savepoint()
        else:
            get_transaction().commit()
        del a
        self.assertEqual(self.root["a"].value, "a")
        self.assertEqual(self.root["a"].extra_attr, MinPO("b"))

    def testCommitAndAbort(self, subtrans=None):
        a = self.root["a"] = MinPO("a")
        if subtrans:
            get_transaction().savepoint()
        else:
            get_transaction().commit()
        a.extra_attr = MinPO("b")
        get_transaction().abort()
        del a
        if subtrans:
            self.assert_("a" not in self.root)
        else:
            self.assertEqual(self.root["a"].value, "a")
            self.assert_(not hasattr(self.root["a"], 'extra_attr'))

class SubtransTests(BasicTests):

    def wrap_test(self, klass, meth_name):
        unbound_method = getattr(klass, meth_name)
        unbound_method(self, 1)
        get_transaction().commit() 
                            
    testSubSingleCommit = lambda self:\
                           self.wrap_test(BasicTests, "testSingleCommit")

    testSubMultipleCommits = lambda self:\
                              self.wrap_test(BasicTests,
                                             "testMultipleCommits")

    testSubCommitAndAbort = lambda self:\
                             self.wrap_test(BasicTests,
                                            "testCommitAndAbort")

class AllTests(TransactionTestBase, SubtransTests):

    def testSavepointAndRollback(self):
        self.root["a"] = MinPO()
        rb1 = get_transaction().savepoint()
        self.root["b"] = MinPO()
        rb2 = get_transaction().savepoint()
        self.assertEqual(len(self.root), 2)
        self.assert_("a" in self.root)
        self.assert_("b" in self.root)

        rb2.rollback()
        self.assertEqual(len(self.root), 1)
        self.assert_("a" in self.root)
        self.assert_("b" not in self.root)

        self.root["c"] = MinPO()
        rb3 = get_transaction().savepoint()
        self.root["d"] = MinPO()
        rb4 = get_transaction().savepoint()
        rb3.rollback()
        self.assertRaises(RollbackError, rb4.rollback)

        self.root["e"] = MinPO()
        rb5 = get_transaction().savepoint()
        self.root["f"] = MinPO()
        rb6 = get_transaction().savepoint()
        self.root["g"] = MinPO()
        rb6.rollback()
        self.root["h"] = MinPO()
        self.assertEqual(len(self.root), 3)
        for name in "a", "e", "h":
            self.assert_(name in self.root)
        for name in "b", "c", "d", "f", "g":
            self.assert_(name not in self.root)

def test_suite():
    return unittest.makeSuite(AllTests)

def main():
    tests = test_suite()
    runner = unittest.TextTestRunner()
    runner.run(tests)

if __name__ == "__main__":
    main()


=== Added File Zope3/src/zodb/tests/test_utils.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
# 
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
# 
##############################################################################
"""Test the routines to convert between long and 64-bit strings"""

import random
import unittest

NUM = 100

from zodb.utils import p64, u64

class TestUtils(unittest.TestCase):

    small = [random.randrange(1, 1L<<32, int=long)
             for i in range(NUM)]
    large = [random.randrange(1L<<32, 1L<<64, int=long)
             for i in range(NUM)]
    all = small + large
    
    def checkLongToStringToLong(self):
        for num in self.all:
            s = p64(num)
            n2 = u64(s)
            self.assertEquals(num, n2, "u64() failed")

    def checkKnownConstants(self):
        self.assertEquals("\000\000\000\000\000\000\000\001", p64(1))
        self.assertEquals("\000\000\000\001\000\000\000\000", p64(1L<<32))
        self.assertEquals(u64("\000\000\000\000\000\000\000\001"), 1)
        self.assertEquals(u64("\000\000\000\001\000\000\000\000"), 1L<<32)

def test_suite():
    return unittest.makeSuite(TestUtils, 'check')
            
if __name__ == "__main__":
    loader = unittest.TestLoader()
    loader.testMethodPrefix = "check"
    unittest.main(testLoader=loader)
    


=== Added File Zope3/src/zodb/tests/test_zodb.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
# 
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
# 
##############################################################################
import os
import unittest
import tempfile

import ZODB.DB, ZODB.FileStorage
from zodb.utils import u64
from zodb.tests.undo import TransactionalUndoDB
from persistence.dict import PersistentDict
from transaction import get_transaction

_fsname = tempfile.mktemp() + ".fs"

class ExportImportTests:

    def duplicate(self, abort_it, dup_name):
        conn = self._db.open()
        try:
            root = conn.root()
            ob = root['test']
            self.assert_(len(ob) > 10, 'Insufficient test data')
            try:
                f = tempfile.TemporaryFile()
                ob._p_jar.exportFile(ob._p_oid, f)
                self.assert_(f.tell() > 0, 'Did not export correctly')
                f.seek(0)
                new_ob = ob._p_jar.importFile(f)
                root[dup_name] = new_ob
                f.close()
                if abort_it:
                    get_transaction().abort()
                else:
                    get_transaction().commit()
            except Exception, err:
                get_transaction().abort()
                raise
        finally:
            conn.close()

    def verify(self, abort_it, dup_name):
        get_transaction().begin()
        # Verify the duplicate.
        conn = self._db.open()
        try:
            root = conn.root()
            ob = root['test']
            try:
                ob2 = root[dup_name]
            except KeyError:
                if abort_it:
                    # Passed the test.
                    return
                else:
                    raise
            else:
                if abort_it:
                    oid = ob2._p_oid
                    if oid is not None:
                        oid = u64(oid)
                    print oid, ob2.__class__, ob2._p_state
                    print ob2
                    self.fail("Did not abort duplication")
            l1 = list(ob.items())
            l1.sort()
            l2 = list(ob2.items())
            l2.sort()
            l1 = [(k, v[0]) for k, v in l1]
            l2 = [(k, v[0]) for k, v in l2]
            self.assertEqual(l1, l2, 'Duplicate did not match')
            self.assert_(ob._p_oid != ob2._p_oid, 'Did not duplicate')
            self.assertEqual(ob._p_jar, ob2._p_jar, 'Not same connection')
            oids = {}
            for v in ob.values():
                oids[v._p_oid] = 1
            for v in ob2.values():
                self.assert_(v._p_oid not in oids,
                             'Did not fully separate duplicate from original')
            get_transaction().commit()
        finally:
            conn.close()

    def checkDuplicate(self, abort_it=False, dup_name='test_duplicate'):
        self.populate()
        get_transaction().begin()
        get_transaction().note('duplication')
        self.duplicate(abort_it, dup_name)
        self.verify(abort_it, dup_name)

    def checkDuplicateAborted(self):
        self.checkDuplicate(abort_it=True, dup_name='test_duplicate_aborted')

class ZODBTests(ExportImportTests, TransactionalUndoDB,
                unittest.TestCase):

    def setUp(self):
        self._db = ZODB.FileStorage.DB(_fsname, create=1)
        self._conn = self._db.open()
        self._root = self._conn.root()

    def populate(self):
        get_transaction().begin()
        conn = self._db.open()
        root = conn.root()
        root['test'] = pm = PersistentDict()
        for n in range(100):
            pm[n] = PersistentDict({0: 100 - n})
        get_transaction().note('created test data')
        get_transaction().commit()
        conn.close()

    def checkModifyGhost(self):
        self.populate()
        root = self._db.open().root()
        o = root["test"][5]
        o._p_activate()
        o._p_deactivate()
        o.anattr = "anattr"
        self.assert_(o._p_changed)
        get_transaction().commit()
        self.assert_(not o._p_changed)
        del o._p_changed
        self.assertEqual(o.anattr, "anattr")

    def tearDown(self):
        self._db.close()
        for ext in '', '.old', '.tmp', '.lock', '.index':
            path = _fsname + ext
            if os.path.exists(path):
                os.remove(path)

def test_suite():
    return unittest.makeSuite(ZODBTests, 'check')

if __name__=='__main__':
    unittest.TextTestRunner().run(test_suite())


=== Added File Zope3/src/zodb/tests/undo.py === (467/567 lines abridged)
"""Check transactional undo performed via the database."""

import time
import unittest

from zodb import POSException
from zodb.ztransaction import Transaction
from zodb.utils import u64, p64, z64
from zodb.db import DB
from zodb.storage.tests.minpo import MinPO
from zodb.storage.tests.base import zodb_pickle, zodb_unpickle

from persistence import Persistent
from transaction import get_transaction

class C(Persistent):
    pass

class TransactionalUndoDB(unittest.TestCase):

    def checkSimpleTransactionalUndo(self):
        obj = MinPO(23)
        self._root["obj"] = obj
        get_transaction().note("23")
        get_transaction().commit()

        obj.value = 24
        get_transaction().note("24")
        get_transaction().commit()
        
        obj.value = 25
        get_transaction().note("25")
        get_transaction().commit()

        info = self._db.undoInfo()
        tid = info[0]['id']
        self._db.undo(tid)
        get_transaction().commit()
        self._conn.sync()
        self.assertEqual(obj.value, 24)

        info = self._db.undoInfo()
        tid = info[2]['id']
        self._db.undo(tid)
        get_transaction().commit()
        self._conn.sync()
        self.assertEqual(obj.value, 23)

        # Undo object creation
        info = self._db.undoInfo()

[-=- -=- -=- 467 lines omitted -=- -=- -=-]

                tid = info[base + j]['id']
                s.transactionalUndo(tid, t)
            s.tpc_vote(t)
            s.tpc_finish(t)
        
        for i in range(BATCHES):
            undo(i)

        # There are now (2 + OBJECTS) * BATCHES transactions:
        #     BATCHES original transactions, followed by
        #     OBJECTS * BATCHES modifications, followed by
        #     BATCHES undos

        iter = s.iterator()
        offset = 0

        eq = self.assertEqual

        for i in range(BATCHES):
            txn = iter[offset]
            offset += 1
            
            tid = p64(i + 1)
            eq(txn.tid, tid)

            L1 = [(rec.oid, rec.serial, rec.data_txn) for rec in txn]
            L2 = [(oid, revid, None) for _tid, oid, revid in orig
                  if _tid == tid]
            
            eq(L1, L2)

        for i in range(BATCHES * OBJECTS):
            txn = iter[offset]
            offset += 1
            eq(len([rec for rec in txn if rec.data_txn is None]), 1)

        for i in range(BATCHES):
            txn = iter[offset]
            offset += 1

            # The undos are performed in reverse order.
            otid = p64(BATCHES - i)
            L1 = [(rec.oid, rec.data_txn) for rec in txn]
            L2 = [(oid, otid) for _tid, oid, revid in orig
                  if _tid == otid]
            L1.sort()
            L2.sort()
            eq(L1, L2)

        self.assertRaises(IndexError, iter.__getitem__, offset)