mirror of
https://github.com/PX4/PX4-Autopilot.git
synced 2026-03-23 18:13:49 +08:00
test: add ros integration test runner script & config
This commit is contained in:
@@ -376,3 +376,29 @@ class TestRunnerMavsdk(Runner):
|
||||
"--url", mavlink_connection,
|
||||
"--speed-factor", str(speed_factor),
|
||||
case]
|
||||
|
||||
class TestRunnerRos(Runner):
|
||||
def __init__(self,
|
||||
workspace_dir: str,
|
||||
log_dir: str,
|
||||
model: str,
|
||||
case: str,
|
||||
verbose: bool,
|
||||
ros_package_build_dir: str):
|
||||
super().__init__(log_dir, model, case, verbose)
|
||||
self.name = "integration_tests"
|
||||
self.cwd = workspace_dir
|
||||
self.cmd = "nice"
|
||||
self.args = ["-17",
|
||||
os.path.join(
|
||||
ros_package_build_dir,
|
||||
"integration_tests"),
|
||||
"--gtest_filter="+case, "--gtest_color=yes"]
|
||||
|
||||
def get_output_line(self) -> Optional[str]:
|
||||
line = super().get_output_line()
|
||||
if line is not None:
|
||||
# colorize assertion failures & errors
|
||||
if 'Failure' in line or '[ERROR]' in line or '[FATAL]' in line:
|
||||
line = colorize(line, color.RED)
|
||||
return line
|
||||
|
||||
247
test/ros_test_runner.py
Executable file
247
test/ros_test_runner.py
Executable file
@@ -0,0 +1,247 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import os
|
||||
import psutil # type: ignore
|
||||
import signal
|
||||
import subprocess
|
||||
import sys
|
||||
from mavsdk_tests.integration_test_runner import test_runner, process_helper as ph, logger_helper
|
||||
from typing import Any, Dict, List, NoReturn
|
||||
|
||||
|
||||
class MicroXrceAgent:
|
||||
def __init__(self, verbose: bool):
|
||||
self._verbose = verbose
|
||||
# Potential binary names to check for
|
||||
self._binary_names = ['MicroXRCEAgent', 'micro-xrce-dds-agent']
|
||||
self._proc = None
|
||||
|
||||
def is_running(self) -> bool:
|
||||
return any(is_running(name) for name in self._binary_names)
|
||||
|
||||
def start_process(self):
|
||||
if self._verbose:
|
||||
print('Starting micro-xrce-dds-agent')
|
||||
assert self._proc is None
|
||||
|
||||
for name in self._binary_names:
|
||||
try:
|
||||
self._proc = subprocess.Popen([name] + 'udp4 -p 8888'.split())
|
||||
except FileNotFoundError:
|
||||
pass
|
||||
else: # Process was started
|
||||
break
|
||||
if self._proc is None:
|
||||
raise RuntimeError(f'Failed to start agent, tried these binaries: {self._binary_names}.\n'
|
||||
'Make sure it is installed and available. '
|
||||
'You can also run it manually before running this script')
|
||||
|
||||
def stop_process_if_started(self):
|
||||
if self._proc is None:
|
||||
return
|
||||
|
||||
if self._verbose:
|
||||
print('Stopping micro-xrce-dds-agent')
|
||||
self._proc.kill()
|
||||
self._proc = None
|
||||
|
||||
|
||||
class TesterInterfaceRos(test_runner.TesterInterface):
|
||||
|
||||
def __init__(self, ros_package_build_dir: str):
|
||||
self._ros_package_build_dir = ros_package_build_dir
|
||||
|
||||
def query_test_cases(self, build_dir: str, filter: str) -> List[str]:
|
||||
cmd = os.path.join(self._ros_package_build_dir, 'integration_tests')
|
||||
args = ["--gtest_list_tests", "--gtest_filter=" + filter]
|
||||
# Example output:
|
||||
# Running main() from ros2_install/install/gtest_vendor/src/gtest_vendor/src/gtest_main.cc
|
||||
# Tester.
|
||||
# runModeTests
|
||||
p = subprocess.Popen(
|
||||
[cmd] + args,
|
||||
stdin=subprocess.PIPE,
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.STDOUT)
|
||||
assert p.stdout is not None
|
||||
lines = str(p.stdout.read().decode("utf-8")).strip().split('\n')
|
||||
if lines[0].startswith('Running main'):
|
||||
del lines[0]
|
||||
cases = []
|
||||
test_suite_name = None
|
||||
for line in lines:
|
||||
if line.startswith(' '):
|
||||
assert test_suite_name is not None
|
||||
cases.append(test_suite_name + line.strip())
|
||||
else:
|
||||
test_suite_name = line
|
||||
return cases
|
||||
|
||||
def create_test_runner(self,
|
||||
workspace_dir: str,
|
||||
log_dir: str,
|
||||
model: str,
|
||||
case: str,
|
||||
config: Dict[str, Any],
|
||||
speed_factor: float,
|
||||
verbose: bool,
|
||||
build_dir: str) -> ph.Runner:
|
||||
return ph.TestRunnerRos(
|
||||
workspace_dir,
|
||||
log_dir,
|
||||
model,
|
||||
case,
|
||||
verbose,
|
||||
self._ros_package_build_dir)
|
||||
|
||||
def rootfs_base_dirname(self) -> str:
|
||||
return "tmp_ros_tests"
|
||||
|
||||
|
||||
def main() -> NoReturn:
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument("--list-cases", action='store_true',
|
||||
help="List available test cases")
|
||||
parser.add_argument("--log-dir",
|
||||
help="Directory for log files", default="logs")
|
||||
parser.add_argument("--iterations", type=int, default=1,
|
||||
help="how often to run all tests")
|
||||
parser.add_argument("--abort-early", action='store_true',
|
||||
help="abort on first unsuccessful test")
|
||||
parser.add_argument("--gui", default=False, action='store_true',
|
||||
help="display the visualization for a simulation")
|
||||
parser.add_argument("--model", type=str, default='all',
|
||||
help="only run tests for one model")
|
||||
parser.add_argument("--case", type=str, default='all',
|
||||
help="only run tests for one case "
|
||||
"(or multiple cases with wildcard '*')")
|
||||
parser.add_argument("--debugger", default="",
|
||||
help="choice from valgrind, callgrind, gdb, lldb")
|
||||
parser.add_argument("--upload", default=False, action='store_true',
|
||||
help="Upload logs to logs.px4.io")
|
||||
parser.add_argument("--verbose", default=False, action='store_true',
|
||||
help="enable more verbose output")
|
||||
parser.add_argument("--config-file", help="JSON config file to use",
|
||||
default="test/ros_tests/config.json")
|
||||
parser.add_argument("--build-dir", type=str,
|
||||
default='build/px4_sitl_default/',
|
||||
help="relative path where the built files are stored")
|
||||
parser.add_argument("--px4-ros2-interface-lib-build-dir", type=str,
|
||||
default=None,
|
||||
help="path which contains the integration_tests binary. "
|
||||
"If not provided, it is determined via 'ros2 pkg'")
|
||||
args = parser.parse_args()
|
||||
|
||||
ros_package_build_dir = args.px4_ros2_interface_lib_build_dir
|
||||
if ros_package_build_dir is None:
|
||||
ros_package_build_dir = lookup_px4_ros2_cpp_build_dir()
|
||||
if args.verbose:
|
||||
print(f'px4_ros2_cpp build dir: {ros_package_build_dir}')
|
||||
|
||||
tester_interface = TesterInterfaceRos(ros_package_build_dir)
|
||||
|
||||
if args.list_cases:
|
||||
for case in tester_interface.query_test_cases(args.build_dir, '*'):
|
||||
print(f'{case}')
|
||||
sys.exit(0)
|
||||
|
||||
with open(args.config_file) as json_file:
|
||||
config = json.load(json_file)
|
||||
|
||||
if config["mode"] != "sitl" and args.gui:
|
||||
print("--gui is not compatible with the mode '{}'"
|
||||
.format(config["mode"]))
|
||||
sys.exit(1)
|
||||
|
||||
if not is_everything_ready(config, args.build_dir):
|
||||
sys.exit(1)
|
||||
|
||||
if args.verbose:
|
||||
print("Creating directory: {}".format(args.log_dir))
|
||||
os.makedirs(args.log_dir, exist_ok=True)
|
||||
|
||||
speed_factor = 1 # Not (yet) supported
|
||||
tester = test_runner.Tester(
|
||||
config,
|
||||
args.iterations,
|
||||
args.abort_early,
|
||||
speed_factor,
|
||||
args.model,
|
||||
args.case,
|
||||
args.debugger,
|
||||
args.log_dir,
|
||||
args.gui,
|
||||
args.verbose,
|
||||
args.upload,
|
||||
args.build_dir,
|
||||
tester_interface
|
||||
)
|
||||
signal.signal(signal.SIGINT, tester.sigint_handler)
|
||||
|
||||
# Automatically start & stop the XRCE Agent if not running already
|
||||
micro_xrce_agent = MicroXrceAgent(args.verbose)
|
||||
if not micro_xrce_agent.is_running():
|
||||
micro_xrce_agent.start_process()
|
||||
|
||||
try:
|
||||
result = tester.run()
|
||||
finally:
|
||||
micro_xrce_agent.stop_process_if_started()
|
||||
|
||||
sys.exit(0 if result else 1)
|
||||
|
||||
|
||||
def is_running(process_name: str) -> bool:
|
||||
for proc in psutil.process_iter(attrs=['name']):
|
||||
if proc.info['name'] == process_name:
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
def is_everything_ready(config: Dict[str, str], build_dir: str) -> bool:
|
||||
result = True
|
||||
|
||||
if config['mode'] == 'sitl':
|
||||
if is_running('px4'):
|
||||
print("px4 process already running\n"
|
||||
"run `killall px4` and try again")
|
||||
result = False
|
||||
if not os.path.isfile(os.path.join(build_dir, 'bin/px4')):
|
||||
print("PX4 SITL is not built\n"
|
||||
"run `DONT_RUN=1 make px4_sitl gazebo` or "
|
||||
"`DONT_RUN=1 make px4_sitl_default gazebo`")
|
||||
result = False
|
||||
if config['simulator'] == 'gazebo':
|
||||
if is_running('gzserver'):
|
||||
print("gzserver process already running\n"
|
||||
"run `killall gzserver` and try again")
|
||||
result = False
|
||||
if is_running('gzclient'):
|
||||
print("gzclient process already running\n"
|
||||
"run `killall gzclient` and try again")
|
||||
result = False
|
||||
|
||||
return result
|
||||
|
||||
|
||||
def lookup_px4_ros2_cpp_build_dir():
|
||||
"""Get the ROS2 build directory for px4_ros2_cpp"""
|
||||
try:
|
||||
p = subprocess.Popen(
|
||||
'ros2 pkg prefix px4_ros2_cpp'.split(),
|
||||
stdin=subprocess.PIPE,
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.STDOUT)
|
||||
except FileNotFoundError:
|
||||
print('Error: command "ros2" not found. Did you source the ros workspace?')
|
||||
sys.exit(1)
|
||||
assert p.stdout is not None
|
||||
lines = str(p.stdout.read().decode("utf-8")).strip().split('\n')
|
||||
assert len(lines) == 1
|
||||
return os.path.join(lines[0], '../../build/px4_ros2_cpp')
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
13
test/ros_tests/config.json
Normal file
13
test/ros_tests/config.json
Normal file
@@ -0,0 +1,13 @@
|
||||
{
|
||||
"mode": "sitl",
|
||||
"simulator": "gazebo",
|
||||
"tests":
|
||||
[
|
||||
{
|
||||
"model": "iris",
|
||||
"vehicle": "iris",
|
||||
"test_filter": "Tester.*",
|
||||
"timeout_min": 10
|
||||
}
|
||||
]
|
||||
}
|
||||
Reference in New Issue
Block a user