[Zope3-checkins] CVS: Zope3/src/zope/fssync/tests - test_fsmerger.py:1.1 test_fsutil.py:1.1 test_error.py:1.2 test_merger.py:1.8

Guido van Rossum guido@python.org
Wed, 14 May 2003 18:16:10 -0400


Update of /cvs-repository/Zope3/src/zope/fssync/tests
In directory cvs.zope.org:/tmp/cvs-serv8605/tests

Modified Files:
	test_error.py test_merger.py 
Added Files:
	test_fsmerger.py test_fsutil.py 
Log Message:
More refactoring.
The new FSMerger class has some unit tests
(though not nearly enough).

=== Added File Zope3/src/zope/fssync/tests/test_fsmerger.py ===
##############################################################################
#
# Copyright (c) 2003 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Tests for the FSMerger class.

$Id: test_fsmerger.py,v 1.1 2003/05/14 22:16:09 gvanrossum Exp $
"""

import os
import shutil
import unittest
import tempfile

from os.path import exists, isdir, isfile, realpath, normcase, split, join

from zope.fssync.fsmerger import FSMerger

class MockMetadata(object):

    def __init__(self):
        self.database = {}

    def getentry(self, filename):
        key, filename = self.makekey(filename)
        if key not in self.database:
            self.database[key] = {}
        return self.database[key]

    def getnames(self, dirpath):
        dirkey, dirpath = self.makekey(dirpath)
        names = []
        for key in self.database:
            head, tail = split(key)
            if head == dirkey:
                names.append(tail)
        return names

    def flush(self):
        pass

    # These only exist for the test framework

    def makekey(self, path):
        path = realpath(path)
        key = normcase(path)
        return key, path

    def setmetadata(self, filename, metadata={}):
        key, filename = self.makekey(filename)
        if key not in self.database:
            self.database[key] = {"path": filename}
        self.database[key].update(metadata)

    def delmetadata(self, filename):
        key, filename = self.makekey(filename)
        if key in self.database:
            del self.database[key]

    def dump(self):
        return dict([(k, v) for (k, v) in self.database.iteritems() if v])

class TestFSMerger(unittest.TestCase):

    def setUp(self):
        # Create a mock metadata database
        self.metadata = MockMetadata()
        # Create a list of temporary names to be removed in tearDown
        self.tempnames = []
        # Create a handy entry
        self.entry = {"path": "/foo"}

    def tearDown(self):
        # Clean up temporary files and directories
        for fn in self.tempnames:
            if isdir(fn):
                shutil.rmtree(fn)
            elif isfile(fn):
                os.remove(fn)

    def adddir(self):
        # Create and register a temporary directory
        dir = tempfile.mktemp()
        self.tempnames.append(dir)
        os.mkdir(dir)
        return dir

    def ensuredir(self, dir):
        # Ensure that a given directory exists
        if not isdir(dir):
            os.makedirs(dir)

    def addfile(self, dir, path, data, entry=None):
        # Create a file or directory and write some data to it.  If
        # entry is not None, add it as the file's entry.  If data is a
        # dict, create a directory; the dict key/value entries are
        # used as name/data for the directory contents; if entry is
        # not None, entries are also synthesized for the directory
        # contents.
        path = join(dir, path)
        if entry is not None:
            self.addentry(path, entry)
        if isinstance(data, dict):
            self.ensuredir(path)
            pentry = self.metadata.getentry(path)
            for x in data:
                if entry is not None:
                    newentry = entry.copy()
                    newentry["path"] += "/" + x
                else:
                    newentry = None
                self.addfile(path, x, data[x], newentry)
        elif data is not None:
            f = open(path, "w")
            try:
                f.write(data)
            finally:
                f.close()
        return path

    def addorigfile(self, dir, path, data):
        # Create the 'original' for a file or directory and write data to it
        if isinstance(data, dict):
            path = join(dir, path)
            self.ensuredir(path)
            for x in data:
                self.addorigfile(path, x, data[x])
            return None
        else:
            origdir = join(dir, "@@Zope", "Original")
            self.ensuredir(origdir)
            return self.addfile(origdir, path, data)

    def checkfile(self, path, expected_data):
        # Assert that a file or directory contains the expected data
        if isinstance(expected_data, dict):
            self.assert_(isdir(path))
            for x in expected_data:
                self.checkfile(join(path, x), expected_data[x])
        elif expected_data is None:
            self.assert_(not exists(path))
        else:
            f = open(path, "r")
            try:
                actual_data = f.read()
            finally:
                f.close()
            self.assertEqual(actual_data, expected_data)

    def checkorigfile(self, path, expected_data):
        # Assert that the 'original' contains the expected data
        if isinstance(expected_data, dict):
            self.assert_(isdir(path))
            for x in expected_data:
                self.checkorigfile(join(path, x), expected_data[x])
        else:
            head, tail = split(path)
            self.checkfile(join(head, "@@Zope", "Original", tail),
                           expected_data)

    def addentry(self, path, entry):
        e = self.metadata.getentry(path)
        e.update(entry)

    def checkentry(self, path, entry):
        e = self.metadata.getentry(path)
        self.assertEqual(e, entry)

    def test_ignore(self):
        m = FSMerger(None, None)
        self.assertEqual(m.ignore("foo"), False)
        self.assertEqual(m.ignore("foo~"), True)

    def test_reportaction(self):
        reports = []
        m = FSMerger(None, reports.append)
        for action in "Fix", "Copy", "Merge", "Delete", "Nothing":
            for state in ("Conflict", "Uptodate", "Modified",
                          "Added", "Removed", "Spurious", "Nonexistent"):
                m.reportaction(action, state, action+state)
        # I cheated a little here: the list of expected reports was
        # constructed by printing the actual list of reports in a
        # previous run.  But then I carefully looked it over and
        # verified that all was as expected, so now at least it serves
        # as a regression test.
        self.assertEqual(reports,
                         ['C FixConflict',
                          'U FixUptodate',
                          'M FixModified',
                          'A FixAdded',
                          'R FixRemoved',
                          '? FixSpurious',
                          '* FixNonexistent',

                          'C CopyConflict',
                          'U CopyUptodate',
                          'M CopyModified',
                          'A CopyAdded',
                          'R CopyRemoved',
                          '? CopySpurious',
                          '* CopyNonexistent',

                          'C MergeConflict',
                          'U MergeUptodate',
                          'M MergeModified',
                          'A MergeAdded',
                          'R MergeRemoved',
                          '? MergeSpurious',
                          '* MergeNonexistent',

                          'C DeleteConflict',
                          '* DeleteUptodate',
                          'M DeleteModified',
                          'A DeleteAdded',
                          'R DeleteRemoved',
                          '? DeleteSpurious',
                          'D DeleteNonexistent',

                          'C NothingConflict',
                          '* NothingUptodate',
                          'M NothingModified',
                          'A NothingAdded',
                          'R NothingRemoved',
                          '? NothingSpurious',
                          '* NothingNonexistent'])

    def test_reportdir(self):
        reports = []
        m = FSMerger(None, reports.append)
        m.reportdir("X", "foo")
        self.assertEqual(reports, ["X foo" + os.sep])

    def mergetest(self, name, localdata, origdata, remotedata,
                  localentry, remoteentry,
                  expected_reports_template,
                  expected_localdata, expected_origdata, expected_remotedata,
                  expected_localentry, expected_remoteentry):
        # Generic test setup to test merging files
        reports = []
        m = FSMerger(self.metadata, reports.append)

        localtopdir = self.adddir()
        remotetopdir = self.adddir()
        localdir = join(localtopdir, "local")
        remotedir = join(remotetopdir, "remote")
        os.mkdir(localdir)
        os.mkdir(remotedir)

        localfile = self.addfile(localdir, name, localdata, localentry)
        origfile = self.addorigfile(localdir, name, origdata)
        remotefile = self.addfile(remotedir, name, remotedata, remoteentry)

        m.merge_dirs(localdir, remotedir)

        expected_reports = []
        for er in expected_reports_template:
            er = er.replace("%l", localfile)
            er = er.replace("%r", remotefile)
            expected_reports.append(er)
        filtered_reports = [r for r in reports if r[0] not in "*/"]
        self.assertEqual(filtered_reports, expected_reports)

        if isinstance(expected_localdata, str):
            expected_localdata = expected_localdata.replace("%l", localfile)
            expected_localdata = expected_localdata.replace("%r", remotefile)

        self.checkfile(localfile, expected_localdata)
        self.checkorigfile(localfile, expected_origdata)
        self.checkfile(remotefile, expected_remotedata)

        if callable(expected_localentry):
            expected_localentry = expected_localentry(localfile)
        self.checkentry(localfile, expected_localentry)
        self.checkentry(remotefile, expected_remoteentry)

    def test_merge_nochange(self):
        self.mergetest("foo", "a", "a", "a", self.entry, self.entry,
                       [], "a", "a", "a", self.entry, self.entry)

    def test_merge_localchange(self):
        self.mergetest("foo", "b", "a", "a", self.entry, self.entry,
                       ["M %l"], "b", "a", "a", self.entry, self.entry)

    def test_merge_remotechange(self):
        self.mergetest("foo", "a", "a", "b", self.entry, self.entry,
                       ["U %l"], "b", "b", "b", self.entry, self.entry)

    def test_merge_diff3(self):
        self.mergetest("foo", "a\nl\n", "a\n", "r\na\n",
                       self.entry, self.entry,
                       ["M %l"], "r\na\nl\n", "r\na\n", "r\na\n",
                       self.entry, self.entry)

    def test_merge_equal(self):
        self.mergetest("foo", "ab", "a", "ab", self.entry, self.entry,
                       ["U %l"], "ab", "ab", "ab", self.entry, self.entry)

    def test_merge_conflict(self):
        conflict = "<<<<<<< %l\nl\n=======\nr\n>>>>>>> %r\n"
        self.mergetest("foo", "l\n", "a\n", "r\n", self.entry, self.entry,
                       ["C %l"], conflict, "r\n", "r\n",
                       self.make_conflict_entry, self.entry)

    def make_conflict_entry(self, local):
        e = {"conflict": os.path.getmtime(local)}
        e.update(self.entry)
        return e

    def test_new_directory(self):
        self.mergetest("foo", None, None, {"x": "x"}, None, self.entry,
                       ["N %l/", "U %l/x"],
                       {"x": "x"}, {"x": "x"}, {"x": "x"},
                       self.entry, self.entry)

def test_suite():
    s = unittest.TestSuite()
    s.addTest(unittest.makeSuite(TestFSMerger))
    return s

def test_main():
    unittest.TextTestRunner().run(test_suite())

if __name__=='__main__':
    test_main()


=== Added File Zope3/src/zope/fssync/tests/test_fsutil.py ===
##############################################################################
#
# Copyright (c) 2003 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Tests for the functions in the fsutil module.

$Id: test_fsutil.py,v 1.1 2003/05/14 22:16:09 gvanrossum Exp $
"""

import os
import tempfile
import unittest

from os.path import split, join, exists, isdir, isfile

from zope.fssync import fsutil

class TestFSUtil(unittest.TestCase):

    def test_split(self):
        self.assertEqual(fsutil.split(join("foo", "bar")), ("foo", "bar"))
        self.assertEqual(fsutil.split("foo"), (os.curdir, "foo"))
        self.assertEqual(fsutil.split(join("foo", "")), (os.curdir, "foo"))
        self.assertEqual(fsutil.split(join("foo", os.curdir)),
                         (os.curdir, "foo"))
        self.assertEqual(fsutil.split(join("foo", "bar", os.pardir)),
                         (os.curdir, "foo"))
        self.assertEqual(fsutil.split(os.curdir), split(os.getcwd()))

    def test_getspecial(self):
        self.assertEqual(fsutil.getspecial("foo/bar", "X"), "foo/@@Zope/X/bar")

    def test_getoriginal(self):
        self.assertEqual(fsutil.getoriginal("foo/bar"),
                         "foo/@@Zope/Original/bar")

    def test_getextra(self):
        self.assertEqual(fsutil.getextra("foo/bar"), "foo/@@Zope/Extra/bar")

    def test_getannotations(self):
        self.assertEqual(fsutil.getannotations("foo/bar"),
                         "foo/@@Zope/Annotations/bar")

    def test_ensuredir(self):
        tmpdir = tempfile.mktemp()
        try:
            self.assertEqual(exists(tmpdir), False)
            self.assertEqual(isdir(tmpdir), False)
            fsutil.ensuredir(tmpdir)
            self.assertEqual(isdir(tmpdir), True)
            fsutil.ensuredir(tmpdir)
            self.assertEqual(isdir(tmpdir), True)
        finally:
            if isdir(tmpdir):
                os.rmdir(tmpdir)

    def test_ensuredir_error(self):
        tmpfile = tempfile.mktemp()
        try:
            f = open(tmpfile, "w")
            try:
                f.write("x\n")
            finally:
                f.close()
            self.assertRaises(OSError, fsutil.ensuredir, tmpfile)
        finally:
            if isfile(tmpfile):
                os.remove(tmpfile)

def test_suite():
    loader = unittest.TestLoader()
    return loader.loadTestsFromTestCase(TestFSUtil)

def test_main():
    unittest.TextTestRunner().run(test_suite())

if __name__=='__main__':
    test_main()


=== Zope3/src/zope/fssync/tests/test_error.py 1.1 => 1.2 ===
--- Zope3/src/zope/fssync/tests/test_error.py:1.1	Tue May 13 13:32:51 2003
+++ Zope3/src/zope/fssync/tests/test_error.py	Wed May 14 18:16:09 2003
@@ -20,7 +20,7 @@
 
 import unittest
 
-from zope.fssync.fssync import Error
+from zope.fssync.fsutil import Error
 
 class TestError(unittest.TestCase):
 


=== Zope3/src/zope/fssync/tests/test_merger.py 1.7 => 1.8 ===
--- Zope3/src/zope/fssync/tests/test_merger.py:1.7	Wed May 14 15:00:16 2003
+++ Zope3/src/zope/fssync/tests/test_merger.py	Wed May 14 18:16:09 2003
@@ -212,11 +212,11 @@
         self.runtest(None, "a", "a", removed, {}, None, "Nothing", "Removed")
 
     def test_remote_removed(self):
-        self.runtest("a", "a", None, {}, None, None, "Remove", "Nonexistent")
+        self.runtest("a", "a", None, {}, None, None, "Delete", "Nonexistent")
 
     def test_both_removed(self):
         self.runtest(None, "a", None, removed, None,
-                     None, "Remove", "Nonexistent")
+                     None, "Delete", "Nonexistent")
 
     def test_local_lost_remote_unchanged(self):
         self.runtest(None, "a", "a", {}, {}, "a", "Copy", "Uptodate")
@@ -225,7 +225,7 @@
         self.runtest(None, "a", "b", {}, {}, "b", "Copy", "Uptodate")
 
     def test_local_lost_remote_removed(self):
-        self.runtest(None, "a", None, {}, None, None, "Remove", "Nonexistent")
+        self.runtest(None, "a", None, {}, None, None, "Delete", "Nonexistent")
 
     def test_spurious(self):
         self.runtest("a", None, None, None, None, "a", "Nothing", "Spurious")