[Checkins] SVN: gocept.zeoraid/trunk/src/gocept/zeoraid/tests/test_recovery.py added continuous_storage_iterator tests

Thomas Lotze tl at gocept.com
Tue Feb 19 09:27:15 EST 2008


Log message for revision 84058:
  added continuous_storage_iterator tests

Changed:
  U   gocept.zeoraid/trunk/src/gocept/zeoraid/tests/test_recovery.py

-=-
Modified: gocept.zeoraid/trunk/src/gocept/zeoraid/tests/test_recovery.py
===================================================================
--- gocept.zeoraid/trunk/src/gocept/zeoraid/tests/test_recovery.py	2008-02-19 14:16:57 UTC (rev 84057)
+++ gocept.zeoraid/trunk/src/gocept/zeoraid/tests/test_recovery.py	2008-02-19 14:27:14 UTC (rev 84058)
@@ -27,6 +27,47 @@
 import gocept.zeoraid.recovery
 
 
+class ContinuousStorageIterator(ZODB.tests.StorageTestBase.StorageTestBase):
+
+    def setUp(self):
+        self._storage = ZODB.FileStorage.FileStorage(tempfile.mktemp())
+
+    def tearDown(self):
+        self._storage.close()
+        self._storage.cleanup()
+
+    def test_empty_storage(self):
+        iterator = gocept.zeoraid.recovery.continuous_storage_iterator(
+            self._storage)
+        self.assertEquals([], list(iterator))
+
+    def test_fixed_storage(self):
+        self._dostore()
+        iterator = gocept.zeoraid.recovery.continuous_storage_iterator(
+            self._storage)
+        self.assertEquals(1, len(list(iterator)))
+
+    def test_early_growing_storage(self):
+        t1 = self._dostore()
+        t2 = self._dostore()
+        iterator = gocept.zeoraid.recovery.continuous_storage_iterator(
+            self._storage)
+        self.assertEquals(t1, iterator.next().tid)
+        t3 = self._dostore()
+        self.assertEquals(t2, iterator.next().tid)
+        self.assertEquals(t3, iterator.next().tid)
+        self.assertRaises(StopIteration, iterator.next)
+
+    def test_late_growing_storage(self):
+        t1 = self._dostore()
+        iterator = gocept.zeoraid.recovery.continuous_storage_iterator(
+            self._storage)
+        self.assertEquals(t1, iterator.next().tid)
+        t2 = self._dostore()
+        self.assertEquals(t2, iterator.next().tid)
+        self.assertRaises(StopIteration, iterator.next)
+
+
 class OnlineRecovery(unittest.TestCase):
 
     def store(self, storages, tid=None, status=' ', user=None,
@@ -213,5 +254,6 @@
 
 def test_suite():
     suite = unittest.TestSuite()
+    suite.addTest(unittest.makeSuite(ContinuousStorageIterator))
     suite.addTest(unittest.makeSuite(OnlineRecovery))
     return suite



More information about the Checkins mailing list