mirror of
https://github.com/PX4/PX4-Autopilot.git
synced 2026-05-23 14:47:44 +08:00
6baef6ba88
Restart the Micro-XRCE-DDS Agent before each integration test so DDS graph state from a previous PX4 instance does not leak into the next test. The MicroXRCEAgent is started once per session, but PX4 reboots between tests. The Agent retains writer entries from the previous PX4, so when the new PX4 reconnects, count_publishers() in px4-ros2-interface-lib's waitForFMU returns >0 immediately against a stale entry. Phase 1 (discovery) returns instantly, then Phase 2 (heartbeat) times out waiting for a message on a subscription matched to a dead writer. This is why ModesTest.denyArming (first test) passes while every later ModesTest fails with "timeout while waiting for FMU heartbeat". Adds an optional pre_test_hook on test_runner.Tester so ROS-specific lifecycle stays out of the shared test_runner. The workflow stops starting the Agent externally; ros_test_runner.py owns the lifecycle. Refs #27328 Signed-off-by: Ramon Roche <mrpollo@gmail.com>
273 lines
9.7 KiB
Python
Executable File
273 lines
9.7 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
|
|
import argparse
|
|
import json
|
|
import os
|
|
import psutil # type: ignore
|
|
import signal
|
|
import subprocess
|
|
import sys
|
|
import time
|
|
from mavsdk_tests.integration_test_runner import test_runner, process_helper as ph, logger_helper
|
|
from typing import Any, Dict, List, NoReturn
|
|
|
|
|
|
class MicroXrceAgent:
|
|
def __init__(self, verbose: bool):
|
|
self._verbose = verbose
|
|
# Potential binary names to check for
|
|
self._binary_names = ['MicroXRCEAgent', 'micro-xrce-dds-agent']
|
|
self._proc = None
|
|
|
|
def is_running(self) -> bool:
|
|
return any(is_running(name) for name in self._binary_names)
|
|
|
|
def start_process(self):
|
|
if self._verbose:
|
|
print('Starting micro-xrce-dds-agent')
|
|
assert self._proc is None
|
|
|
|
for name in self._binary_names:
|
|
try:
|
|
self._proc = subprocess.Popen([name] + 'udp4 -p 8888'.split())
|
|
except FileNotFoundError:
|
|
pass
|
|
else: # Process was started
|
|
break
|
|
if self._proc is None:
|
|
raise RuntimeError(f'Failed to start agent, tried these binaries: {self._binary_names}.\n'
|
|
'Make sure it is installed and available. '
|
|
'You can also run it manually before running this script')
|
|
|
|
def stop_process_if_started(self):
|
|
if self._proc is None:
|
|
return
|
|
|
|
if self._verbose:
|
|
print('Stopping micro-xrce-dds-agent')
|
|
self._proc.kill()
|
|
self._proc.wait()
|
|
self._proc = None
|
|
|
|
def restart(self):
|
|
"""Force a fresh Agent process so DDS graph state does not leak across tests.
|
|
|
|
The Agent retains writer entries from prior PX4 instances; a fresh PX4 reconnects
|
|
but the stale entries make count_publishers() return >0 before the new writers
|
|
are matched, breaking waitForFMU's two-phase discovery in px4-ros2-interface-lib.
|
|
"""
|
|
self.stop_process_if_started()
|
|
# Give the OS a moment to release the UDP port before rebinding.
|
|
time.sleep(0.2)
|
|
self.start_process()
|
|
|
|
|
|
class TesterInterfaceRos(test_runner.TesterInterface):
|
|
|
|
def __init__(self, ros_package_build_dir: str):
|
|
self._ros_package_build_dir = ros_package_build_dir
|
|
|
|
def query_test_cases(self, build_dir: str, filter: str) -> List[str]:
|
|
cmd = os.path.join(self._ros_package_build_dir, 'integration_tests')
|
|
args = ["--gtest_list_tests", "--gtest_filter=" + filter]
|
|
# Example output:
|
|
# Running main() from ros2_install/install/gtest_vendor/src/gtest_vendor/src/gtest_main.cc
|
|
# Tester.
|
|
# runModeTests
|
|
p = subprocess.Popen(
|
|
[cmd] + args,
|
|
stdin=subprocess.PIPE,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.STDOUT)
|
|
assert p.stdout is not None
|
|
lines = str(p.stdout.read().decode("utf-8")).strip().split('\n')
|
|
if lines[0].startswith('Running main'):
|
|
del lines[0]
|
|
cases = []
|
|
test_suite_name = None
|
|
for line in lines:
|
|
if line.startswith(' '):
|
|
assert test_suite_name is not None
|
|
cases.append(test_suite_name + line.strip())
|
|
else:
|
|
test_suite_name = line
|
|
return cases
|
|
|
|
def create_test_runner(self,
|
|
workspace_dir: str,
|
|
log_dir: str,
|
|
model: str,
|
|
case: str,
|
|
config: Dict[str, Any],
|
|
speed_factor: float,
|
|
verbose: bool,
|
|
build_dir: str) -> ph.Runner:
|
|
return ph.TestRunnerRos(
|
|
workspace_dir,
|
|
log_dir,
|
|
model,
|
|
case,
|
|
verbose,
|
|
self._ros_package_build_dir)
|
|
|
|
def rootfs_base_dirname(self) -> str:
|
|
return "tmp_ros_tests"
|
|
|
|
|
|
def main() -> NoReturn:
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument("--list-cases", action='store_true',
|
|
help="List available test cases")
|
|
parser.add_argument("--log-dir",
|
|
help="Directory for log files", default="logs")
|
|
parser.add_argument("--iterations", type=int, default=1,
|
|
help="how often to run all tests")
|
|
parser.add_argument("--abort-early", action='store_true',
|
|
help="abort on first unsuccessful test")
|
|
parser.add_argument("--gui", default=False, action='store_true',
|
|
help="display the visualization for a simulation")
|
|
parser.add_argument("--model", type=str, default='all',
|
|
help="only run tests for one model")
|
|
parser.add_argument("--case", type=str, default='all',
|
|
help="only run tests for one case "
|
|
"(or multiple cases with wildcard '*')")
|
|
parser.add_argument("--debugger", default="",
|
|
help="choice from valgrind, callgrind, gdb, lldb")
|
|
parser.add_argument("--upload", default=False, action='store_true',
|
|
help="Upload logs to logs.px4.io")
|
|
parser.add_argument("--force-color", default=False, action='store_true',
|
|
help="Force colorized output")
|
|
parser.add_argument("--verbose", default=False, action='store_true',
|
|
help="enable more verbose output")
|
|
parser.add_argument("--config-file", help="JSON config file to use",
|
|
default="test/ros_tests/config.json")
|
|
parser.add_argument("--build-dir", type=str,
|
|
default='build/px4_sitl_default/',
|
|
help="relative path where the built files are stored")
|
|
parser.add_argument("--px4-ros2-interface-lib-build-dir", type=str,
|
|
default=None,
|
|
help="path which contains the integration_tests binary. "
|
|
"If not provided, it is determined via 'ros2 pkg'")
|
|
args = parser.parse_args()
|
|
|
|
if args.force_color:
|
|
logger_helper.force_color = True
|
|
|
|
ros_package_build_dir = args.px4_ros2_interface_lib_build_dir
|
|
if ros_package_build_dir is None:
|
|
ros_package_build_dir = lookup_px4_ros2_cpp_build_dir()
|
|
if args.verbose:
|
|
print(f'px4_ros2_cpp build dir: {ros_package_build_dir}')
|
|
|
|
tester_interface = TesterInterfaceRos(ros_package_build_dir)
|
|
|
|
if args.list_cases:
|
|
for case in tester_interface.query_test_cases(args.build_dir, '*'):
|
|
print(f'{case}')
|
|
sys.exit(0)
|
|
|
|
with open(args.config_file) as json_file:
|
|
config = json.load(json_file)
|
|
|
|
if config["mode"] != "sitl" and args.gui:
|
|
print("--gui is not compatible with the mode '{}'"
|
|
.format(config["mode"]))
|
|
sys.exit(1)
|
|
|
|
if not is_everything_ready(config, args.build_dir):
|
|
sys.exit(1)
|
|
|
|
if args.verbose:
|
|
print("Creating directory: {}".format(args.log_dir))
|
|
os.makedirs(args.log_dir, exist_ok=True)
|
|
|
|
speed_factor = 1 # Not (yet) supported
|
|
tester = test_runner.Tester(
|
|
config,
|
|
args.iterations,
|
|
args.abort_early,
|
|
speed_factor,
|
|
args.model,
|
|
args.case,
|
|
args.debugger,
|
|
args.log_dir,
|
|
args.gui,
|
|
args.verbose,
|
|
args.upload,
|
|
args.build_dir,
|
|
tester_interface
|
|
)
|
|
signal.signal(signal.SIGINT, tester.sigint_handler)
|
|
|
|
# Automatically start & stop the XRCE Agent if not running already
|
|
micro_xrce_agent = MicroXrceAgent(args.verbose)
|
|
agent_managed_here = not micro_xrce_agent.is_running()
|
|
if agent_managed_here:
|
|
micro_xrce_agent.start_process()
|
|
|
|
def restart_agent(_model: str, _case: str) -> None:
|
|
micro_xrce_agent.restart()
|
|
|
|
tester.pre_test_hook = restart_agent
|
|
|
|
try:
|
|
result = tester.run()
|
|
finally:
|
|
micro_xrce_agent.stop_process_if_started()
|
|
|
|
sys.exit(0 if result else 1)
|
|
|
|
|
|
def is_running(process_name: str) -> bool:
|
|
for proc in psutil.process_iter(attrs=['name']):
|
|
if proc.info['name'] == process_name:
|
|
return True
|
|
return False
|
|
|
|
|
|
def is_everything_ready(config: Dict[str, str], build_dir: str) -> bool:
|
|
result = True
|
|
|
|
if config['mode'] == 'sitl':
|
|
if is_running('px4'):
|
|
print("px4 process already running\n"
|
|
"run `killall px4` and try again")
|
|
result = False
|
|
if not os.path.isfile(os.path.join(build_dir, 'bin/px4')):
|
|
print("PX4 SITL is not built\n"
|
|
"run `DONT_RUN=1 make px4_sitl gazebo` or "
|
|
"`DONT_RUN=1 make px4_sitl_default gazebo`")
|
|
result = False
|
|
if config['simulator'] == 'gazebo':
|
|
if is_running('gzserver'):
|
|
print("gzserver process already running\n"
|
|
"run `killall gzserver` and try again")
|
|
result = False
|
|
if is_running('gzclient'):
|
|
print("gzclient process already running\n"
|
|
"run `killall gzclient` and try again")
|
|
result = False
|
|
|
|
return result
|
|
|
|
|
|
def lookup_px4_ros2_cpp_build_dir():
|
|
"""Get the ROS2 build directory for px4_ros2_cpp"""
|
|
try:
|
|
p = subprocess.Popen(
|
|
'ros2 pkg prefix px4_ros2_cpp'.split(),
|
|
stdin=subprocess.PIPE,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.STDOUT)
|
|
except FileNotFoundError:
|
|
print('Error: command "ros2" not found. Did you source the ros workspace?')
|
|
sys.exit(1)
|
|
assert p.stdout is not None
|
|
lines = str(p.stdout.read().decode("utf-8")).strip().split('\n')
|
|
assert len(lines) == 1
|
|
return os.path.join(lines[0], '../../build/px4_ros2_cpp')
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|