[Checkins] SVN: zope.testing/trunk/ reduce batching and improve output for subprocesses
Gary Poster
gary.poster at canonical.com
Mon Aug 10 16:57:09 EDT 2009
Log message for revision 102634:
reduce batching and improve output for subprocesses
Changed:
U zope.testing/trunk/CHANGES.txt
U zope.testing/trunk/src/zope/testing/testrunner/filter.py
U zope.testing/trunk/src/zope/testing/testrunner/formatter.py
U zope.testing/trunk/src/zope/testing/testrunner/process.py
U zope.testing/trunk/src/zope/testing/testrunner/runner.py
A zope.testing/trunk/src/zope/testing/testrunner/testrunner-ex/sampletests_buffering.py
A zope.testing/trunk/src/zope/testing/testrunner/testrunner-layers-buff.txt
U zope.testing/trunk/src/zope/testing/testrunner/tests.py
-=-
Modified: zope.testing/trunk/CHANGES.txt
===================================================================
--- zope.testing/trunk/CHANGES.txt 2009-08-10 18:23:10 UTC (rev 102633)
+++ zope.testing/trunk/CHANGES.txt 2009-08-10 20:57:09 UTC (rev 102634)
@@ -4,13 +4,23 @@
3.8.1 (unreleased)
==================
-- Avoid to hardcode sys.argv[0] as script;
- allow for instance Zope 2 `bin/instance test` (LP#407916).
+- Avoid hardcoding sys.argv[0] as script;
+ allow, for instance, Zope 2's `bin/instance test` (LP#407916).
-- Produce a clear error message when subprocess doesn't follow the
+- Produce a clear error message when a subprocess doesn't follow the
zope.testing.testrunner protocol (LP#407916).
+- Do not unnecessarily squelch verbose output in a subprocess when there are
+ not multiple subprocesses.
+- Do not unnecessarily batch subprocess output, which can stymie automated and
+ human processes for identifying hung tests.
+
+- Include incremental output when there are multiple subprocesses and a
+ verbosity of -vv or greater is requested. This again is not batched,
+ supporting automated processes and humans looking for hung tests.
+
+
3.8.0 (2009-07-24)
==================
Modified: zope.testing/trunk/src/zope/testing/testrunner/filter.py
===================================================================
--- zope.testing/trunk/src/zope/testing/testrunner/filter.py 2009-08-10 18:23:10 UTC (rev 102633)
+++ zope.testing/trunk/src/zope/testing/testrunner/filter.py 2009-08-10 20:57:09 UTC (rev 102634)
@@ -65,7 +65,8 @@
# No pattern matched this name so we remove it
layers.pop(name)
- if self.runner.options.verbose:
+ if (self.runner.options.verbose and
+ not self.runner.options.resume_layer):
if self.runner.options.all:
msg = "Running tests at all levels"
else:
Modified: zope.testing/trunk/src/zope/testing/testrunner/formatter.py
===================================================================
--- zope.testing/trunk/src/zope/testing/testrunner/formatter.py 2009-08-10 18:23:10 UTC (rev 102633)
+++ zope.testing/trunk/src/zope/testing/testrunner/formatter.py 2009-08-10 20:57:09 UTC (rev 102634)
@@ -52,6 +52,9 @@
progress = property(lambda self: self.options.progress)
verbose = property(lambda self: self.options.verbose)
+ in_subprocess = property(
+ lambda self: self.options.resume_layer is not None and
+ self.options.processes > 1)
def compute_max_width(self):
"""Try to determine the terminal width."""
@@ -263,6 +266,12 @@
elif self.verbose == 1:
sys.stdout.write('.' * test.countTestCases())
+
+ elif self.in_subprocess:
+ sys.stdout.write('.' * test.countTestCases())
+ # Give the parent process a new line so it sees the progress
+ # in a timely manner.
+ sys.stdout.write('\n')
if self.verbose > 1:
s = str(test)
Modified: zope.testing/trunk/src/zope/testing/testrunner/process.py
===================================================================
--- zope.testing/trunk/src/zope/testing/testrunner/process.py 2009-08-10 18:23:10 UTC (rev 102633)
+++ zope.testing/trunk/src/zope/testing/testrunner/process.py 2009-08-10 20:57:09 UTC (rev 102634)
@@ -31,7 +31,13 @@
def global_setup(self):
self.original_stderr = sys.stderr
sys.stderr = sys.stdout
- self.runner.options.verbose = False
+ if self.runner.options.processes > 1:
+ # If we only have one subprocess, there's absolutely
+ # no reason to squelch. We will let the messages through in a
+ # timely manner, if they have been requested. On the other hand, if
+ # there are multiple processes, we do squelch to 0.
+ self.runner.options.verbose = 0
+ self.progress = False
def report(self):
sys.stdout.close()
Modified: zope.testing/trunk/src/zope/testing/testrunner/runner.py
===================================================================
--- zope.testing/trunk/src/zope/testing/testrunner/runner.py 2009-08-10 18:23:10 UTC (rev 102633)
+++ zope.testing/trunk/src/zope/testing/testrunner/runner.py 2009-08-10 20:57:09 UTC (rev 102634)
@@ -20,6 +20,7 @@
import cStringIO
import gc
+import Queue
import re
import sys
import threading
@@ -229,11 +230,12 @@
if should_resume:
setup_layers = None
if layers_to_run:
- self.ran += resume_tests(self.script_parts, self.options, self.features,
+ self.ran += resume_tests(
+ self.script_parts, self.options, self.features,
layers_to_run, self.failures, self.errors)
if setup_layers:
- if self.options.resume_layer == None:
+ if self.options.resume_layer is None:
self.options.output.info("Tearing down left over layers:")
tear_down_unneeded(self.options, (), setup_layers, True)
@@ -355,7 +357,7 @@
output.info("Running %s tests:" % layer_name)
tear_down_unneeded(options, needed, setup_layers)
- if options.resume_layer != None:
+ if options.resume_layer is not None:
output.info_suboptimal(" Running in a subprocess.")
try:
@@ -378,8 +380,9 @@
"Layer set up failure."
-def spawn_layer_in_subprocess(result, script_parts, options, features, layer_name, layer,
- failures, errors, resume_number):
+def spawn_layer_in_subprocess(result, script_parts, options, features,
+ layer_name, layer, failures, errors,
+ resume_number):
try:
# BBB
if script_parts is None:
@@ -408,9 +411,34 @@
child = subprocess.Popen(args, shell=False, stdin=subprocess.PIPE,
stdout=subprocess.PIPE, stderr=subprocess.PIPE,
close_fds=not sys.platform.startswith('win'))
- subout, suberr = child.communicate()
- erriter = iter(suberr.splitlines())
+ while True:
+ try:
+ while True:
+ # We use readline() instead of iterating over stdout
+ # because it appears that iterating over stdout causes a
+ # lot more buffering to take place (probably so it can
+ # return its lines as a batch). We don't want too much
+ # buffering because this foils automatic and human monitors
+ # trying to verify that the subprocess is still alive.
+ l = child.stdout.readline()
+ if not l:
+ break
+ result.write(l)
+ except IOError, e:
+ if e.errno == errno.EINTR:
+ # If the subprocess dies before we finish reading its
+ # output, a SIGCHLD signal can interrupt the reading.
+ # The correct thing to to in that case is to retry.
+ continue
+ output.error(
+ "Error reading subprocess output for %s" % layer_name)
+ output.info(str(e))
+ else:
+ break
+
+ # Now stderr should be ready to read the whole thing.
+ erriter = iter(child.stderr.read().splitlines())
nfail = nerr = 0
for line in erriter:
try:
@@ -431,37 +459,88 @@
nerr -= 1
errors.append((erriter.next().strip(), None))
- result.stdout.append(subout)
-
finally:
result.done = True
-class SubprocessResult(object):
+class AbstractSubprocessResult(object):
+ """A result of a subprocess layer run."""
- def __init__(self):
- self.num_ran = 0
+ num_ran = 0
+ done = False
+
+ def __init__(self, layer_name, queue):
+ self.layer_name = layer_name
+ self.queue = queue
self.stdout = []
- self.done = False
+ def write(self, out):
+ """Receive a line of the subprocess out."""
+
+class DeferredSubprocessResult(AbstractSubprocessResult):
+ """Keeps stdout around for later processing,"""
+
+ def write(self, out):
+ if not _is_dots(out):
+ self.stdout.append(out)
+
+
+class ImmediateSubprocessResult(AbstractSubprocessResult):
+ """Sends complete output to queue."""
+
+ def write(self, out):
+ sys.stdout.write(out)
+ # Help keep-alive monitors (human or automated) keep up-to-date.
+ sys.stdout.flush()
+
+
+_is_dots = re.compile(r'\.+\n').match
+class KeepaliveSubprocessResult(AbstractSubprocessResult):
+ "Keeps stdout for later processing; sends marks to queue to show activity."
+
+ _done = False
+
+ def _set_done(self, value):
+ self._done = value
+ assert value, 'Internal error: unexpectedly setting done to False'
+ self.queue.put((self.layer_name, ' LAYER FINISHED'))
+ done = property(lambda self: self._done, _set_done)
+
+ def write(self, out):
+ if _is_dots(out):
+ self.queue.put((self.layer_name, out.strip()))
+ else:
+ self.stdout.append(out)
+
+
def resume_tests(script_parts, options, features, layers, failures, errors):
results = []
+ stdout_queue = None
+ if options.processes == 1:
+ result_factory = ImmediateSubprocessResult
+ elif options.verbose > 1:
+ result_factory = KeepaliveSubprocessResult
+ stdout_queue = Queue.Queue()
+ else:
+ result_factory = DeferredSubprocessResult
resume_number = int(options.processes > 1)
ready_threads = []
for layer_name, layer, tests in layers:
- result = SubprocessResult()
+ result = result_factory(layer_name, stdout_queue)
results.append(result)
ready_threads.append(threading.Thread(
target=spawn_layer_in_subprocess,
- args=(result, script_parts, options, features, layer_name, layer, failures,
- errors, resume_number)))
+ args=(result, script_parts, options, features, layer_name, layer,
+ failures, errors, resume_number)))
resume_number += 1
# Now start a few threads at a time.
running_threads = []
results_iter = iter(results)
current_result = results_iter.next()
+ last_layer_intermediate_output = None
+ output = None
while ready_threads or running_threads:
while len(running_threads) < options.processes and ready_threads:
thread = ready_threads.pop(0)
@@ -472,15 +551,36 @@
if not thread.isAlive():
del running_threads[index]
- # We want to display results in the order they would have been
- # displayed, had the work not been done in parallel.
+ # Clear out any messages in queue
+ while stdout_queue is not None:
+ previous_output = output
+ try:
+ layer_name, output = stdout_queue.get(False)
+ except Queue.Empty:
+ break
+ if layer_name != last_layer_intermediate_output:
+ # Clarify what layer is reporting activity.
+ if previous_output is not None:
+ sys.stdout.write(']\n')
+ sys.stdout.write(
+ '[Parallel tests running in %s:\n ' % (layer_name,))
+ last_layer_intermediate_output = layer_name
+ sys.stdout.write(output)
+ # Display results in the order they would have been displayed, had the
+ # work not been done in parallel.
while current_result and current_result.done:
+ if output is not None:
+ sys.stdout.write(']\n')
+ output = None
map(sys.stdout.write, current_result.stdout)
try:
current_result = results_iter.next()
except StopIteration:
current_result = None
+
+ # Help keep-alive monitors (human or automated) keep up-to-date.
+ sys.stdout.flush()
time.sleep(0.01) # Keep the loop from being too tight.
# Return the total number of tests run.
@@ -488,8 +588,8 @@
def tear_down_unneeded(options, needed, setup_layers, optional=False):
- # Tear down any layers not needed for these tests. The unneeded
- # layers might interfere.
+ # Tear down any layers not needed for these tests. The unneeded layers
+ # might interfere.
unneeded = [l for l in setup_layers if l not in needed]
unneeded = order_by_bases(unneeded)
unneeded.reverse()
Added: zope.testing/trunk/src/zope/testing/testrunner/testrunner-ex/sampletests_buffering.py
===================================================================
--- zope.testing/trunk/src/zope/testing/testrunner/testrunner-ex/sampletests_buffering.py (rev 0)
+++ zope.testing/trunk/src/zope/testing/testrunner/testrunner-ex/sampletests_buffering.py 2009-08-10 20:57:09 UTC (rev 102634)
@@ -0,0 +1,70 @@
+##############################################################################
+#
+# Copyright (c) 2003 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""Sample tests with sleep and layers that can't be torn down
+
+$Id$
+"""
+
+import unittest, time
+
+class Layer1:
+
+ def setUp(self):
+ pass
+ setUp = classmethod(setUp)
+
+ def tearDown(self):
+ raise NotImplementedError
+ tearDown = classmethod(tearDown)
+
+
+class Layer2:
+
+ def setUp(self):
+ pass
+ setUp = classmethod(setUp)
+
+ def tearDown(self):
+ raise NotImplementedError
+ tearDown = classmethod(tearDown)
+
+
+class TestSomething1(unittest.TestCase):
+
+ layer = Layer1
+
+ def test_something(self):
+ pass
+
+
+class TestSomething2(unittest.TestCase):
+
+ layer = Layer2
+
+ def test_something(self):
+ time.sleep(0.5)
+
+ def test_something2(self):
+ time.sleep(0.5)
+
+
+def test_suite():
+ suite = unittest.TestSuite()
+ suite.addTest(unittest.makeSuite(TestSomething1))
+ suite.addTest(unittest.makeSuite(TestSomething2))
+ return suite
+
+
+if __name__ == '__main__':
+ unittest.main()
\ No newline at end of file
Added: zope.testing/trunk/src/zope/testing/testrunner/testrunner-layers-buff.txt
===================================================================
--- zope.testing/trunk/src/zope/testing/testrunner/testrunner-layers-buff.txt (rev 0)
+++ zope.testing/trunk/src/zope/testing/testrunner/testrunner-layers-buff.txt 2009-08-10 20:57:09 UTC (rev 102634)
@@ -0,0 +1,142 @@
+This is a test for a fix in buffering of output from a layer in a subprocess.
+
+Prior to the change that this tests, output from within a test layer in a
+subprocess would be buffered. This could wreak havoc on supervising processes
+(or human) that would kill a test run if no output was seen for some period of
+time.
+
+First, we wrap stdout with an object that instruments it. It notes the time at
+which a given line was written.
+
+ >>> import os, sys, datetime
+ >>> class RecordingStreamWrapper:
+ ... def __init__(self, wrapped):
+ ... self.record = []
+ ... self.wrapped = wrapped
+ ... def write(self, out):
+ ... self.record.append((out, datetime.datetime.now()))
+ ... self.wrapped.write(out)
+ ... def flush(self):
+ ... self.wrapped.flush()
+ ...
+ >>> sys.stdout = RecordingStreamWrapper(sys.stdout)
+
+Now we actually call our test. If you open the file to which we are referring
+here (zope/testing/testrunner-ex/sampletests_buffering.py) you will see two test
+suites, each with its own layer that does not know how to tear down. This
+forces the second suite to be run in a subprocess.
+
+That second suite has two tests. Both sleep for half a second each.
+
+ >>> directory_with_tests = os.path.join(this_directory, 'testrunner-ex')
+ >>> from zope.testing import testrunner
+ >>> defaults = [
+ ... '--path', directory_with_tests,
+ ... ]
+ >>> argv = [sys.argv[0],
+ ... '-vv', '--tests-pattern', '^sampletests_buffering.*']
+
+ >>> try:
+ ... testrunner.run_internal(defaults, argv)
+ ... record = sys.stdout.record
+ ... finally:
+ ... sys.stdout = sys.stdout.wrapped
+ ...
+ Running tests at level 1
+ Running sampletests_buffering.Layer1 tests:
+ Set up sampletests_buffering.Layer1 in N.NNN seconds.
+ Running:
+ test_something (sampletests_buffering.TestSomething1)
+ Ran 1 tests with 0 failures and 0 errors in N.NNN seconds.
+ Running sampletests_buffering.Layer2 tests:
+ Tear down sampletests_buffering.Layer1 ... not supported
+ Running in a subprocess.
+ Set up sampletests_buffering.Layer2 in N.NNN seconds.
+ Running:
+ test_something (sampletests_buffering.TestSomething2)
+ test_something2 (sampletests_buffering.TestSomething2)
+ Ran 2 tests with 0 failures and 0 errors in N.NNN seconds.
+ Tear down sampletests_buffering.Layer2 ... not supported
+ Total: 3 tests, 0 failures, 0 errors in N.NNN seconds.
+ False
+
+Now we actually check the results we care about. We should see that there are
+two pauses of about half a second, one around the first test and one around the
+second. Before the change that this test verfies, there was a single pause of
+more than a second after the second suite ran.
+
+ >>> pause = datetime.timedelta(seconds=0.3)
+ >>> last_line, last_time = record.pop(0)
+ >>> for line, time in record:
+ ... if (time-last_time >= pause and
+ ... line != ' Running in a subprocess.\n'):
+ ... # We paused!
+ ... print 'PAUSE FOUND BETWEEN THESE LINES:'
+ ... print ''.join([last_line, line, '-'*70])
+ ... last_line, last_time = line, time
+ PAUSE FOUND BETWEEN THESE LINES:
+ Running:
+ test_something (sampletests_buffering.TestSomething2)
+ ----------------------------------------------------------------------
+ PAUSE FOUND BETWEEN THESE LINES:
+ test_something (sampletests_buffering.TestSomething2)
+ test_something2 (sampletests_buffering.TestSomething2)
+ ----------------------------------------------------------------------
+
+Because this is a test based on timing, it may be somewhat fragile. However,
+on a relatively slow machine, this timing works out fine; I'm hopeful that this
+test will not be a source of spurious errors. If it is, we will have to
+readdress.
+
+Now let's do the same thing, but with multiple processes at once. We'll get
+a different result that has similar characteristics.
+
+ >>> sys.stdout = RecordingStreamWrapper(sys.stdout)
+ >>> argv.extend(['-j', '2'])
+ >>> try:
+ ... testrunner.run_internal(defaults, argv)
+ ... record = sys.stdout.record
+ ... finally:
+ ... sys.stdout = sys.stdout.wrapped
+ ...
+ Running tests at level 1
+ Running sampletests_buffering.Layer1 tests:
+ Set up sampletests_buffering.Layer1 in N.NNN seconds.
+ Running:
+ test_something (sampletests_buffering.TestSomething1)
+ Ran 1 tests with 0 failures and 0 errors in N.NNN seconds.
+ [Parallel tests running in sampletests_buffering.Layer2:
+ .. LAYER FINISHED]
+ Running sampletests_buffering.Layer2 tests:
+ Running in a subprocess.
+ Set up sampletests_buffering.Layer2 in N.NNN seconds.
+ Ran 2 tests with 0 failures and 0 errors in N.NNN seconds.
+ Total: 3 tests, 0 failures, 0 errors in N.NNN seconds.
+ False
+
+Notice that, with a -vv (or greater) verbosity, the parallel test run includes
+a progress report to keep track of tests run in the various layers. Because
+the actual results are saved to be displayed assembled in the original test
+order, the progress report shows up before we are given the news that the
+testrunner is starting Layer2. This is counterintuitive, but lets us keep the
+primary reporting information for the given layer in one location, while also
+giving us progress reports that can be used for keepalive analysis by a human or
+automated agent. In particular for the second point, notice below that, as
+before, the progress output is not buffered.
+
+ >>> last_line, last_time = record.pop(0)
+ >>> for line, time in record:
+ ... if (time-last_time >= pause and
+ ... line != ' Running in a subprocess.\n'):
+ ... # We paused!
+ ... print 'PAUSE FOUND BETWEEN THIS OUTPUT:'
+ ... print '\n'.join([last_line, line, '-'*70])
+ ... last_line, last_time = line, time
+ PAUSE FOUND BETWEEN THIS OUTPUT:
+ .
+ .
+ ----------------------------------------------------------------------
+ PAUSE FOUND BETWEEN THIS OUTPUT:
+ .
+ LAYER FINISHED
+ ----------------------------------------------------------------------
Modified: zope.testing/trunk/src/zope/testing/testrunner/tests.py
===================================================================
--- zope.testing/trunk/src/zope/testing/testrunner/tests.py 2009-08-10 18:23:10 UTC (rev 102633)
+++ zope.testing/trunk/src/zope/testing/testrunner/tests.py 2009-08-10 20:57:09 UTC (rev 102634)
@@ -150,6 +150,7 @@
'testrunner-debugging.txt',
'testrunner-edge-cases.txt',
'testrunner-errors.txt',
+ 'testrunner-layers-buff.txt',
'testrunner-layers-ntd.txt',
'testrunner-layers.txt',
'testrunner-layers-api.txt',
More information about the Checkins
mailing list