mirror of
https://github.com/PX4/PX4-Autopilot.git
synced 2026-05-20 11:23:06 +08:00
mavsdk_tests: add filter for cases
This commit is contained in:
@@ -29,7 +29,9 @@ def main() -> NoReturn:
|
||||
parser.add_argument("--gui", default=False, action='store_true',
|
||||
help="Display gzclient with simulation")
|
||||
parser.add_argument("--model", type=str, default='all',
|
||||
help="Specify which model to run")
|
||||
help="Only run tests for one model")
|
||||
parser.add_argument("--case", type=str, default='all',
|
||||
help="Only run tests for one case")
|
||||
parser.add_argument("--debugger", default="",
|
||||
help="valgrind callgrind gdb lldb")
|
||||
parser.add_argument("--verbose", default=False, action='store_true',
|
||||
@@ -58,6 +60,7 @@ def main() -> NoReturn:
|
||||
args.abort_early,
|
||||
args.speed_factor,
|
||||
args.model,
|
||||
args.case,
|
||||
args.debugger,
|
||||
args.log_dir,
|
||||
args.gui,
|
||||
@@ -114,6 +117,7 @@ class Tester:
|
||||
abort_early: bool,
|
||||
speed_factor: float,
|
||||
model: str,
|
||||
case: str,
|
||||
debugger: str,
|
||||
log_dir: str,
|
||||
gui: bool,
|
||||
@@ -122,7 +126,7 @@ class Tester:
|
||||
self.active_runners: List[ph.Runner]
|
||||
self.iterations = iterations
|
||||
self.abort_early = abort_early
|
||||
self.tests = self.determine_tests(config['tests'], model)
|
||||
self.tests = self.determine_tests(config['tests'], model, case)
|
||||
self.speed_factor = speed_factor
|
||||
self.debugger = debugger
|
||||
self.log_dir = log_dir
|
||||
@@ -132,17 +136,23 @@ class Tester:
|
||||
self.combined_log_fd = TextIO
|
||||
|
||||
@classmethod
|
||||
def determine_tests(cls, tests: List[Dict[str, Any]], model: str) \
|
||||
-> List[Dict[str, Any]]:
|
||||
def determine_tests(cls,
|
||||
tests: List[Dict[str, Any]],
|
||||
model: str,
|
||||
case: str) -> List[Dict[str, Any]]:
|
||||
for test in tests:
|
||||
test['selected'] = (model == 'all') or model == test['model']
|
||||
test['selected'] = (model == 'all' or model == test['model'])
|
||||
test['cases'] = dict.fromkeys(
|
||||
cls.determine_test_cases(test['test_filter']))
|
||||
cls.query_test_cases(test['test_filter']))
|
||||
for key in test['cases'].keys():
|
||||
test['cases'][key] = {
|
||||
'selected': (test['selected'] and
|
||||
(case == 'all' or case == key))}
|
||||
|
||||
return tests
|
||||
|
||||
@staticmethod
|
||||
def determine_test_cases(filter: str) -> List[str]:
|
||||
def query_test_cases(filter: str) -> List[str]:
|
||||
cmd = os.getcwd() + "/build/px4_sitl_default/mavsdk_tests/mavsdk_tests"
|
||||
args = ["--list-test-names-only", filter]
|
||||
p = subprocess.Popen(
|
||||
@@ -170,19 +180,12 @@ class Tester:
|
||||
return self.was_overall_pass()
|
||||
|
||||
def show_plans(self) -> None:
|
||||
n_models = sum(map(
|
||||
lambda test: 1 if test['selected'] else 0,
|
||||
self.tests))
|
||||
n_cases = sum(map(
|
||||
lambda test: len(test['cases']) if test['selected'] else 0,
|
||||
self.tests))
|
||||
|
||||
print()
|
||||
print(colorize(
|
||||
"About to run {} test case{} for {} selected model{} "
|
||||
"({} iteration{}):".
|
||||
format(n_cases, self.plural_s(n_cases),
|
||||
n_models, self.plural_s(n_models),
|
||||
format(self.num_cases(), self.plural_s(self.num_cases()),
|
||||
self.num_models(), self.plural_s(self.num_models()),
|
||||
self.iterations, self.plural_s(self.iterations)),
|
||||
color.BOLD))
|
||||
|
||||
@@ -193,16 +196,44 @@ class Tester:
|
||||
color.BOLD), color.GRAY))
|
||||
continue
|
||||
print(colorize(" - {}:".format(test['model']), color.BOLD))
|
||||
for case in test['cases']:
|
||||
print(" - '{}'".format(case))
|
||||
for key, case in test['cases'].items():
|
||||
if case['selected']:
|
||||
print(" - '{}'".format(key))
|
||||
else:
|
||||
print(colorize(" - '{}' (not selected)".format(key),
|
||||
color.GRAY))
|
||||
print()
|
||||
|
||||
def num_models(self) -> int:
|
||||
return sum(map(
|
||||
lambda test: 1 if test['selected'] else 0,
|
||||
self.tests))
|
||||
|
||||
def num_cases(self) -> int:
|
||||
n_cases = 0
|
||||
for test in self.tests:
|
||||
if not test['selected']:
|
||||
continue
|
||||
n_cases += self.num_cases_for_test(test)
|
||||
|
||||
return n_cases
|
||||
|
||||
def num_cases_for_test(self, test: Dict[str, Any]) -> int:
|
||||
n_cases = 0
|
||||
for case in test['cases'].values():
|
||||
if not case['selected']:
|
||||
continue
|
||||
n_cases += 1
|
||||
return n_cases
|
||||
|
||||
def prepare_for_results(self) -> None:
|
||||
for test in self.tests:
|
||||
if not test['selected']:
|
||||
continue
|
||||
for key in test['cases'].keys():
|
||||
test['cases'][key] = {'results': []}
|
||||
for key, value in test['cases'].items():
|
||||
if not value['selected']:
|
||||
continue
|
||||
value['results'] = []
|
||||
|
||||
def run_tests(self) -> None:
|
||||
for iteration in range(self.iterations):
|
||||
@@ -225,22 +256,28 @@ class Tester:
|
||||
"==> Running tests for {}".format(test['model']),
|
||||
color.BOLD))
|
||||
|
||||
for i, case in enumerate(test['cases']):
|
||||
print("--> Test case {} of {}: '{}' running ...".
|
||||
format(i+1, len(test['cases']), case))
|
||||
test_i = 0
|
||||
for key, case_value in test['cases'].items():
|
||||
if not case_value['selected']:
|
||||
continue
|
||||
|
||||
log_dir = self.get_log_dir(iteration, test['model'], case)
|
||||
print("--> Test case {} of {}: '{}' running ...".
|
||||
format(test_i+1,
|
||||
self.num_cases_for_test(test),
|
||||
key))
|
||||
|
||||
log_dir = self.get_log_dir(iteration, test['model'], key)
|
||||
if self.verbose:
|
||||
print("creating log directory: {}"
|
||||
.format(log_dir))
|
||||
os.makedirs(log_dir, exist_ok=True)
|
||||
|
||||
was_success = self.run_test_case(test, case, log_dir)
|
||||
was_success = self.run_test_case(test, key, log_dir)
|
||||
|
||||
print("--- Test case {} of {}: '{}' {}."
|
||||
.format(i+1,
|
||||
len(test['cases']),
|
||||
case,
|
||||
.format(test_i+1,
|
||||
self.num_cases_for_test(test),
|
||||
key,
|
||||
colorize("succeeded", color.GREEN)
|
||||
if was_success
|
||||
else colorize("failed", color.RED)))
|
||||
@@ -252,6 +289,8 @@ class Tester:
|
||||
print("Aborting early")
|
||||
return False
|
||||
|
||||
test_i += 1
|
||||
|
||||
return iteration_success
|
||||
|
||||
def get_log_dir(self, iteration: int, model: str, case: str) -> str:
|
||||
@@ -269,21 +308,65 @@ class Tester:
|
||||
|
||||
return foldername
|
||||
|
||||
def run_test_case(self, test: Dict[str, Any],
|
||||
case: str, log_dir: str) -> bool:
|
||||
self.active_runners = []
|
||||
|
||||
def get_max_speed_factor(self, test: Dict[str, Any]) -> float:
|
||||
speed_factor = self.speed_factor
|
||||
if "max_speed_factor" in test:
|
||||
speed_factor = min(float(speed_factor), test["max_speed_factor"])
|
||||
return speed_factor
|
||||
|
||||
def run_test_case(self, test: Dict[str, Any],
|
||||
case: str, log_dir: str) -> bool:
|
||||
|
||||
self.start_runners(log_dir, test, case)
|
||||
|
||||
logfile_path = self.determine_logfile_path(log_dir, 'combined')
|
||||
self.start_combined_log(logfile_path)
|
||||
|
||||
test_timeout_s = test['timeout_min']*60
|
||||
while self.active_runners[-1].time_elapsed_s() < test_timeout_s:
|
||||
returncode = self.active_runners[-1].poll()
|
||||
if returncode is not None:
|
||||
is_success = (returncode == 0)
|
||||
break
|
||||
|
||||
self.collect_runner_output()
|
||||
|
||||
else:
|
||||
print(colorize(
|
||||
"Test timeout of {} mins triggered!".
|
||||
format(test['timeout_min']),
|
||||
color.BOLD))
|
||||
is_success = False
|
||||
|
||||
self.stop_runners()
|
||||
self.stop_combined_log()
|
||||
|
||||
result = {'success': is_success,
|
||||
'logfiles': [runner.get_log_filename()
|
||||
for runner in self.active_runners]}
|
||||
test['cases'][case]['results'].append(result)
|
||||
|
||||
if not is_success:
|
||||
if not self.verbose:
|
||||
print(self.get_combined_log(logfile_path))
|
||||
print("Logfiles: ")
|
||||
print(" - {}".format(logfile_path))
|
||||
for runner in self.active_runners:
|
||||
print(" - {}".format(runner.get_log_filename()))
|
||||
return is_success
|
||||
|
||||
def start_runners(self,
|
||||
log_dir: str,
|
||||
test: Dict[str, Any],
|
||||
case: str) -> None:
|
||||
self.active_runners = []
|
||||
if self.config['mode'] == 'sitl':
|
||||
px4_runner = ph.Px4Runner(
|
||||
os.getcwd(),
|
||||
log_dir,
|
||||
test['model'],
|
||||
case,
|
||||
speed_factor,
|
||||
self.get_max_speed_factor(test),
|
||||
self.debugger,
|
||||
self.verbose)
|
||||
self.active_runners.append(px4_runner)
|
||||
@@ -294,7 +377,7 @@ class Tester:
|
||||
log_dir,
|
||||
test['model'],
|
||||
case,
|
||||
self.speed_factor,
|
||||
self.get_max_speed_factor(test),
|
||||
self.verbose)
|
||||
self.active_runners.append(gzserver_runner)
|
||||
|
||||
@@ -314,7 +397,6 @@ class Tester:
|
||||
case,
|
||||
self.config['mavlink_connection'],
|
||||
self.verbose)
|
||||
|
||||
self.active_runners.append(test_runner)
|
||||
|
||||
for runner in self.active_runners:
|
||||
@@ -322,50 +404,22 @@ class Tester:
|
||||
self.determine_logfile_path(log_dir, runner.name))
|
||||
runner.start()
|
||||
|
||||
max_name = max(len(runner.name) for runner in self.active_runners)
|
||||
logfile_path = self.determine_logfile_path(log_dir, 'combined')
|
||||
|
||||
self.start_combined_log(logfile_path)
|
||||
|
||||
while test_runner.time_elapsed_s() < test['timeout_min']*60:
|
||||
returncode = test_runner.poll()
|
||||
if returncode is not None:
|
||||
is_success = (returncode == 0)
|
||||
break
|
||||
|
||||
for runner in self.active_runners:
|
||||
output = runner.get_output()
|
||||
if not output:
|
||||
continue
|
||||
|
||||
output = self.add_name_prefix(max_name, runner.name, output)
|
||||
self.add_to_combined_log(output)
|
||||
if self.verbose:
|
||||
print(output)
|
||||
|
||||
else:
|
||||
print(colorize(
|
||||
"Test timeout of {} mins triggered!".
|
||||
format(test['timeout_min']),
|
||||
color.BOLD))
|
||||
is_success = False
|
||||
|
||||
def stop_runners(self) -> None:
|
||||
for runner in self.active_runners:
|
||||
runner.stop()
|
||||
self.stop_combined_log()
|
||||
|
||||
result = {'success': is_success,
|
||||
'logfiles': [runner.get_log_filename()
|
||||
for runner in self.active_runners]}
|
||||
test['cases'][case]['results'].append(result)
|
||||
def collect_runner_output(self) -> None:
|
||||
max_name = max(len(runner.name) for runner in self.active_runners)
|
||||
|
||||
if not is_success:
|
||||
print(self.get_combined_log(logfile_path))
|
||||
print("Logfiles: ")
|
||||
print(" - {}".format(logfile_path))
|
||||
for runner in self.active_runners:
|
||||
print(" - {}".format(runner.get_log_filename()))
|
||||
return is_success
|
||||
for runner in self.active_runners:
|
||||
output = runner.get_output()
|
||||
if not output:
|
||||
continue
|
||||
|
||||
output = self.add_name_prefix(max_name, runner.name, output)
|
||||
self.add_to_combined_log(output)
|
||||
if self.verbose:
|
||||
print(output, end="")
|
||||
|
||||
def start_combined_log(self, filename: str) -> None:
|
||||
self.log_fd = open(filename, 'w')
|
||||
@@ -402,6 +456,8 @@ class Tester:
|
||||
print(colorize(" - {}:".format(test['model']), color.BOLD))
|
||||
|
||||
for name, case in test['cases'].items():
|
||||
if not case['selected']:
|
||||
continue
|
||||
print(" - '{}': ".format(name), end="")
|
||||
|
||||
n_succeeded = [result['success']
|
||||
@@ -429,6 +485,8 @@ class Tester:
|
||||
continue
|
||||
|
||||
for case in test['cases'].values():
|
||||
if not case['selected']:
|
||||
continue
|
||||
for result in case['results']:
|
||||
if result['success'] is not True:
|
||||
return False
|
||||
@@ -455,8 +513,7 @@ class Tester:
|
||||
-> NoReturn:
|
||||
print("Received SIGINT")
|
||||
print("Stopping all processes ...")
|
||||
for runner in self.active_runners:
|
||||
runner.stop()
|
||||
self.stop_runners()
|
||||
self.stop_combined_log()
|
||||
print("Stopping all processes done.")
|
||||
self.show_detailed_results()
|
||||
|
||||
Reference in New Issue
Block a user