[Checkins] SVN: zope.testing/trunk/ reduce batching and improve output for subprocesses

Gary Poster gary.poster at canonical.com
Mon Aug 10 16:57:09 EDT 2009


Log message for revision 102634:
  reduce batching and improve output for subprocesses

Changed:
  U   zope.testing/trunk/CHANGES.txt
  U   zope.testing/trunk/src/zope/testing/testrunner/filter.py
  U   zope.testing/trunk/src/zope/testing/testrunner/formatter.py
  U   zope.testing/trunk/src/zope/testing/testrunner/process.py
  U   zope.testing/trunk/src/zope/testing/testrunner/runner.py
  A   zope.testing/trunk/src/zope/testing/testrunner/testrunner-ex/sampletests_buffering.py
  A   zope.testing/trunk/src/zope/testing/testrunner/testrunner-layers-buff.txt
  U   zope.testing/trunk/src/zope/testing/testrunner/tests.py

-=-
Modified: zope.testing/trunk/CHANGES.txt
===================================================================
--- zope.testing/trunk/CHANGES.txt	2009-08-10 18:23:10 UTC (rev 102633)
+++ zope.testing/trunk/CHANGES.txt	2009-08-10 20:57:09 UTC (rev 102634)
@@ -4,13 +4,23 @@
 3.8.1 (unreleased)
 ==================
 
-- Avoid to hardcode sys.argv[0] as script; 
-  allow for instance Zope 2 `bin/instance test` (LP#407916).
+- Avoid hardcoding sys.argv[0] as script; 
+  allow, for instance, Zope 2's `bin/instance test` (LP#407916).
 
-- Produce a clear error message when subprocess doesn't follow the
+- Produce a clear error message when a subprocess doesn't follow the
   zope.testing.testrunner protocol (LP#407916).
 
+- Do not unnecessarily squelch verbose output in a subprocess when there are
+  not multiple subprocesses.
 
+- Do not unnecessarily batch subprocess output, which can stymie automated and
+  human processes for identifying hung tests.
+
+- Include incremental output when there are multiple subprocesses and a
+  verbosity of -vv or greater is requested.  This again is not batched,
+  supporting automated processes and humans looking for hung tests. 
+
+
 3.8.0 (2009-07-24)
 ==================
 

Modified: zope.testing/trunk/src/zope/testing/testrunner/filter.py
===================================================================
--- zope.testing/trunk/src/zope/testing/testrunner/filter.py	2009-08-10 18:23:10 UTC (rev 102633)
+++ zope.testing/trunk/src/zope/testing/testrunner/filter.py	2009-08-10 20:57:09 UTC (rev 102634)
@@ -65,7 +65,8 @@
                     # No pattern matched this name so we remove it
                     layers.pop(name)
 
-        if self.runner.options.verbose:
+        if (self.runner.options.verbose and
+            not self.runner.options.resume_layer):
             if self.runner.options.all:
                 msg = "Running tests at all levels"
             else:

Modified: zope.testing/trunk/src/zope/testing/testrunner/formatter.py
===================================================================
--- zope.testing/trunk/src/zope/testing/testrunner/formatter.py	2009-08-10 18:23:10 UTC (rev 102633)
+++ zope.testing/trunk/src/zope/testing/testrunner/formatter.py	2009-08-10 20:57:09 UTC (rev 102634)
@@ -52,6 +52,9 @@
 
     progress = property(lambda self: self.options.progress)
     verbose = property(lambda self: self.options.verbose)
+    in_subprocess = property(
+        lambda self: self.options.resume_layer is not None and
+                     self.options.processes > 1)
 
     def compute_max_width(self):
         """Try to determine the terminal width."""
@@ -263,6 +266,12 @@
 
         elif self.verbose == 1:
             sys.stdout.write('.' * test.countTestCases())
+        
+        elif self.in_subprocess:
+            sys.stdout.write('.' * test.countTestCases())
+            # Give the parent process a new line so it sees the progress
+            # in a timely manner.
+            sys.stdout.write('\n')
 
         if self.verbose > 1:
             s = str(test)

Modified: zope.testing/trunk/src/zope/testing/testrunner/process.py
===================================================================
--- zope.testing/trunk/src/zope/testing/testrunner/process.py	2009-08-10 18:23:10 UTC (rev 102633)
+++ zope.testing/trunk/src/zope/testing/testrunner/process.py	2009-08-10 20:57:09 UTC (rev 102634)
@@ -31,7 +31,13 @@
     def global_setup(self):
         self.original_stderr = sys.stderr
         sys.stderr = sys.stdout
-        self.runner.options.verbose = False
+        if self.runner.options.processes > 1:
+            # If we only have one subprocess, there's absolutely
+            # no reason to squelch.  We will let the messages through in a
+            # timely manner, if they have been requested. On the other hand, if
+            # there are multiple processes, we do squelch to 0.
+            self.runner.options.verbose = 0
+        self.progress = False
 
     def report(self):
         sys.stdout.close()

Modified: zope.testing/trunk/src/zope/testing/testrunner/runner.py
===================================================================
--- zope.testing/trunk/src/zope/testing/testrunner/runner.py	2009-08-10 18:23:10 UTC (rev 102633)
+++ zope.testing/trunk/src/zope/testing/testrunner/runner.py	2009-08-10 20:57:09 UTC (rev 102634)
@@ -20,6 +20,7 @@
 
 import cStringIO
 import gc
+import Queue
 import re
 import sys
 import threading
@@ -229,11 +230,12 @@
         if should_resume:
             setup_layers = None
             if layers_to_run:
-                self.ran += resume_tests(self.script_parts, self.options, self.features,
+                self.ran += resume_tests(
+                    self.script_parts, self.options, self.features,
                     layers_to_run, self.failures, self.errors)
 
         if setup_layers:
-            if self.options.resume_layer == None:
+            if self.options.resume_layer is None:
                 self.options.output.info("Tearing down left over layers:")
             tear_down_unneeded(self.options, (), setup_layers, True)
 
@@ -355,7 +357,7 @@
         output.info("Running %s tests:" % layer_name)
     tear_down_unneeded(options, needed, setup_layers)
 
-    if options.resume_layer != None:
+    if options.resume_layer is not None:
         output.info_suboptimal("  Running in a subprocess.")
 
     try:
@@ -378,8 +380,9 @@
         "Layer set up failure."
 
 
-def spawn_layer_in_subprocess(result, script_parts, options, features, layer_name, layer,
-        failures, errors, resume_number):
+def spawn_layer_in_subprocess(result, script_parts, options, features,
+                              layer_name, layer, failures, errors,
+                              resume_number):
     try:
         # BBB
         if script_parts is None:
@@ -408,9 +411,34 @@
         child = subprocess.Popen(args, shell=False, stdin=subprocess.PIPE,
             stdout=subprocess.PIPE, stderr=subprocess.PIPE,
             close_fds=not sys.platform.startswith('win'))
-        subout, suberr = child.communicate()
 
-        erriter = iter(suberr.splitlines())
+        while True:
+            try:
+                while True:
+                    # We use readline() instead of iterating over stdout
+                    # because it appears that iterating over stdout causes a
+                    # lot more buffering to take place (probably so it can
+                    # return its lines as a batch). We don't want too much
+                    # buffering because this foils automatic and human monitors
+                    # trying to verify that the subprocess is still alive.
+                    l = child.stdout.readline()
+                    if not l:
+                        break
+                    result.write(l)
+            except IOError, e:
+                if e.errno == errno.EINTR:
+                    # If the subprocess dies before we finish reading its
+                    # output, a SIGCHLD signal can interrupt the reading.
+                    # The correct thing to to in that case is to retry.
+                    continue
+                output.error(
+                    "Error reading subprocess output for %s" % layer_name)
+                output.info(str(e))
+            else:
+                break
+
+        # Now stderr should be ready to read the whole thing.
+        erriter = iter(child.stderr.read().splitlines())
         nfail = nerr = 0
         for line in erriter:
             try:
@@ -431,37 +459,88 @@
             nerr -= 1
             errors.append((erriter.next().strip(), None))
 
-        result.stdout.append(subout)
-
     finally:
         result.done = True
 
 
-class SubprocessResult(object):
+class AbstractSubprocessResult(object):
+    """A result of a subprocess layer run."""
 
-    def __init__(self):
-        self.num_ran = 0
+    num_ran = 0
+    done = False
+
+    def __init__(self, layer_name, queue):
+        self.layer_name = layer_name
+        self.queue = queue
         self.stdout = []
-        self.done = False
 
+    def write(self, out):
+        """Receive a line of the subprocess out."""
 
+
+class DeferredSubprocessResult(AbstractSubprocessResult):
+    """Keeps stdout around for later processing,"""
+    
+    def write(self, out):
+        if not _is_dots(out):
+            self.stdout.append(out)
+
+
+class ImmediateSubprocessResult(AbstractSubprocessResult):
+    """Sends complete output to queue."""
+
+    def write(self, out):
+        sys.stdout.write(out)
+        # Help keep-alive monitors (human or automated) keep up-to-date.
+        sys.stdout.flush()
+
+
+_is_dots = re.compile(r'\.+\n').match
+class KeepaliveSubprocessResult(AbstractSubprocessResult):
+    "Keeps stdout for later processing; sends marks to queue to show activity."
+
+    _done = False
+
+    def _set_done(self, value):
+        self._done = value
+        assert value, 'Internal error: unexpectedly setting done to False'
+        self.queue.put((self.layer_name, ' LAYER FINISHED'))
+    done = property(lambda self: self._done, _set_done) 
+
+    def write(self, out):
+        if _is_dots(out):
+            self.queue.put((self.layer_name, out.strip()))
+        else:
+            self.stdout.append(out)
+
+
 def resume_tests(script_parts, options, features, layers, failures, errors):
     results = []
+    stdout_queue = None
+    if options.processes == 1:
+        result_factory = ImmediateSubprocessResult
+    elif options.verbose > 1:
+        result_factory = KeepaliveSubprocessResult
+        stdout_queue = Queue.Queue()
+    else:
+        result_factory = DeferredSubprocessResult
     resume_number = int(options.processes > 1)
     ready_threads = []
     for layer_name, layer, tests in layers:
-        result = SubprocessResult()
+        result = result_factory(layer_name, stdout_queue)
         results.append(result)
         ready_threads.append(threading.Thread(
             target=spawn_layer_in_subprocess,
-            args=(result, script_parts, options, features, layer_name, layer, failures,
-                errors, resume_number)))
+            args=(result, script_parts, options, features, layer_name, layer,
+                  failures, errors, resume_number)))
         resume_number += 1
 
     # Now start a few threads at a time.
     running_threads = []
     results_iter = iter(results)
     current_result = results_iter.next()
+    last_layer_intermediate_output = None
+    output = None
     while ready_threads or running_threads:
         while len(running_threads) < options.processes and ready_threads:
             thread = ready_threads.pop(0)
@@ -472,15 +551,36 @@
             if not thread.isAlive():
                 del running_threads[index]
 
-        # We want to display results in the order they would have been
-        # displayed, had the work not been done in parallel.
+        # Clear out any messages in queue
+        while stdout_queue is not None:
+            previous_output = output
+            try:
+                layer_name, output = stdout_queue.get(False)
+            except Queue.Empty:
+                break
+            if layer_name != last_layer_intermediate_output:
+                # Clarify what layer is reporting activity.
+                if previous_output is not None:
+                    sys.stdout.write(']\n')
+                sys.stdout.write(
+                    '[Parallel tests running in %s:\n  ' % (layer_name,))
+                last_layer_intermediate_output = layer_name
+            sys.stdout.write(output)
+        # Display results in the order they would have been displayed, had the
+        # work not been done in parallel.
         while current_result and current_result.done:
+            if output is not None:
+                sys.stdout.write(']\n')
+                output = None
             map(sys.stdout.write, current_result.stdout)
 
             try:
                 current_result = results_iter.next()
             except StopIteration:
                 current_result = None
+
+        # Help keep-alive monitors (human or automated) keep up-to-date.
+        sys.stdout.flush()
         time.sleep(0.01) # Keep the loop from being too tight.
 
     # Return the total number of tests run.
@@ -488,8 +588,8 @@
 
 
 def tear_down_unneeded(options, needed, setup_layers, optional=False):
-    # Tear down any layers not needed for these tests. The unneeded
-    # layers might interfere.
+    # Tear down any layers not needed for these tests. The unneeded layers
+    # might interfere.
     unneeded = [l for l in setup_layers if l not in needed]
     unneeded = order_by_bases(unneeded)
     unneeded.reverse()

Added: zope.testing/trunk/src/zope/testing/testrunner/testrunner-ex/sampletests_buffering.py
===================================================================
--- zope.testing/trunk/src/zope/testing/testrunner/testrunner-ex/sampletests_buffering.py	                        (rev 0)
+++ zope.testing/trunk/src/zope/testing/testrunner/testrunner-ex/sampletests_buffering.py	2009-08-10 20:57:09 UTC (rev 102634)
@@ -0,0 +1,70 @@
+##############################################################################
+#
+# Copyright (c) 2003 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""Sample tests with sleep and layers that can't be torn down
+
+$Id$
+"""
+
+import unittest, time
+
+class Layer1:
+
+    def setUp(self):
+        pass
+    setUp = classmethod(setUp)
+
+    def tearDown(self):
+        raise NotImplementedError
+    tearDown = classmethod(tearDown)
+
+
+class Layer2:
+
+    def setUp(self):
+        pass
+    setUp = classmethod(setUp)
+
+    def tearDown(self):
+        raise NotImplementedError
+    tearDown = classmethod(tearDown)
+
+
+class TestSomething1(unittest.TestCase):
+
+    layer = Layer1
+
+    def test_something(self):
+        pass
+
+
+class TestSomething2(unittest.TestCase):
+
+    layer = Layer2
+
+    def test_something(self):
+        time.sleep(0.5)
+
+    def test_something2(self):
+        time.sleep(0.5)
+
+
+def test_suite():
+    suite = unittest.TestSuite()
+    suite.addTest(unittest.makeSuite(TestSomething1))
+    suite.addTest(unittest.makeSuite(TestSomething2))
+    return suite
+
+
+if __name__ == '__main__':
+    unittest.main()
\ No newline at end of file

Added: zope.testing/trunk/src/zope/testing/testrunner/testrunner-layers-buff.txt
===================================================================
--- zope.testing/trunk/src/zope/testing/testrunner/testrunner-layers-buff.txt	                        (rev 0)
+++ zope.testing/trunk/src/zope/testing/testrunner/testrunner-layers-buff.txt	2009-08-10 20:57:09 UTC (rev 102634)
@@ -0,0 +1,142 @@
+This is a test for a fix in buffering of output from a layer in a subprocess.
+
+Prior to the change that this tests, output from within a test layer in a
+subprocess would be buffered.  This could wreak havoc on supervising processes
+(or human) that would kill a test run if no output was seen for some period of
+time.
+
+First, we wrap stdout with an object that instruments it. It notes the time at
+which a given line was written.
+
+    >>> import os, sys, datetime
+    >>> class RecordingStreamWrapper:
+    ...     def __init__(self, wrapped):
+    ...         self.record = []
+    ...         self.wrapped = wrapped
+    ...     def write(self, out):
+    ...         self.record.append((out, datetime.datetime.now()))
+    ...         self.wrapped.write(out)
+    ...     def flush(self):
+    ...         self.wrapped.flush()
+    ...
+    >>> sys.stdout = RecordingStreamWrapper(sys.stdout)
+
+Now we actually call our test.  If you open the file to which we are referring
+here (zope/testing/testrunner-ex/sampletests_buffering.py) you will see two test
+suites, each with its own layer that does not know how to tear down.  This
+forces the second suite to be run in a subprocess.
+
+That second suite has two tests.  Both sleep for half a second each.
+
+    >>> directory_with_tests = os.path.join(this_directory, 'testrunner-ex')
+    >>> from zope.testing import testrunner
+    >>> defaults = [
+    ...     '--path', directory_with_tests,
+    ...     ]
+    >>> argv = [sys.argv[0],
+    ...         '-vv', '--tests-pattern', '^sampletests_buffering.*']
+
+    >>> try:
+    ...     testrunner.run_internal(defaults, argv)
+    ...     record = sys.stdout.record
+    ... finally:
+    ...     sys.stdout = sys.stdout.wrapped
+    ...
+    Running tests at level 1
+    Running sampletests_buffering.Layer1 tests:
+      Set up sampletests_buffering.Layer1 in N.NNN seconds.
+      Running:
+     test_something (sampletests_buffering.TestSomething1)
+      Ran 1 tests with 0 failures and 0 errors in N.NNN seconds.
+    Running sampletests_buffering.Layer2 tests:
+      Tear down sampletests_buffering.Layer1 ... not supported
+      Running in a subprocess.
+      Set up sampletests_buffering.Layer2 in N.NNN seconds.
+      Running:
+     test_something (sampletests_buffering.TestSomething2)
+     test_something2 (sampletests_buffering.TestSomething2)
+      Ran 2 tests with 0 failures and 0 errors in N.NNN seconds.
+      Tear down sampletests_buffering.Layer2 ... not supported
+    Total: 3 tests, 0 failures, 0 errors in N.NNN seconds.
+    False
+
+Now we actually check the results we care about.  We should see that there are
+two pauses of about half a second, one around the first test and one around the
+second.  Before the change that this test verfies, there was a single pause of
+more than a second after the second suite ran.
+
+    >>> pause = datetime.timedelta(seconds=0.3)
+    >>> last_line, last_time = record.pop(0)
+    >>> for line, time in record:
+    ...     if (time-last_time >= pause and
+    ...         line != '  Running in a subprocess.\n'):
+    ...         # We paused!
+    ...         print 'PAUSE FOUND BETWEEN THESE LINES:'
+    ...         print ''.join([last_line, line, '-'*70])
+    ...     last_line, last_time = line, time
+    PAUSE FOUND BETWEEN THESE LINES:
+      Running:
+     test_something (sampletests_buffering.TestSomething2)
+    ----------------------------------------------------------------------
+    PAUSE FOUND BETWEEN THESE LINES:
+     test_something (sampletests_buffering.TestSomething2)
+     test_something2 (sampletests_buffering.TestSomething2)
+    ----------------------------------------------------------------------
+
+Because this is a test based on timing, it may be somewhat fragile.  However,
+on a relatively slow machine, this timing works out fine; I'm hopeful that this
+test will not be a source of spurious errors.  If it is, we will have to
+readdress.
+
+Now let's do the same thing, but with multiple processes at once.  We'll get
+a different result that has similar characteristics.
+
+    >>> sys.stdout = RecordingStreamWrapper(sys.stdout)
+    >>> argv.extend(['-j', '2'])
+    >>> try:
+    ...     testrunner.run_internal(defaults, argv)
+    ...     record = sys.stdout.record
+    ... finally:
+    ...     sys.stdout = sys.stdout.wrapped
+    ...
+    Running tests at level 1
+    Running sampletests_buffering.Layer1 tests:
+      Set up sampletests_buffering.Layer1 in N.NNN seconds.
+      Running:
+     test_something (sampletests_buffering.TestSomething1)
+      Ran 1 tests with 0 failures and 0 errors in N.NNN seconds.
+    [Parallel tests running in sampletests_buffering.Layer2:
+      .. LAYER FINISHED]
+    Running sampletests_buffering.Layer2 tests:
+      Running in a subprocess.
+      Set up sampletests_buffering.Layer2 in N.NNN seconds.
+      Ran 2 tests with 0 failures and 0 errors in N.NNN seconds.
+    Total: 3 tests, 0 failures, 0 errors in N.NNN seconds.
+    False
+
+Notice that, with a -vv (or greater) verbosity, the parallel test run includes
+a progress report to keep track of tests run in the various layers.  Because
+the actual results are saved to be displayed assembled in the original test
+order, the progress report shows up before we are given the news that the
+testrunner is starting Layer2.  This is counterintuitive, but lets us keep the
+primary reporting information for the given layer in one location, while also
+giving us progress reports that can be used for keepalive analysis by a human or
+automated agent. In particular for the second point, notice below that, as
+before, the progress output is not buffered.
+
+    >>> last_line, last_time = record.pop(0)
+    >>> for line, time in record:
+    ...     if (time-last_time >= pause and
+    ...         line != '  Running in a subprocess.\n'):
+    ...         # We paused!
+    ...         print 'PAUSE FOUND BETWEEN THIS OUTPUT:'
+    ...         print '\n'.join([last_line, line, '-'*70])
+    ...     last_line, last_time = line, time
+    PAUSE FOUND BETWEEN THIS OUTPUT:
+    .
+    .
+    ----------------------------------------------------------------------
+    PAUSE FOUND BETWEEN THIS OUTPUT:
+    .
+     LAYER FINISHED
+    ----------------------------------------------------------------------

Modified: zope.testing/trunk/src/zope/testing/testrunner/tests.py
===================================================================
--- zope.testing/trunk/src/zope/testing/testrunner/tests.py	2009-08-10 18:23:10 UTC (rev 102633)
+++ zope.testing/trunk/src/zope/testing/testrunner/tests.py	2009-08-10 20:57:09 UTC (rev 102634)
@@ -150,6 +150,7 @@
         'testrunner-debugging.txt',
         'testrunner-edge-cases.txt',
         'testrunner-errors.txt',
+        'testrunner-layers-buff.txt',
         'testrunner-layers-ntd.txt',
         'testrunner-layers.txt',
         'testrunner-layers-api.txt',



More information about the Checkins mailing list