[Zope3-checkins] SVN: zope.testing/branches/ctheune-cleanup/src/zope/testing/testrunner/runner.py move reporting to its own method

Christian Theune ct at gocept.com
Sun May 4 06:08:27 EDT 2008


Log message for revision 86322:
  move reporting to its own method
  

Changed:
  U   zope.testing/branches/ctheune-cleanup/src/zope/testing/testrunner/runner.py

-=-
Modified: zope.testing/branches/ctheune-cleanup/src/zope/testing/testrunner/runner.py
===================================================================
--- zope.testing/branches/ctheune-cleanup/src/zope/testing/testrunner/runner.py	2008-05-04 09:13:00 UTC (rev 86321)
+++ zope.testing/branches/ctheune-cleanup/src/zope/testing/testrunner/runner.py	2008-05-04 10:08:27 UTC (rev 86322)
@@ -16,6 +16,7 @@
 $Id: __init__.py 86232 2008-05-03 15:09:33Z ctheune $
 """
 
+import re
 import cStringIO
 import gc
 import glob
@@ -40,13 +41,19 @@
 
 real_pdb_set_trace = pdb.set_trace
 
+PYREFCOUNT_PATTERN = re.compile('\[[0-9]+ refs\]')
 
+
 class SubprocessError(Exception):
     """An error occurred when running a subprocess
     """
 
+    def __init__(self, reason, stderr):
+        self.reason = reason
+        self.stderr = stderr
+
     def __str__(self):
-        return self.args
+        return '%s: %s' % (self.reason, self.stderr)
 
 
 class CanNotTearDown(Exception):
@@ -81,6 +88,13 @@
         self.late_initializers = []
         self.early_shutdown = []
 
+        self.ran = 0
+        self.failures = []
+        self.errors = []
+        self.nlayers = 0
+
+        self.show_report = True
+
     def run(self):
         self.configure()
         if self.options.fail:
@@ -102,6 +116,9 @@
                 shutdown()
             self.shutdown_features()
 
+        if self.show_report:
+            self.report()
+
     def configure(self):
         if self.args is None:
             self.args = sys.argv[:]
@@ -222,6 +239,14 @@
         else:
             doctest.set_unittest_reportflags(self.old_reporting_flags)
 
+        # Set up time measurement
+        def start_time_recording():
+            self.start_time = time.time()
+        def stop_time_recording():
+            self.total_time = time.time() - self.start_time
+        self.late_initializers.append(start_time_recording)
+        self.early_shutdown.append(stop_time_recording)
+
     def find_tests(self):
         pass
 
@@ -243,7 +268,7 @@
         output = self.options.output
 
         if self.options.resume_layer:
-            original_stderr = sys.stderr
+            self.original_stderr = sys.stderr
             sys.stderr = sys.stdout
         elif self.options.verbose:
             if self.options.all:
@@ -259,13 +284,9 @@
 
         tests_by_layer_name = find_tests(self.options, found_suites)
 
-        ran = 0
-        failures = []
-        errors = []
-        nlayers = 0
-        import_errors = tests_by_layer_name.pop(None, None)
+        self.import_errors = tests_by_layer_name.pop(None, None)
 
-        output.import_errors(import_errors)
+        output.import_errors(self.import_errors)
 
         if 'unit' in tests_by_layer_name:
             tests = tests_by_layer_name.pop('unit')
@@ -284,9 +305,10 @@
                         output.list_of_tests(tests, 'unit')
                     else:
                         output.info("Running unit tests:")
-                        nlayers += 1
+                        self.nlayers += 1
                         try:
-                            ran += run_tests(self.options, tests, 'unit', failures, errors)
+                            self.ran += run_tests(self.options, tests, 'unit',
+                                                  self.failures, self.errors)
                         except EndRun:
                             self.failed = True
                             return
@@ -307,28 +329,26 @@
                 if filter(None, [pat(layer_name) for pat in self.options.layer])
             ]
 
-
         if self.options.list_tests:
             for layer_name, layer, tests in layers_to_run:
                 output.list_of_tests(tests, layer_name)
             self.failed = False
+            self.show_report = False
             return
 
-        start_time = time.time()
-
         for layer_name, layer, tests in layers_to_run:
-            nlayers += 1
+            self.nlayers += 1
             try:
-                ran += run_layer(self.options, layer_name, layer, tests,
-                                 setup_layers, failures, errors)
+                self.ran += run_layer(self.options, layer_name, layer, tests,
+                                      setup_layers, self.failures, self.errors)
             except EndRun:
                 self.failed = True
                 return
             except CanNotTearDown:
                 setup_layers = None
                 if not self.options.resume_layer:
-                    ran += resume_tests(self.options, layer_name, layers_to_run,
-                                        failures, errors)
+                    self.ran += resume_tests(self.options, layer_name, layers_to_run,
+                                             self.failures, self.errors)
                     break
 
         if setup_layers:
@@ -336,29 +356,8 @@
                 output.info("Tearing down left over layers:")
             tear_down_unneeded(self.options, (), setup_layers, True)
 
-        total_time = time.time() - start_time
+        self.failed = bool(self.import_errors or self.failures or self.errors)
 
-        if self.options.resume_layer:
-            sys.stdout.close()
-            # Communicate with the parent.  The protocol is obvious:
-            print >> original_stderr, ran, len(failures), len(errors)
-            for test, exc_info in failures:
-                print >> original_stderr, ' '.join(str(test).strip().split('\n'))
-            for test, exc_info in errors:
-                print >> original_stderr, ' '.join(str(test).strip().split('\n'))
-
-        else:
-            if self.options.verbose:
-                output.tests_with_errors(errors)
-                output.tests_with_failures(failures)
-
-            if nlayers != 1:
-                output.totals(ran, len(failures), len(errors), total_time)
-
-            output.modules_with_import_problems(import_errors)
-
-        self.failed = bool(import_errors or failures or errors)
-
     def shutdown_features(self):
         doctest.set_unittest_reportflags(self.old_reporting_flags)
 
@@ -375,19 +374,45 @@
             # attempt to unlink a still-open file.
             os.close(self.oshandle)
             if not self.options.resume_layer:
-                stats = self.profiler.loadStats(self.prof_glob)
-                stats.sort_stats('cumulative', 'calls')
-                self.options.output.profiler_stats(stats)
+                self.profiler_stats = self.profiler.loadStats(self.prof_glob)
+                self.profiler_stats.sort_stats('cumulative', 'calls')
 
         if self.tracer:
             self.tracer.stop()
+
+        doctest.set_unittest_reportflags(self.old_reporting_flags)
+
+    def report(self):
+        if self.options.resume_layer:
+            sys.stdout.close()
+            # Communicate with the parent.  The protocol is obvious:
+            print >> self.original_stderr, self.ran, len(self.failures), len(self.errors)
+            for test, exc_info in self.failures:
+                print >> self.original_stderr, ' '.join(str(test).strip().split('\n'))
+            for test, exc_info in self.errors:
+                print >> self.original_stderr, ' '.join(str(test).strip().split('\n'))
+
+        else:
+            if self.options.verbose:
+                self.options.output.tests_with_errors(self.errors)
+                self.options.output.tests_with_failures(self.failures)
+
+            if self.nlayers != 1:
+                self.options.output.totals(self.ran, len(self.failures),
+                                           len(self.errors), self.total_time)
+
+            self.options.output.modules_with_import_problems(
+                self.import_errors)
+
+        if self.options.profile and not self.options.resume_layer:
+            self.options.output.profiler_stats(self.profiler_stats)
+
+        if self.tracer:
             coverdir = os.path.join(os.getcwd(), self.options.coverage)
             r = self.tracer.results()
             r.write_results(summary=True, coverdir=coverdir)
 
-        doctest.set_unittest_reportflags(self.old_reporting_flags)
 
-
 def run_tests(options, tests, name, failures, errors):
     repeat = options.repeat or 1
     repeat_range = iter(range(repeat))
@@ -566,7 +591,8 @@
         except KeyboardInterrupt:
             raise
         except:
-            raise SubprocessError(line+suberr.read())
+            raise SubprocessError(
+                'No subprocess summary found', line+suberr.read())
 
         while nfail > 0:
             nfail -= 1



More information about the Zope3-Checkins mailing list