perf(mavlink): remove mutex from forward_message hot path

Remove mavlink_module_mutex from forward_message() to eliminate
per-message lock overhead in the forwarding path.

The mutex is unnecessary because:
- Instances can only be destroyed via destroy_all_instances(), which
  stops all threads before modifying the array
- Pointer reads from mavlink_module_instances[] are naturally atomic
  on aligned architectures (ARM, x86)
- pass_message() has its own per-instance _message_buffer_mutex

Benchmark results (SITL SIH, 2 connections, wall-clock timing):

  200 Hz (400 msg/s): avg 648 ns -> 563 ns (-13%)
  500 Hz (1000 msg/s): avg 469 ns -> 407 ns (-13%)
  Mutex lock section: avg 327 ns -> 262 ns (-20%)

Benchmark gist: https://gist.github.com/mrpollo/167d4ec7436fb773bf193be6c2800639

Also includes a MAVSDK C++ traffic generator tool for reproducing
the benchmark.

Signed-off-by: Ramon Roche <mrpollo@gmail.com>
This commit is contained in:
Ramon Roche
2026-03-31 18:15:20 -07:00
parent 7b8fc2efaf
commit 49e5550b94
4 changed files with 538 additions and 1 deletions
+4 -1
View File
@@ -513,7 +513,10 @@ Mavlink::forward_message(const mavlink_message_t *msg, Mavlink *self)
return;
}
LockGuard lg{mavlink_module_mutex};
// No mutex needed: instances can't be destroyed while receiver threads run
// (destroy_all_instances() stops all threads first). Pointer reads are
// naturally atomic on aligned architectures. pass_message() has its own
// per-instance _message_buffer_mutex for the ring buffer.
for (Mavlink *inst : mavlink_module_instances) {
if (inst && (inst != self) && (inst->get_forwarding_on())) {
+26
View File
@@ -0,0 +1,26 @@
cmake_minimum_required(VERSION 3.5.1)
project(mavlink_forwarding_bench CXX)
set(CMAKE_CXX_STANDARD 17)
set(CMAKE_CXX_STANDARD_REQUIRED ON)
set(CMAKE_EXPORT_COMPILE_COMMANDS ON)
find_package(MAVSDK REQUIRED)
find_package(Threads REQUIRED)
add_executable(mavlink_forwarding_bench
test_mavlink_forwarding_bench.cpp
)
target_link_libraries(mavlink_forwarding_bench
MAVSDK::mavsdk
${CMAKE_THREAD_LIBS_INIT}
)
target_compile_options(mavlink_forwarding_bench
PRIVATE
-Wall
-Wextra
-Werror
)
+128
View File
@@ -0,0 +1,128 @@
#!/bin/bash
#
# MAVLink forwarding mutex benchmark runner.
#
# Builds the benchmark tool, starts PX4 SITL SIH, runs the benchmark,
# then collects perf counter data.
#
# Usage:
# ./run_forwarding_bench.sh [--duration 60] [--rate 200] [--connections 2] [--label baseline]
#
# The --label flag tags the output files for before/after comparison.
set -euo pipefail
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
PX4_DIR="$(cd "$SCRIPT_DIR/../.." && pwd)"
BUILD_DIR="$SCRIPT_DIR/build_bench"
RESULTS_DIR="$PX4_DIR/bench_results"
DURATION=60
RATE=200
CONNECTIONS=2
LABEL="run"
while [[ $# -gt 0 ]]; do
case "$1" in
--duration) DURATION="$2"; shift 2 ;;
--rate) RATE="$2"; shift 2 ;;
--connections) CONNECTIONS="$2"; shift 2 ;;
--label) LABEL="$2"; shift 2 ;;
--help|-h)
echo "Usage: $0 [--duration N] [--rate N] [--connections N] [--label NAME]"
exit 0
;;
*) echo "Unknown arg: $1"; exit 1 ;;
esac
done
TIMESTAMP=$(date +%Y%m%d_%H%M%S)
RESULT_PREFIX="${RESULTS_DIR}/${LABEL}_${TIMESTAMP}"
mkdir -p "$RESULTS_DIR"
echo "=== Step 1: Build PX4 SITL SIH ==="
cd "$PX4_DIR"
make px4_sitl_sih 2>&1 | tail -5
echo ""
echo "=== Step 2: Build benchmark tool ==="
mkdir -p "$BUILD_DIR"
cd "$BUILD_DIR"
# Symlink the standalone CMakeLists and source into build dir
ln -sf "$SCRIPT_DIR/CMakeLists_bench.txt" CMakeLists.txt
ln -sf "$SCRIPT_DIR/test_mavlink_forwarding_bench.cpp" .
cmake -DCMAKE_BUILD_TYPE=Release .
make -j$(nproc 2>/dev/null || sysctl -n hw.ncpu)
echo ""
echo "=== Step 3: Start PX4 SITL SIH ==="
cd "$PX4_DIR"
# Start PX4 in background, capture PID
PX4_SIM_MODEL=sihsim_quadx $PX4_DIR/build/px4_sitl_sih/bin/px4 \
-s $PX4_DIR/ROMFS/px4fmu_common/init.d-posix/rcS \
-w $PX4_DIR/build/px4_sitl_sih/rootfs \
$PX4_DIR/build/px4_sitl_sih/etc \
> "${RESULT_PREFIX}_px4.log" 2>&1 &
PX4_PID=$!
echo "PX4 started (PID=$PX4_PID), waiting 10s for boot..."
sleep 10
# Verify PX4 is running
if ! kill -0 "$PX4_PID" 2>/dev/null; then
echo "ERROR: PX4 failed to start. Check ${RESULT_PREFIX}_px4.log"
exit 1
fi
echo ""
echo "=== Step 4: Run benchmark ==="
echo " Duration: ${DURATION}s"
echo " Rate: ${RATE} Hz/connection"
echo " Connections: ${CONNECTIONS}"
echo ""
"$BUILD_DIR/mavlink_forwarding_bench" \
--duration "$DURATION" \
--rate "$RATE" \
--connections "$CONNECTIONS" \
--report "${RESULT_PREFIX}_sender.csv" \
2>&1 | tee "${RESULT_PREFIX}_bench.log"
echo ""
echo "=== Step 5: Collect perf counters ==="
# Send 'perf' command to PX4 via mavlink shell (or direct if available)
sleep 2
echo ""
echo "=== Step 6: Stop PX4 and extract perf data ==="
kill -INT "$PX4_PID" 2>/dev/null || true
sleep 3
# PX4 dumps perf counters on exit in some configurations.
# Extract perf lines from the log.
if grep -q "mavlink: fwd_msg" "${RESULT_PREFIX}_px4.log"; then
echo "Perf counters from PX4 log:"
grep -E "mavlink: (fwd_|comp_seen|unsigned_cb)" "${RESULT_PREFIX}_px4.log" | tee "${RESULT_PREFIX}_perf.txt"
else
echo "NOTE: Perf counters not found in PX4 log."
echo "Run 'perf' in the PX4 console before stopping to capture data."
echo ""
echo "Manual workflow:"
echo " 1. Terminal 1: PX4_SIM_MODEL=sihsim_quadx make px4_sitl_sih"
echo " 2. Terminal 2: $BUILD_DIR/mavlink_forwarding_bench --duration $DURATION --rate $RATE --connections $CONNECTIONS"
echo " 3. Terminal 1 (PX4 console): perf"
echo " 4. Copy the perf output to ${RESULT_PREFIX}_perf.txt"
fi
echo ""
echo "=== Results saved ==="
echo " PX4 log: ${RESULT_PREFIX}_px4.log"
echo " Bench log: ${RESULT_PREFIX}_bench.log"
echo " Sender CSV: ${RESULT_PREFIX}_sender.csv"
echo " Perf data: ${RESULT_PREFIX}_perf.txt (if available)"
echo ""
echo "To compare runs:"
echo " diff <baseline>_perf.txt <after_fix>_perf.txt"
@@ -0,0 +1,380 @@
/****************************************************************************
*
* Copyright (c) 2026 PX4 Development Team. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
*
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in
* the documentation and/or other materials provided with the
* distribution.
* 3. Neither the name PX4 nor the names of its contributors may be
* used to endorse or promote products derived from this software
* without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
* FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
* COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
* INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
* BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS
* OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
* AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
* ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*
****************************************************************************/
/**
* @file test_mavlink_forwarding_bench.cpp
*
* MAVLink forwarding mutex benchmark.
* Sends high-rate broadcast messages to multiple PX4 MAVLink instances
* to measure the cost of mavlink_module_mutex in the forwarding path.
*
* Usage:
* 1. Start PX4 SITL: make px4_sitl_sih
* 2. Build and run this benchmark (see CMakeLists.txt)
* 3. After the run completes, dump perf counters in the PX4 console:
* > perf
*
* The benchmark connects to two UDP ports (GCS + offboard) and floods
* broadcast DEBUG_FLOAT_ARRAY messages. Since target_system=0 (broadcast),
* every message hits forward_message() and gets forwarded to all other
* instances, creating real mutex contention between receiver threads.
*/
#include <mavsdk/mavsdk.h>
#include <mavsdk/plugins/mavlink_passthrough/mavlink_passthrough.h>
#include <mavsdk/plugins/telemetry/telemetry.h>
#include <atomic>
#include <chrono>
#include <cinttypes>
#include <cmath>
#include <csignal>
#include <cstring>
#include <iostream>
#include <thread>
#include <vector>
using namespace mavsdk;
using namespace std::chrono;
using namespace std::chrono_literals;
static std::atomic<bool> g_stop{false};
static void signal_handler(int /*signum*/)
{
g_stop.store(true);
}
struct SenderStats {
std::atomic<uint64_t> messages_sent{0};
std::atomic<uint64_t> send_errors{0};
};
// Send broadcast DEBUG_FLOAT_ARRAY at the given rate on a single connection.
// Broadcast (sysid=0) ensures the message always enters forward_message().
static void sender_thread(MavlinkPassthrough &passthrough,
int rate_hz,
SenderStats &stats)
{
const auto interval = microseconds(1'000'000 / rate_hz);
auto next_send = steady_clock::now();
while (!g_stop.load()) {
// DEBUG_FLOAT_ARRAY is a good choice: variable-length, commonly supported,
// and has no side effects on PX4 when received.
// No target fields, so it defaults to broadcast forwarding in PX4.
auto result = passthrough.queue_message(
[](MavlinkAddress mavlink_address, uint8_t channel) {
mavlink_message_t msg{};
mavlink_msg_debug_float_array_pack_chan(
mavlink_address.system_id,
mavlink_address.component_id,
channel,
&msg,
0, // time_usec
"bench", // name
0, // array_id
nullptr // data (empty)
);
return msg;
});
if (result == MavlinkPassthrough::Result::Success) {
stats.messages_sent.fetch_add(1, std::memory_order_relaxed);
} else {
stats.send_errors.fetch_add(1, std::memory_order_relaxed);
}
next_send += interval;
std::this_thread::sleep_until(next_send);
}
}
static void print_usage(const char *bin)
{
std::cout << "Usage: " << bin << " [options]\n"
<< " --duration <sec> Test duration in seconds (default: 60)\n"
<< " --rate <hz> Messages per second per connection (default: 200)\n"
<< " --connections <n> Number of UDP connections (default: 2)\n"
<< " --gcs-port <port> GCS UDP port (default: 14550)\n"
<< " --api-port <port> Offboard/API UDP port (default: 14540)\n"
<< " --report <file> Write CSV report to file (default: stdout)\n"
<< std::endl;
}
int main(int argc, char *argv[])
{
int duration_sec = 60;
int rate_hz = 200;
int num_connections = 2;
int gcs_port = 14550;
int api_port = 14540;
std::string report_file;
for (int i = 1; i < argc; ++i) {
std::string arg(argv[i]);
if (arg == "--duration" && i + 1 < argc) {
duration_sec = std::atoi(argv[++i]);
} else if (arg == "--rate" && i + 1 < argc) {
rate_hz = std::atoi(argv[++i]);
} else if (arg == "--connections" && i + 1 < argc) {
num_connections = std::atoi(argv[++i]);
} else if (arg == "--gcs-port" && i + 1 < argc) {
gcs_port = std::atoi(argv[++i]);
} else if (arg == "--api-port" && i + 1 < argc) {
api_port = std::atoi(argv[++i]);
} else if (arg == "--report" && i + 1 < argc) {
report_file = argv[++i];
} else if (arg == "--help" || arg == "-h") {
print_usage(argv[0]);
return 0;
} else {
std::cerr << "Unknown argument: " << arg << std::endl;
print_usage(argv[0]);
return 1;
}
}
signal(SIGINT, signal_handler);
signal(SIGTERM, signal_handler);
std::cout << "=== MAVLink Forwarding Mutex Benchmark ===" << std::endl;
std::cout << "Duration: " << duration_sec << "s" << std::endl;
std::cout << "Rate: " << rate_hz << " Hz per connection" << std::endl;
std::cout << "Connections: " << num_connections << std::endl;
std::cout << std::endl;
// Build list of UDP ports to connect to
std::vector<int> ports;
if (num_connections >= 1) {
ports.push_back(api_port); // Offboard/API link
}
if (num_connections >= 2) {
ports.push_back(gcs_port); // GCS link
}
// Additional connections use sequential ports after api_port
for (int i = 2; i < num_connections; ++i) {
ports.push_back(api_port + i);
}
// Create MAVSDK instances and connect
std::vector<std::unique_ptr<Mavsdk>> mavsdks;
std::vector<MavlinkPassthrough *> passthroughs;
for (size_t i = 0; i < ports.size(); ++i) {
auto config = Mavsdk::Configuration(
static_cast<uint8_t>(245 + i), // unique sysid per connection
MAV_COMP_ID_MISSIONPLANNER,
false // not autopilot
);
auto mavsdk = std::make_unique<Mavsdk>(config);
std::string url = "udp://:" + std::to_string(ports[i]);
std::cout << "Connecting to " << url << " (sysid=" << (245 + i) << ")..." << std::endl;
auto result = mavsdk->add_any_connection(url);
if (result != ConnectionResult::Success) {
std::cerr << "Connection failed: " << url << std::endl;
return 1;
}
// Wait for system discovery
std::cout << " Waiting for system..." << std::flush;
bool found = false;
for (int t = 0; t < 50; ++t) { // 5 second timeout
if (!mavsdk->systems().empty()) {
found = true;
break;
}
std::this_thread::sleep_for(100ms);
}
if (!found) {
std::cerr << "\n Timeout waiting for system on port " << ports[i] << std::endl;
return 1;
}
std::cout << " OK" << std::endl;
auto system = mavsdk->systems().front();
auto *passthrough = new MavlinkPassthrough(system);
passthroughs.push_back(passthrough);
mavsdks.push_back(std::move(mavsdk));
}
std::cout << std::endl;
std::cout << "Starting " << passthroughs.size() << " sender threads at " << rate_hz << " Hz each..." << std::endl;
std::cout << "Total expected rate: " << (rate_hz * static_cast<int>(passthroughs.size())) << " msg/s" << std::endl;
std::cout << std::endl;
// Launch sender threads
std::vector<SenderStats> stats(passthroughs.size());
std::vector<std::thread> threads;
auto start_time = steady_clock::now();
for (size_t i = 0; i < passthroughs.size(); ++i) {
threads.emplace_back(sender_thread, std::ref(*passthroughs[i]), rate_hz, std::ref(stats[i]));
}
// Progress reporting
auto report_interval = 5s;
auto next_report = start_time + report_interval;
while (!g_stop.load()) {
auto now = steady_clock::now();
if (now >= start_time + seconds(duration_sec)) {
break;
}
if (now >= next_report) {
auto elapsed = duration_cast<seconds>(now - start_time).count();
uint64_t total_sent = 0;
uint64_t total_errors = 0;
for (auto &s : stats) {
total_sent += s.messages_sent.load(std::memory_order_relaxed);
total_errors += s.send_errors.load(std::memory_order_relaxed);
}
double actual_rate = static_cast<double>(total_sent) / static_cast<double>(elapsed);
std::cout << "[" << elapsed << "s] sent=" << total_sent
<< " errors=" << total_errors
<< " rate=" << static_cast<int>(actual_rate) << " msg/s"
<< std::endl;
next_report += report_interval;
}
std::this_thread::sleep_for(100ms);
}
// Stop senders
g_stop.store(true);
for (auto &t : threads) {
t.join();
}
auto end_time = steady_clock::now();
double total_elapsed = duration_cast<milliseconds>(end_time - start_time).count() / 1000.0;
// Final report
std::cout << std::endl;
std::cout << "=== Results ===" << std::endl;
std::cout << "Elapsed: " << total_elapsed << "s" << std::endl;
uint64_t grand_total_sent = 0;
uint64_t grand_total_errors = 0;
for (size_t i = 0; i < stats.size(); ++i) {
uint64_t sent = stats[i].messages_sent.load();
uint64_t errors = stats[i].send_errors.load();
double rate = static_cast<double>(sent) / total_elapsed;
std::cout << " Connection " << i << " (port " << ports[i] << "): "
<< "sent=" << sent
<< " errors=" << errors
<< " rate=" << static_cast<int>(rate) << " msg/s"
<< std::endl;
grand_total_sent += sent;
grand_total_errors += errors;
}
double grand_rate = static_cast<double>(grand_total_sent) / total_elapsed;
std::cout << " Total: sent=" << grand_total_sent
<< " errors=" << grand_total_errors
<< " rate=" << static_cast<int>(grand_rate) << " msg/s"
<< std::endl;
std::cout << std::endl;
std::cout << "Now run 'perf' in the PX4 console to dump forwarding perf counters." << std::endl;
std::cout << "Look for:" << std::endl;
std::cout << " mavlink: fwd_msg total -- total forward_message() time" << std::endl;
std::cout << " mavlink: fwd_msg lock -- time under mavlink_module_mutex" << std::endl;
std::cout << " mavlink: comp_seen total -- static component_was_seen() time" << std::endl;
std::cout << " mavlink: unsigned_cb -- accept_unsigned_callback time" << std::endl;
// CSV report output
FILE *out = stdout;
if (!report_file.empty()) {
out = fopen(report_file.c_str(), "w");
if (!out) {
std::cerr << "Failed to open report file: " << report_file << std::endl;
out = stdout;
}
}
fprintf(out, "connection,port,sysid,messages_sent,send_errors,duration_s,rate_hz_requested,rate_hz_actual\n");
for (size_t i = 0; i < stats.size(); ++i) {
uint64_t sent = stats[i].messages_sent.load();
uint64_t errors = stats[i].send_errors.load();
double rate = static_cast<double>(sent) / total_elapsed;
fprintf(out, "%zu,%d,%d,%" PRIu64 ",%" PRIu64 ",%.1f,%d,%.1f\n",
i, ports[i], static_cast<int>(245 + i),
sent, errors, total_elapsed, rate_hz, rate);
}
if (out != stdout) {
fclose(out);
std::cout << "CSV report written to: " << report_file << std::endl;
}
// Cleanup
for (auto *p : passthroughs) {
delete p;
}
return 0;
}