[Zope-CVS] CVS: Products/Ape/lib/apelib/tests - __init__.py:1.1 correct.png:1.1 serialtestbase.py:1.1 testall.py:1.1 testimpl.py:1.1 testparams.py:1.1 testserialization.py:1.1 testsqlimpl.py:1.1 teststorage.py:1.1 testzope2fs.py:1.1 testzope2sql.py:1.1 zope2testbase.py:1.1

Shane Hathaway shane@zope.com
Wed, 9 Apr 2003 23:09:59 -0400


Update of /cvs-repository/Products/Ape/lib/apelib/tests
In directory cvs.zope.org:/tmp/cvs-serv32010/lib/apelib/tests

Added Files:
	__init__.py correct.png serialtestbase.py testall.py 
	testimpl.py testparams.py testserialization.py testsqlimpl.py 
	teststorage.py testzope2fs.py testzope2sql.py zope2testbase.py 
Log Message:
Moved apelib into a "lib" subdirectory.  This simplified the
Python hacking required to make apelib a top-level package.  Sorry
about the flood of checkins, but CVS makes a move like this quite painful.


=== Added File Products/Ape/lib/apelib/tests/__init__.py ===
##############################################################################
#
# Copyright (c) 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""apelib tests package

$Id: __init__.py,v 1.1 2003/04/10 03:09:57 shane Exp $
"""



=== Added File Products/Ape/lib/apelib/tests/correct.png ===
  <Binary-ish file>

=== Added File Products/Ape/lib/apelib/tests/serialtestbase.py ===
##############################################################################
#
# Copyright (c) 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Serialization test setup/teardown

$Id: serialtestbase.py,v 1.1 2003/04/10 03:09:57 shane Exp $
"""

import ZODB
from Persistence import PersistentMapping
from cPickle import dumps, loads

from apelib.core.classifiers import FixedClassifier
from apelib.core.interfaces import ISerializer
from apelib.core.gateways import CompositeGateway, MappingGateway
from apelib.core.mapper import Mapper
from apelib.core.schemas import RowSequenceSchema
from apelib.core.serializers import CompositeSerializer, StringDataAttribute
from apelib.zodb3.serializers \
     import FixedPersistentMapping, RollCall, RemainingState


class SimpleItemsSerializer:

    __implements__ = ISerializer

    schema = RowSequenceSchema()
    schema.addField('name')
    schema.addField('pickle')

    def getSchema(self):
        return self.schema

    def serialize(self, object, event):
        res = []
        for k, v in object.items():
            res.append((k, dumps(v)))
            event.notifySerialized(k, v, 0)
        res.sort()
        event.ignoreAttribute('data')
        event.ignoreAttribute('_container')
        return res

    def deserialize(self, object, event, state):
        d = {}
        for k, v in state:
            o = loads(v)
            d[k] = o
            event.notifyDeserialized(k, o)
        object.__init__(d)


class SerialTestBase:

    def setUp(self):
        classifier = FixedClassifier()
        classifier.register('test', 'test_mapper')
        classifier.register('test2', 'test_mapper_2')

        ser2 = CompositeSerializer('Persistence', 'PersistentMapping')
        fixed_items_serializer = FixedPersistentMapping()
        fixed_items_serializer.add('TestRoot', ('test',), ('test_mapper',))
        fixed_items_serializer.add('TestRoot2', ('test2',), ('test_mapper_2',))
        ser2.addSerializer('fixed_items', fixed_items_serializer)
        ser2.addSerializer('roll_call', RollCall())

        root_mapper = Mapper(None, ser2, CompositeGateway(),
                                       classifier)
        self.root_mapper = root_mapper

        # Create "test_mapper", which allows a "strdata" attribute.

        ser1 = CompositeSerializer('Persistence', 'PersistentMapping')
        items_serializer = SimpleItemsSerializer()
        ser1.addSerializer('items', items_serializer)
        props_serializer = StringDataAttribute('strdata')
        ser1.addSerializer('properties', props_serializer)
        ser1.addSerializer('roll_call', RollCall())

        gw1 = CompositeGateway()
        items_gw = MappingGateway(items_serializer.getSchema())
        self.items_gw = items_gw
        gw1.addGateway('items', items_gw)
        props_gw = MappingGateway(props_serializer.getSchema())
        self.props_gw = props_gw
        gw1.addGateway('properties', props_gw)

        om1 = Mapper(root_mapper, ser1, gw1)
        self.om1 = om1
        root_mapper.addSubMapper('test_mapper', om1)

        # Create "test_mapper_2", which stores a remainder.

        ser = CompositeSerializer('Persistence', 'PersistentMapping')
        items_serializer = SimpleItemsSerializer()
        ser.addSerializer('items', items_serializer)
        remainder_serializer = RemainingState()
        ser.addSerializer('remainder', remainder_serializer)

        gw = CompositeGateway()
        items_gw = MappingGateway(items_serializer.getSchema())
        gw.addGateway('items', items_gw)
        remainder_gw = MappingGateway(remainder_serializer.getSchema())
        gw.addGateway('remainder', remainder_gw)

        om = Mapper(root_mapper, ser, gw)
        root_mapper.addSubMapper('test_mapper_2', om)


    def tearDown(self):
        pass



=== Added File Products/Ape/lib/apelib/tests/testall.py ===
##############################################################################
#
# Copyright (c) 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Run all unit tests

To run all tests, invoke this script with the PYTHONPATH environment
variable set.  Example:

PYTHONPATH=~/cvs/Zope/lib/python python testAll.py

$Id: testall.py,v 1.1 2003/04/10 03:09:57 shane Exp $
"""

import sys, unittest

try:
    import apelib
except ImportError:
    # The Ape product makes apelib available as a top-level package.
    import Products.Ape
    import apelib

from testserialization import SerializationTests
from testimpl import ApelibImplTests
from teststorage import ApeStorageTests
from testzope2fs import Zope2FSTests, Zope2FSUnderscoreTests
from testparams import ParamsTests

try:
    import psycopg
except ImportError:
    sys.stderr.write('Warning: could not import psycopg.\n')
    sys.stderr.write('Not running the PostgreSQL tests.\n')
else:
    try:
        c = psycopg.connect('')
        c.close()
    except psycopg.DatabaseError:
        sys.stderr.write('Warning: could not open a psycopg connection.\n')
        sys.stderr.write('Not running the PostgreSQL tests.\n')
    else:
        # Run the PostgreSQL tests.
        from testzope2sql import Zope2SQLTests
        from testsqlimpl import ApelibSQLImplTests


if __name__ == '__main__':
    unittest.main()



=== Added File Products/Ape/lib/apelib/tests/testimpl.py ===
##############################################################################
#
# Copyright (c) 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Interface implementation tests

$Id: testimpl.py,v 1.1 2003/04/10 03:09:57 shane Exp $
"""

import os
import unittest
from types import ListType, TupleType

from Interface import Interface
from Interface.Verify import verifyClass


class InterfaceImplChecker:

    def _testObjectImpl(self, c):
        try:
            impl = c.__implements__
            self._verify(impl, c)
        except:
            print '%s incorrectly implements %s' % (repr(c), repr(impl))
            raise

    def _testAllInModule(self, m):
        for attr, value in m.__dict__.items():
            if (hasattr(value, '__implements__') and
                not Interface.isImplementedBy(value)):
                self._testObjectImpl(value)

    def _testAllInPackage(self, p):
        seen = {'__init__': 1}
        for path in p.__path__:
            names = os.listdir(path)
            for name in names:
                base, ext = os.path.splitext(name)
                ext = ext.lower()
                if ext in ('.py', '.pyc', '.pyo'):
                    if seen.has_key(base):
                        continue
                    seen[base] = 1
                    modname = '%s.%s' % (p.__name__, base)
                    m = __import__(modname, {}, {}, ('__doc__',))
                    self._testAllInModule(m)

    def _verify(self, iface, c):
        if isinstance(iface, ListType) or isinstance(iface, TupleType):
            for item in iface:
                self._verify(item, c)
        else:
            verifyClass(iface, c)
            for base in iface.getBases():
                self._verify(base, c)

    
class ApelibImplTests(InterfaceImplChecker, unittest.TestCase):

    def testCoreImplementations(self):
        import apelib.core
        self._testAllInPackage(apelib.core)

    def testZope2Implementations(self):
        import apelib.zope2
        self._testAllInPackage(apelib.zope2)

    def testFSImplementations(self):
        import apelib.fs
        self._testAllInPackage(apelib.fs)

    def testZODB3Implementations(self):
        import apelib.zodb3
        self._testAllInPackage(apelib.zodb3)

if __name__ == '__main__':
    unittest.main()



=== Added File Products/Ape/lib/apelib/tests/testparams.py ===
##############################################################################
#
# Copyright (c) 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Tests of gateway_fs.params

$Id: testparams.py,v 1.1 2003/04/10 03:09:57 shane Exp $
"""

import unittest

from apelib.fs.params import stringToParams, paramsToString


class ParamsTests(unittest.TestCase):

    def testStringToParams(self):
        s = 'abc def="123 456\\n \\"done\\" " ghi=4 j567 \n'
        params = stringToParams(s)
        self.assertEqual(tuple(params), (
            ('abc', ''),
            ('def', '123 456\n "done" '),
            ('ghi', '4'),
            ('j567', ''),
            ))

    def testParamsToString(self):
        params = (
            ('abc', ''),
            ('def', '123 456\n "done" '),
            ('ghi', '4'),
            ('j567', ''),
            )
        s = paramsToString(params)
        self.assertEqual(s, 'abc def="123 456\\n \\"done\\" " ghi="4" j567')

    def testInvalidKeys(self):
        paramsToString((('abc_-09ABC', ''),))
        self.assertRaises(ValueError, paramsToString, (('a bc', ''),))
        self.assertRaises(ValueError, paramsToString, (('a\nbc', ''),))
        self.assertRaises(ValueError, paramsToString, (('', ''),))
        self.assertRaises(ValueError, paramsToString, ((' abc', ''),))
        self.assertRaises(ValueError, paramsToString, (('abc ', ''),))
        self.assertRaises(ValueError, paramsToString, (('a\tbc', ''),))
        self.assertRaises(ValueError, paramsToString, (('a\rbc', ''),))
        self.assertRaises(ValueError, paramsToString, (('a"bc', ''),))
        self.assertRaises(ValueError, paramsToString, (('0abc', ''),))


if __name__ == '__main__':
    unittest.main()



=== Added File Products/Ape/lib/apelib/tests/testserialization.py ===
##############################################################################
#
# Copyright (c) 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Serialization tests

$Id: testserialization.py,v 1.1 2003/04/10 03:09:57 shane Exp $
"""

import unittest

import ZODB
from Persistence import PersistentMapping

from apelib.core.events \
     import LoadEvent, StoreEvent, SerializationEvent, DeserializationEvent
from apelib.core.exceptions import SerializationError
from serialtestbase import SerialTestBase


class SimpleInstance:

    def __init__(self, data):
        self.data = data


class SerializationTests(SerialTestBase, unittest.TestCase):

    def getKeyedObjectSystem(self):
        # XXX works for now
        return None

    def testSerializeAndDeserialize(self):
        ob = PersistentMapping()
        ob.strdata = '345'
        ob['a'] = 'b'
        ob['c'] = 'd'
        kos = self.getKeyedObjectSystem()
        mapper = self.root_mapper.getSubMapper('test_mapper')
        event = SerializationEvent(kos, mapper, ('',), ob)
        full_state = mapper.getSerializer().serialize(ob, event)
        ob2 = PersistentMapping()
        event = DeserializationEvent(kos, mapper, ('',), ob2)
        mapper.getSerializer().deserialize(ob2, event, full_state)
        self.assertEqual(ob.strdata, ob2.strdata)
        self.assertEqual(ob.items(), ob2.items())

    def testStoreAndLoad(self):
        ob = PersistentMapping()
        ob.strdata = '345'
        ob['a'] = 'b'
        ob['c'] = 'd'
        kos = self.getKeyedObjectSystem()
        mapper = self.root_mapper.getSubMapper('test_mapper')
        event = SerializationEvent(kos, mapper, ('',), ob)
        full_state = mapper.getSerializer().serialize(ob, event)
        event = StoreEvent(mapper, ('',))
        mapper.getGateway().store(event, full_state)

        event = LoadEvent(mapper, ('',))
        full_state, serial = mapper.getGateway().load(event)
        ob2 = PersistentMapping()
        event = DeserializationEvent(kos, mapper, ('',), ob2)
        mapper.getSerializer().deserialize(ob2, event, full_state)
        self.assertEqual(ob.strdata, ob2.strdata)
        self.assertEqual(ob.items(), ob2.items())

    def testCatchExtraAttribute(self):
        ob = PersistentMapping()
        ob.strdata = '345'
        ob.extra = '678'
        ob['a'] = 'b'
        ob['c'] = 'd'
        kos = self.getKeyedObjectSystem()
        mapper = self.root_mapper.getSubMapper('test_mapper')
        event = SerializationEvent(kos, mapper, ('',), ob)
        self.assertRaises(SerializationError,
                          mapper.getSerializer().serialize, ob, event)

    def testSharedAttribute(self):
        # Test of an attribute shared between a normal serializer and
        # a remainder serializer.
        ob = PersistentMapping()
        data = SimpleInstance('This is a shared piece of data')
        ob.extra = data
        ob['a'] = data
        kos = self.getKeyedObjectSystem()
        mapper = self.root_mapper.getSubMapper('test_mapper_2')
        event = SerializationEvent(kos, mapper, ('',), ob)
        full_state = mapper.getSerializer().serialize(ob, event)
        event = StoreEvent(mapper, ('',))
        mapper.getGateway().store(event, full_state)

        # Now load the state into a different object
        event = LoadEvent(mapper, ('',))
        full_state, serial = mapper.getGateway().load(event)
        ob2 = PersistentMapping()
        event = DeserializationEvent(kos, mapper, ('',), ob2)
        mapper.getSerializer().deserialize(ob2, event, full_state)
        self.assertEqual(ob.extra.data, ob2.extra.data)
        self.assertEqual(ob.keys(), ob2.keys())

        # Check that both see the *same* object
        self.assert_(ob2['a'] is ob2.extra, (ob2['a'], ob2.extra))
        self.assert_(ob2['a'] is not data)  # Verify it didn't cheat somehow


if __name__ == '__main__':
    unittest.main()


=== Added File Products/Ape/lib/apelib/tests/testsqlimpl.py ===
##############################################################################
#
# Copyright (c) 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Interface implementation tests

$Id: testsqlimpl.py,v 1.1 2003/04/10 03:09:57 shane Exp $
"""

import unittest

from testimpl import InterfaceImplChecker


class ApelibSQLImplTests(InterfaceImplChecker, unittest.TestCase):

    def testSQLImplementations(self):
        import apelib.sql
        self._testAllInPackage(apelib.sql)

if __name__ == '__main__':
    unittest.main()



=== Added File Products/Ape/lib/apelib/tests/teststorage.py ===
##############################################################################
#
# Copyright (c) 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Storage tests (with data stored in simple mappings)

$Id: teststorage.py,v 1.1 2003/04/10 03:09:57 shane Exp $
"""

import unittest
from thread import start_new_thread, allocate_lock

import ZODB
from Persistence import PersistentMapping

from apelib.zodb3.db import ApeDB
from apelib.zodb3.storage import ApeStorage
from apelib.zodb3.resource import StaticResource
from apelib.zodb3.utils import copyOf
from serialtestbase import SerialTestBase


def run_in_thread(f):
    """Calls a function in another thread and waits for it to finish."""
    lock = allocate_lock()
    def run(f=f, lock=lock):
        try:
            f()
        finally:
            lock.release()
    lock.acquire()
    start_new_thread(run, ())
    lock.acquire()
    lock.release()


class ApeStorageTests (SerialTestBase, unittest.TestCase):
    # Tests of ApeStorage and ApeConnection.

    def setUp(self):
        SerialTestBase.setUp(self)
        resource = StaticResource(self.root_mapper)
        storage = ApeStorage(resource)
        self.storage = storage
        db = ApeDB(storage, resource)
        self.db = db

    def tearDown(self):
        self.db.close()
        SerialTestBase.tearDown(self)

    def testStoreAndLoad(self):
        ob = PersistentMapping()
        ob.strdata = '345'
        ob['a'] = 'b'
        ob['c'] = 'd'

        dummy = PersistentMapping()

        conn1 = self.db.open()
        conn2 = None
        conn3 = None
        try:

            # Load the root and create a new object
            root = conn1.root()
            get_transaction().begin()
            root['TestRoot'] = ob
            root['TestRoot2'] = dummy
            get_transaction().commit()
            ob1 = conn1.loadStub(('test',))
            self.assertEqual(ob1.strdata, ob.strdata)
            self.assertEqual(ob1.items(), ob.items())

            # Verify a new object was stored and make a change
            get_transaction().begin()
            conn2 = self.db.open()
            ob2 = conn2.loadStub(('test',))
            self.assertEqual(ob2.strdata, ob.strdata)
            self.assertEqual(ob2.items(), ob.items())
            ob2.strdata = '678'
            get_transaction().commit()

            # Verify the change was stored and make another change
            conn3 = self.db.open()
            ob3 = conn3.loadStub(('test',))
            self.assertEqual(ob3.strdata, '678')
            self.assertEqual(ob3.items(), ob.items())
            ob3.strdata = '901'
            get_transaction().commit()
            conn3.close()
            conn3 = None
            conn3 = self.db.open()
            ob3 = conn3.loadStub(('test',))
            self.assertEqual(ob3.strdata, '901')

            # Verify we didn't accidentally change the original object
            self.assertEqual(ob.strdata, '345')

        finally:
            conn1.close()
            if conn2 is not None:
                conn2.close()
            if conn3 is not None:
                conn3.close()


    def testUnmanaged(self):
        ob = PersistentMapping()
        ob['a'] = 'b'
        ob.stowaway = PersistentMapping()
        ob.stowaway['c'] = 'd'

        dummy = PersistentMapping()
        dummy.strdata = 'foo'

        conn1 = self.db.open()
        conn2 = None
        conn3 = None
        try:

            # Load the root and create a new object
            root = conn1.root()
            get_transaction().begin()
            root['TestRoot'] = dummy
            root['TestRoot2'] = ob
            get_transaction().commit()
            ob1 = conn1.loadStub(('test2',))
            self.assert_(ob1 is ob)
            self.assertEqual(ob1.items(), [('a', 'b')])
            self.assertEqual(ob1.stowaway.items(), [('c', 'd')])

            # Verify a new object was stored
            get_transaction().begin()
            conn2 = self.db.open()
            ob2 = conn2.loadStub(('test2',))
            self.assertEqual(ob2.items(), [('a', 'b')])
            self.assertEqual(ob2.stowaway.items(), [('c', 'd')])

            # Make a change only to the unmanaged persistent object
            # (the "stowaway").
            ob.stowaway['c'] = 'e'
            get_transaction().commit()

            # Verify the change was stored and make a change to the
            # managed persistent object.
            conn3 = self.db.open()
            ob3 = conn3.loadStub(('test2',))
            self.assertEqual(ob3.items(), [('a', 'b')])
            self.assertEqual(ob3.stowaway.items(), [('c', 'e')])
            ob3['a'] = 'z'
            get_transaction().commit()
            conn3.close()
            conn3 = None
            conn3 = self.db.open()
            ob3 = conn3.loadStub(('test2',))
            self.assertEqual(ob3['a'], 'z')
            self.assertEqual(ob3.stowaway.items(), [('c', 'e')])

            # Verify we didn't accidentally change the original object.
            self.assertEqual(ob['a'], 'b')

            # sync and verify the current state.
            conn1.sync()
            self.assertEqual(ob1.items(), [('a', 'z')])
            self.assertEqual(ob1.stowaway.items(), [('c', 'e')])

        finally:
            conn1.close()
            if conn2 is not None:
                conn2.close()
            if conn3 is not None:
                conn3.close()


    def testStoreAndLoadBinary(self):
        ob = PersistentMapping()
        # strdata contains binary characters
        ob.strdata = ''.join([chr(n) for n in range(256)]) * 2

        dummy = PersistentMapping()

        conn1 = self.db.open()
        try:
            root = conn1.root()
            get_transaction().begin()
            root['TestRoot'] = ob
            root['TestRoot2'] = dummy
            get_transaction().commit()
            ob1 = conn1.loadStub(('test',))
            self.assertEqual(ob1.strdata, ob.strdata)
            self.assertEqual(ob1.items(), ob.items())
        finally:
            conn1.close()


    def _writeBasicObjects(self, conn):
        ob = PersistentMapping()
        ob.strdata = 'abc'
        dummy = PersistentMapping()
        root = conn.root()
        get_transaction().begin()
        root['TestRoot'] = ob
        root['TestRoot2'] = dummy
        get_transaction().commit()
        return ob, dummy


    def _changeTestRoot(self):
        conn2 = self.db.open()
        try:
            ob2 = conn2.root()['TestRoot']
            ob2.strdata = 'ghi'
            get_transaction().commit()
        finally:
            conn2.close()


    def testConflictDetection(self):
        conn1 = self.db.open()
        try:
            ob1, dummy = self._writeBasicObjects(conn1)
            ob1.strdata = 'def'
            run_in_thread(self._changeTestRoot)
            # Verify that "def" doesn't get written, since it
            # conflicts with "ghi".
            self.assertRaises(ZODB.POSException.ConflictError,
                              get_transaction().commit)
            self.assertEqual(ob1.strdata, "ghi")
        finally:
            conn1.close()


    def testNewObjectConflictDetection(self):
        # Verify a new object won't overwrite existing objects by accident
        conn1 = self.db.open()
        try:
            ob1, dummy = self._writeBasicObjects(conn1)
            ob1.strdata = 'def'
            conn1.setSerial(ob1, '\0' * 8)  # Pretend that it's new
            self.assertRaises(ZODB.POSException.ConflictError,
                              get_transaction().commit)
        finally:
            conn1.close()


    def testRemainderCyclicReferenceRestoration(self):
        # test whether the remainder pickler properly stores cyclic references
        # back to the object itself.
        ob1 = PersistentMapping()
        ob1.myself = ob1

        dummy = PersistentMapping()
        dummy.strdata = ''

        conn1 = self.db.open()
        try:
            root = conn1.root()
            get_transaction().begin()
            root['TestRoot'] = dummy
            root['TestRoot2'] = ob1
            get_transaction().commit()

            conn2 = self.db.open()
            try:
                ob2 = conn2.root()['TestRoot2']
                self.assert_(ob2.myself is ob2)
                self.assert_(ob2 is not ob1)  # Verify it didn't cheat somehow
            finally:
                conn2.close()
        finally:
            conn1.close()


    def testCopyOf(self):
        # Verifies the functionality of copyOf().
        ob1 = PersistentMapping()
        ob1._p_oid = 'xxx'
        self.assertEqual(ob1._p_oid, 'xxx')  # Precondition
        ob1['fish'] = PersistentMapping()
        ob1['fish']['trout'] = 1
        ob1['fish']['herring'] = 2

        ob2 = copyOf(ob1)
        self.assert_(ob2 is not ob1)
        self.assert_(ob2['fish'] is not ob1['fish'])
        self.assert_(ob2._p_oid is None)
        self.assertEqual(list(ob2.keys()), ['fish'])
        self.assertEqual(len(ob2['fish'].keys()), 2)


    def testCopyOfZClassInstance(self):
        # Verifies that copyOf() can copy instances that look like ZClass
        # instances.
        class weird_class (ZODB.Persistent):
            pass
        weird_class.__module__ = '*IAmAZClassModule'
        self.assertEqual(weird_class.__module__, '*IAmAZClassModule')

        ob1 = PersistentMapping()
        ob1['fishy'] = weird_class()

        ob2 = copyOf(ob1)
        self.assert_(ob2 is not ob1)
        self.assert_(ob2['fishy'] is not ob1['fishy'])
        self.assert_(ob2['fishy'].__class__ is weird_class)


    def test_p_serial_untouched(self):
        # _p_serial isn't safe to use for hashes, since _p_mtime
        # interprets it as a date stamp.  Verify Ape doesn't
        # use _p_serial for hashes.
        conn1 = self.db.open()
        try:
            ob1, dummy = self._writeBasicObjects(conn1)
            self.assertEqual(ob1._p_serial, "\0" * 8)
            self.assertEqual(dummy._p_serial, "\0" * 8)
        finally:
            conn1.close()


    def testGetSerial(self):
        # Verifies the behavior of getSerial().
        conn1 = self.db.open()
        try:
            new_ob = PersistentMapping()
            self.assertEqual(conn1.getSerial(new_ob), '\0' * 8)
            ob1, dummy = self._writeBasicObjects(conn1)
            self.assertNotEqual(conn1.getSerial(ob1), '\0' * 8)
        finally:
            conn1.close()


    def testGetSerialDetectsNewObjects(self):
        # Verifies the behavior of getSerial() and setSerial().
        conn1 = self.db.open()
        try:
            ob1, dummy = self._writeBasicObjects(conn1)
            self.assertNotEqual(conn1.getSerial(ob1), '\0' * 8)
            # Replace the object and verify it gets a new serial.
            ob1 = PersistentMapping()
            ob1.strdata = 'cba'
            ob1._p_oid = conn1.root()['TestRoot']._p_oid
            conn1.root()['TestRoot'] = ob1
            self.assertEqual(conn1.getSerial(ob1), '\0' * 8)
        finally:
            conn1.close()


    def testSerialCleanup(self):
        # Verify that setSerial() cleans up.
        conn1 = self.db.open()
        try:
            conn1.SERIAL_CLEANUP_THRESHOLD = 10
            for n in range(conn1.SERIAL_CLEANUP_THRESHOLD + 1):
                new_ob = PersistentMapping()
                new_ob._p_oid = 'fake_oid_' + str(n)
                old_size = len(conn1._serials or ())
                conn1.setSerial(new_ob, '01234567')
                new_size = len(conn1._serials)
                if new_size < old_size:
                    # Cleaned up.  Success.
                    break
            else:
                self.fail("setSerial() did not clean up")
        finally:
            conn1.close()


if __name__ == '__main__':
    unittest.main()



=== Added File Products/Ape/lib/apelib/tests/testzope2fs.py === (507/607 lines abridged)
##############################################################################
#
# Copyright (c) 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Test of storing folders on the filesystem via ZODB

$Id: testzope2fs.py,v 1.1 2003/04/10 03:09:57 shane Exp $
"""

import os
import sys
from shutil import rmtree
import unittest
from tempfile import mktemp
from cStringIO import StringIO

from ZODB.POSException import ConflictError
from OFS.Image import File, manage_addImage, manage_addFile
from Products.PythonScripts.PythonScript import PythonScript
from Products.PageTemplates.ZopePageTemplate import ZopePageTemplate

from apelib.zodb3.db import ApeDB
from apelib.zodb3.storage import ApeStorage
from apelib.zodb3.resource import StaticResource
from apelib.zope2.fsmapper import createMapper
from apelib.fs.exceptions import FSWriteError
from zope2testbase import Zope2TestBase, Folder


try:
    __file__
except NameError:
    __file__ = os.path.abspath(sys.argv[0])

tmpdir = mktemp()


class Zope2FSTests (unittest.TestCase, Zope2TestBase):

    def _createMapper(self, path):
        return createMapper(path)

[-=- -=- -=- 507 lines omitted -=- -=- -=-]

            f.close()
            self.assert_(data.find(content) >= 0)
        finally:
            conn.close()


    def testDottedNames(self):
        # FSConnection should allow dotted names that don't look like
        # property or remainder files.
        conn = self.db.open()
        try:
            app = conn.root()['Application']
            f = Folder()
            f.id = '.Holidays'
            app._setObject(f.id, f, set_owner=0)
            f2 = Folder()
            f2.id = '.Holidays.properties.dat'
            app._setObject(f2.id, f2, set_owner=0)
            get_transaction().commit()
        finally:
            conn.close()


    def testGuessFileContentType(self):
        # Verify that file content type guessing happens.
        data = '<html><body>Cool stuff</body></html>'
        dir = self.conn._expandPath('/')
        f = open(os.path.join(dir, 'testobj'), 'wt')
        f.write(data)
        f.close()

        conn = self.db.open()
        try:
            app = conn.root()['Application']
            self.assert_(hasattr(app, 'testobj'))
            self.assertEqual(app.testobj.content_type, 'text/html')
        finally:
            conn.close()



class Zope2FSUnderscoreTests (Zope2FSTests):
    
    def _createMapper(self, path):
        return createMapper(path, metadata_prefix='_')


if __name__ == '__main__':
    unittest.main()



=== Added File Products/Ape/lib/apelib/tests/testzope2sql.py ===
##############################################################################
#
# Copyright (c) 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Test of storing objects in relational databases via ZODB

$Id: testzope2sql.py,v 1.1 2003/04/10 03:09:57 shane Exp $
"""

import unittest

from apelib.sql.pg import PsycopgConnection
from apelib.zodb3.db import ApeDB
from apelib.zodb3.storage import ApeStorage
from apelib.zodb3.resource import StaticResource
from apelib.zope2.sqlmapper import createSQLMapper
from zope2testbase import Zope2TestBase


class Zope2SQLTests (unittest.TestCase, Zope2TestBase):

    def openConnection(self):
        return PsycopgConnection('', 'test_temp')

    def setUp(self):
        conn = self.openConnection()
        dm, conns, gws = createSQLMapper(conn)
        self.dm = dm
        self.conns = conns
        self.gws = gws
        resource = StaticResource(dm)
        storage = ApeStorage(resource, conns)
        self.storage = storage
        db = ApeDB(storage, resource)
        self.db = db

    def clear(self):
        for gw in self.gws:
            if hasattr(gw, 'clear'):
                gw.clear()
        for conn in self.conns:
            conn.db.commit()

    def tearDown(self):
        self.clear()
        self.db.close()


if __name__ == '__main__':
    unittest.main()



=== Added File Products/Ape/lib/apelib/tests/zope2testbase.py === (471/571 lines abridged)
##############################################################################
#
# Copyright (c) 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Test of storing folders on the filesystem via ZODB

$Id: zope2testbase.py,v 1.1 2003/04/10 03:09:57 shane Exp $
"""

from cStringIO import StringIO
import time

from Acquisition import aq_base
from ZODB import Persistent, POSException
from Persistence import PersistentMapping
from OFS.Folder import Folder
from OFS.ObjectManager import ObjectManager
from OFS.SimpleItem import SimpleItem
from OFS.Image import manage_addFile
from OFS.DTMLMethod import DTMLMethod
from AccessControl.User import User, UserFolder
from Products.PythonScripts.PythonScript import PythonScript
from Products.ZSQLMethods.SQL import manage_addZSQLMethod
from Products.PageTemplates.ZopePageTemplate import ZopePageTemplate

from apelib.zope2.setup.patches import applySetObPatch


class TestFolder(Folder):

    meta_type = 'Zope2FS Test Folder'

    def __init__(self, title):
        self.title = title


class TestObjectManager(ObjectManager):

    meta_type = 'Zope2FS Test ObjectManager'

    def __init__(self, title):

[-=- -=- -=- 471 lines omitted -=- -=- -=-]

            try:
                # Verify that loading works
                app = conn2.root()['Application']
                f2 = app.Holidays
                user = f2.getOwner()
                self.assertEqual(user.getUserName(), 'shane')
                self.assert_('Elder' in user.getRoles())

                roles = {}
                for role in list(user.getRolesInContext(f2)):
                    if role != 'Authenticated' and role != 'Anonymous':
                        roles[role] = 1
                self.assertEqual(roles, {'Elder':1, 'Missionary':1})
                self.assertEqual(tuple(f2._proxy_roles), ('Manager',))

                roles = {}
                for role in list(f2._View_Permission):
                    roles[role] = 1
                self.assertEqual(roles, {'Elder':1, 'Owner':1})

                # Write some changes to verify that changes work
                f2._owner = None
                del f2._proxy_roles
                f2.__ac_roles__ += ['Teacher']
                get_transaction().commit()
            finally:
                conn2.close()

            # Make sure the changes are seen
            conn.sync()
            self.assert_(f.getOwner() is None, f.getOwner())
            self.assert_(not hasattr(f, '_proxy_roles'))
            self.assertEqual(len(f.__ac_roles__), 3)
            self.assert_('Teacher' in f.__ac_roles__)
        finally:
            conn.close()


    def testModTime(self):
        # Verify _p_mtime is within a reasonable range.
        conn = self.db.open()
        try:
            now = time.time()
            app = conn.root()['Application']
            app.title = 'Testing'
            get_transaction().commit()
            self.assert_(app._p_mtime > now - 10)
            self.assert_(app._p_mtime < now + 10)
        finally:
            conn.close()