[Zope-Checkins] SVN: Zope/trunk/src/App/ Coverage for App.ApplicationManager.ApplicationManager.

Tres Seaver tseaver at palladion.com
Wed Apr 7 21:21:30 EDT 2010


Log message for revision 110620:
  Coverage for App.ApplicationManager.ApplicationManager.

Changed:
  U   Zope/trunk/src/App/ApplicationManager.py
  U   Zope/trunk/src/App/tests/test_ApplicationManager.py

-=-
Modified: Zope/trunk/src/App/ApplicationManager.py
===================================================================
--- Zope/trunk/src/App/ApplicationManager.py	2010-04-08 01:21:28 UTC (rev 110619)
+++ Zope/trunk/src/App/ApplicationManager.py	2010-04-08 01:21:30 UTC (rev 110620)
@@ -318,8 +318,10 @@
         """Return to the main management screen"""
         raise Redirect, URL2+'/manage'
 
-    def process_time(self):
-        s=int(time.time())-self.process_start
+    def process_time(self, _when=None):
+        if _when is None:
+            _when = time.time()
+        s=int(_when)-self.process_start
         d=int(s/86400)
         s=s-(d*86400)
         h=int(s/3600)
@@ -378,11 +380,14 @@
         """
 
     @requestmethod('POST')
-    def manage_pack(self, days=0, REQUEST=None):
+    def manage_pack(self, days=0, REQUEST=None, _when=None):
         """Pack the database"""
 
-        t=time.time()-days*86400
+        if _when is None:
+            _when = time.time()
 
+        t = _when - (days*86400)
+
         db=self._p_jar.db()
         t=db.pack(t)
         if REQUEST is not None:

Modified: Zope/trunk/src/App/tests/test_ApplicationManager.py
===================================================================
--- Zope/trunk/src/App/tests/test_ApplicationManager.py	2010-04-08 01:21:28 UTC (rev 110619)
+++ Zope/trunk/src/App/tests/test_ApplicationManager.py	2010-04-08 01:21:30 UTC (rev 110620)
@@ -1,5 +1,23 @@
 import unittest
 
+class ConfigTestBase:
+
+    def setUp(self):
+        import App.config
+        self._old_config = App.config._config
+
+    def tearDown(self):
+        import App.config
+        App.config._config = self._old_config
+
+    def _makeConfig(self, **kw):
+        import App.config
+        class DummyConfig:
+            pass
+        App.config._config = config = DummyConfig()
+        config.dbtab = DummyDBTab(kw)
+        return config
+
 class FakeConnectionTests(unittest.TestCase):
 
     def _getTargetClass(self):
@@ -15,16 +33,8 @@
         fc = self._makeOne(db, parent_jar)
         self.failUnless(fc.db() is db)
 
-class DatabaseChooserTests(unittest.TestCase):
+class DatabaseChooserTests(ConfigTestBase, unittest.TestCase):
 
-    def setUp(self):
-        import App.config
-        self._old_config = App.config._config
-
-    def tearDown(self):
-        import App.config
-        App.config._config = self._old_config
-
     def _getTargetClass(self):
         from App.ApplicationManager import DatabaseChooser
         return DatabaseChooser
@@ -40,13 +50,6 @@
                 return self
         return Root()
 
-    def _makeConfig(self, **kw):
-        import App.config
-        class DummyConfig:
-            pass
-        App.config._config = config = DummyConfig()
-        config.dbtab = DummyDBTab(kw)
-
     def test_getDatabaseNames_sorted(self):
         self._makeConfig(foo=object(), bar=object(), qux=object())
         dc = self._makeOne('test')
@@ -242,6 +245,225 @@
         self.assertEqual(dm.manage_getSysPath(), list(sys.path))
 
 
+class ApplicationManagerTests(ConfigTestBase, unittest.TestCase):
+
+    def setUp(self):
+        ConfigTestBase.setUp(self)
+        self._tempdirs = ()
+
+    def tearDown(self):
+        import shutil
+        for tempdir in self._tempdirs:
+            shutil.rmtree(tempdir)
+        ConfigTestBase.tearDown(self)
+
+    def _getTargetClass(self):
+        from App.ApplicationManager import ApplicationManager
+        return ApplicationManager
+
+    def _makeOne(self):
+        return self._getTargetClass()()
+
+    def _makeJar(self, dbname, dbsize):
+        class Jar:
+            def db(self):
+                return self._db
+        jar = Jar()
+        jar._db = DummyDB(dbname, dbsize)
+        return jar
+
+    def _makeTempdir(self):
+        import tempfile
+        tmp = tempfile.mkdtemp()
+        self._tempdirs += (tmp,)
+        return tmp
+
+    def _makeFile(self, dir, name, text):
+        import os
+        os.makedirs(dir)
+        fqn = os.path.join(dir, name)
+        f = open(fqn, 'w')
+        f.write(text)
+        f.flush()
+        f.close()
+        return fqn
+
+    def test_ctor_initializes_Products(self):
+        from App.Product import ProductFolder
+        am = self._makeOne()
+        self.failUnless(isinstance(am.Products, ProductFolder))
+
+    def test__canCopy(self):
+        am = self._makeOne()
+        self.failIf(am._canCopy())
+
+    def test_manage_app(self):
+        from zExceptions import Redirect
+        am = self._makeOne()
+        try:
+            am.manage_app('http://example.com/foo')
+        except Redirect, v:
+            self.assertEqual(v.args, ('http://example.com/foo/manage',))
+        else:
+            self.fail('Redirect not raised')
+
+    def test_process_time_seconds(self):
+        am = self._makeOne()
+        am.process_start = 0
+        self.assertEqual(am.process_time(0).strip(), '0 sec')
+        self.assertEqual(am.process_time(1).strip(), '1 sec')
+        self.assertEqual(am.process_time(2).strip(), '2 sec')
+
+    def test_process_time_minutes(self):
+        am = self._makeOne()
+        am.process_start = 0
+        self.assertEqual(am.process_time(60).strip(), '1 min 0 sec')
+        self.assertEqual(am.process_time(61).strip(), '1 min 1 sec')
+        self.assertEqual(am.process_time(62).strip(), '1 min 2 sec')
+        self.assertEqual(am.process_time(120).strip(), '2 min 0 sec')
+        self.assertEqual(am.process_time(121).strip(), '2 min 1 sec')
+        self.assertEqual(am.process_time(122).strip(), '2 min 2 sec')
+
+    def test_process_time_hours(self):
+        am = self._makeOne()
+        am.process_start = 0
+        n1 = 60 * 60
+        n2 = n1 * 2
+        self.assertEqual(am.process_time(n1).strip(),
+                         '1 hour  0 sec')
+        self.assertEqual(am.process_time(n1 + 61).strip(),
+                         '1 hour 1 min 1 sec')
+        self.assertEqual(am.process_time(n2 + 1).strip(),
+                         '2 hours  1 sec')
+        self.assertEqual(am.process_time(n2 + 122).strip(),
+                         '2 hours 2 min 2 sec')
+
+    def test_process_time_days(self):
+        am = self._makeOne()
+        am.process_start = 0
+        n1 = 60 * 60 * 24
+        n2 = n1 * 2
+        self.assertEqual(am.process_time(n1).strip(),
+                         '1 day   0 sec')
+        self.assertEqual(am.process_time(n1 + 3661).strip(),
+                         '1 day 1 hour 1 min 1 sec')
+        self.assertEqual(am.process_time(n2 + 1).strip(),
+                         '2 days   1 sec')
+        self.assertEqual(am.process_time(n2 + 7322).strip(),
+                         '2 days 2 hours 2 min 2 sec')
+
+    def test_thread_get_ident(self):
+        import thread
+        am = self._makeOne()
+        self.assertEqual(am.thread_get_ident(), thread.get_ident())
+
+    def test_db_name(self):
+        am = self._makeOne()
+        am._p_jar = self._makeJar('foo', '')
+        self.assertEqual(am.db_name(), 'foo')
+
+    def test_db_size_string(self):
+        am = self._makeOne()
+        am._p_jar = self._makeJar('foo', 'super')
+        self.assertEqual(am.db_size(), 'super')
+
+    def test_db_size_lt_1_meg(self):
+        am = self._makeOne()
+        am._p_jar = self._makeJar('foo', 4497)
+        self.assertEqual(am.db_size(), '4.4K')
+
+    def test_db_size_gt_1_meg(self):
+        am = self._makeOne()
+        am._p_jar = self._makeJar('foo', (2048 * 1024) + 123240)
+        self.assertEqual(am.db_size(), '2.1M')
+
+    #def test_manage_restart(self):  XXX -- TOO UGLY TO TEST
+    #def test_manage_restart(self):  XXX -- TOO UGLY TO TEST
+
+    def test_manage_pack(self):
+        am = self._makeOne()
+        jar = am._p_jar = self._makeJar('foo', '')
+        am.manage_pack(1, _when=86400*2)
+        self.assertEqual(jar._db._packed, 86400)
+
+    def test_revert_points(self):
+        am = self._makeOne()
+        self.assertEqual(list(am.revert_points()), [])
+
+    def test_version_list(self):
+        # XXX this method is too stupid to live:  returning a bare list
+        #     of versions without even tying them to the products?
+        #     and what about products living outside SOFTWARE_HOME?
+        #     Nobody calls it, either
+        import os
+        am = self._makeOne()
+        config = self._makeConfig()
+        swdir = config.softwarehome = self._makeTempdir()
+        foodir = os.path.join(swdir, 'Products', 'foo')
+        self._makeFile(foodir, 'VERSION.TXT', '1.2')
+        bardir = os.path.join(swdir, 'Products', 'bar')
+        self._makeFile(bardir, 'VERSION.txt', '3.4')
+        bazdir = os.path.join(swdir, 'Products', 'baz')
+        self._makeFile(bazdir, 'version.txt', '5.6')
+        versions = am.version_list()
+        self.assertEqual(versions, ['3.4', '5.6', '1.2'])
+
+    def test_getSOFTWARE_HOME_missing(self):
+        am = self._makeOne()
+        config = self._makeConfig()
+        self.assertEqual(am.getSOFTWARE_HOME(), None)
+
+    def test_getSOFTWARE_HOME_present(self):
+        am = self._makeOne()
+        config = self._makeConfig()
+        swdir = config.softwarehome = self._makeTempdir()
+        self.assertEqual(am.getSOFTWARE_HOME(), swdir)
+
+    def test_getZOPE_HOME_present(self):
+        am = self._makeOne()
+        config = self._makeConfig()
+        zopedir = config.zopehome = self._makeTempdir()
+        self.assertEqual(am.getZOPE_HOME(), zopedir)
+
+    def test_getINSTANCE_HOME_present(self):
+        am = self._makeOne()
+        config = self._makeConfig()
+        instdir = config.instancehome = self._makeTempdir()
+        self.assertEqual(am.getINSTANCE_HOME(), instdir)
+
+    def test_getCLIENT_HOME_present(self):
+        am = self._makeOne()
+        config = self._makeConfig()
+        cldir = config.clienthome = self._makeTempdir()
+        self.assertEqual(am.getCLIENT_HOME(), cldir)
+
+    def test_getServers(self):
+        from asyncore import socket_map
+        class DummySocketServer:
+            def __init__(self, port):
+                self.port = port
+        class AnotherSocketServer(DummySocketServer):
+            pass
+        class NotAServer:
+            pass
+        am = self._makeOne()
+        _old_socket_map = socket_map.copy()
+        socket_map.clear()
+        socket_map['foo'] = DummySocketServer(45)
+        socket_map['bar'] = AnotherSocketServer(57)
+        socket_map['qux'] = NotAServer()
+        try:
+            pairs = am.getServers()
+        finally:
+            socket_map.clear()
+            socket_map.update(_old_socket_map)
+        self.assertEqual(len(pairs), 2)
+        self.failUnless((str(DummySocketServer), 'Port: 45') in pairs)
+        self.failUnless((str(AnotherSocketServer), 'Port: 57') in pairs)
+
+    #def test_objectIds(self):  XXX -- TOO UGLY TO TEST (BBB for Zope 2.3!!)
+
+
 class DummyDBTab:
     def __init__(self, databases=None):
         self._databases = databases or {}
@@ -255,10 +477,27 @@
     def getDatabase(self, name):
         return self._databases[name]
 
+class DummyDB:
 
+    _packed = None
+
+    def __init__(self, name, size):
+        self._name = name
+        self._size = size
+
+    def getName(self):
+        return self._name
+
+    def getSize(self):
+        return self._size
+
+    def pack(self, when):
+        self._packed = when
+
 def test_suite():
     return unittest.TestSuite((
         unittest.makeSuite(FakeConnectionTests),
         unittest.makeSuite(DatabaseChooserTests),
         unittest.makeSuite(DebugManagerTests),
+        unittest.makeSuite(ApplicationManagerTests),
     ))



More information about the Zope-Checkins mailing list