feat(px4_daemon): relay client stdin to modules

Windows command clients need a cross-platform path for module stdin and stdout that does not rely on POSIX fdopen semantics over sockets. Extend the daemon protocol and client/server plumbing so module invocations can relay input and command output through explicit IPC messages.

This keeps interactive and one-shot module clients responsive when invoked through pxh or px4-*.exe wrappers, and gives the Windows poll shim a pipe-backed descriptor path to wait on instead of treating every fd as a Winsock socket.

Signed-off-by: Nuno Marques <n.marques21@hotmail.com>
This commit is contained in:
Nuno Marques
2026-05-05 21:00:49 -07:00
parent 331baf13c7
commit d1500fda9e
5 changed files with 393 additions and 45 deletions
@@ -47,12 +47,15 @@
#include <sys/stat.h>
#ifdef __PX4_WINDOWS
#include <netinet/in.h>
#include <windows.h>
#else
#include <sys/un.h>
#endif
#include <unistd.h>
#include <string>
#include <system_error>
#include <thread>
#include <px4_platform_common/log.h>
#include "client.h"
@@ -60,9 +63,19 @@
namespace px4_daemon
{
#ifdef __PX4_WINDOWS
namespace
{
#ifdef __PX4_WINDOWS
// SHUT_WR is the POSIX value of the half-close-send constant; on Winsock,
// the equivalent is SD_SEND. Use a portable alias so the relay code below is
// platform-neutral.
static constexpr int kShutWr = SD_SEND;
#else
static constexpr int kShutWr = SHUT_WR;
#endif
#ifdef __PX4_WINDOWS
bool ends_with_exe_suffix(const std::string &arg)
{
if (arg.size() < 4) {
@@ -75,11 +88,73 @@ bool ends_with_exe_suffix(const std::string &arg)
|| suffix == ".exE" || suffix == ".EXe"
|| suffix == ".eXE" || suffix == ".ExE";
}
} // namespace
#endif
// Reads up to `n` bytes from this process's standard input. Returns 0 on EOF
// and -1 on error. Wraps the platform-specific stdin-read primitive so the
// forwarding loop below stays platform-neutral.
static ssize_t read_local_stdin(char *buf, size_t n)
{
#ifdef __PX4_WINDOWS
HANDLE stdin_h = GetStdHandle(STD_INPUT_HANDLE);
if (stdin_h == INVALID_HANDLE_VALUE || stdin_h == nullptr) {
return -1;
}
DWORD got = 0;
if (!ReadFile(stdin_h, buf, (DWORD)n, &got, nullptr)) {
return -1;
}
return (ssize_t)got;
#else
return read(STDIN_FILENO, buf, n);
#endif
}
// Worker that reads bytes from this process's stdin and forwards them over
// the socket so the daemon can hand them to the running module's stdin pipe.
// Stops on stdin EOF (half-closes the socket's send side, signalling EOF to
// the daemon) or on any send() error (typically the daemon already closed
// the connection because the command finished).
static void stdin_forward_loop(socket_handle_t fd)
{
char buffer[256];
while (true) {
const ssize_t bytes_read = read_local_stdin(buffer, sizeof(buffer));
if (bytes_read <= 0) {
break;
}
const char *buf = buffer;
ssize_t remaining = bytes_read;
while (remaining > 0) {
const int n_sent = send(fd, buf, (int)remaining, 0);
if (n_sent <= 0) {
// Daemon closed the connection (command finished). Done.
return;
}
buf += n_sent;
remaining -= n_sent;
}
}
// Local stdin reached EOF — half-close the send side so the daemon's
// stdin relay sees EOF on its recv() and drains the module's stdin pipe.
shutdown(fd, kShutWr);
}
} // namespace
Client::Client(int instance_id) :
_fd(-1),
_fd(invalid_socket_handle),
_instance_id(instance_id)
{}
@@ -91,7 +166,7 @@ Client::process_args(const int argc, const char **argv)
_fd = socket(AF_INET, SOCK_STREAM, 0);
if (_fd < 0) {
if (_fd == invalid_socket_handle) {
PX4_ERR("error creating socket");
return -1;
}
@@ -105,6 +180,15 @@ Client::process_args(const int argc, const char **argv)
PX4_ERR("error connecting to 127.0.0.1:%u: %s", port, strerror(errno));
return -1;
}
// No SO_RCVTIMEO: match POSIX/AF_UNIX behaviour where the client blocks
// in recv() until the daemon either replies or shuts the connection
// down. A short timeout here (the previous 5 s default) would fire
// before commands that legitimately take longer to return — most
// notably `commander takeoff`, whose lockstep wait_for_vehicle_command_reply
// can sit idle for the full 5 s acknowledgement window followed by a
// second 5 s wait for the arming reply. Letting the OS deliver an EOF
// or RST from the server is the correct termination signal here.
#else
std::string sock_path = get_socket_path(_instance_id);
@@ -132,6 +216,18 @@ Client::process_args(const int argc, const char **argv)
return -3;
}
// Forward this process's stdin to the daemon while we wait for the
// command's stdout. The thread is detached and will exit when the local
// stdin closes or the daemon hangs up the socket — process exit reaps it.
// Best-effort: commands that don't read stdin are unaffected; interactive
// ones simply won't see keystrokes if thread creation fails.
try {
std::thread(stdin_forward_loop, _fd).detach();
} catch (const std::system_error &) {
// Non-fatal: stdin relay was best-effort.
}
return _listen();
}
@@ -198,7 +294,26 @@ Client::_listen()
int n_read = recv(_fd, buffer + n_buffer_used, sizeof buffer - n_buffer_used, 0);
if (n_read < 0) {
#ifdef __PX4_WINDOWS
const int wsa_err = WSAGetLastError();
// WSAECONNRESET / WSAESHUTDOWN can fire instead of a clean
// recv() == 0 if the daemon's closesocket() races our recv()
// (the OS already delivered the bytes — TCP just abandoned the
// graceful FIN). Treat it as end-of-stream so we still pick up
// the {0, retval} sentinel that was already buffered.
if (wsa_err == WSAECONNRESET || wsa_err == WSAESHUTDOWN) {
if (n_buffer_used == 2) {
return buffer[1];
}
return -1;
}
PX4_ERR("unable to read from socket: WSA error = %d", wsa_err);
#else
PX4_ERR("unable to read from socket");
#endif
return -1;
} else if (n_read == 0) {
@@ -236,8 +351,12 @@ Client::_listen()
Client::~Client()
{
if (_fd >= 0) {
if (_fd != invalid_socket_handle) {
#ifdef __PX4_WINDOWS
closesocket(_fd);
#else
close(_fd);
#endif
}
}
@@ -64,7 +64,7 @@ public:
Client(Client &&other) : _fd(other._fd), _instance_id(other._instance_id)
{
// Steal the fd from the moved-from client.
other._fd = -1;
other._fd = invalid_socket_handle;
}
/**
@@ -80,7 +80,7 @@ private:
int _send_cmds(const int argc, const char **argv);
int _listen();
int _fd;
socket_handle_t _fd;
int _instance_id; ///< instance ID for running multiple instances of the px4 server
};
@@ -44,6 +44,9 @@
#include <unistd.h>
#include <string.h>
#include <string>
#include <system_error>
#include <thread>
#include <utility>
#include <pthread.h>
#include <poll.h>
#include <sys/stat.h>
@@ -51,6 +54,7 @@
#include <sys/types.h>
#ifdef __PX4_WINDOWS
#include <netinet/in.h>
#include <io.h>
#else
#include <sys/un.h>
#endif
@@ -68,26 +72,68 @@ namespace
{
struct ClientThreadArgs {
int client_fd;
socket_handle_t client_fd;
FILE *thread_stdout;
#ifdef __PX4_WINDOWS
int stdout_read_fd;
#endif
};
// Platform-neutral wrappers for the half-close-send and stdin-pipe-create
// primitives so the relay code below can stay free of #ifdefs on the hot path.
#ifdef __PX4_WINDOWS
struct StdoutRelayArgs {
int stdout_read_fd;
int client_fd;
};
static constexpr int kShutWr = SD_SEND;
#else
static constexpr int kShutWr = SHUT_WR;
#endif
static void *stdout_relay_trampoline(void *arg)
// Returns {read_fd, write_fd} for an anonymous pipe, or {-1, -1} on failure.
// Wraps CreatePipe()+_open_osfhandle() on Windows and pipe() on POSIX.
static std::pair<int, int> create_relay_pipe()
{
#ifdef __PX4_WINDOWS
HANDLE r = nullptr;
HANDLE w = nullptr;
if (!CreatePipe(&r, &w, nullptr, 0)) {
return {-1, -1};
}
int rfd = _open_osfhandle(reinterpret_cast<intptr_t>(r), O_RDONLY);
int wfd = _open_osfhandle(reinterpret_cast<intptr_t>(w), 0);
if (rfd < 0 || wfd < 0) {
if (rfd >= 0) { ::close(rfd); } else { CloseHandle(r); }
if (wfd >= 0) { ::close(wfd); } else { CloseHandle(w); }
return {-1, -1};
}
return {rfd, wfd};
#else
int fds[2];
if (pipe(fds) != 0) {
return {-1, -1};
}
return {fds[0], fds[1]};
#endif
}
#ifdef __PX4_WINDOWS
// Forwards bytes from the per-client anonymous stdout pipe to the client
// socket. Used only on Windows because Winsock SOCKETs are not CRT file
// descriptors and therefore cannot back a FILE* directly via fdopen() — the
// daemon writes to a CRT pipe and this thread copies it onto the socket.
// On POSIX the socket is fdopen()'d directly and no relay is needed.
static void stdout_relay_loop(int stdout_read_fd, socket_handle_t client_fd)
{
StdoutRelayArgs *relay_args = static_cast<StdoutRelayArgs *>(arg);
char buffer[1024];
while (true) {
const ssize_t n_read = read(relay_args->stdout_read_fd, buffer, sizeof(buffer));
const ssize_t n_read = read(stdout_read_fd, buffer, sizeof(buffer));
if (n_read <= 0) {
break;
@@ -97,12 +143,11 @@ static void *stdout_relay_trampoline(void *arg)
ssize_t remaining = n_read;
while (remaining > 0) {
const int n_sent = send((SOCKET)relay_args->client_fd, buf, (int)remaining, 0);
const int n_sent = send(client_fd, buf, (int)remaining, 0);
if (n_sent <= 0) {
close(relay_args->stdout_read_fd);
delete relay_args;
return nullptr;
::close(stdout_read_fd);
return;
}
buf += n_sent;
@@ -110,16 +155,66 @@ static void *stdout_relay_trampoline(void *arg)
}
}
close(relay_args->stdout_read_fd);
delete relay_args;
return nullptr;
::close(stdout_read_fd);
}
#endif // __PX4_WINDOWS
static ssize_t socket_read(int fd, char *buffer, size_t buffer_size)
// Forwards bytes that arrive on the client socket after the cmd terminator
// into the daemon's anonymous stdin pipe. The pipe's read-end is dup2'd onto
// STDIN_FILENO for the duration of Pxh::process_line(), so the running module
// sees keystrokes typed in the client's terminal.
static void stdin_relay_loop(socket_handle_t client_fd, int stdin_write_fd, std::string prefill)
{
auto write_all = [&](const char *buf, ssize_t n) -> bool {
while (n > 0) {
const int n_written = ::write(stdin_write_fd, buf, (unsigned)n);
if (n_written <= 0) {
return false;
}
buf += n_written;
n -= n_written;
}
return true;
};
if (!prefill.empty()) {
if (!write_all(prefill.data(), (ssize_t)prefill.size())) {
::close(stdin_write_fd);
return;
}
}
char buffer[1024];
while (true) {
const int n_read = recv(client_fd, buffer, sizeof(buffer), 0);
if (n_read <= 0) {
break;
}
if (!write_all(buffer, n_read)) {
break;
}
}
// EOF/error: close the pipe write end so the module sees EOF on stdin.
::close(stdin_write_fd);
}
// Process-wide mutex serializing the STDIN_FILENO redirect. Concurrent commands
// take turns owning the live console-stdin replacement; only one can hold it at
// a time. Interactive use is single-client, so contention is negligible in
// practice.
static pthread_mutex_t s_stdin_redirect_mutex = PTHREAD_MUTEX_INITIALIZER;
static ssize_t socket_read(socket_handle_t fd, char *buffer, size_t buffer_size)
{
#ifdef __PX4_WINDOWS
const int n_read = recv((SOCKET)fd, buffer, (int)buffer_size, 0);
const int n_read = recv(fd, buffer, (int)buffer_size, 0);
if (n_read == SOCKET_ERROR) {
return -1;
@@ -142,15 +237,29 @@ static bool is_shutdown_command(const std::string &cmd)
const std::size_t command_end = cmd.find_first_of(whitespace, command_start);
const std::size_t command_length = (command_end == std::string::npos) ? std::string::npos : command_end - command_start;
return cmd.compare(command_start, command_length, "shutdown") == 0;
if (cmd.compare(command_start, command_length, "shutdown") != 0) {
return false;
}
// Only treat the bare "shutdown" command as the daemon-teardown shortcut.
// If there are any positional arguments (e.g. "shutdown status"), fall
// through to the normal command path so shutdown_main can reject them via
// its usage banner -- otherwise an accidental "shutdown <subcommand>" would
// silently kill the daemon.
if (command_end == std::string::npos) {
return true;
}
return cmd.find_first_not_of(whitespace, command_end) == std::string::npos;
}
} // namespace
#ifdef __PX4_WINDOWS
static SOCKET poll_socket(int fd)
static SOCKET poll_socket(socket_handle_t fd)
{
return static_cast<SOCKET>(fd);
return fd;
}
#else
static int poll_socket(int fd)
@@ -181,7 +290,7 @@ Server::start()
_fd = socket(AF_INET, SOCK_STREAM, 0);
if (_fd < 0) {
if (_fd == invalid_socket_handle) {
PX4_ERR("error creating socket");
return -1;
}
@@ -282,9 +391,9 @@ Server::_server_main()
continue;
}
const int client = accept(_fd, nullptr, nullptr);
const socket_handle_t client = accept(_fd, nullptr, nullptr);
if (client == -1) {
if (client == invalid_socket_handle) {
PX4_ERR("failed to accept client: %s", strerror(errno));
continue;
}
@@ -437,10 +546,13 @@ void
{
ClientThreadArgs *client_args = static_cast<ClientThreadArgs *>(arg);
FILE *out = client_args->thread_stdout;
int fd = client_args->client_fd;
socket_handle_t fd = client_args->client_fd;
// Read until the end of the incoming stream.
std::string cmd;
// Bytes the cmd-read loop pulled past the terminator. These belong to the
// post-command stdin stream and must be handed to the stdin relay thread.
std::string stdin_prefill;
while (true) {
size_t n = cmd.size();
@@ -462,8 +574,25 @@ void
cmd.resize(n + n_read);
// Command ends in 0x00 (no tty) or 0x01 (tty).
if (!cmd.empty() && cmd.back() < 2) {
// Command ends in 0x00 (no tty) or 0x01 (tty). Scan from the start of
// this read because a single recv() may carry the terminator together
// with a leading slice of the stdin stream the client started sending
// once it had finished writing the cmd buffer.
bool found_terminator = false;
for (size_t i = n; i < cmd.size(); ++i) {
if ((unsigned char)cmd[i] < 2) {
if (i + 1 < cmd.size()) {
stdin_prefill.assign(cmd, i + 1, cmd.size() - (i + 1));
}
cmd.resize(i + 1);
found_terminator = true;
break;
}
}
if (found_terminator) {
break;
}
}
@@ -497,17 +626,75 @@ void
}
#ifdef __PX4_WINDOWS
StdoutRelayArgs *relay_args = new StdoutRelayArgs {client_args->stdout_read_fd, fd};
pthread_t relay_thread {};
const bool relay_started = (pthread_create(&relay_thread, nullptr, stdout_relay_trampoline, relay_args) == 0);
std::thread stdout_relay_thread;
bool stdout_relay_started = false;
if (!relay_started) {
try {
stdout_relay_thread = std::thread(stdout_relay_loop, client_args->stdout_read_fd, fd);
stdout_relay_started = true;
} catch (const std::system_error &) {
PX4_ERR("could not start stdout relay thread");
close(client_args->stdout_read_fd);
delete relay_args;
}
#endif
// Redirect STDIN_FILENO onto an anonymous pipe and shovel client-socket
// bytes into it for the duration of process_line(). Modules that read
// STDIN_FILENO (uorb top, mavlink shell, etc.) thereby see keystrokes the
// user types in the px4-* client process. Skip this for the shutdown
// command — it returns immediately and never reads stdin.
int stdin_pipe_read_fd = -1;
int stdin_pipe_write_fd = -1;
int stdin_backup_fd = -1;
std::thread stdin_relay_thread;
bool stdin_relay_started = false;
bool stdin_redirect_active = false;
if (!is_shutdown_command(cmd)) {
auto pipe_fds = create_relay_pipe();
if (pipe_fds.first < 0 || pipe_fds.second < 0) {
PX4_ERR("could not create stdin pipe for module");
} else {
stdin_pipe_read_fd = pipe_fds.first;
stdin_pipe_write_fd = pipe_fds.second;
pthread_mutex_lock(&s_stdin_redirect_mutex);
stdin_backup_fd = dup(STDIN_FILENO);
if (stdin_backup_fd < 0 || dup2(stdin_pipe_read_fd, STDIN_FILENO) < 0) {
PX4_ERR("could not redirect STDIN_FILENO to the relay pipe");
if (stdin_backup_fd >= 0) {
::close(stdin_backup_fd);
stdin_backup_fd = -1;
}
::close(stdin_pipe_read_fd);
::close(stdin_pipe_write_fd);
stdin_pipe_read_fd = stdin_pipe_write_fd = -1;
pthread_mutex_unlock(&s_stdin_redirect_mutex);
} else {
stdin_redirect_active = true;
try {
stdin_relay_thread = std::thread(stdin_relay_loop, fd, stdin_pipe_write_fd, std::move(stdin_prefill));
stdin_relay_started = true;
} catch (const std::system_error &) {
PX4_ERR("could not start stdin relay thread");
::close(stdin_pipe_write_fd);
stdin_pipe_write_fd = -1;
}
}
}
}
if (is_shutdown_command(cmd)) {
// shutdown exits the daemon process asynchronously. Reply before
// requesting shutdown so px4-shutdown gets the normal success marker
@@ -519,11 +706,17 @@ void
#ifdef __PX4_WINDOWS
fclose(out);
if (relay_started) {
pthread_join(relay_thread, nullptr);
if (stdout_relay_started) {
stdout_relay_thread.join();
}
shutdown(fd, SHUT_RDWR);
// kShutWr (SD_SEND) only — never SD_RECEIVE/SHUT_RD: half-closing the
// receive side on Winsock discards the unread receive buffer and
// causes a RST on closesocket(), which races the queued response
// bytes and the peer sees WSAECONNRESET before reading them. A
// half-close-send queues a clean FIN behind any pending sends so the
// client recv() returns 0 after consuming the full reply.
shutdown(fd, kShutWr);
closesocket((SOCKET)fd);
delete client_args;
#else
@@ -538,6 +731,27 @@ void
// Run the actual command.
int retval = Pxh::process_line(cmd, true);
// Tear down the stdin redirect while the socket is still open. We avoid
// shutdown(fd, SHUT_RD/SD_RECEIVE) here: on Winsock, half-closing the
// receive side resets the connection if there is any unread data buffered
// in the kernel, which would race the {0, retval} reply; on POSIX it
// would also discard data the relay had not yet drained. Instead, restore
// STDIN_FILENO and close the pipe read end; the relay thread is detached
// and exits naturally once its recv() on the client socket returns 0
// (after the closesocket()/_cleanup below) or its write to the now-closed
// pipe fails.
if (stdin_redirect_active) {
dup2(stdin_backup_fd, STDIN_FILENO);
::close(stdin_backup_fd);
::close(stdin_pipe_read_fd);
stdin_pipe_read_fd = -1;
pthread_mutex_unlock(&s_stdin_redirect_mutex);
if (stdin_relay_started) {
stdin_relay_thread.detach();
}
}
// Report return value.
char buf[2] = {0, (char)retval};
@@ -551,11 +765,17 @@ void
#ifdef __PX4_WINDOWS
fclose(out);
if (relay_started) {
pthread_join(relay_thread, nullptr);
if (stdout_relay_started) {
stdout_relay_thread.join();
}
shutdown(fd, SHUT_RDWR);
// See kShutWr comment in the shutdown-command branch above. On modules
// that return immediately (e.g. `<module> status` when the module is
// not autostarted, which prints "not running" and returns 1) the entire
// reply is small enough to still be in flight when this thread tears
// down the socket — a full half-close of both directions would make the
// client observe truncation.
shutdown(fd, kShutWr);
closesocket((SOCKET)fd);
delete client_args;
#else
@@ -115,7 +115,7 @@ private:
int _instance_id; ///< instance ID for running multiple instances of the px4 server
int _fd;
socket_handle_t _fd;
static void _pthread_key_destructor(void *arg);
@@ -40,10 +40,17 @@
#include <cstdint>
#include <string>
#ifdef __PX4_WINDOWS
#include <winsock2.h>
#endif
namespace px4_daemon
{
#ifdef __PX4_WINDOWS
using socket_handle_t = SOCKET;
static constexpr socket_handle_t invalid_socket_handle = INVALID_SOCKET;
// Windows: AF_INET TCP loopback. AF_UNIX was introduced in Windows 10 1803
// (WinSock2) and in principle would work, but Wine (used for SITL CI) did
// not support AF_UNIX until 7.x — 6.x still returns WSAEAFNOSUPPORT. TCP
@@ -51,6 +58,8 @@ namespace px4_daemon
// daemon protocol.
uint16_t get_socket_port(int instance_id);
#else
using socket_handle_t = int;
static constexpr socket_handle_t invalid_socket_handle = -1;
std::string get_socket_path(int instance_id);
#endif