mavsdk_tests: properly exit on Ctrl+C

With threads we need to manually take care of it, otherwise it gets
messy and we need to press Ctrl+C multiple times.
This commit is contained in:
Julian Oes
2020-03-11 14:00:51 +01:00
committed by Nuno Marques
parent 2c2e314ffe
commit e2c80e546d
+113 -106
View File
@@ -8,6 +8,7 @@ import json
import os import os
import psutil import psutil
import re import re
import signal
import subprocess import subprocess
import sys import sys
import threading import threading
@@ -304,7 +305,10 @@ def main():
print("Creating directory: {}".format(args.log_dir)) print("Creating directory: {}".format(args.log_dir))
os.makedirs(args.log_dir, exist_ok=True) os.makedirs(args.log_dir, exist_ok=True)
sys.exit(run(args, config)) tester = Tester()
signal.signal(signal.SIGINT, tester.sigint_handler)
sys.exit(tester.run(args, config))
def determine_tests(filter): def determine_tests(filter):
@@ -358,134 +362,137 @@ def is_everything_ready(config):
return result return result
def run(args, config): class Tester:
overall_success = True def __init__(self):
self.active_runners = []
for iteration in range(args.iterations): def run(self, args, config):
if args.iterations > 1: overall_success = True
print("Test iteration: {}".format(iteration + 1, args.iterations))
was_success = run_test_group(args, config) for iteration in range(args.iterations):
if args.iterations > 1:
print("Test iteration: {}".
format(iteration + 1, args.iterations))
if not was_success: was_success = self.run_test_group(args, config)
overall_success = False
if args.iterations > 1 and not was_success and args.abort_early:
print("Aborting with a failure in test run {}/{}".
format(iteration + 1, args.iterations))
break
if overall_success:
print(color.GREEN + "Overall result: success!" + color.END)
return 0
else:
print(color.RED + "Overall result: failure!" + color.END)
return 1
def run_test_group(args, config):
overall_success = True
tests = config["tests"]
if args.model == 'all':
models = tests
else:
found = False
for elem in tests:
if elem['model'] == args.model:
models = [elem]
found = True
if not found:
print("Specified model is not defined")
models = []
for group in models:
print(color.BOLD + "==> Running tests for '{}' with filter '{}'"
.format(group['model'], group['test_filter']) + color.END)
tests = determine_tests(group['test_filter'])
for i, test in enumerate(tests):
print("--> Test {} of {}: '{}' running ...".
format(i+1, len(tests), test))
was_success = run_test(test, group, args, config)
if was_success:
print(color.GREEN + "--- Test {} of {}: '{}' succeeded.".
format(i+1, len(tests), test) + color.END)
else:
print(color.RED + "--- Test {} of {}: '{}' failed.".
format(i+1, len(tests), test) + color.END)
if not was_success: if not was_success:
overall_success = False overall_success = False
if not was_success and args.abort_early: if args.iterations > 1 and not was_success and args.abort_early:
print("Aborting early") print("Aborting with a failure in test run {}/{}".
return False format(iteration + 1, args.iterations))
break
return overall_success if overall_success:
print(color.GREEN + "Overall result: success!" + color.END)
return 0
else:
print(color.RED + "Overall result: failure!" + color.END)
return 1
def run_test_group(self, args, config):
overall_success = True
def run_test(test, group, args, config): tests = config["tests"]
speed_factor = args.speed_factor if args.model == 'all':
if "max_speed_factor" in group: models = tests
speed_factor = min(int(speed_factor), group["max_speed_factor"]) else:
found = False
for elem in tests:
if elem['model'] == args.model:
models = [elem]
found = True
if not found:
print("Specified model is not defined")
models = []
if config['mode'] == 'sitl': for group in models:
px4_runner = Px4Runner( print(color.BOLD + "==> Running tests for '{}' with filter '{}'"
group['model'], os.getcwd(), args.log_dir, speed_factor, .format(group['model'], group['test_filter']) + color.END)
args.debugger, args.verbose)
px4_runner.start(group)
if config['simulator'] == 'gazebo': tests = determine_tests(group['test_filter'])
gzserver_runner = GzserverRunner(
for i, test in enumerate(tests):
print("--> Test {} of {}: '{}' running ...".
format(i+1, len(tests), test))
was_success = self.run_test(test, group, args, config)
if was_success:
print(color.GREEN + "--- Test {} of {}: '{}' succeeded.".
format(i+1, len(tests), test) + color.END)
else:
print(color.RED + "--- Test {} of {}: '{}' failed.".
format(i+1, len(tests), test) + color.END)
if not was_success:
overall_success = False
if not was_success and args.abort_early:
print("Aborting early")
return False
return overall_success
def run_test(self, test, group, args, config):
self.active_runners = []
speed_factor = args.speed_factor
if "max_speed_factor" in group:
speed_factor = min(int(speed_factor), group["max_speed_factor"])
if config['mode'] == 'sitl':
px4_runner = Px4Runner(
group['model'], os.getcwd(), args.log_dir, speed_factor, group['model'], os.getcwd(), args.log_dir, speed_factor,
args.verbose) args.debugger, args.verbose)
gzserver_runner.start(group) px4_runner.start(group)
self.active_runners.append(px4_runner)
if args.gui: if config['simulator'] == 'gazebo':
gzclient_runner = GzclientRunner( gzserver_runner = GzserverRunner(
os.getcwd(), args.log_dir, args.verbose) group['model'], os.getcwd(), args.log_dir, speed_factor,
gzclient_runner.start(group) args.verbose)
gzserver_runner.start(group)
self.active_runners.append(gzserver_runner)
test_runner = TestRunner(os.getcwd(), args.log_dir, group, test, if args.gui:
config['mavlink_connection'], args.verbose) gzclient_runner = GzclientRunner(
test_runner.start(group) os.getcwd(), args.log_dir, args.verbose)
gzclient_runner.start(group)
self.active_runners.append(gzclient_runner)
while test_runner.time_elapsed_s() < group['timeout_min']*60: test_runner = TestRunner(os.getcwd(), args.log_dir, group, test,
returncode = test_runner.poll() config['mavlink_connection'], args.verbose)
if returncode is not None: test_runner.start(group)
is_success = (returncode == 0) self.active_runners.append(test_runner)
break
if args.verbose: while test_runner.time_elapsed_s() < group['timeout_min']*60:
test_runner.print_output() returncode = test_runner.poll()
if returncode is not None:
is_success = (returncode == 0)
break
if config['mode'] == 'sitl': if args.verbose:
px4_runner.print_output() for runner in self.active_runners:
runner.print_output()
if config['simulator'] == 'gazebo': else:
gzserver_runner.print_output() print(color.BOLD + "Test timeout of {} mins triggered!".
format(group['timeout_min']) + color.END)
is_success = False
if args.gui: for runner in self.active_runners:
gzclient_runner.print_output() runner.stop()
else:
print(color.BOLD + "Test timeout of {} mins triggered!".
format(group['timeout_min']) + color.END)
is_success = False
test_runner.stop() return is_success
if config['mode'] == 'sitl': def sigint_handler(self, sig, frame):
if config['simulator'] == 'gazebo': print("Received SIGINT")
if args.gui: print("Stopping all processes ...")
gzclient_runner.stop() for runner in self.active_runners:
gzserver_runner.stop() runner.stop()
px4_runner.stop() print("Stopping all processes done.")
sys.exit(-sig)
return is_success
if __name__ == '__main__': if __name__ == '__main__':