fix(lockstep): stabilize Windows SITL timing

Wake scheduler waits through Windows-safe primitives, gate lockstep work-queue progress to Windows, and avoid startup stalls before simulator time is available.

Keep SIH and commander shell commands responsive at higher speed factors, including Wine-specific sleep behavior that avoids clock-poll spin overhead without changing native Windows paths.
This commit is contained in:
Nuno Marques
2026-05-11 10:31:52 -07:00
parent 3e37a74173
commit 3b70c33d7e
15 changed files with 739 additions and 32 deletions
@@ -192,6 +192,21 @@ void WorkQueue::Run()
#if defined(ENABLE_LOCKSTEP_SCHEDULER)
#if defined(__PX4_WINDOWS)
// Windows-only: winpthreads occasionally drops pthread_cond_broadcast
// wakes, so explicitly hand back lockstep progress after each Run()
// pass to keep the simulator advancing even if the broadcast that
// would have unblocked the lockstep barrier got lost. Linux POSIX
// condition variables deliver every broadcast; calling progress
// here on Linux risks signalling readiness before the queue has
// actually drained the items the simulator clock is gating on.
if (_lockstep_component != -1) {
px4_lockstep_progress(_lockstep_component);
}
#endif // __PX4_WINDOWS
if (_q.empty()) {
px4_lockstep_unregister_component(_lockstep_component);
_lockstep_component = -1;
@@ -49,6 +49,11 @@
#include <limits.h>
#include <string.h>
#if defined(__PX4_POSIX)
#include <pthread.h>
#include <unistd.h>
#endif
using namespace time_literals;
namespace px4
@@ -274,8 +279,8 @@ WorkQueueManagerRun(int, char **)
#elif defined(__PX4_POSIX)
// On posix system , the desired stacksize round to the nearest multiplier of the system pagesize
// It is a requirement of the pthread_attr_setstacksize* function
const unsigned int page_size = sysconf(_SC_PAGESIZE);
const size_t stacksize_adj = math::max((int)PTHREAD_STACK_MIN, PX4_STACK_ADJUSTED(wq->stacksize));
const size_t page_size = static_cast<size_t>(sysconf(_SC_PAGESIZE));
const size_t stacksize_adj = math::max(static_cast<size_t>(PTHREAD_STACK_MIN), static_cast<size_t>(PX4_STACK_ADJUSTED(wq->stacksize)));
const size_t stacksize = (stacksize_adj + page_size - (stacksize_adj % page_size));
#endif
@@ -304,6 +309,37 @@ WorkQueueManagerRun(int, char **)
PX4_ERR("getting sched param for %s failed (%i)", wq->name, ret_getschedparam);
}
#if defined(__PX4_WINDOWS)
// Windows-only: winpthreads on MinGW does not allow SCHED_FIFO
// for unprivileged threads, so fall back to SCHED_OTHER and
// clamp the priority into that policy's valid range. The Linux
// path keeps main's byte-exact behavior (try SCHED_FIFO and let
// pthread_create end up at the kernel default if it isn't
// privileged) — without that, every WQ on a regular Linux
// CI runner ends up at SCHED_OTHER priority 0, which subtly
// changes producer/consumer ordering in lockstep SITL.
int sched_policy = SCHED_FIFO;
int ret_setschedpolicy = pthread_attr_setschedpolicy(&attr, sched_policy);
if (ret_setschedpolicy != 0) {
sched_policy = SCHED_OTHER;
ret_setschedpolicy = pthread_attr_setschedpolicy(&attr, sched_policy);
}
if (ret_setschedpolicy != 0) {
PX4_ERR("failed to set sched policy (%i)", ret_setschedpolicy);
}
const int max_prio = sched_get_priority_max(sched_policy);
const int min_prio = sched_get_priority_min(sched_policy);
int effective_prio = sched_priority;
if (effective_prio > max_prio) { effective_prio = max_prio; }
if (effective_prio < min_prio) { effective_prio = min_prio; }
param.sched_priority = effective_prio;
#else
// schedule policy FIFO
int ret_setschedpolicy = pthread_attr_setschedpolicy(&attr, SCHED_FIFO);
@@ -313,6 +349,7 @@ WorkQueueManagerRun(int, char **)
// priority
param.sched_priority = sched_priority;
#endif
int ret_setschedparam = pthread_attr_setschedparam(&attr, &param);
if (ret_setschedparam != 0) {
+11 -1
View File
@@ -106,6 +106,7 @@
int hrt_work_queue(struct work_s *work, worker_t worker, void *arg, uint32_t delay)
{
struct wqueue_s *wqueue = &g_hrt_work;
const int wake_worker = (px4_getpid() != wqueue->pid);
/* First, initialize the work structure */
@@ -124,9 +125,18 @@ int hrt_work_queue(struct work_s *work, worker_t worker, void *arg, uint32_t del
dq_addlast(&work->dq, &wqueue->q);
if (px4_getpid() != wqueue->pid) { /* only need to wake up if called from a different thread */
if (wake_worker) { /* only need to wake up if called from a different thread */
//wqueue->pid == own task? -> don't signal
#if defined(__PX4_WINDOWS)
int sem_val;
if (px4_sem_getvalue(&_hrt_work_signal, &sem_val) == 0 && sem_val <= 0) {
px4_sem_post(&_hrt_work_signal);
}
#else
px4_task_kill(wqueue->pid, SIGCONT); /* Wake up the worker thread */
#endif
}
hrt_work_unlock();
+14 -3
View File
@@ -73,6 +73,9 @@ struct wqueue_s g_hrt_work;
* Private Variables
****************************************************************************/
px4_sem_t _hrt_work_lock;
#if defined(__PX4_WINDOWS)
px4_sem_t _hrt_work_signal;
#endif
/****************************************************************************
* Private Functions
@@ -223,13 +226,17 @@ static void hrt_work_process(void)
}
/* Wait awhile to check the work list. We will wait here until either
* the time elapses or until we are awakened by a signal.
* the time elapses or until a newly queued item wakes us.
*/
hrt_work_unlock();
/* might sleep less if a signal received and new item was queued */
//PX4_INFO("Sleeping for %u usec", next);
#if defined(__PX4_WINDOWS)
struct timespec ts;
abstime_to_ts(&ts, hrt_absolute_time() + next);
px4_sem_timedwait(&_hrt_work_signal, &ts);
#else
px4_usleep(next);
#endif
}
/****************************************************************************
@@ -286,6 +293,10 @@ static int work_hrtthread(int argc, char *argv[])
void hrt_work_queue_init(void)
{
px4_sem_init(&_hrt_work_lock, 0, 1);
#if defined(__PX4_WINDOWS)
px4_sem_init(&_hrt_work_signal, 0, 0);
px4_sem_setprotocol(&_hrt_work_signal, SEM_PRIO_NONE);
#endif
memset(&g_hrt_work, 0, sizeof(g_hrt_work));
// Create high priority worker thread
+3
View File
@@ -41,6 +41,9 @@
__BEGIN_DECLS
extern px4_sem_t _hrt_work_lock;
#if defined(__PX4_WINDOWS)
extern px4_sem_t _hrt_work_signal;
#endif
extern struct wqueue_s g_hrt_work;
void hrt_work_queue_init(void);
@@ -533,4 +533,21 @@ void px4_lockstep_wait_for_components()
{
lockstep_scheduler.components().wait_for_components();
}
#elif defined(__PX4_WINDOWS)
int px4_usleep(useconds_t usec)
{
// MinGW's usleep/nanosleep round sub-millisecond waits down to an
// immediate return under Wine. HRT uses those tiny waits to yield between
// timer checks, so force at least one millisecond for non-zero sleeps.
if (usec > 0 && usec < 1000) {
usec = 1000;
}
return system_usleep(usec);
}
unsigned int px4_sleep(unsigned int seconds)
{
return system_sleep(seconds);
}
#endif
@@ -38,15 +38,26 @@
#include <vector>
#include <memory>
#include <atomic>
#if defined(__PX4_WINDOWS) || defined(_WIN32)
#ifndef PX4_WINDOWS_PTHREAD_LOCKSTEP_BRIDGE
#define PX4_WINDOWS_PTHREAD_LOCKSTEP_BRIDGE 1
#endif
#endif
#include <pthread.h>
#include <unistd.h>
#if defined(__PX4_WINDOWS) || defined(_WIN32)
#include <windows.h>
#endif
#include "lockstep_components.h"
class LockstepScheduler
{
public:
LockstepScheduler(bool no_cleanup_on_destroy = false) : _components(no_cleanup_on_destroy) {}
LockstepScheduler(bool no_cleanup_on_destroy = false);
~LockstepScheduler();
void set_absolute_time(uint64_t time_us);
@@ -56,8 +67,22 @@ public:
LockstepComponents &components() { return _components; }
#if defined(__PX4_WINDOWS) || defined(_WIN32)
static void notify_pthread_condition(pthread_cond_t *cond, bool broadcast);
#endif
private:
struct TimedWait {
#if defined(__PX4_WINDOWS) || defined(_WIN32)
// Windows: use native CONDITION_VARIABLE + SRWLOCK for the producer
// signaling path. winpthreads is known to occasionally drop
// pthread_cond_broadcast wakes; WakeAllConditionVariable is kernel-
// managed and never loses a wake. SRWLOCK and CONDITION_VARIABLE are
// trivially zero-initialised and have no destroy step (no kernel
// handle to leak).
SRWLOCK wait_lock{SRWLOCK_INIT};
CONDITION_VARIABLE wait_cond{CONDITION_VARIABLE_INIT};
#endif
~TimedWait()
{
if (!done) {
@@ -87,7 +112,10 @@ private:
pthread_mutex_t *passed_lock{nullptr};
uint64_t time_us{0};
bool timeout{false};
std::atomic<bool> done{false};
#if defined(__PX4_WINDOWS) || defined(_WIN32)
bool signaled {false};
#endif
std::atomic<bool> done {false};
std::atomic<bool> removed{true};
TimedWait *next{nullptr}; ///< linked list
@@ -95,6 +123,10 @@ private:
LockstepComponents _components;
#if defined(__PX4_WINDOWS) || defined(_WIN32)
bool notify_pthread_condition_locked(pthread_cond_t *cond, bool broadcast);
#endif
std::atomic<uint64_t> _time_us{0};
TimedWait *_timed_waits{nullptr}; ///< head of linked list
@@ -40,7 +40,45 @@
#include <drivers/drv_hrt.h>
#include <px4_platform_common/log.h>
#include <px4_platform_common/tasks.h>
#include <px4_platform_common/time.h>
#include <limits.h>
#include <stdlib.h>
#if defined(__PX4_WINDOWS)
#include <px4_windows/platform.h>
static uint64_t lockstep_wall_time_us()
{
if (px4_windows_running_under_wine()) {
return px4_windows_wine_monotonic_time_us();
}
timespec ts{};
system_clock_gettime(CLOCK_MONOTONIC, &ts);
return (static_cast<uint64_t>(ts.tv_sec) * 1000000ULL) + (static_cast<uint64_t>(ts.tv_nsec) / 1000ULL);
}
static uint64_t lockstep_component_wait_budget_us()
{
static const uint64_t wait_budget_us = []() -> uint64_t {
const char *speed_factor_env = getenv("PX4_SIM_SPEED_FACTOR");
if (speed_factor_env != nullptr)
{
const double speed_factor = strtod(speed_factor_env, nullptr);
if (speed_factor > 0.) {
const uint64_t budget = static_cast<uint64_t>(1000. / speed_factor);
return budget < 10ULL ? 10ULL : (budget > 100ULL ? 100ULL : budget);
}
}
return 100ULL;
}();
return wait_budget_us;
}
#endif // __PX4_WINDOWS
LockstepComponents::LockstepComponents(bool no_cleanup_on_destroy)
: _no_cleanup_on_destroy(no_cleanup_on_destroy)
@@ -128,5 +166,41 @@ void LockstepComponents::wait_for_components()
return;
}
#if defined(__PX4_WINDOWS)
if (px4_windows_running_under_wine()) {
(void)px4_sem_trywait(&_components_sem);
return;
}
// Windows-only: winpthreads occasionally drops pthread_cond_broadcast
// wakes, which can leave the producer blocked here forever. Use a
// short wall-time watchdog so a lost wake caps barrier latency at
// ~100 us instead of stalling the simulator. Linux uses the original
// blocking wait below because POSIX condition variables on Linux
// reliably deliver every broadcast and the blocking path preserves
// strict producer/consumer ordering at startup.
const uint64_t wait_start_us = lockstep_wall_time_us();
const uint64_t max_wait_us = lockstep_component_wait_budget_us();
unsigned spin_count = 0;
while (px4_sem_trywait(&_components_sem) != 0) {
if (_components_used_bitset == 0) {
return;
}
if ((lockstep_wall_time_us() - wait_start_us) >= max_wait_us) {
return;
}
if (++spin_count % 16 == 0) {
sched_yield();
}
}
#else
while (px4_sem_wait(&_components_sem) != 0) {}
#endif
}
@@ -35,8 +35,62 @@
#include <px4_platform_common/log.h>
#if defined(__PX4_WINDOWS) || defined(_WIN32)
#include <windows.h>
#endif
#if defined(__PX4_WINDOWS) || defined(_WIN32)
namespace
{
std::mutex &scheduler_registry_mutex()
{
static std::mutex mutex;
return mutex;
}
std::vector<LockstepScheduler *> &scheduler_registry()
{
static std::vector<LockstepScheduler *> schedulers;
return schedulers;
}
void px4_lockstep_pthread_cond_notify(pthread_cond_t *cond, int broadcast)
{
LockstepScheduler::notify_pthread_condition(cond, broadcast != 0);
}
}
#endif
LockstepScheduler::LockstepScheduler(bool no_cleanup_on_destroy) :
_components(no_cleanup_on_destroy)
{
#if defined(__PX4_WINDOWS) || defined(_WIN32)
std::lock_guard<std::mutex> guard(scheduler_registry_mutex());
scheduler_registry().push_back(this);
#if defined(_MSC_VER)
px4_pthread_cond_set_notify_callback(px4_lockstep_pthread_cond_notify);
#endif
#endif
}
LockstepScheduler::~LockstepScheduler()
{
#if defined(__PX4_WINDOWS) || defined(_WIN32)
{
std::lock_guard<std::mutex> guard(scheduler_registry_mutex());
auto &schedulers = scheduler_registry();
for (auto it = schedulers.begin(); it != schedulers.end(); ++it)
{
if (*it == this) {
schedulers.erase(it);
break;
}
}
}
#endif
// cleanup the linked list
std::unique_lock<std::mutex> lock_timed_waits(_timed_waits_mutex);
@@ -83,10 +137,21 @@ void LockstepScheduler::set_absolute_time(uint64_t time_us)
!timed_wait->timeout) {
// We are abusing the condition here to signal that the time
// has passed.
#if defined(__PX4_WINDOWS) || defined(_WIN32)
// Use the per-waiter Win32 CONDITION_VARIABLE/SRWLOCK
// pair so the wake is kernel-managed and never dropped.
// Avoid taking the user pthread mutex (winpthreads): it is
// the broadcast that races and gets lost on that path.
AcquireSRWLockExclusive(&timed_wait->wait_lock);
timed_wait->timeout = true;
WakeAllConditionVariable(&timed_wait->wait_cond);
ReleaseSRWLockExclusive(&timed_wait->wait_lock);
#else
pthread_mutex_lock(timed_wait->passed_lock);
timed_wait->timeout = true;
pthread_cond_broadcast(timed_wait->passed_cond);
pthread_mutex_unlock(timed_wait->passed_lock);
#endif
}
timed_wait_prev = timed_wait;
@@ -97,6 +162,66 @@ void LockstepScheduler::set_absolute_time(uint64_t time_us)
}
}
#if defined(__PX4_WINDOWS) || defined(_WIN32)
void LockstepScheduler::notify_pthread_condition(pthread_cond_t *cond, bool broadcast)
{
std::lock_guard<std::mutex> guard(scheduler_registry_mutex());
for (LockstepScheduler *scheduler : scheduler_registry()) {
if (scheduler->notify_pthread_condition_locked(cond, broadcast) && !broadcast) {
break;
}
}
}
bool LockstepScheduler::notify_pthread_condition_locked(pthread_cond_t *cond, bool broadcast)
{
bool notified = false;
std::lock_guard<std::mutex> lock_timed_waits(_timed_waits_mutex);
for (TimedWait *timed_wait = _timed_waits; timed_wait; timed_wait = timed_wait->next) {
if (timed_wait->done || timed_wait->passed_cond != cond ||
timed_wait->timeout || timed_wait->signaled) {
continue;
}
AcquireSRWLockExclusive(&timed_wait->wait_lock);
timed_wait->signaled = true;
WakeAllConditionVariable(&timed_wait->wait_cond);
ReleaseSRWLockExclusive(&timed_wait->wait_lock);
notified = true;
if (!broadcast) {
break;
}
}
return notified;
}
extern "C" int px4_lockstep_pthread_cond_signal(pthread_cond_t *cond)
{
const int result = (pthread_cond_signal)(cond);
if (result == 0) {
px4_lockstep_pthread_cond_notify(cond, 0);
}
return result;
}
extern "C" int px4_lockstep_pthread_cond_broadcast(pthread_cond_t *cond)
{
const int result = (pthread_cond_broadcast)(cond);
if (result == 0) {
px4_lockstep_pthread_cond_notify(cond, 1);
}
return result;
}
#endif
int LockstepScheduler::cond_timedwait(pthread_cond_t *cond, pthread_mutex_t *lock, uint64_t time_us)
{
// A TimedWait object might still be in timed_waits_ after we return, so its lifetime needs to be
@@ -114,6 +239,9 @@ int LockstepScheduler::cond_timedwait(pthread_cond_t *cond, pthread_mutex_t *loc
timed_wait.passed_cond = cond;
timed_wait.passed_lock = lock;
timed_wait.timeout = false;
#if defined(__PX4_WINDOWS) || defined(_WIN32)
timed_wait.signaled = false;
#endif
timed_wait.done = false;
// Add to linked list if not removed yet (otherwise just re-use the object)
@@ -124,7 +252,62 @@ int LockstepScheduler::cond_timedwait(pthread_cond_t *cond, pthread_mutex_t *loc
}
}
int result = pthread_cond_wait(cond, lock);
// On most pthread implementations pthread_cond_wait here would
// suffice — set_absolute_time broadcasts when our target is reached.
// winpthreads (MinGW), however, occasionally drops the broadcast
// when it races the waiter entering the wait, which leaves the
// thread blocked forever.
//
// On Windows we therefore wait on a per-waiter native Win32
// CONDITION_VARIABLE + SRWLOCK pair that the producer signals via
// WakeAllConditionVariable (kernel-managed, never lost). The 500 ms
// SleepConditionVariableSRW timeout is kept as a defensive watchdog
// (under correct signaling it never fires).
int result = 0;
#if defined(__PX4_WINDOWS) || defined(_WIN32)
// pthread cond_timedwait contract: the user's `lock` is released for
// the duration of the wait and re-acquired before return. Mirror that
// while we sleep on the per-waiter Win32 primitive instead.
pthread_mutex_unlock(lock);
AcquireSRWLockExclusive(&timed_wait.wait_lock);
while (!timed_wait.timeout && !timed_wait.signaled) {
BOOL slept = SleepConditionVariableSRW(&timed_wait.wait_cond,
&timed_wait.wait_lock,
500, 0);
if (timed_wait.timeout || timed_wait.signaled) {
break;
}
if (!slept && GetLastError() == ERROR_TIMEOUT) {
// Watchdog fired; re-check _time_us in case the
// producer signalled before we entered the wait. With
// native Win32 signaling this branch should never
// flip the flag, but it's kept as belt-and-braces.
ReleaseSRWLockExclusive(&timed_wait.wait_lock);
_timed_waits_mutex.lock();
if (time_us <= _time_us) {
timed_wait.timeout = true;
}
_timed_waits_mutex.unlock();
AcquireSRWLockExclusive(&timed_wait.wait_lock);
continue;
}
// Spurious wakeup: loop to re-check.
}
ReleaseSRWLockExclusive(&timed_wait.wait_lock);
pthread_mutex_lock(lock);
#else
result = pthread_cond_wait(cond, lock);
#endif
const bool timeout = timed_wait.timeout;
@@ -155,8 +338,23 @@ int LockstepScheduler::cond_timedwait(pthread_cond_t *cond, pthread_mutex_t *loc
int LockstepScheduler::usleep_until(uint64_t time_us)
{
#if defined(__PX4_WINDOWS) || defined(_WIN32)
// On winpthreads PTHREAD_*_INITIALIZER triggers lazy allocation of
// the underlying kernel object; without an explicit *_destroy() the
// auto-storage variants leak on every tick, which at high speed
// factors quickly accumulates. Keep a thread-local pair that is
// created once per thread and destroyed at thread exit.
static thread_local pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
static thread_local pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
#else
// Linux glibc treats PTHREAD_*_INITIALIZER as a static struct
// initializer with no kernel-object allocation, so per-call
// auto-storage matches main and avoids the cross-call cond
// reuse semantics that interact badly with the LockstepScheduler
// linked list when a thread re-enters usleep_until().
pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
#endif
pthread_mutex_lock(&lock);
@@ -31,6 +31,9 @@ public:
ls.set_absolute_time(ls.get_absolute_time());
_thread.join();
}
bool done() const { return _done; }
private:
void execute()
{
@@ -121,6 +124,54 @@ void test_locked_semaphore_getting_unlocked()
pthread_cond_destroy(&cond);
}
void test_condition_signal_wakes_without_time_advance()
{
pthread_cond_t cond;
pthread_cond_init(&cond, nullptr);
pthread_mutex_t lock;
pthread_mutex_init(&lock, nullptr);
LockstepScheduler ls;
ls.set_absolute_time(some_time_us);
std::atomic<bool> waiting{false};
std::atomic<int> result{EINVAL};
TestThread thread([&]() {
pthread_mutex_lock(&lock);
waiting = true;
result = ls.cond_timedwait(&cond, &lock, some_time_us + 1000000);
pthread_mutex_unlock(&lock);
});
while (!waiting) {
std::this_thread::yield();
}
pthread_mutex_lock(&lock);
EXPECT_EQ(pthread_cond_signal(&cond), 0);
pthread_mutex_unlock(&lock);
const auto deadline = std::chrono::steady_clock::now() + std::chrono::seconds(1);
while (!thread.done() && std::chrono::steady_clock::now() < deadline) {
std::this_thread::yield();
}
EXPECT_TRUE(thread.done());
if (!thread.done()) {
ls.set_absolute_time(some_time_us + 1000001);
}
thread.join(ls);
EXPECT_EQ(result, 0);
pthread_mutex_destroy(&lock);
pthread_cond_destroy(&cond);
}
class TestCase
{
public:
@@ -314,6 +365,7 @@ TEST(LockstepScheduler, All)
test_absolute_time();
test_condition_timing_out();
test_locked_semaphore_getting_unlocked();
test_condition_signal_wakes_without_time_advance();
test_usleep();
test_multiple_semaphores_waiting();
}
+5 -2
View File
@@ -149,9 +149,12 @@ static inline hrt_abstime ts_to_abstime(const struct timespec *ts)
*/
static inline void abstime_to_ts(struct timespec *ts, hrt_abstime abstime)
{
ts->tv_sec = (typeof(ts->tv_sec))(abstime / 1000000);
/* POSIX guarantees `time_t tv_sec` and `long tv_nsec`. Use explicit
* casts instead of GCC's `typeof` so this header parses under MSVC C,
* which lacks `typeof` outside the C23 mode we don't enable. */
ts->tv_sec = (time_t)(abstime / 1000000);
abstime -= (hrt_abstime)(ts->tv_sec) * 1000000;
ts->tv_nsec = (typeof(ts->tv_nsec))(abstime * 1000);
ts->tv_nsec = (long)(abstime * 1000);
}
/**
+119 -6
View File
@@ -37,6 +37,15 @@
#include <dataman_client/DatamanClient.hpp>
#if defined(ENABLE_LOCKSTEP_SCHEDULER) && defined(__PX4_WINDOWS)
static hrt_abstime dataman_wall_time_us()
{
timespec ts{};
system_clock_gettime(CLOCK_MONOTONIC, &ts);
return (static_cast<hrt_abstime>(ts.tv_sec) * 1000000ULL) + (static_cast<hrt_abstime>(ts.tv_nsec) / 1000ULL);
}
#endif
DatamanClient::DatamanClient()
{
_sync_perf = perf_alloc(PC_ELAPSED, "DatamanClient: sync");
@@ -55,6 +64,13 @@ DatamanClient::DatamanClient()
_fds.fd = _dataman_response_sub;
_fds.events = POLLIN;
#if defined(ENABLE_LOCKSTEP_SCHEDULER) && defined(__PX4_WINDOWS)
// Windows lockstep build only: defer client-ID acquisition
// until the first sync/async call, because hrt_absolute_time()
// is not yet advancing when the constructor runs and the
// GET_ID round-trip would stall startup.
(void)response;
#else
hrt_abstime timestamp = hrt_absolute_time();
dataman_request_s request;
@@ -71,6 +87,8 @@ DatamanClient::DatamanClient()
} else {
PX4_ERR("Failed to get client ID!");
}
#endif
}
}
@@ -86,6 +104,72 @@ DatamanClient::~DatamanClient()
bool DatamanClient::syncHandler(const dataman_request_s &request, dataman_response_s &response,
const hrt_abstime &start_time, hrt_abstime timeout)
{
#if defined(ENABLE_LOCKSTEP_SCHEDULER) && defined(__PX4_WINDOWS)
// Windows lockstep: hrt_absolute_time() does not advance until the
// shim attaches to the simulator clock, so use the OS monotonic
// clock here and poll the orb queue with a short usleep instead of
// px4_poll(), which blocks against the lockstep scheduler.
(void)start_time;
bool response_received = false;
const hrt_abstime sync_start_time = dataman_wall_time_us();
hrt_abstime last_request_time = sync_start_time;
hrt_abstime time_elapsed = 0;
const unsigned max_iterations = (timeout / 1000) + 1;
unsigned iterations = 0;
perf_begin(_sync_perf);
_dataman_request_pub.publish(request);
while (!response_received && (time_elapsed < timeout)) {
if (++iterations > max_iterations) {
break;
}
bool updated = false;
orb_check(_dataman_response_sub, &updated);
if (updated) {
orb_copy(ORB_ID(dataman_response), _dataman_response_sub, &response);
if (response.client_id == request.client_id) {
if ((response.request_type == request.request_type) &&
(response.item == request.item) &&
(response.index == request.index)) {
response_received = true;
break;
}
} else if (request.client_id == CLIENT_ID_NOT_SET) {
// validate timestamp from response.data
if (0 == memcmp(&(request.timestamp), &(response.data), sizeof(hrt_abstime))) {
response_received = true;
break;
}
}
}
const hrt_abstime now = dataman_wall_time_us();
if (now - last_request_time >= 100_ms) {
_dataman_request_pub.publish(request);
last_request_time = now;
}
system_usleep(1000);
time_elapsed = dataman_wall_time_us() - sync_start_time;
}
perf_end(_sync_perf);
if (!response_received) {
PX4_ERR("timeout after %" PRIu32 " ms!", static_cast<uint32_t>(timeout / 1000));
}
return response_received;
#else
bool response_received = false;
int32_t ret = 0;
hrt_abstime time_elapsed = hrt_elapsed_time(&start_time);
@@ -144,11 +228,40 @@ bool DatamanClient::syncHandler(const dataman_request_s &request, dataman_respon
}
return response_received;
#endif
}
bool DatamanClient::updateClientId(hrt_abstime timeout)
{
if (_client_id != CLIENT_ID_NOT_SET) {
return true;
}
if (_dataman_response_sub < 0) {
return false;
}
dataman_response_s response{};
hrt_abstime timestamp = hrt_absolute_time();
dataman_request_s request{};
request.timestamp = timestamp;
request.request_type = DM_GET_ID;
request.client_id = CLIENT_ID_NOT_SET;
const bool success = syncHandler(request, response, timestamp, timeout);
if (success && (response.client_id > CLIENT_ID_NOT_SET)) {
_client_id = response.client_id;
return true;
}
return false;
}
bool DatamanClient::readSync(dm_item_t item, uint32_t index, uint8_t *buffer, uint32_t length, hrt_abstime timeout)
{
if (_client_id == CLIENT_ID_NOT_SET) {
if (_client_id == CLIENT_ID_NOT_SET && !updateClientId(timeout)) {
return false;
}
@@ -188,7 +301,7 @@ bool DatamanClient::readSync(dm_item_t item, uint32_t index, uint8_t *buffer, ui
bool DatamanClient::writeSync(dm_item_t item, uint32_t index, uint8_t *buffer, uint32_t length, hrt_abstime timeout)
{
if (_client_id == CLIENT_ID_NOT_SET) {
if (_client_id == CLIENT_ID_NOT_SET && !updateClientId(timeout)) {
return false;
}
@@ -227,7 +340,7 @@ bool DatamanClient::writeSync(dm_item_t item, uint32_t index, uint8_t *buffer, u
bool DatamanClient::clearSync(dm_item_t item, hrt_abstime timeout)
{
if (_client_id == CLIENT_ID_NOT_SET) {
if (_client_id == CLIENT_ID_NOT_SET && !updateClientId(timeout)) {
return false;
}
@@ -257,7 +370,7 @@ bool DatamanClient::clearSync(dm_item_t item, hrt_abstime timeout)
bool DatamanClient::readAsync(dm_item_t item, uint32_t index, uint8_t *buffer, uint32_t length)
{
if (_client_id == CLIENT_ID_NOT_SET) {
if (_client_id == CLIENT_ID_NOT_SET && !updateClientId(5000_ms)) {
return false;
}
@@ -299,7 +412,7 @@ bool DatamanClient::readAsync(dm_item_t item, uint32_t index, uint8_t *buffer, u
bool DatamanClient::writeAsync(dm_item_t item, uint32_t index, uint8_t *buffer, uint32_t length)
{
if (_client_id == CLIENT_ID_NOT_SET) {
if (_client_id == CLIENT_ID_NOT_SET && !updateClientId(5000_ms)) {
return false;
}
@@ -343,7 +456,7 @@ bool DatamanClient::writeAsync(dm_item_t item, uint32_t index, uint8_t *buffer,
bool DatamanClient::clearAsync(dm_item_t item)
{
if (_client_id == CLIENT_ID_NOT_SET) {
if (_client_id == CLIENT_ID_NOT_SET && !updateClientId(5000_ms)) {
return false;
}
+1
View File
@@ -173,6 +173,7 @@ private:
/* Synchronous response/request handler */
bool syncHandler(const dataman_request_s &request, dataman_response_s &response,
const hrt_abstime &start_time, hrt_abstime timeout);
bool updateClientId(hrt_abstime timeout);
State _state{State::Idle};
Request _active_request{};
+65 -4
View File
@@ -191,6 +191,21 @@ static bool send_vehicle_command(const uint32_t cmd, const float param1 = NAN, c
static bool wait_for_vehicle_command_reply(const uint32_t cmd,
uORB::SubscriptionData<vehicle_command_ack_s> &vehicle_command_ack_sub)
{
#if defined(ENABLE_LOCKSTEP_SCHEDULER)
static constexpr unsigned wait_interval_us = 10000;
static constexpr unsigned max_wait_iterations = 500; // 5 s wall time
for (unsigned i = 0; i < max_wait_iterations; ++i) {
if (vehicle_command_ack_sub.update()) {
if (vehicle_command_ack_sub.get().command == cmd) {
return vehicle_command_ack_sub.get().result == vehicle_command_ack_s::VEHICLE_CMD_RESULT_ACCEPTED;
}
}
system_usleep(wait_interval_us);
}
#else
hrt_abstime start = hrt_absolute_time();
while (hrt_absolute_time() - start < 100_ms) {
@@ -203,9 +218,23 @@ static bool wait_for_vehicle_command_reply(const uint32_t cmd,
px4_usleep(10000);
}
#endif
return false;
}
#if defined(ENABLE_LOCKSTEP_SCHEDULER)
static bool send_vehicle_command_and_wait_for_reply(const uint32_t cmd, const float param1 = NAN, const float param2 = NAN,
const float param3 = NAN, const float param4 = NAN, const double param5 = static_cast<double>(NAN),
const double param6 = static_cast<double>(NAN), const float param7 = NAN)
{
uORB::SubscriptionData<vehicle_command_ack_s> vehicle_command_ack_sub{ORB_ID(vehicle_command_ack)};
return send_vehicle_command(cmd, param1, param2, param3, param4, param5, param6, param7)
&& wait_for_vehicle_command_reply(cmd, vehicle_command_ack_sub);
}
#endif
static bool broadcast_vehicle_command(const uint32_t cmd, const float param1 = NAN, const float param2 = NAN,
const float param3 = NAN, const float param4 = NAN, const double param5 = static_cast<double>(NAN),
const double param6 = static_cast<double>(NAN), const float param7 = NAN)
@@ -314,6 +343,7 @@ int Commander::custom_command(int argc, char *argv[])
} else {
PX4_ERR("missing argument");
return 1;
}
}
@@ -339,7 +369,7 @@ int Commander::custom_command(int argc, char *argv[])
send_vehicle_command(vehicle_command_s::VEHICLE_CMD_DO_SET_SAFETY_SWITCH_STATE, vehicle_command_s::SAFETY_OFF);
} else {
PX4_ERR("invlaid argument, use [on|off]");
PX4_ERR("invalid argument, use [on|off]");
return 1;
}
@@ -354,9 +384,15 @@ int Commander::custom_command(int argc, char *argv[])
param2 = 21196.f;
}
#if defined(ENABLE_LOCKSTEP_SCHEDULER)
send_vehicle_command_and_wait_for_reply(vehicle_command_s::VEHICLE_CMD_COMPONENT_ARM_DISARM,
static_cast<float>(vehicle_command_s::ARMING_ACTION_ARM),
param2);
#else
send_vehicle_command(vehicle_command_s::VEHICLE_CMD_COMPONENT_ARM_DISARM,
static_cast<float>(vehicle_command_s::ARMING_ACTION_ARM),
param2);
#endif
return 0;
}
@@ -369,9 +405,15 @@ int Commander::custom_command(int argc, char *argv[])
param2 = 21196.f;
}
#if defined(ENABLE_LOCKSTEP_SCHEDULER)
send_vehicle_command_and_wait_for_reply(vehicle_command_s::VEHICLE_CMD_COMPONENT_ARM_DISARM,
static_cast<float>(vehicle_command_s::ARMING_ACTION_DISARM),
param2);
#else
send_vehicle_command(vehicle_command_s::VEHICLE_CMD_COMPONENT_ARM_DISARM,
static_cast<float>(vehicle_command_s::ARMING_ACTION_DISARM),
param2);
#endif
return 0;
}
@@ -382,16 +424,26 @@ int Commander::custom_command(int argc, char *argv[])
send_vehicle_command(vehicle_command_s::VEHICLE_CMD_NAV_TAKEOFF);
if (wait_for_vehicle_command_reply(vehicle_command_s::VEHICLE_CMD_NAV_TAKEOFF, vehicle_command_ack_sub)) {
#if defined(ENABLE_LOCKSTEP_SCHEDULER)
send_vehicle_command_and_wait_for_reply(vehicle_command_s::VEHICLE_CMD_COMPONENT_ARM_DISARM,
static_cast<float>(vehicle_command_s::ARMING_ACTION_ARM),
0.f);
#else
send_vehicle_command(vehicle_command_s::VEHICLE_CMD_COMPONENT_ARM_DISARM,
static_cast<float>(vehicle_command_s::ARMING_ACTION_ARM),
0.f);
#endif
}
return 0;
}
if (!strcmp(argv[0], "land")) {
#if defined(ENABLE_LOCKSTEP_SCHEDULER)
send_vehicle_command_and_wait_for_reply(vehicle_command_s::VEHICLE_CMD_NAV_LAND);
#else
send_vehicle_command(vehicle_command_s::VEHICLE_CMD_NAV_LAND);
#endif
return 0;
}
@@ -466,12 +518,14 @@ int Commander::custom_command(int argc, char *argv[])
} else {
PX4_ERR("argument %s unsupported.", argv[1]);
return 1;
}
return 0;
} else {
PX4_ERR("missing argument");
return 1;
}
}
@@ -502,7 +556,7 @@ int Commander::custom_command(int argc, char *argv[])
} else {
PX4_ERR("missing argument");
return 0;
return 1;
}
}
@@ -517,7 +571,7 @@ int Commander::custom_command(int argc, char *argv[])
} else {
PX4_ERR("missing argument");
return 0;
return 1;
}
}
@@ -910,7 +964,14 @@ Commander::handle_command(const vehicle_command_s &cmd)
desired_nav_state = vehicle_status_s::NAVIGATION_STATE_AUTO_PRECLAND;
break;
case PX4_CUSTOM_SUB_MODE_EXTERNAL1...PX4_CUSTOM_SUB_MODE_EXTERNAL8:
case PX4_CUSTOM_SUB_MODE_EXTERNAL1:
case PX4_CUSTOM_SUB_MODE_EXTERNAL2:
case PX4_CUSTOM_SUB_MODE_EXTERNAL3:
case PX4_CUSTOM_SUB_MODE_EXTERNAL4:
case PX4_CUSTOM_SUB_MODE_EXTERNAL5:
case PX4_CUSTOM_SUB_MODE_EXTERNAL6:
case PX4_CUSTOM_SUB_MODE_EXTERNAL7:
case PX4_CUSTOM_SUB_MODE_EXTERNAL8:
desired_nav_state = vehicle_status_s::NAVIGATION_STATE_EXTERNAL1 + (custom_sub_mode - PX4_CUSTOM_SUB_MODE_EXTERNAL1);
break;
+91 -11
View File
@@ -45,10 +45,20 @@
#include <px4_platform_common/getopt.h>
#include <px4_platform_common/log.h>
#include <px4_platform_common/tasks.h>
#include <px4_platform_common/time.h>
#include <drivers/drv_pwm_output.h> // to get PWM flags
#include <lib/drivers/device/Device.hpp>
#if defined(__PX4_WINDOWS) || defined(_WIN32)
#include <windows.h>
#endif
#if defined(__PX4_WINDOWS)
#include <px4_windows/platform.h>
#endif
using namespace math;
using namespace matrix;
using namespace time_literals;
@@ -69,6 +79,19 @@ Sih::~Sih()
void Sih::run()
{
#if defined(__PX4_WINDOWS) || defined(_WIN32)
// SIH is the lockstep producer: it advances simulated time and
// signals every consumer waiting on the next sim_time tick. On
// Windows the default thread priority lets unrelated host
// background threads (Defender, Search, telemetry) preempt this
// loop, which shows up as sim/wall drift under load. Raise the
// producer to ABOVE_NORMAL so the OS scheduler keeps it on-CPU
// long enough to publish the next tick. ABOVE_NORMAL is enough
// to outrank typical host noise without starving driver/IO
// threads, which TIME_CRITICAL or HIGHEST risk on a busy host.
(void)SetThreadPriority(GetCurrentThread(), THREAD_PRIORITY_ABOVE_NORMAL);
#endif
_px4_accel.set_temperature(T1_C);
_px4_gyro.set_temperature(T1_C);
@@ -92,12 +115,70 @@ void Sih::run()
}
#if defined(ENABLE_LOCKSTEP_SCHEDULER)
// Get current timestamp in microseconds
static uint64_t micros()
static uint64_t wall_time_us()
{
#if defined(__PX4_WINDOWS)
if (px4_windows_running_under_wine()) {
return px4_windows_wine_monotonic_time_us();
}
struct timeval t;
gettimeofday(&t, nullptr);
return t.tv_sec * ((uint64_t)1000000) + t.tv_usec;
return t.tv_sec * static_cast<uint64_t>(1000000) + t.tv_usec;
#else
timespec ts {};
system_clock_gettime(CLOCK_MONOTONIC, &ts);
return (static_cast<uint64_t>(ts.tv_sec) * 1000000ULL) + (static_cast<uint64_t>(ts.tv_nsec) / 1000ULL);
#endif
}
static void lockstep_wall_sleep(int sleep_time_us)
{
if (sleep_time_us <= 0) {
return;
}
// Avoid coarse sub-millisecond sleeps limiting high lockstep speed factors.
if (sleep_time_us <= 1000) {
const uint64_t sleep_until_us = wall_time_us() + sleep_time_us;
#if defined(__PX4_WINDOWS)
if (px4_windows_running_under_wine()) {
px4_windows_wine_spin_until_us(sleep_until_us);
} else {
while (wall_time_us() < sleep_until_us) {
sched_yield();
}
}
#else
while (true) {
const uint64_t now_us = wall_time_us();
if (now_us >= sleep_until_us) {
break;
}
if (sleep_until_us - now_us > 100) {
sched_yield();
}
}
#endif
} else {
system_usleep(sleep_time_us);
}
}
void Sih::lockstep_loop()
@@ -126,7 +207,7 @@ void Sih::lockstep_loop()
uint64_t pre_compute_wall_time_us;
while (!should_exit()) {
pre_compute_wall_time_us = micros();
pre_compute_wall_time_us = wall_time_us();
perf_count(_loop_interval_perf);
_current_simulation_time_us += sim_interval_us;
@@ -144,18 +225,18 @@ void Sih::lockstep_loop()
if (_last_actuator_output_time <= 0) {
PX4_DEBUG("SIH starting up - no lockstep yet");
current_wall_time_us = micros();
current_wall_time_us = wall_time_us();
sleep_time = math::max(0, sim_interval_us - (int)(current_wall_time_us - pre_compute_wall_time_us));
} else {
px4_lockstep_wait_for_components();
current_wall_time_us = micros();
current_wall_time_us = wall_time_us();
sleep_time = math::max(0, rt_interval_us - (int)(current_wall_time_us - pre_compute_wall_time_us));
}
_achieved_speedup = 0.99f * _achieved_speedup + 0.01f * ((float)sim_interval_us / (float)(
current_wall_time_us - pre_compute_wall_time_us + sleep_time));
usleep(sleep_time);
lockstep_wall_sleep(sleep_time);
}
}
#endif
@@ -204,7 +285,8 @@ void Sih::sensor_step()
parameters_updated();
}
perf_begin(_loop_perf);
// Note: _loop_perf is wrapped by the calling loop (realtime_loop / lockstep_loop),
// do not call perf_begin/perf_end here or the counter is double-counted per tick.
const hrt_abstime now = hrt_absolute_time();
const float dt = (now - _last_run) * 1e-6f;
@@ -240,8 +322,6 @@ void Sih::sensor_step()
}
publish_ground_truth(now);
perf_end(_loop_perf);
}
void Sih::parameters_updated()
@@ -313,7 +393,7 @@ void Sih::parameters_updated()
_I(1, 2) = _I(2, 1) = _sih_iyz.get();
// guards against too small determinants
_Im1 = 100.0f * inv(static_cast<typeof _I>(100.0f * _I));
_Im1 = 100.0f * inv(static_cast<decltype(_I)>(100.0f * _I));
_distance_snsr_min = _sih_distance_snsr_min.get();
_distance_snsr_max = _sih_distance_snsr_max.get();