[Zodb-checkins] SVN: ZODB/branches/test_repozo/src/ZODB/scripts/tests/test_repozo.py put files in separate directories

Godefroid Chapelle gotcha at bubblenet.be
Wed Dec 2 12:17:27 EST 2009


Log message for revision 106185:
  put files in separate directories
  use TestCase API
  

Changed:
  U   ZODB/branches/test_repozo/src/ZODB/scripts/tests/test_repozo.py

-=-
Modified: ZODB/branches/test_repozo/src/ZODB/scripts/tests/test_repozo.py
===================================================================
--- ZODB/branches/test_repozo/src/ZODB/scripts/tests/test_repozo.py	2009-12-02 16:45:39 UTC (rev 106184)
+++ ZODB/branches/test_repozo/src/ZODB/scripts/tests/test_repozo.py	2009-12-02 17:17:27 UTC (rev 106185)
@@ -18,36 +18,22 @@
 import sys
 import random
 import time
-import glob
 import shutil
 
 import ZODB
 from ZODB import FileStorage
 import transaction
+from BTrees.OOBTree import OOBTree
 
 from ZODB.scripts import tests as tests_module
 
 REPOZO = os.path.join(os.path.dirname(sys.argv[0]), 'repozo')
 
 
-def cleanup(basedir):
-    globData = os.path.join(basedir, 'Data.*')
-    globCopy = os.path.join(basedir, 'Copy.*')
-    backupDir = os.path.join(basedir, 'backup')
-    for fname in glob.glob(globData) + glob.glob(globCopy):
-        os.remove(fname)
-
-    if os.path.isdir(backupDir):
-        for fname in os.listdir(backupDir):
-            os.remove(os.path.join(backupDir, fname))
-        os.rmdir(backupDir)
-
-
 class OurDB:
 
-    def __init__(self, basedir):
-        from BTrees.OOBTree import OOBTree
-        self.basedir = basedir
+    def __init__(self, dir):
+        self.dir = dir
         self.getdb()
         conn = self.db.open()
         conn.root()['tree'] = OOBTree()
@@ -55,7 +41,7 @@
         self.close()
 
     def getdb(self):
-        storage_filename = os.path.join(self.basedir, 'Data.fs')
+        storage_filename = os.path.join(self.dir, 'Data.fs')
         storage = FileStorage.FileStorage(storage_filename)
         self.db = ZODB.DB(storage)
 
@@ -88,84 +74,101 @@
 
 
 # Do recovery to time 'when', and check that it's identical to correctpath.
-def check(correctpath='Data.fs', when=None):
-    if when is None:
-        extra = ''
-    else:
-        extra = ' -D ' + when
-    cmd = REPOZO + ' -vRr backup -o Copy.fs' + extra
-    os.system(cmd)
-    f = file(correctpath, 'rb')
-    g = file('Copy.fs', 'rb')
-    fguts = f.read()
-    gguts = g.read()
-    f.close()
-    g.close()
-    if fguts != gguts:
-        raise ValueError("guts don't match\n"
-                         "    correctpath=%r when=%r\n"
-                         "    cmd=%r" % (correctpath, when, cmd))
+class RepozoTest(unittest.TestCase):
 
+    def setUp(self):
+        # compute directory names
+        self.basedir = os.path.dirname(tests_module.__file__)
+        self.backupdir = os.path.join(self.basedir, 'backup')
+        self.datadir = os.path.join(self.basedir, 'data')
+        self.restoredir = os.path.join(self.basedir, 'restore')
+        self.copydir = os.path.join(self.basedir, 'copy')
+        self.currdir = os.getcwd()
+        # ensure they have all been deleted
+        self.cleanup()
+        # create empty directories
+        os.mkdir(self.backupdir)
+        os.mkdir(self.datadir)
+        os.mkdir(self.restoredir)
+        os.mkdir(self.copydir)
+        os.chdir(self.datadir)
+        self.db = OurDB(self.datadir)
 
-def main(basedir, d):
-    # Every 9th time thru the loop, we save a full copy of Data.fs,
-    # and at the end we ensure we can reproduce those too.
-    saved_snapshots = []  # list of (name, time) pairs for copies.
+    def tearDown(self):
+        self.cleanup()
+        os.chdir(self.currdir)
 
-    for i in range(100):
-        print i
-        # Make some mutations.
-        d.mutate()
+    def testRepozo(self):
+        self.saved_snapshots = []  # list of (name, time) pairs for copies.
 
+        for i in range(100):
+            self.mutate_pack_backup(i)
+
+        # Verify snapshots can be reproduced exactly.
+        for copyname, copytime in self.saved_snapshots:
+            print "Checking that", copyname, "at", copytime, "is reproducible."
+            self.assertRestored(copyname, copytime)
+
+    def mutate_pack_backup(self, i):
+        self.db.mutate()
+
         # Pack about each tenth time.
         if random.random() < 0.1:
             print "packing"
-            d.pack()
-            d.close()
+            self.db.pack()
+            self.db.close()
 
         # Make an incremental backup, half the time with gzip (-z).
         if random.random() < 0.5:
-            os.system(REPOZO + ' -vBQr backup -f Data.fs')
+            cmd = REPOZO + ' -vBQr %s -f Data.fs'
         else:
-            os.system(REPOZO + ' -zvBQr backup -f Data.fs')
+            cmd = REPOZO + ' -zvBQr %s -f Data.fs'
+        os.system(cmd % self.backupdir)
 
+        # Save snapshots to assert that dated restores are possible
         if i % 9 == 0:
+            srcname = os.path.join(self.datadir, 'Data.fs')
             copytime = '%04d-%02d-%02d-%02d-%02d-%02d' % (time.gmtime()[:6])
-            copyname = os.path.join(basedir, 'backup', "Data%d" % i) + '.fs'
-            srcname = os.path.join(basedir, 'Data.fs')
+            copyname = os.path.join(self.copydir, "Data%d.fs" % i)
             shutil.copyfile(srcname, copyname)
-            saved_snapshots.append((copyname, copytime))
+            self.saved_snapshots.append((copyname, copytime))
 
         # Make sure the clock moves at least a second.
         time.sleep(1.01)
 
         # Verify current Data.fs can be reproduced exactly.
-        check()
+        self.assertRestored()
 
-    # Verify snapshots can be reproduced exactly.
-    for copyname, copytime in saved_snapshots:
-        print "Checking that", copyname, "at", copytime, "is reproducible."
-        check(copyname, copytime)
+    def assertRestored(self, correctpath='Data.fs', when=None):
+        if when is None:
+            extra = ''
+        else:
+            extra = ' -D ' + when
+        # restore to Restored.fs
+        restoredfile = os.path.join(self.restoredir, 'Restored.fs')
+        cmd = REPOZO + ' -vRr %s -o ' + restoredfile + extra
+        os.system(cmd % self.backupdir)
 
+        # check restored file content is equal to file that was backed up
+        f = file(correctpath, 'rb')
+        g = file(restoredfile, 'rb')
+        fguts = f.read()
+        gguts = g.read()
+        f.close()
+        g.close()
+        msg = ("guts don't match\ncorrectpath=%r when=%r\n cmd=%r" %
+            (correctpath, when, cmd))
+        self.assertEquals(fguts, gguts, msg)
 
-class RepozoTest(unittest.TestCase):
+    def cleanup(self):
+        for dir in [self.datadir, self.backupdir, self.restoredir,
+                self.copydir]:
+            if os.path.isdir(dir):
+                for fname in os.listdir(dir):
+                    os.remove(os.path.join(dir, fname))
+                os.rmdir(dir)
 
-    def setUp(self):
-        self.basedir = os.path.dirname(tests_module.__file__)
-        self.currdir = os.getcwd()
-        os.chdir(self.basedir)
-        cleanup(self.basedir)
-        os.mkdir(os.path.join(self.basedir, 'backup'))
-        self.d = OurDB(self.basedir)
 
-    def tearDown(self):
-        cleanup(self.basedir)
-        os.chdir(self.currdir)
-
-    def testDummy(self):
-        main(self.basedir, self.d)
-
-
 def test_suite():
     suite = unittest.TestSuite()
     suite.addTest(unittest.makeSuite(RepozoTest))



More information about the Zodb-checkins mailing list