[Checkins]
SVN: gocept.zeoraid/trunk/src/gocept/zeoraid/tests/test_recovery.py
added continuous_storage_iterator tests
Thomas Lotze
tl at gocept.com
Tue Feb 19 09:27:15 EST 2008
Log message for revision 84058:
added continuous_storage_iterator tests
Changed:
U gocept.zeoraid/trunk/src/gocept/zeoraid/tests/test_recovery.py
-=-
Modified: gocept.zeoraid/trunk/src/gocept/zeoraid/tests/test_recovery.py
===================================================================
--- gocept.zeoraid/trunk/src/gocept/zeoraid/tests/test_recovery.py 2008-02-19 14:16:57 UTC (rev 84057)
+++ gocept.zeoraid/trunk/src/gocept/zeoraid/tests/test_recovery.py 2008-02-19 14:27:14 UTC (rev 84058)
@@ -27,6 +27,47 @@
import gocept.zeoraid.recovery
+class ContinuousStorageIterator(ZODB.tests.StorageTestBase.StorageTestBase):
+
+ def setUp(self):
+ self._storage = ZODB.FileStorage.FileStorage(tempfile.mktemp())
+
+ def tearDown(self):
+ self._storage.close()
+ self._storage.cleanup()
+
+ def test_empty_storage(self):
+ iterator = gocept.zeoraid.recovery.continuous_storage_iterator(
+ self._storage)
+ self.assertEquals([], list(iterator))
+
+ def test_fixed_storage(self):
+ self._dostore()
+ iterator = gocept.zeoraid.recovery.continuous_storage_iterator(
+ self._storage)
+ self.assertEquals(1, len(list(iterator)))
+
+ def test_early_growing_storage(self):
+ t1 = self._dostore()
+ t2 = self._dostore()
+ iterator = gocept.zeoraid.recovery.continuous_storage_iterator(
+ self._storage)
+ self.assertEquals(t1, iterator.next().tid)
+ t3 = self._dostore()
+ self.assertEquals(t2, iterator.next().tid)
+ self.assertEquals(t3, iterator.next().tid)
+ self.assertRaises(StopIteration, iterator.next)
+
+ def test_late_growing_storage(self):
+ t1 = self._dostore()
+ iterator = gocept.zeoraid.recovery.continuous_storage_iterator(
+ self._storage)
+ self.assertEquals(t1, iterator.next().tid)
+ t2 = self._dostore()
+ self.assertEquals(t2, iterator.next().tid)
+ self.assertRaises(StopIteration, iterator.next)
+
+
class OnlineRecovery(unittest.TestCase):
def store(self, storages, tid=None, status=' ', user=None,
@@ -213,5 +254,6 @@
def test_suite():
suite = unittest.TestSuite()
+ suite.addTest(unittest.makeSuite(ContinuousStorageIterator))
suite.addTest(unittest.makeSuite(OnlineRecovery))
return suite
More information about the Checkins
mailing list