[Zope3-checkins] SVN: zope.testing/branches/benji-parallelize-subprocesses/src/zope/testing/testrunner/runner.py checkpoint

Benji York benji at zope.com
Wed Jul 2 20:38:40 EDT 2008


Log message for revision 87943:
  checkpoint
  

Changed:
  U   zope.testing/branches/benji-parallelize-subprocesses/src/zope/testing/testrunner/runner.py

-=-
Modified: zope.testing/branches/benji-parallelize-subprocesses/src/zope/testing/testrunner/runner.py
===================================================================
--- zope.testing/branches/benji-parallelize-subprocesses/src/zope/testing/testrunner/runner.py	2008-07-03 00:38:16 UTC (rev 87942)
+++ zope.testing/branches/benji-parallelize-subprocesses/src/zope/testing/testrunner/runner.py	2008-07-03 00:38:39 UTC (rev 87943)
@@ -16,12 +16,13 @@
 $Id: __init__.py 86232 2008-05-03 15:09:33Z ctheune $
 """
 
-import re
+import Queue
 import cStringIO
 import gc
 import glob
 import os
 import pdb
+import re
 import sys
 import tempfile
 import threading
@@ -358,68 +359,93 @@
     def runTest(self):
         "Layer set up failure."
 
+def resume_a_layer(queue, options, layer_name, failures, errors,
+        resume_number):
+    args = [sys.executable,
+            sys.argv[0],
+            '--resume-layer', layer_name, str(resume_number),
+            ]
+    for d in options.testrunner_defaults:
+        args.extend(['--default', d])
+
+    args.extend(options.original_testrunner_args[1:])
+
+    # this is because of a bug in Python (http://www.python.org/sf/900092)
+    if (options.profile == 'hotshot'
+        and sys.version_info[:3] <= (2,4,1)):
+        args.insert(1, '-O')
+
+    if sys.platform.startswith('win'):
+        args = args[0] + ' ' + ' '.join([
+            ('"' + a.replace('\\', '\\\\').replace('"', '\\"') + '"')
+            for a in args[1:]
+            ])
+
+    subin, subout, suberr = os.popen3(args)
+    while True:
+        try:
+            for l in subout:
+                sys.stdout.write(l)
+        except IOError, e:
+            if e.errno == errno.EINTR:
+                # If the reading the subprocess input is interruped (as
+                # recieving SIGCHLD can), then retry.
+                continue
+            options.output.error(
+                "Error reading subprocess output for %s" % layer_name)
+            options.output.info(str(e))
+        else:
+            break
+
+    line = suberr.readline()
+    try:
+        ran, nfail, nerr = map(int, line.strip().split())
+    except KeyboardInterrupt:
+        raise
+    except:
+        raise SubprocessError(
+            'No subprocess summary found', line+suberr.read())
+
+    while nfail > 0:
+        nfail -= 1
+        failures.append((suberr.readline().strip(), None))
+    while nerr > 0:
+        nerr -= 1
+        errors.append((suberr.readline().strip(), None))
+
+    queue.put(ran)
+
+
 def resume_tests(options, layer_name, layers, failures, errors):
     output = options.output
+    queue = Queue.Queue()
     layers = [l for (l, _, _) in layers]
     layers = layers[layers.index(layer_name):]
-    rantotal = 0
     resume_number = 0
+    ready_threads = []
     for layer_name in layers:
-        args = [sys.executable,
-                sys.argv[0],
-                '--resume-layer', layer_name, str(resume_number),
-                ]
+        ready_threads.append(threading.Thread(
+            target=resume_a_layer,
+            args=(queue, options, layer_name, failures, errors, resume_number)))
         resume_number += 1
-        for d in options.testrunner_defaults:
-            args.extend(['--default', d])
 
-        args.extend(options.original_testrunner_args[1:])
+    # Now start a few threads at a time.
+    num_threads = 3
+    running_threads = []
+    while ready_threads or running_threads:
+        while len(running_threads) < num_threads and ready_threads:
+            thread = ready_threads.pop(0)
+            thread.start()
+            running_threads.append(thread)
 
-        # this is because of a bug in Python (http://www.python.org/sf/900092)
-        if (options.profile == 'hotshot'
-            and sys.version_info[:3] <= (2,4,1)):
-            args.insert(1, '-O')
+        running_threads[0].join()
+        del running_threads[0]
 
-        if sys.platform.startswith('win'):
-            args = args[0] + ' ' + ' '.join([
-                ('"' + a.replace('\\', '\\\\').replace('"', '\\"') + '"')
-                for a in args[1:]
-                ])
+    # Gather up all the results.
+    rantotal = 0
+    while not queue.empty():
+        rantotal += queue.get()
 
-        subin, subout, suberr = os.popen3(args)
-        while True:
-            try:
-                for l in subout:
-                    sys.stdout.write(l)
-            except IOError, e:
-                if e.errno == errno.EINTR:
-                    # If the subprocess dies before we finish reading its
-                    # output, a SIGCHLD signal can interrupt the reading.
-                    # The correct thing to to in that case is to retry.
-                    continue
-                output.error("Error reading subprocess output for %s" % layer_name)
-                output.info(str(e))
-            else:
-                break
-
-        line = suberr.readline()
-        try:
-            ran, nfail, nerr = map(int, line.strip().split())
-        except KeyboardInterrupt:
-            raise
-        except:
-            raise SubprocessError(
-                'No subprocess summary found', line+suberr.read())
-
-        while nfail > 0:
-            nfail -= 1
-            failures.append((suberr.readline().strip(), None))
-        while nerr > 0:
-            nerr -= 1
-            errors.append((suberr.readline().strip(), None))
-
-        rantotal += ran
-
     return rantotal
 
 



More information about the Zope3-Checkins mailing list