[Zope3-checkins] SVN: zope.testing/branches/output-refactoring/src/zope/testing/testrunner.py More refactoring: move all the output related to running the tests into

Marius Gedminas marius at pov.lt
Fri Jul 13 06:56:29 EDT 2007


Log message for revision 77841:
  More refactoring: move all the output related to running the tests into
  OutputFormatter.
  
  Noticed a couple of potential bugs in other parts of the code and added
  XXX/TODO comments.
  
  

Changed:
  U   zope.testing/branches/output-refactoring/src/zope/testing/testrunner.py

-=-
Modified: zope.testing/branches/output-refactoring/src/zope/testing/testrunner.py
===================================================================
--- zope.testing/branches/output-refactoring/src/zope/testing/testrunner.py	2007-07-13 10:52:26 UTC (rev 77840)
+++ zope.testing/branches/output-refactoring/src/zope/testing/testrunner.py	2007-07-13 10:56:28 UTC (rev 77841)
@@ -256,6 +256,54 @@
     # resume_layer() reasigns sys.stderr for this reason, but be careful
     # and don't store the original one in __init__ or something.
 
+    max_width = 80
+
+    def __init__(self, options):
+        self.options = options
+        self.last_width = 0
+        self.compute_max_width()
+
+    progress = property(lambda self: self.options.progress)
+    verbose = property(lambda self: self.options.verbose)
+
+    def compute_max_width(self):
+        """Try to determine the terminal width."""
+        if self.progress: # XXX: mgedmin: why not do this always?
+            try:
+                # Note that doing this every time is more test friendly.
+                import curses
+            except ImportError:
+                # avoid reimporting a broken module in python 2.3
+                sys.modules['curses'] = None
+            else:
+                try:
+                    curses.setupterm()
+                except TypeError:
+                    pass
+                else:
+                    self.max_width = curses.tigetnum('cols')
+
+    def getShortDescription(self, test, room):
+        """Return a description of a test that fits in ``room`` characters."""
+        room -= 1
+        s = str(test)
+        if len(s) > room:
+            pos = s.find(" (")
+            if pos >= 0:
+                w = room - (pos + 5)
+                if w < 1:
+                    # first portion (test method name) is too long
+                    s = s[:room-3] + "..."
+                else:
+                    pre = s[:pos+2]
+                    post = s[-w:]
+                    s = "%s...%s" % (pre, post)
+            else:
+                w = room - 4
+                s = '... ' + s[-w:]
+
+        return ' ' + s[:room]
+
     def info(self, message):
         """Print an informative message."""
         print message
@@ -264,6 +312,14 @@
         """Report an error."""
         print message
 
+    def error_with_banner(self, message):
+        """Report an error with a big ASCII banner."""
+        print
+        print '*'*70
+        print message
+        print '*'*70
+        print
+
     def profiler_stats(self, stats):
         """Report profiler stats."""
         stats.print_stats(50)
@@ -328,6 +384,7 @@
         The next output operation should be stop_set_up().
         """
         print "  Set up %s" % layer_name,
+        # TODO: flush sys.stdout
 
     def stop_set_up(self, seconds):
         """Report that we've set up a layer.
@@ -343,6 +400,7 @@
         tear_down_not_supported().
         """
         print "  Tear down %s" % layer_name,
+        # TODO: flush sys.stdout
 
     def stop_tear_down(self, seconds):
         """Report that we've tore down a layer.
@@ -358,7 +416,96 @@
         """
         print "... not supported"
 
+    def start_test(self, test, tests_run, total_tests):
+        """Report that we're about to run a test.
 
+        The next output operation should be test_success(), test_error(), or
+        test_failure().
+        """
+        self.test_width = 0
+        if self.progress:
+            if self.last_width:
+                sys.stdout.write('\r' + (' ' * self.last_width) + '\r')
+
+            s = "    %d/%d (%.1f%%)" % (tests_run, total_tests,
+                                        tests_run * 100.0 / total_tests)
+            sys.stdout.write(s)
+            self.test_width += len(s)
+            if self.verbose == 1:
+                room = self.max_width - self.test_width - 1
+                s = self.getShortDescription(test, room)
+                sys.stdout.write(s)
+                self.test_width += len(s)
+
+        elif self.verbose == 1:
+            sys.stdout.write('.' * test.countTestCases())
+
+        if self.verbose > 1:
+            s = str(test)
+            sys.stdout.write(' ')
+            sys.stdout.write(s)
+            self.test_width += len(s) + 1
+
+        sys.stdout.flush()
+
+    def test_success(self, test, seconds):
+        """Report that a test was successful.
+
+        Should be called right after start_test().
+
+        The next output operation should be stop_test().
+        """
+        if self.verbose > 2:
+            s = " (%.3f s)" % seconds
+            sys.stdout.write(s)
+            self.test_width += len(s) + 1
+
+    def test_error(self, test, seconds, exc_info):
+        """Report that an error occurred while running a test.
+
+        Should be called right after start_test().
+
+        The next output operation should be stop_test().
+        """
+        if self.verbose > 2:
+            print " (%.3f s)" % seconds
+        print
+        self._print_traceback("Error in test %s" % test, exc_info)
+        self.test_width = self.last_width = 0
+
+    def test_failure(self, test, seconds, exc_info):
+        """Report that a test failed.
+
+        Should be called right after start_test().
+
+        The next output operation should be stop_test().
+        """
+        if self.verbose > 2:
+            print " (%.3f s)" % seconds
+        print
+        self._print_traceback("Failure in test %s" % test, exc_info)
+        self.test_width = self.last_width = 0
+
+    def _print_traceback(self, msg, exc_info):
+        # TODO: Inline print_traceback here
+        print_traceback(msg, exc_info)
+
+    def stop_test(self, test):
+        """Clean up the output state after a test."""
+        if self.progress:
+            self.last_width = self.test_width
+        elif self.verbose > 1:
+            print
+        sys.stdout.flush()
+
+    def stop_tests(self):
+        """Clean up the output state after a collection of tests."""
+        if self.progress and self.last_width:
+            sys.stdout.write('\r' + (' ' * self.last_width) + '\r')
+        if self.verbose == 1 or self.progress:
+            print
+
+
 def run(defaults=None, args=None):
     if args is None:
         args = sys.argv
@@ -703,10 +850,7 @@
                 test.__dict__.update(state)
 
         t = time.time() - t
-        if options.verbose == 1 or options.progress:
-            result.stopTests()
-            # XXX: figure out how to move this print into the OutputFormatter
-            print
+        output.stop_tests()
         failures.extend(result.failures)
         errors.extend(result.errors)
         output.summary(result.testsRun, len(result.failures), len(result.errors), t)
@@ -867,8 +1011,6 @@
 
 class TestResult(unittest.TestResult):
 
-    max_width = 80
-
     def __init__(self, options, tests, layer_name=None):
         unittest.TestResult.__init__(self)
         self.options = options
@@ -880,48 +1022,11 @@
             self.layers = order_by_bases(layers)
         else:
             self.layers = []
-        if options.progress:
-            count = 0
-            for test in tests:
-                count += test.countTestCases()
-            self.count = count
-        self.last_width = 0
+        count = 0
+        for test in tests:
+            count += test.countTestCases()
+        self.count = count
 
-        if options.progress:
-            try:
-                # Note that doing this every time is more test friendly.
-                import curses
-            except ImportError:
-                # avoid reimporting a broken module in python 2.3
-                sys.modules['curses'] = None
-            else:
-                try:
-                    curses.setupterm()
-                except TypeError:
-                    pass
-                else:
-                    self.max_width = curses.tigetnum('cols')
-
-    def getShortDescription(self, test, room):
-        room -= 1
-        s = str(test)
-        if len(s) > room:
-            pos = s.find(" (")
-            if pos >= 0:
-                w = room - (pos + 5)
-                if w < 1:
-                    # first portion (test method name) is too long
-                    s = s[:room-3] + "..."
-                else:
-                    pre = s[:pos+2]
-                    post = s[-w:]
-                    s = "%s...%s" % (pre, post)
-            else:
-                w = room - 4
-                s = '... ' + s[-w:]
-
-        return ' ' + s[:room]
-
     def testSetUp(self):
         """A layer may define a setup method to be called before each
         individual test.
@@ -945,103 +1050,47 @@
     def startTest(self, test):
         self.testSetUp()
         unittest.TestResult.startTest(self, test)
-        testsRun = self.testsRun - 1
+        testsRun = self.testsRun - 1 # subtract the one the base class added
         count = test.countTestCases()
         self.testsRun = testsRun + count
-        options = self.options
-        self.test_width = 0
 
-        # TODO: figure out how to move this to OutputFormatter
-        if options.progress:
-            if self.last_width:
-                sys.stdout.write('\r' + (' ' * self.last_width) + '\r')
+        self.options.output.start_test(test, self.testsRun, self.count)
 
-            s = "    %d/%d (%.1f%%)" % (
-                self.testsRun, self.count,
-                (self.testsRun) * 100.0 / self.count
-                )
-            sys.stdout.write(s)
-            self.test_width += len(s)
-            if options.verbose == 1:
-                room = self.max_width - self.test_width - 1
-                s = self.getShortDescription(test, room)
-                sys.stdout.write(s)
-                self.test_width += len(s)
-
-        elif options.verbose == 1:
-            for i in range(count):
-                sys.stdout.write('.')
-                testsRun += 1
-
-        if options.verbose > 1:
-            s = str(test)
-            sys.stdout.write(' ')
-            sys.stdout.write(s)
-            self.test_width += len(s) + 1
-
-        sys.stdout.flush()
-
         self._threads = threading.enumerate()
         self._start_time = time.time()
 
     def addSuccess(self, test):
-        # TODO: figure out how to move this to OutputFormatter
-        if self.options.verbose > 2:
-            t = max(time.time() - self._start_time, 0.0)
-            s = " (%.3f s)" % t
-            sys.stdout.write(s)
-            self.test_width += len(s) + 1
+        t = max(time.time() - self._start_time, 0.0)
+        self.options.output.test_success(test, t)
 
     def addError(self, test, exc_info):
-        # TODO: figure out how to move this to OutputFormatter
-        if self.options.verbose > 2:
-            print " (%.3f s)" % (time.time() - self._start_time)
+        self.options.output.test_error(test, time.time() - self._start_time,
+                                       exc_info)
 
         unittest.TestResult.addError(self, test, exc_info)
-        print
-        self._print_traceback("Error in test %s" % test, exc_info)
 
         if self.options.post_mortem:
             if self.options.resume_layer:
-                print
-                print '*'*70
-                print ("Can't post-mortem debug when running a layer"
-                       " as a subprocess!")
-                print '*'*70
-                print
+                self.options.output.error_with_banner("Can't post-mortem debug"
+                                                      " when running a layer"
+                                                      " as a subprocess!")
             else:
                 post_mortem(exc_info)
 
-        self.test_width = self.last_width = 0
-
     def addFailure(self, test, exc_info):
-        # TODO: figure out how to move this to OutputFormatter
+        self.options.output.test_failure(test, time.time() - self._start_time,
+                                         exc_info)
 
-
-        if self.options.verbose > 2:
-            print " (%.3f s)" % (time.time() - self._start_time)
-
         unittest.TestResult.addFailure(self, test, exc_info)
-        print
-        self._print_traceback("Failure in test %s" % test, exc_info)
 
         if self.options.post_mortem:
+            # XXX: mgedmin: why isn't there a resume_layer check here like
+            # in addError?
             post_mortem(exc_info)
 
-        self.test_width = self.last_width = 0
-
-
-    def stopTests(self):
-        # TODO: figure out how to move this to OutputFormatter
-        if self.options.progress and self.last_width:
-            sys.stdout.write('\r' + (' ' * self.last_width) + '\r')
-
     def stopTest(self, test):
         self.testTearDown()
-        if self.options.progress:
-            self.last_width = self.test_width
-        elif self.options.verbose > 1:
-            print
+        self.options.output.stop_test(test)
 
         # TODO: figure out how to move this to OutputFormatter
         if gc.garbage:
@@ -1061,13 +1110,7 @@
             print test
             print "New thread(s):", new_threads
 
-        sys.stdout.flush()
 
-
-    def _print_traceback(self, msg, exc_info):
-        # TODO: figure out how to move this to OutputFormatter
-        print_traceback(msg, exc_info)
-
 doctest_template = """
 File "%s", line %s, in %s
 
@@ -1092,7 +1135,6 @@
 
 
 def print_traceback(msg, exc_info):
-    # TODO: figure out how to move this to OutputFormatter
     print
     print msg
 
@@ -1982,7 +2024,7 @@
     merge_options(options, defaults)
     options.original_testrunner_args = original_testrunner_args
 
-    options.output = OutputFormatter()
+    options.output = OutputFormatter(options)
 
     options.fail = False
 



More information about the Zope3-Checkins mailing list