[Checkins] SVN: gocept.zeoraid/branches/dirceu-addstoragetool/src/gocept/zeoraid/tests/test_basics.py Adding a test to raid_reload

Dirceu Pereira Tiegs dirceutiegs at gmail.com
Sun Aug 10 21:28:32 EDT 2008


Log message for revision 89609:
  Adding a test to raid_reload

Changed:
  U   gocept.zeoraid/branches/dirceu-addstoragetool/src/gocept/zeoraid/tests/test_basics.py

-=-
Modified: gocept.zeoraid/branches/dirceu-addstoragetool/src/gocept/zeoraid/tests/test_basics.py
===================================================================
--- gocept.zeoraid/branches/dirceu-addstoragetool/src/gocept/zeoraid/tests/test_basics.py	2008-08-11 00:28:26 UTC (rev 89608)
+++ gocept.zeoraid/branches/dirceu-addstoragetool/src/gocept/zeoraid/tests/test_basics.py	2008-08-11 01:28:32 UTC (rev 89609)
@@ -1373,9 +1373,46 @@
     pass
 
 
+class ExtensionMethodsTests(ZEOStorageBackendTests):
+    
+    def test_reload(self):
+        # create and start a new ZEO server
+        port = get_port()
+        zconf = forker.ZEOConfig(('', port))
+        zport, adminaddr, pid, path = forker.start_zeo_server(self.getConfig(),
+                                                              zconf, port)
+        self._pids.append(pid)
+        self._servers.append(adminaddr)
+        self._storages.append(ZEOOpener(zport, storage='1',
+                                        min_disconnect_poll=0.5, wait=1,
+                                        wait_timeout=60))
+
+        # create a config file with this additional ZEO server and save it
+        file_contents = """%%import gocept.zeoraid\n<zeo>\n\taddress 127.0.0.1:%s\n</zeo>\n\n<raidstorage main>\n""" % get_port()
+        for count, storage in enumerate(self._storages):
+            file_contents += """\t<zeoclient %s>\n\t\tserver %s:%s\n\t\tstorage 1\n\t</zeoclient>\n\n""" % (count+1, storage.name[0], storage.name[1])
+        file_contents += """</raidstorage>\n<eventlog>\n\t<logfile>\n\t\tpath STDOUT\n\t</logfile>\n</eventlog>"""
+        filename = tempfile.mktemp()
+        self._server_storage_files = [ ]
+        self._server_storage_files.append(filename)
+        f = open(filename, 'w')
+        f.write(file_contents)
+        f.close()
+
+        # test if the new ZEO server is added as a storage
+        self.assertEquals(len(self._storage.storages.items()), 5)
+        self._storage.raid_reload(filename)
+        self.assertEquals(len(self._storage.storages.items()), 6)
+
+        # do a simple store to see if anything breaks
+        oid = self._storage.new_oid()
+        self._dostore(oid=oid)
+
+
 def test_suite():
     suite = unittest.TestSuite()
     suite.addTest(unittest.makeSuite(ZEOReplicationStorageTests, "check"))
     suite.addTest(unittest.makeSuite(FailingStorageTests))
     suite.addTest(unittest.makeSuite(FailingStorageSharedBlobTests))
+    suite.addTest(unittest.makeSuite(ExtensionMethodsTests))
     return suite



More information about the Checkins mailing list