[Checkins] SVN: zope.testing/branches/output-refactoring/src/zope/testing/testrunner.py Extract (about half of) output operations into a single class.

Marius Gedminas marius at pov.lt
Fri Jul 13 06:06:09 EDT 2007


Log message for revision 77826:
  Extract (about half of) output operations into a single class.
  
  The goal is to have the output logic in a single place so it's easier to modify
  (e.g. add colours if the user asks for them), or replace it with alternative
  output formats (e.g., HTML or GUI).
  
  

Changed:
  U   zope.testing/branches/output-refactoring/src/zope/testing/testrunner.py

-=-
Modified: zope.testing/branches/output-refactoring/src/zope/testing/testrunner.py
===================================================================
--- zope.testing/branches/output-refactoring/src/zope/testing/testrunner.py	2007-07-13 10:04:33 UTC (rev 77825)
+++ zope.testing/branches/output-refactoring/src/zope/testing/testrunner.py	2007-07-13 10:06:09 UTC (rev 77826)
@@ -246,6 +246,83 @@
             return "__init__.pyo" in fnamelist
     return False
 
+
+class OutputFormatter(object):
+    """Test runner output formatter."""
+
+    # Implementation note: be careful about printing stuff to sys.stderr.
+    # It is used for interprocess communication between the parent and the
+    # child test runner, when you run some test layers in a subprocess.
+    # resume_layer() reasigns sys.stderr for this reason, but be careful
+    # and don't store the original one in __init__ or something.
+
+    def info(self, message):
+        """Print an informative message."""
+        print message
+
+    def error(self, message):
+        """Report an error."""
+        print message
+
+    def profiler_stats(self, stats):
+        """Report profiler stats."""
+        stats.print_stats(50)
+
+    def import_errors(self, import_errors):
+        """Report test-module import errors (if any)."""
+        if import_errors:
+            print "Test-module import failures:"
+            for error in import_errors:
+                print_traceback("Module: %s\n" % error.module, error.exc_info),
+            print
+
+    def tests_with_errors(self, errors):
+        """Report names of tests with errors (if any)."""
+        if errors:
+            print
+            print "Tests with errors:"
+            for test, exc_info in errors:
+                print "  ", test
+
+    def tests_with_failures(self, failures):
+        """Report names of tests with failures (if any)."""
+        if failures:
+            print
+            print "Tests with failures:"
+            for test, exc_info in failures:
+                print "  ", test
+
+    def modules_with_import_problems(self, import_errors):
+        """Report names of modules with import problems (if any)."""
+        if import_errors:
+            print
+            print "Test-modules with import problems:"
+            for test in import_errors:
+                print "  " + test.module
+
+    def totals(self, n_tests, n_failures, n_errors):
+        """Report totals (number of tests, failures, and errors)."""
+        print "Total: %s tests, %s failures, %s errors" % (
+                        n_tests, n_failures, n_errors)
+
+    def summary(self, n_tests, n_failures, n_errors, n_seconds):
+        """Summarize the results."""
+        print ("  Ran %s tests with %s failures and %s errors in %.3f seconds."
+               % (n_tests, n_failures, n_errors, n_seconds))
+
+    def list_of_tests(self, tests, layer_name):
+        """Report a list of test names."""
+        print "Listing %s tests:" % layer_name
+        for test in tests:
+            print ' ', test
+
+    def garbage(self, garbage):
+        """Report garbage generated by tests."""
+        if garbage:
+            print "Tests generated new (%d) garbage:" % len(garbage)
+            print garbage
+
+
 def run(defaults=None, args=None):
     if args is None:
         args = sys.argv
@@ -277,6 +354,8 @@
     if options.fail:
         return True
 
+    output = options.output
+
     options.testrunner_defaults = defaults
     options.resume_layer = resume_layer
     options.resume_number = resume_number
@@ -288,9 +367,9 @@
     if (options.profile
         and sys.version_info[:3] <= (2,4,1)
         and __debug__):
-        print ('Because of a bug in Python < 2.4.1, profiling '
-               'during tests requires the -O option be passed to '
-               'Python (not the test runner).')
+        output.error('Because of a bug in Python < 2.4.1, profiling '
+                     'during tests requires the -O option be passed to '
+                     'Python (not the test runner).')
         sys.exit()
 
     if options.coverage:
@@ -334,7 +413,7 @@
     if options.profile and not options.resume_layer:
         stats = profiler.loadStats(prof_glob)
         stats.sort_stats('cumulative', 'calls')
-        stats.print_stats(50)
+        output.profiler_stats(stats)
 
     if tracer:
         coverdir = os.path.join(os.getcwd(), options.coverage)
@@ -362,26 +441,29 @@
     global _layer_name_cache
     _layer_name_cache = {} # Reset to enforce test isolation
 
+    output = options.output
+
     if options.resume_layer:
         original_stderr = sys.stderr
         sys.stderr = sys.stdout
     elif options.verbose:
         if options.all:
-            print "Running tests at all levels"
+            msg = "Running tests at all levels"
         else:
-            print "Running tests at level %d" % options.at_level
+            msg = "Running tests at level %d" % options.at_level
+        output.info(msg)
 
 
     old_threshold = gc.get_threshold()
     if options.gc:
         if len(options.gc) > 3:
-            print "Too many --gc options"
+            output.error("Too many --gc options")
             sys.exit(1)
         if options.gc[0]:
-            print ("Cyclic garbage collection threshold set to: %s" %
-                   `tuple(options.gc)`)
+            output.info("Cyclic garbage collection threshold set to: %s" %
+                        repr(tuple(options.gc)))
         else:
-            print "Cyclic garbage collection is disabled."
+            output.info("Cyclic garbage collection is disabled.")
 
         gc.set_threshold(*options.gc)
 
@@ -398,12 +480,12 @@
         reporting_flags = doctest.REPORT_NDIFF
     if options.udiff:
         if reporting_flags:
-            print "Can only give one of --ndiff, --udiff, or --cdiff"
+            output.error("Can only give one of --ndiff, --udiff, or --cdiff")
             sys.exit(1)
         reporting_flags = doctest.REPORT_UDIFF
     if options.cdiff:
         if reporting_flags:
-            print "Can only give one of --ndiff, --udiff, or --cdiff"
+            output.error("Can only give one of --ndiff, --udiff, or --cdiff")
             sys.exit(1)
         reporting_flags = doctest.REPORT_CDIFF
     if options.report_only_first_failure:
@@ -430,11 +512,7 @@
     nlayers = 0
     import_errors = tests_by_layer_name.pop(None, None)
 
-    if import_errors:
-        print "Test-module import failures:"
-        for error in import_errors:
-            print_traceback("Module: %s\n" % error.module, error.exc_info),
-        print
+    output.import_errors(import_errors)
 
     if 'unit' in tests_by_layer_name:
         tests = tests_by_layer_name.pop('unit')
@@ -452,7 +530,7 @@
                 if options.list_tests:
                     list_tests(options, tests, 'unit')
                 else:
-                    print "Running unit tests:"
+                    output.info("Running unit tests:")
                     nlayers += 1
                     ran += run_tests(options, tests, 'unit', failures, errors)
 
@@ -492,11 +570,12 @@
 
     if setup_layers:
         if options.resume_layer == None:
-            print "Tearing down left over layers:"
+            output.info("Tearing down left over layers:")
         tear_down_unneeded((), setup_layers, True)
 
     if options.resume_layer:
         sys.stdout.close()
+        # Communicate with the parent.  The protocol is obvious:
         print >> original_stderr, ran, len(failures), len(errors)
         for test, exc_info in failures:
             print >> original_stderr, ' '.join(str(test).strip().split('\n'))
@@ -505,27 +584,13 @@
 
     else:
         if options.verbose > 1:
-            if errors:
-                print
-                print "Tests with errors:"
-                for test, exc_info in errors:
-                    print "  ", test
+            output.tests_with_errors(errors)
+            output.tests_with_failures(failures)
 
-            if failures:
-                print
-                print "Tests with failures:"
-                for test, exc_info in failures:
-                    print "  ", test
-
         if nlayers != 1:
-            print "Total: %s tests, %s failures, %s errors" % (
-                ran, len(failures), len(errors))
+            output.totals(ran, len(failures), len(errors))
 
-        if import_errors:
-            print
-            print "Test-modules with import problems:"
-            for test in import_errors:
-                print "  " + test.module
+        output.modules_with_import_problems(import_errors)
 
     doctest.set_unittest_reportflags(old_reporting_flags)
 
@@ -538,15 +603,16 @@
     return not bool(import_errors or failures or errors)
 
 def list_tests(options, tests, layer_name):
-    print "Listing %s tests:" % layer_name
-    for test in tests:
-        print ' ', test
+    # TODO: inline this method
+    options.output.list_of_tests(tests, layer_name)
 
 def run_tests(options, tests, name, failures, errors):
     repeat = options.repeat or 1
     repeat_range = iter(range(repeat))
     ran = 0
 
+    output = options.output
+
     gc.collect()
     lgarbage = len(gc.garbage)
 
@@ -558,10 +624,10 @@
 
     for i in repeat_range:
         if repeat > 1:
-            print "Iteration", i+1
+            output.info("Iteration %d" % (i+1))
 
         if options.verbose > 0 or options.progress:
-            print '  Running:'
+            output.info('  Running:')
         result = TestResult(options, tests, layer_name=name)
 
         t = time.time()
@@ -603,20 +669,16 @@
         t = time.time() - t
         if options.verbose == 1 or options.progress:
             result.stopTests()
+            # XXX: figure out how to move this print into the OutputFormatter
             print
         failures.extend(result.failures)
         errors.extend(result.errors)
-        print (
-            "  Ran %s tests with %s failures and %s errors in %.3f seconds." %
-            (result.testsRun, len(result.failures), len(result.errors), t)
-            )
+        output.summary(result.testsRun, len(result.failures), len(result.errors), t)
         ran = result.testsRun
 
         gc.collect()
         if len(gc.garbage) > lgarbage:
-            print ("Tests generated new (%d) garbage:"
-                   % (len(gc.garbage)-lgarbage))
-            print gc.garbage[lgarbage:]
+            output.garbage(gc.garbage[lgarbage:])
             lgarbage = len(gc.garbage)
 
         if options.report_refcounts:
@@ -630,6 +692,7 @@
 
             prev = rc
             rc = sys.gettotalrefcount()
+            # TODO: move the output into OutputFormatter
             if options.verbose:
                 track.update()
                 if i:
@@ -650,20 +713,22 @@
 def run_layer(options, layer_name, layer, tests, setup_layers,
               failures, errors):
 
+    output = options.output
     gathered = []
     gather_layers(layer, gathered)
     needed = dict([(l, 1) for l in gathered])
     if options.resume_number != 0:
-        print "Running %s tests:" % layer_name
+        output.info("Running %s tests:" % layer_name)
     tear_down_unneeded(needed, setup_layers)
 
     if options.resume_layer != None:
-        print "  Running in a subprocess."
+        output.info( "  Running in a subprocess.")
 
     setup_layer(layer, setup_layers)
     return run_tests(options, tests, layer_name, failures, errors)
 
 def resume_tests(options, layer_name, layers, failures, errors):
+    output = options.output
     layers = [l for (l, _, _) in layers]
     layers = layers[layers.index(layer_name):]
     rantotal = 0
@@ -695,7 +760,7 @@
             for l in subout:
                 sys.stdout.write(l)
         except IOError:
-            print "Error reading subprocess output for", layer_name
+            output.error("Error reading subprocess output for %s" % layer_name)
 
         line = suberr.readline()
         try:
@@ -731,6 +796,7 @@
     unneeded = order_by_bases(unneeded)
     unneeded.reverse()
     for l in unneeded:
+        # TODO: figure out how to move this to OutputFormatter
         print "  Tear down %s" % name_from_layer(l),
         t = time.time()
         try:
@@ -750,6 +816,7 @@
         for base in layer.__bases__:
             if base is not object:
                 setup_layer(base, setup_layers)
+        # TODO: figure out how to move this to OutputFormatter
         print "  Set up %s" % name_from_layer(layer),
         t = time.time()
         if hasattr(layer, 'setUp'):
@@ -848,6 +915,7 @@
         options = self.options
         self.test_width = 0
 
+        # TODO: figure out how to move this to OutputFormatter
         if options.progress:
             if self.last_width:
                 sys.stdout.write('\r' + (' ' * self.last_width) + '\r')
@@ -881,6 +949,7 @@
         self._start_time = time.time()
 
     def addSuccess(self, test):
+        # TODO: figure out how to move this to OutputFormatter
         if self.options.verbose > 2:
             t = max(time.time() - self._start_time, 0.0)
             s = " (%.3f s)" % t
@@ -888,6 +957,7 @@
             self.test_width += len(s) + 1
 
     def addError(self, test, exc_info):
+        # TODO: figure out how to move this to OutputFormatter
         if self.options.verbose > 2:
             print " (%.3f s)" % (time.time() - self._start_time)
 
@@ -909,6 +979,7 @@
         self.test_width = self.last_width = 0
 
     def addFailure(self, test, exc_info):
+        # TODO: figure out how to move this to OutputFormatter
 
 
         if self.options.verbose > 2:
@@ -925,6 +996,7 @@
 
 
     def stopTests(self):
+        # TODO: figure out how to move this to OutputFormatter
         if self.options.progress and self.last_width:
             sys.stdout.write('\r' + (' ' * self.last_width) + '\r')
 
@@ -935,6 +1007,7 @@
         elif self.options.verbose > 1:
             print
 
+        # TODO: figure out how to move this to OutputFormatter
         if gc.garbage:
             print "The following test left garbage:"
             print test
@@ -956,6 +1029,7 @@
 
 
     def _print_traceback(self, msg, exc_info):
+        # TODO: figure out how to move this to OutputFormatter
         print_traceback(msg, exc_info)
 
 doctest_template = """
@@ -971,6 +1045,7 @@
 class FakeInputContinueGenerator:
 
     def readline(self):
+        # TODO: figure out how to move this to OutputFormatter
         print  'c\n'
         print '*'*70
         print ("Can't use pdb.set_trace when running a layer"
@@ -981,6 +1056,7 @@
 
 
 def print_traceback(msg, exc_info):
+    # TODO: figure out how to move this to OutputFormatter
     print
     print msg
 
@@ -1304,7 +1380,8 @@
             for file in files:
                 if file[-4:] in compiled_sufixes and file[:-1] not in files:
                     fullname = os.path.join(dirname, file)
-                    print "Removing stale bytecode file", fullname
+                    options.output.info("Removing stale bytecode file %s"
+                                        % fullname)
                     os.unlink(fullname)
 
 
@@ -1422,6 +1499,7 @@
 
 
     def output(self):
+        # TODO: figure out how to move this to OutputFormatter        
         printed = False
         s1 = s2 = 0
         for t, delta1, delta2 in self.delta:
@@ -1868,6 +1946,8 @@
     merge_options(options, defaults)
     options.original_testrunner_args = original_testrunner_args
 
+    options.output = OutputFormatter()
+
     options.fail = False
 
     if positional:



More information about the Checkins mailing list