[Zodb-checkins] SVN: ZODB/branches/tseaver-better_repozo_tests/src/ZODB/scripts/tests/test_repozo.py Add tests for 'do_full_backup' / 'do_incremental_backup'.

Tres Seaver tseaver at palladion.com
Fri May 14 13:44:37 EDT 2010


Log message for revision 112306:
  Add tests for 'do_full_backup' / 'do_incremental_backup'.

Changed:
  U   ZODB/branches/tseaver-better_repozo_tests/src/ZODB/scripts/tests/test_repozo.py

-=-
Modified: ZODB/branches/tseaver-better_repozo_tests/src/ZODB/scripts/tests/test_repozo.py
===================================================================
--- ZODB/branches/tseaver-better_repozo_tests/src/ZODB/scripts/tests/test_repozo.py	2010-05-14 17:43:39 UTC (rev 112305)
+++ ZODB/branches/tseaver-better_repozo_tests/src/ZODB/scripts/tests/test_repozo.py	2010-05-14 17:44:37 UTC (rev 112306)
@@ -13,12 +13,21 @@
 ##############################################################################
 import unittest
 import os
+try:
+    # the hashlib package is available from Python 2.5
+    from hashlib import md5
+except ImportError:
+    # the md5 package is deprecated in Python 2.6
+    from md5 import new as md5
+
 import ZODB.tests.util  # layer used at class scope
 
 _NOISY = os.environ.get('NOISY_REPOZO_TEST_OUTPUT')
 
 class OurDB:
 
+    _file_name = None
+
     def __init__(self, dir):
         from BTrees.OOBTree import OOBTree
         import transaction
@@ -27,12 +36,13 @@
         conn = self.db.open()
         conn.root()['tree'] = OOBTree()
         transaction.commit()
+        self.pos = self.db.storage._pos
         self.close()
 
     def getdb(self):
         from ZODB import DB
         from ZODB.FileStorage import FileStorage
-        storage_filename = os.path.join(self.dir, 'Data.fs')
+        self._file_name = storage_filename = os.path.join(self.dir, 'Data.fs')
         storage = FileStorage(storage_filename)
         self.db = DB(storage)
 
@@ -63,6 +73,7 @@
                 if keys:
                     del tree[keys[0]]
         transaction.commit()
+        self.pos = self.db.storage._pos
         self.close()
 
 
@@ -96,7 +107,7 @@
         from ZODB.scripts.repozo import main
         main(argv)
 
-    def testRepozo(self):
+    def test_via_monte_carlo(self):
         self.saved_snapshots = []  # list of (name, time) pairs for copies.
 
         for i in range(100):
@@ -168,33 +179,47 @@
             (correctpath, when, ' '.join(argv)))
         self.assertEquals(fguts, gguts, msg)
 
-class Test_delete_old_backups(unittest.TestCase):
 
+class TestBase:
+
     _repository_directory = None
+    _data_directory = None
 
     def tearDown(self):
         if self._repository_directory is not None:
             from shutil import rmtree
             rmtree(self._repository_directory)
+        if self._data_directory is not None:
+            from shutil import rmtree
+            rmtree(self._data_directory)
 
-    def _callFUT(self, options=None, filenames=()):
-        from ZODB.scripts.repozo import delete_old_backups
-        if options is None:
-            options = self._makeOptions(filenames)
-        delete_old_backups(options)
+    def _makeOptions(self, **kw):
+        import tempfile
+        self._repository_directory = tempfile.mkdtemp()
+        class Options(object):
+            repository = self._repository_directory
+            def __init__(self, **kw):
+                self.__dict__.update(kw)
+        return Options(**kw)
 
+
+class Test_delete_old_backups(TestBase, unittest.TestCase):
+
     def _makeOptions(self, filenames=()):
-        import tempfile
-        dir = self._repository_directory = tempfile.mkdtemp()
+        options = super(Test_delete_old_backups, self)._makeOptions()
         for filename in filenames:
-            fqn = os.path.join(dir, filename)
+            fqn = os.path.join(options.repository, filename)
             f = open(fqn, 'wb')
             f.write('testing delete_old_backups')
             f.close()
-        class Options(object):
-            repository = dir
-        return Options()
+        return options
 
+    def _callFUT(self, options=None, filenames=()):
+        from ZODB.scripts.repozo import delete_old_backups
+        if options is None:
+            options = self._makeOptions(filenames)
+        return delete_old_backups(options)
+
     def test_empty_dir_doesnt_raise(self):
         self._callFUT()
         self.assertEqual(len(os.listdir(self._repository_directory)), 0)
@@ -254,8 +279,153 @@
             fqn = os.path.join(self._repository_directory, name)
             self.failUnless(os.path.isfile(fqn))
 
+
+class Test_do_full_backup(TestBase, unittest.TestCase):
+
+    def _callFUT(self, options):
+        from ZODB.scripts.repozo import do_full_backup
+        return do_full_backup(options)
+
+    def _makeDB(self):
+        import tempfile
+        datadir = self._data_directory = tempfile.mkdtemp()
+        return OurDB(self._data_directory)
+
+    def test_dont_overwrite_existing_file(self):
+        from ZODB.scripts.repozo import WouldOverwriteFiles
+        from ZODB.scripts.repozo import gen_filename
+        db = self._makeDB()
+        options = self._makeOptions(full=True,
+                                    file=db._file_name,
+                                    gzip=False,
+                                    test_now = (2010, 5, 14, 10, 51, 22),
+                                   )
+        f = open(os.path.join(self._repository_directory,
+                              gen_filename(options)), 'w')
+        f.write('TESTING')
+        f.flush()
+        f.close()
+        self.assertRaises(WouldOverwriteFiles, self._callFUT, options)
+
+    def test_empty(self):
+        from ZODB.scripts.repozo import gen_filename
+        db = self._makeDB()
+        options = self._makeOptions(file=db._file_name,
+                                    gzip=False,
+                                    killold=False,
+                                    test_now = (2010, 5, 14, 10, 51, 22),
+                                   )
+        self._callFUT(options)
+        target = os.path.join(self._repository_directory,
+                              gen_filename(options))
+        original = open(db._file_name, 'rb').read()
+        self.assertEqual(open(target, 'rb').read(), original)
+        datfile = os.path.join(self._repository_directory,
+                               gen_filename(options, '.dat'))
+        self.assertEqual(open(datfile).read(),
+                         '%s 0 %d %s\n' %
+                            (target, len(original), md5(original).hexdigest()))
+
+
+class Test_do_incremental_backup(TestBase, unittest.TestCase):
+
+    def _callFUT(self, options, reposz, repofiles):
+        from ZODB.scripts.repozo import do_incremental_backup
+        return do_incremental_backup(options, reposz, repofiles)
+
+    def _makeDB(self):
+        import tempfile
+        datadir = self._data_directory = tempfile.mkdtemp()
+        return OurDB(self._data_directory)
+
+    def test_dont_overwrite_existing_file(self):
+        from ZODB.scripts.repozo import WouldOverwriteFiles
+        from ZODB.scripts.repozo import gen_filename
+        from ZODB.scripts.repozo import find_files
+        db = self._makeDB()
+        options = self._makeOptions(full=False,
+                                    file=db._file_name,
+                                    gzip=False,
+                                    test_now = (2010, 5, 14, 10, 51, 22),
+                                    date = None,
+                                   )
+        f = open(os.path.join(self._repository_directory,
+                              gen_filename(options)), 'w')
+        f.write('TESTING')
+        f.flush()
+        f.close()
+        repofiles = find_files(options)
+        self.assertRaises(WouldOverwriteFiles,
+                          self._callFUT, options, 0, repofiles)
+
+    def test_no_changes(self):
+        from ZODB.scripts.repozo import gen_filename
+        db = self._makeDB()
+        oldpos = db.pos
+        options = self._makeOptions(file=db._file_name,
+                                    gzip=False,
+                                    killold=False,
+                                    test_now = (2010, 5, 14, 10, 51, 22),
+                                    date = None,
+                                   )
+        fullfile = os.path.join(self._repository_directory,
+                                '2010-05-14-00-00-00.fs')
+        original = open(db._file_name, 'rb').read()
+        last = len(original)
+        f = open(fullfile, 'wb')
+        f.write(original)
+        f.flush()
+        f.close()
+        datfile = os.path.join(self._repository_directory,
+                                '2010-05-14-00-00-00.dat')
+        repofiles = [fullfile, datfile]
+        self._callFUT(options, oldpos, repofiles)
+        target = os.path.join(self._repository_directory,
+                              gen_filename(options))
+        self.assertEqual(open(target, 'rb').read(), '')
+        self.assertEqual(open(datfile).read(),
+                         '%s %d %d %s\n' %
+                            (target, oldpos, oldpos, md5('').hexdigest()))
+
+    def test_w_changes(self):
+        from ZODB.scripts.repozo import gen_filename
+        db = self._makeDB()
+        oldpos = db.pos
+        options = self._makeOptions(file=db._file_name,
+                                    gzip=False,
+                                    killold=False,
+                                    test_now = (2010, 5, 14, 10, 51, 22),
+                                    date = None,
+                                   )
+        fullfile = os.path.join(self._repository_directory,
+                                '2010-05-14-00-00-00.fs')
+        original = open(db._file_name, 'rb').read()
+        f = open(fullfile, 'wb')
+        f.write(original)
+        f.flush()
+        f.close()
+        datfile = os.path.join(self._repository_directory,
+                                '2010-05-14-00-00-00.dat')
+        repofiles = [fullfile, datfile]
+        db.mutate()
+        newpos = db.pos
+        self._callFUT(options, oldpos, repofiles)
+        target = os.path.join(self._repository_directory,
+                              gen_filename(options))
+        f = open(db._file_name, 'rb')
+        f.seek(oldpos)
+        increment = f.read()
+        self.assertEqual(open(target, 'rb').read(), increment)
+        self.assertEqual(open(datfile).read(),
+                         '%s %d %d %s\n' %
+                            (target, oldpos, newpos,
+                             md5(increment).hexdigest()))
+
+
 def test_suite():
     return unittest.TestSuite([
         unittest.makeSuite(RepozoTests),
         unittest.makeSuite(Test_delete_old_backups),
+        unittest.makeSuite(Test_do_full_backup),
+        unittest.makeSuite(Test_do_incremental_backup),
     ])



More information about the Zodb-checkins mailing list