[Checkins] SVN: zope.testing/trunk/src/zope/testing/testrunner.py Merge the output-refactoring branch with

Marius Gedminas marius at pov.lt
Sat Jul 14 17:59:42 EDT 2007


Log message for revision 77981:
  Merge the output-refactoring branch with
  
    svn merge -r 77825:77855 svn+ssh://svn.zope.org/repos/main/zope.testing/branches/output-refactoring .
  
  Extract all output operations into a single class (OutputFormatter).  The goal
  is to have the output logic in a single place so it's easier to modify (e.g.
  add colours if the user asks for them), or replace it with alternative output
  formats (e.g., HTML or GUI).
  
  

Changed:
  U   zope.testing/trunk/src/zope/testing/testrunner.py

-=-
Modified: zope.testing/trunk/src/zope/testing/testrunner.py
===================================================================
--- zope.testing/trunk/src/zope/testing/testrunner.py	2007-07-14 21:54:23 UTC (rev 77980)
+++ zope.testing/trunk/src/zope/testing/testrunner.py	2007-07-14 21:59:41 UTC (rev 77981)
@@ -246,6 +246,321 @@
             return "__init__.pyo" in fnamelist
     return False
 
+
+doctest_template = """
+File "%s", line %s, in %s
+
+%s
+Want:
+%s
+Got:
+%s
+"""
+
+
+class OutputFormatter(object):
+    """Test runner output formatter."""
+
+    # Implementation note: be careful about printing stuff to sys.stderr.
+    # It is used for interprocess communication between the parent and the
+    # child test runner, when you run some test layers in a subprocess.
+    # resume_layer() reasigns sys.stderr for this reason, but be careful
+    # and don't store the original one in __init__ or something.
+
+    max_width = 80
+
+    def __init__(self, options):
+        self.options = options
+        self.last_width = 0
+        self.compute_max_width()
+
+    progress = property(lambda self: self.options.progress)
+    verbose = property(lambda self: self.options.verbose)
+
+    def compute_max_width(self):
+        """Try to determine the terminal width."""
+        try:
+            # Note that doing this every time is more test friendly.
+            import curses
+        except ImportError:
+            # avoid reimporting a broken module in python 2.3
+            sys.modules['curses'] = None
+        else:
+            try:
+                curses.setupterm()
+            except TypeError:
+                pass
+            else:
+                self.max_width = curses.tigetnum('cols')
+
+    def getShortDescription(self, test, room):
+        """Return a description of a test that fits in ``room`` characters."""
+        room -= 1
+        s = str(test)
+        if len(s) > room:
+            pos = s.find(" (")
+            if pos >= 0:
+                w = room - (pos + 5)
+                if w < 1:
+                    # first portion (test method name) is too long
+                    s = s[:room-3] + "..."
+                else:
+                    pre = s[:pos+2]
+                    post = s[-w:]
+                    s = "%s...%s" % (pre, post)
+            else:
+                w = room - 4
+                s = '... ' + s[-w:]
+
+        return ' ' + s[:room]
+
+    def info(self, message):
+        """Print an informative message."""
+        print message
+
+    def error(self, message):
+        """Report an error."""
+        print message
+
+    def error_with_banner(self, message):
+        """Report an error with a big ASCII banner."""
+        print
+        print '*'*70
+        print message
+        print '*'*70
+        print
+
+    def profiler_stats(self, stats):
+        """Report profiler stats."""
+        stats.print_stats(50)
+
+    def import_errors(self, import_errors):
+        """Report test-module import errors (if any)."""
+        if import_errors:
+            print "Test-module import failures:"
+            for error in import_errors:
+                self.print_traceback("Module: %s\n" % error.module,
+                                     error.exc_info),
+            print
+
+    def tests_with_errors(self, errors):
+        """Report names of tests with errors (if any)."""
+        if errors:
+            print
+            print "Tests with errors:"
+            for test, exc_info in errors:
+                print "  ", test
+
+    def tests_with_failures(self, failures):
+        """Report names of tests with failures (if any)."""
+        if failures:
+            print
+            print "Tests with failures:"
+            for test, exc_info in failures:
+                print "  ", test
+
+    def modules_with_import_problems(self, import_errors):
+        """Report names of modules with import problems (if any)."""
+        if import_errors:
+            print
+            print "Test-modules with import problems:"
+            for test in import_errors:
+                print "  " + test.module
+
+    def totals(self, n_tests, n_failures, n_errors):
+        """Report totals (number of tests, failures, and errors)."""
+        print "Total: %s tests, %s failures, %s errors" % (
+                        n_tests, n_failures, n_errors)
+
+    def summary(self, n_tests, n_failures, n_errors, n_seconds):
+        """Summarize the results."""
+        print ("  Ran %s tests with %s failures and %s errors in %.3f seconds."
+               % (n_tests, n_failures, n_errors, n_seconds))
+
+    def list_of_tests(self, tests, layer_name):
+        """Report a list of test names."""
+        print "Listing %s tests:" % layer_name
+        for test in tests:
+            print ' ', test
+
+    def garbage(self, garbage):
+        """Report garbage generated by tests."""
+        if garbage:
+            print "Tests generated new (%d) garbage:" % len(garbage)
+            print garbage
+
+    def test_garbage(self, test, garbage):
+        """Report garbage generated by a test."""
+        if garbage:
+            print "The following test left garbage:"
+            print test
+            print garbage
+
+    def test_threads(self, test, new_threads):
+        """Report threads left behind by a test."""
+        if new_threads:
+            print "The following test left new threads behind:"
+            print test
+            print "New thread(s):", new_threads
+
+    def refcounts(self, rc, prev):
+        """Report a change in reference counts."""
+        print "  sys refcount=%-8d change=%-6d" % (rc, rc - prev)
+
+    def detailed_refcounts(self, track, rc, prev):
+        """Report a change in reference counts, with extra detail."""
+        print ("  sum detail refcount=%-8d"
+               " sys refcount=%-8d"
+               " change=%-6d"
+               % (track.n, rc, rc - prev))
+        track.output()
+
+    def start_set_up(self, layer_name):
+        """Report that we're setting up a layer.
+
+        The next output operation should be stop_set_up().
+        """
+        print "  Set up %s" % layer_name,
+        sys.stdout.flush()
+
+    def stop_set_up(self, seconds):
+        """Report that we've set up a layer.
+
+        Should be called right after start_set_up().
+        """
+        print "in %.3f seconds." % seconds
+
+    def start_tear_down(self, layer_name):
+        """Report that we're tearing down a layer.
+
+        The next output operation should be stop_tear_down() or
+        tear_down_not_supported().
+        """
+        print "  Tear down %s" % layer_name,
+        sys.stdout.flush()
+
+    def stop_tear_down(self, seconds):
+        """Report that we've tore down a layer.
+
+        Should be called right after start_tear_down().
+        """
+        print "in %.3f seconds." % seconds
+
+    def tear_down_not_supported(self):
+        """Report that we could not tear down a layer.
+
+        Should be called right after start_tear_down().
+        """
+        print "... not supported"
+
+    def start_test(self, test, tests_run, total_tests):
+        """Report that we're about to run a test.
+
+        The next output operation should be test_success(), test_error(), or
+        test_failure().
+        """
+        self.test_width = 0
+        if self.progress:
+            if self.last_width:
+                sys.stdout.write('\r' + (' ' * self.last_width) + '\r')
+
+            s = "    %d/%d (%.1f%%)" % (tests_run, total_tests,
+                                        tests_run * 100.0 / total_tests)
+            sys.stdout.write(s)
+            self.test_width += len(s)
+            if self.verbose == 1:
+                room = self.max_width - self.test_width - 1
+                s = self.getShortDescription(test, room)
+                sys.stdout.write(s)
+                self.test_width += len(s)
+
+        elif self.verbose == 1:
+            sys.stdout.write('.' * test.countTestCases())
+
+        if self.verbose > 1:
+            s = str(test)
+            sys.stdout.write(' ')
+            sys.stdout.write(s)
+            self.test_width += len(s) + 1
+
+        sys.stdout.flush()
+
+    def test_success(self, test, seconds):
+        """Report that a test was successful.
+
+        Should be called right after start_test().
+
+        The next output operation should be stop_test().
+        """
+        if self.verbose > 2:
+            s = " (%.3f s)" % seconds
+            sys.stdout.write(s)
+            self.test_width += len(s) + 1
+
+    def test_error(self, test, seconds, exc_info):
+        """Report that an error occurred while running a test.
+
+        Should be called right after start_test().
+
+        The next output operation should be stop_test().
+        """
+        if self.verbose > 2:
+            print " (%.3f s)" % seconds
+        print
+        self.print_traceback("Error in test %s" % test, exc_info)
+        self.test_width = self.last_width = 0
+
+    def test_failure(self, test, seconds, exc_info):
+        """Report that a test failed.
+
+        Should be called right after start_test().
+
+        The next output operation should be stop_test().
+        """
+        if self.verbose > 2:
+            print " (%.3f s)" % seconds
+        print
+        self.print_traceback("Failure in test %s" % test, exc_info)
+        self.test_width = self.last_width = 0
+
+    def print_traceback(self, msg, exc_info):
+        """Report an error with a traceback."""
+        print
+        print msg
+
+        v = exc_info[1]
+        if isinstance(v, doctest.DocTestFailureException):
+            tb = v.args[0]
+        elif isinstance(v, doctest.DocTestFailure):
+            tb = doctest_template % (
+                v.test.filename,
+                v.test.lineno + v.example.lineno + 1,
+                v.test.name,
+                v.example.source,
+                v.example.want,
+                v.got,
+                )
+        else:
+            tb = "".join(traceback.format_exception(*exc_info))
+
+        print tb
+
+    def stop_test(self, test):
+        """Clean up the output state after a test."""
+        if self.progress:
+            self.last_width = self.test_width
+        elif self.verbose > 1:
+            print
+        sys.stdout.flush()
+
+    def stop_tests(self):
+        """Clean up the output state after a collection of tests."""
+        if self.progress and self.last_width:
+            sys.stdout.write('\r' + (' ' * self.last_width) + '\r')
+        if self.verbose == 1 or self.progress:
+            print
+
+
 def run(defaults=None, args=None):
     if args is None:
         args = sys.argv
@@ -277,6 +592,8 @@
     if options.fail:
         return True
 
+    output = options.output
+
     options.testrunner_defaults = defaults
     options.resume_layer = resume_layer
     options.resume_number = resume_number
@@ -288,9 +605,9 @@
     if (options.profile
         and sys.version_info[:3] <= (2,4,1)
         and __debug__):
-        print ('Because of a bug in Python < 2.4.1, profiling '
-               'during tests requires the -O option be passed to '
-               'Python (not the test runner).')
+        output.error('Because of a bug in Python < 2.4.1, profiling '
+                     'during tests requires the -O option be passed to '
+                     'Python (not the test runner).')
         sys.exit()
 
     if options.coverage:
@@ -334,7 +651,7 @@
     if options.profile and not options.resume_layer:
         stats = profiler.loadStats(prof_glob)
         stats.sort_stats('cumulative', 'calls')
-        stats.print_stats(50)
+        output.profiler_stats(stats)
 
     if tracer:
         coverdir = os.path.join(os.getcwd(), options.coverage)
@@ -362,26 +679,29 @@
     global _layer_name_cache
     _layer_name_cache = {} # Reset to enforce test isolation
 
+    output = options.output
+
     if options.resume_layer:
         original_stderr = sys.stderr
         sys.stderr = sys.stdout
     elif options.verbose:
         if options.all:
-            print "Running tests at all levels"
+            msg = "Running tests at all levels"
         else:
-            print "Running tests at level %d" % options.at_level
+            msg = "Running tests at level %d" % options.at_level
+        output.info(msg)
 
 
     old_threshold = gc.get_threshold()
     if options.gc:
         if len(options.gc) > 3:
-            print "Too many --gc options"
+            output.error("Too many --gc options")
             sys.exit(1)
         if options.gc[0]:
-            print ("Cyclic garbage collection threshold set to: %s" %
-                   `tuple(options.gc)`)
+            output.info("Cyclic garbage collection threshold set to: %s" %
+                        repr(tuple(options.gc)))
         else:
-            print "Cyclic garbage collection is disabled."
+            output.info("Cyclic garbage collection is disabled.")
 
         gc.set_threshold(*options.gc)
 
@@ -398,12 +718,12 @@
         reporting_flags = doctest.REPORT_NDIFF
     if options.udiff:
         if reporting_flags:
-            print "Can only give one of --ndiff, --udiff, or --cdiff"
+            output.error("Can only give one of --ndiff, --udiff, or --cdiff")
             sys.exit(1)
         reporting_flags = doctest.REPORT_UDIFF
     if options.cdiff:
         if reporting_flags:
-            print "Can only give one of --ndiff, --udiff, or --cdiff"
+            output.error("Can only give one of --ndiff, --udiff, or --cdiff")
             sys.exit(1)
         reporting_flags = doctest.REPORT_CDIFF
     if options.report_only_first_failure:
@@ -430,11 +750,7 @@
     nlayers = 0
     import_errors = tests_by_layer_name.pop(None, None)
 
-    if import_errors:
-        print "Test-module import failures:"
-        for error in import_errors:
-            print_traceback("Module: %s\n" % error.module, error.exc_info),
-        print
+    output.import_errors(import_errors)
 
     if 'unit' in tests_by_layer_name:
         tests = tests_by_layer_name.pop('unit')
@@ -450,9 +766,9 @@
 
             if should_run:
                 if options.list_tests:
-                    list_tests(options, tests, 'unit')
+                    output.list_of_tests(tests, 'unit')
                 else:
-                    print "Running unit tests:"
+                    output.info("Running unit tests:")
                     nlayers += 1
                     ran += run_tests(options, tests, 'unit', failures, errors)
 
@@ -475,7 +791,7 @@
 
     if options.list_tests:
         for layer_name, layer, tests in layers_to_run:
-            list_tests(options, tests, layer_name)
+            output.list_of_tests(tests, layer_name)
         return True
 
     for layer_name, layer, tests in layers_to_run:
@@ -492,11 +808,12 @@
 
     if setup_layers:
         if options.resume_layer == None:
-            print "Tearing down left over layers:"
-        tear_down_unneeded((), setup_layers, True)
+            output.info("Tearing down left over layers:")
+        tear_down_unneeded(options, (), setup_layers, True)
 
     if options.resume_layer:
         sys.stdout.close()
+        # Communicate with the parent.  The protocol is obvious:
         print >> original_stderr, ran, len(failures), len(errors)
         for test, exc_info in failures:
             print >> original_stderr, ' '.join(str(test).strip().split('\n'))
@@ -505,27 +822,13 @@
 
     else:
         if options.verbose > 1:
-            if errors:
-                print
-                print "Tests with errors:"
-                for test, exc_info in errors:
-                    print "  ", test
+            output.tests_with_errors(errors)
+            output.tests_with_failures(failures)
 
-            if failures:
-                print
-                print "Tests with failures:"
-                for test, exc_info in failures:
-                    print "  ", test
-
         if nlayers != 1:
-            print "Total: %s tests, %s failures, %s errors" % (
-                ran, len(failures), len(errors))
+            output.totals(ran, len(failures), len(errors))
 
-        if import_errors:
-            print
-            print "Test-modules with import problems:"
-            for test in import_errors:
-                print "  " + test.module
+        output.modules_with_import_problems(import_errors)
 
     doctest.set_unittest_reportflags(old_reporting_flags)
 
@@ -537,16 +840,14 @@
 
     return not bool(import_errors or failures or errors)
 
-def list_tests(options, tests, layer_name):
-    print "Listing %s tests:" % layer_name
-    for test in tests:
-        print ' ', test
 
 def run_tests(options, tests, name, failures, errors):
     repeat = options.repeat or 1
     repeat_range = iter(range(repeat))
     ran = 0
 
+    output = options.output
+
     gc.collect()
     lgarbage = len(gc.garbage)
 
@@ -556,12 +857,12 @@
             track = TrackRefs()
         rc = sys.gettotalrefcount()
 
-    for i in repeat_range:
+    for iteration in repeat_range:
         if repeat > 1:
-            print "Iteration", i+1
+            output.info("Iteration %d" % (iteration + 1))
 
         if options.verbose > 0 or options.progress:
-            print '  Running:'
+            output.info('  Running:')
         result = TestResult(options, tests, layer_name=name)
 
         t = time.time()
@@ -601,22 +902,15 @@
                 test.__dict__.update(state)
 
         t = time.time() - t
-        if options.verbose == 1 or options.progress:
-            result.stopTests()
-            print
+        output.stop_tests()
         failures.extend(result.failures)
         errors.extend(result.errors)
-        print (
-            "  Ran %s tests with %s failures and %s errors in %.3f seconds." %
-            (result.testsRun, len(result.failures), len(result.errors), t)
-            )
+        output.summary(result.testsRun, len(result.failures), len(result.errors), t)
         ran = result.testsRun
 
         gc.collect()
         if len(gc.garbage) > lgarbage:
-            print ("Tests generated new (%d) garbage:"
-                   % (len(gc.garbage)-lgarbage))
-            print gc.garbage[lgarbage:]
+            output.garbage(gc.garbage[lgarbage:])
             lgarbage = len(gc.garbage)
 
         if options.report_refcounts:
@@ -632,38 +926,34 @@
             rc = sys.gettotalrefcount()
             if options.verbose:
                 track.update()
-                if i:
-                    print (" "
-                           " sum detail refcount=%-8d"
-                           " sys refcount=%-8d"
-                           " change=%-6d"
-                           % (track.n, rc, rc - prev))
-                    if options.verbose:
-                        track.output()
+                if iteration > 0:
+                    output.detailed_refcounts(track, rc, prev)
                 else:
                     track.delta = None
-            elif i:
-                print "  sys refcount=%-8d change=%-6d" % (rc, rc - prev)
+            elif iteration > 0:
+                output.refcounts(rc, prev)
 
     return ran
 
 def run_layer(options, layer_name, layer, tests, setup_layers,
               failures, errors):
 
+    output = options.output
     gathered = []
     gather_layers(layer, gathered)
     needed = dict([(l, 1) for l in gathered])
     if options.resume_number != 0:
-        print "Running %s tests:" % layer_name
-    tear_down_unneeded(needed, setup_layers)
+        output.info("Running %s tests:" % layer_name)
+    tear_down_unneeded(options, needed, setup_layers)
 
     if options.resume_layer != None:
-        print "  Running in a subprocess."
+        output.info( "  Running in a subprocess.")
 
-    setup_layer(layer, setup_layers)
+    setup_layer(options, layer, setup_layers)
     return run_tests(options, tests, layer_name, failures, errors)
 
 def resume_tests(options, layer_name, layers, failures, errors):
+    output = options.output
     layers = [l for (l, _, _) in layers]
     layers = layers[layers.index(layer_name):]
     rantotal = 0
@@ -695,7 +985,7 @@
             for l in subout:
                 sys.stdout.write(l)
         except IOError:
-            print "Error reading subprocess output for", layer_name
+            output.error("Error reading subprocess output for %s" % layer_name)
 
         line = suberr.readline()
         try:
@@ -724,37 +1014,39 @@
 class CanNotTearDown(Exception):
     "Couldn't tear down a test"
 
-def tear_down_unneeded(needed, setup_layers, optional=False):
+def tear_down_unneeded(options, needed, setup_layers, optional=False):
     # Tear down any layers not needed for these tests. The unneeded
     # layers might interfere.
     unneeded = [l for l in setup_layers if l not in needed]
     unneeded = order_by_bases(unneeded)
     unneeded.reverse()
+    output = options.output
     for l in unneeded:
-        print "  Tear down %s" % name_from_layer(l),
+        output.start_tear_down(name_from_layer(l))
         t = time.time()
         try:
             if hasattr(l, 'tearDown'):
                 l.tearDown()
         except NotImplementedError:
-            print "... not supported"
+            output.tear_down_not_supported()
             if not optional:
                 raise CanNotTearDown(l)
         else:
-            print "in %.3f seconds." % (time.time() - t)
+            output.stop_tear_down(time.time() - t)
         del setup_layers[l]
 
-def setup_layer(layer, setup_layers):
+def setup_layer(options, layer, setup_layers):
     assert layer is not object
+    output = options.output
     if layer not in setup_layers:
         for base in layer.__bases__:
             if base is not object:
-                setup_layer(base, setup_layers)
-        print "  Set up %s" % name_from_layer(layer),
+                setup_layer(options, base, setup_layers)
+        output.start_set_up(name_from_layer(layer))
         t = time.time()
         if hasattr(layer, 'setUp'):
             layer.setUp()
-        print "in %.3f seconds." % (time.time() - t)
+        output.stop_set_up(time.time() - t)
         setup_layers[layer] = 1
 
 def dependencies(bases, result):
@@ -764,8 +1056,6 @@
 
 class TestResult(unittest.TestResult):
 
-    max_width = 80
-
     def __init__(self, options, tests, layer_name=None):
         unittest.TestResult.__init__(self)
         self.options = options
@@ -777,48 +1067,11 @@
             self.layers = order_by_bases(layers)
         else:
             self.layers = []
-        if options.progress:
-            count = 0
-            for test in tests:
-                count += test.countTestCases()
-            self.count = count
-        self.last_width = 0
+        count = 0
+        for test in tests:
+            count += test.countTestCases()
+        self.count = count
 
-        if options.progress:
-            try:
-                # Note that doing this every time is more test friendly.
-                import curses
-            except ImportError:
-                # avoid reimporting a broken module in python 2.3
-                sys.modules['curses'] = None
-            else:
-                try:
-                    curses.setupterm()
-                except TypeError:
-                    pass
-                else:
-                    self.max_width = curses.tigetnum('cols')
-
-    def getShortDescription(self, test, room):
-        room -= 1
-        s = str(test)
-        if len(s) > room:
-            pos = s.find(" (")
-            if pos >= 0:
-                w = room - (pos + 5)
-                if w < 1:
-                    # first portion (test method name) is too long
-                    s = s[:room-3] + "..."
-                else:
-                    pre = s[:pos+2]
-                    post = s[-w:]
-                    s = "%s...%s" % (pre, post)
-            else:
-                w = room - 4
-                s = '... ' + s[-w:]
-
-        return ' ' + s[:room]
-
     def testSetUp(self):
         """A layer may define a setup method to be called before each
         individual test.
@@ -842,103 +1095,50 @@
     def startTest(self, test):
         self.testSetUp()
         unittest.TestResult.startTest(self, test)
-        testsRun = self.testsRun - 1
+        testsRun = self.testsRun - 1 # subtract the one the base class added
         count = test.countTestCases()
         self.testsRun = testsRun + count
-        options = self.options
-        self.test_width = 0
 
-        if options.progress:
-            if self.last_width:
-                sys.stdout.write('\r' + (' ' * self.last_width) + '\r')
+        self.options.output.start_test(test, self.testsRun, self.count)
 
-            s = "    %d/%d (%.1f%%)" % (
-                self.testsRun, self.count,
-                (self.testsRun) * 100.0 / self.count
-                )
-            sys.stdout.write(s)
-            self.test_width += len(s)
-            if options.verbose == 1:
-                room = self.max_width - self.test_width - 1
-                s = self.getShortDescription(test, room)
-                sys.stdout.write(s)
-                self.test_width += len(s)
-
-        elif options.verbose == 1:
-            for i in range(count):
-                sys.stdout.write('.')
-                testsRun += 1
-
-        if options.verbose > 1:
-            s = str(test)
-            sys.stdout.write(' ')
-            sys.stdout.write(s)
-            self.test_width += len(s) + 1
-
-        sys.stdout.flush()
-
         self._threads = threading.enumerate()
         self._start_time = time.time()
 
     def addSuccess(self, test):
-        if self.options.verbose > 2:
-            t = max(time.time() - self._start_time, 0.0)
-            s = " (%.3f s)" % t
-            sys.stdout.write(s)
-            self.test_width += len(s) + 1
+        t = max(time.time() - self._start_time, 0.0)
+        self.options.output.test_success(test, t)
 
     def addError(self, test, exc_info):
-        if self.options.verbose > 2:
-            print " (%.3f s)" % (time.time() - self._start_time)
+        self.options.output.test_error(test, time.time() - self._start_time,
+                                       exc_info)
 
         unittest.TestResult.addError(self, test, exc_info)
-        print
-        self._print_traceback("Error in test %s" % test, exc_info)
 
         if self.options.post_mortem:
             if self.options.resume_layer:
-                print
-                print '*'*70
-                print ("Can't post-mortem debug when running a layer"
-                       " as a subprocess!")
-                print '*'*70
-                print
+                self.options.output.error_with_banner("Can't post-mortem debug"
+                                                      " when running a layer"
+                                                      " as a subprocess!")
             else:
                 post_mortem(exc_info)
 
-        self.test_width = self.last_width = 0
-
     def addFailure(self, test, exc_info):
+        self.options.output.test_failure(test, time.time() - self._start_time,
+                                         exc_info)
 
-
-        if self.options.verbose > 2:
-            print " (%.3f s)" % (time.time() - self._start_time)
-
         unittest.TestResult.addFailure(self, test, exc_info)
-        print
-        self._print_traceback("Failure in test %s" % test, exc_info)
 
         if self.options.post_mortem:
+            # XXX: mgedmin: why isn't there a resume_layer check here like
+            # in addError?
             post_mortem(exc_info)
 
-        self.test_width = self.last_width = 0
-
-
-    def stopTests(self):
-        if self.options.progress and self.last_width:
-            sys.stdout.write('\r' + (' ' * self.last_width) + '\r')
-
     def stopTest(self, test):
         self.testTearDown()
-        if self.options.progress:
-            self.last_width = self.test_width
-        elif self.options.verbose > 1:
-            print
+        self.options.output.stop_test(test)
 
         if gc.garbage:
-            print "The following test left garbage:"
-            print test
-            print gc.garbage
+            self.options.output.test_garbage(test, gc.garbage)
             # TODO: Perhaps eat the garbage here, so that the garbage isn't
             #       printed for every subsequent test.
 
@@ -948,26 +1148,9 @@
                              and
                              t not in self._threads)]
         if new_threads:
-            print "The following test left new threads behind:"
-            print test
-            print "New thread(s):", new_threads
+            self.options.output.test_threads(test, new_threads)
 
-        sys.stdout.flush()
 
-
-    def _print_traceback(self, msg, exc_info):
-        print_traceback(msg, exc_info)
-
-doctest_template = """
-File "%s", line %s, in %s
-
-%s
-Want:
-%s
-Got:
-%s
-"""
-
 class FakeInputContinueGenerator:
 
     def readline(self):
@@ -980,27 +1163,6 @@
         return 'c\n'
 
 
-def print_traceback(msg, exc_info):
-    print
-    print msg
-
-    v = exc_info[1]
-    if isinstance(v, doctest.DocTestFailureException):
-        tb = v.args[0]
-    elif isinstance(v, doctest.DocTestFailure):
-        tb = doctest_template % (
-            v.test.filename,
-            v.test.lineno + v.example.lineno + 1,
-            v.test.name,
-            v.example.source,
-            v.example.want,
-            v.got,
-            )
-    else:
-        tb = "".join(traceback.format_exception(*exc_info))
-
-    print tb
-
 def post_mortem(exc_info):
     err = exc_info[1]
     if isinstance(err, (doctest.UnexpectedException, doctest.DocTestFailure)):
@@ -1304,7 +1466,8 @@
             for file in files:
                 if file[-4:] in compiled_sufixes and file[:-1] not in files:
                     fullname = os.path.join(dirname, file)
-                    print "Removing stale bytecode file", fullname
+                    options.output.info("Removing stale bytecode file %s"
+                                        % fullname)
                     os.unlink(fullname)
 
 
@@ -1868,6 +2031,8 @@
     merge_options(options, defaults)
     options.original_testrunner_args = original_testrunner_args
 
+    options.output = OutputFormatter(options)
+
     options.fail = False
 
     if positional:



More information about the Checkins mailing list