fix(px4_daemon): support Windows command clients

Use loopback TCP for daemon clients on Windows, strip the .exe suffix from command aliases, and relay command stdout through a CRT pipe instead of fdopen() on WinSock sockets.

The PXH loop now handles Windows console and redirected input explicitly, supports exit/quit commands, and preserves normal shutdown replies so px4-shutdown can return success before the daemon exits.

Signed-off-by: Nuno Marques <n.marques21@hotmail.com>
This commit is contained in:
Nuno Marques
2026-04-27 15:26:07 -07:00
parent 4916c9e0cf
commit 1974fe5945
5 changed files with 533 additions and 12 deletions
@@ -45,7 +45,11 @@
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/stat.h>
#ifdef __PX4_WINDOWS
#include <netinet/in.h>
#else
#include <sys/un.h>
#endif
#include <unistd.h>
#include <string>
@@ -56,6 +60,24 @@
namespace px4_daemon
{
#ifdef __PX4_WINDOWS
namespace
{
bool ends_with_exe_suffix(const std::string &arg)
{
if (arg.size() < 4) {
return false;
}
const std::string suffix = arg.substr(arg.size() - 4);
return suffix == ".exe" || suffix == ".EXE"
|| suffix == ".Exe" || suffix == ".eXe"
|| suffix == ".exE" || suffix == ".EXe"
|| suffix == ".eXE" || suffix == ".ExE";
}
} // namespace
#endif
Client::Client(int instance_id) :
_fd(-1),
_instance_id(instance_id)
@@ -64,6 +86,26 @@ Client::Client(int instance_id) :
int
Client::process_args(const int argc, const char **argv)
{
#ifdef __PX4_WINDOWS
const uint16_t port = get_socket_port(_instance_id);
_fd = socket(AF_INET, SOCK_STREAM, 0);
if (_fd < 0) {
PX4_ERR("error creating socket");
return -1;
}
sockaddr_in addr = {};
addr.sin_family = AF_INET;
addr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
addr.sin_port = htons(port);
if (connect(_fd, (sockaddr *)&addr, sizeof(addr)) < 0) {
PX4_ERR("error connecting to 127.0.0.1:%u: %s", port, strerror(errno));
return -1;
}
#else
std::string sock_path = get_socket_path(_instance_id);
_fd = socket(AF_UNIX, SOCK_STREAM, 0);
@@ -81,6 +123,7 @@ Client::process_args(const int argc, const char **argv)
PX4_ERR("error connecting to socket: %s", strerror(errno));
return -1;
}
#endif
int ret = _send_cmds(argc, argv);
@@ -98,7 +141,18 @@ Client::_send_cmds(const int argc, const char **argv)
std::string cmd_buf;
for (int i = 0; i < argc; ++i) {
cmd_buf += argv[i];
std::string arg = argv[i];
#ifdef __PX4_WINDOWS
// Client executables are real .exe files on Windows, not POSIX
// symlinks. The pxh command namespace remains extensionless
// ("commander", "shutdown", ...), so strip only argv[0].
if (i == 0 && ends_with_exe_suffix(arg)) {
arg.resize(arg.size() - 4);
}
#endif
cmd_buf += arg;
if (i + 1 != argc) {
// TODO: Use '\0' as argument separator (and parse this server-side as well),
@@ -114,10 +168,13 @@ Client::_send_cmds(const int argc, const char **argv)
const char *buf = cmd_buf.data();
while (n > 0) {
int n_sent = write(_fd, buf, n);
// send() instead of write() so the same code path works with AF_UNIX
// on POSIX and AF_INET SOCKETs on Windows — write() does not operate
// on WinSock SOCKET handles.
int n_sent = send(_fd, buf, n, 0);
if (n_sent < 0) {
PX4_ERR("write() failed: %s", strerror(errno));
PX4_ERR("send() failed: %s", strerror(errno));
return -1;
}
@@ -138,7 +195,7 @@ Client::_listen()
// by another byte, we don't output it yet, until we know whether it was
// the end of the stream or not.
while (true) {
int n_read = read(_fd, buffer + n_buffer_used, sizeof buffer - n_buffer_used);
int n_read = recv(_fd, buffer + n_buffer_used, sizeof buffer - n_buffer_used, 0);
if (n_read < 0) {
PX4_ERR("unable to read from socket");
@@ -49,12 +49,117 @@
#include <poll.h>
#include <fcntl.h>
#include <unistd.h>
#ifdef __PX4_WINDOWS
#include <conio.h>
#include <windows.h>
#include <px4_windows/platform.h>
#endif
#include "pxh.h"
namespace px4_daemon
{
#ifdef __PX4_WINDOWS
namespace
{
bool read_windows_console_line(std::string &line)
{
HANDLE stdin_h = GetStdHandle(STD_INPUT_HANDLE);
if (stdin_h == INVALID_HANDLE_VALUE || stdin_h == nullptr) {
return false;
}
line.clear();
while (true) {
char buffer[128] {};
DWORD bytes_read = 0;
if (!ReadFile(stdin_h, buffer, sizeof(buffer), &bytes_read, nullptr)) {
return !line.empty();
}
if (bytes_read == 0) {
return !line.empty();
}
for (DWORD i = 0; i < bytes_read; ++i) {
const char c = buffer[i];
if (c == '\r') {
continue;
}
if (c == '\n') {
return true;
}
line.push_back(c);
}
}
}
void flush_pending_windows_stdin()
{
px4_windows_restore_console_modes();
px4_windows_discard_pending_input();
px4_windows_restore_console_modes();
}
bool stdin_is_windows_console()
{
HANDLE stdin_h = GetStdHandle(STD_INPUT_HANDLE);
if (stdin_h == INVALID_HANDLE_VALUE || stdin_h == nullptr) {
return false;
}
DWORD mode = 0;
return GetConsoleMode(stdin_h, &mode) != 0;
}
bool read_windows_redirected_line(std::string &line)
{
line.clear();
while (true) {
char c = '\0';
const ssize_t bytes_read = ::read(STDIN_FILENO, &c, 1);
if (bytes_read <= 0) {
return !line.empty();
}
if (c == '\r') {
continue;
}
if (c == '\n') {
return true;
}
line.push_back(c);
}
}
} // namespace
#endif
#ifdef __PX4_WINDOWS
static SOCKET poll_socket(int fd)
{
return static_cast<SOCKET>(fd);
}
#else
static int poll_socket(int fd)
{
return fd;
}
#endif
apps_map_type Pxh::_apps = {};
Pxh *Pxh::_instance = nullptr;
@@ -126,6 +231,13 @@ int Pxh::process_line(const std::string &line, bool silently_fail)
list_builtins(_apps);
return 0;
} else if (command == "exit" || command == "quit") {
if (_instance) {
_instance->_should_exit = true;
}
return 0;
} else if (command.length() == 0 || command[0] == '#') {
// Do nothing
return 0;
@@ -199,7 +311,7 @@ void Pxh::run_remote_pxh(int remote_in_fd, int remote_out_fd)
// Any data from remote_in_fd will be process as shell commands when an '\n' is received
while (!_should_exit) {
struct pollfd fds[3] { {pipe_stderr, POLLIN}, {pipe_stdout, POLLIN}, {remote_in_fd, POLLIN}};
struct pollfd fds[3] { {poll_socket(pipe_stderr), POLLIN}, {poll_socket(pipe_stdout), POLLIN}, {poll_socket(remote_in_fd), POLLIN}};
if (poll(fds, 3, -1) == -1) {
perror("Mavlink Shell Poll Error");
@@ -301,8 +413,61 @@ void Pxh::run_pxh()
// Only the local_terminal needed for static calls
_instance = this;
_local_terminal = true;
#ifndef __PX4_WINDOWS
_setup_term();
#endif
#ifdef __PX4_WINDOWS
_print_prompt();
const bool stdin_is_tty = isatty(STDIN_FILENO) != 0;
const bool use_console_input = stdin_is_tty && stdin_is_windows_console();
while (!_should_exit) {
std::string line;
bool got_line = false;
if (use_console_input) {
got_line = read_windows_console_line(line);
} else {
got_line = read_windows_redirected_line(line);
}
if (!got_line) {
if (_should_exit) {
flush_pending_windows_stdin();
break;
}
clearerr(stdin);
usleep(10000);
continue;
}
if (_should_exit) {
flush_pending_windows_stdin();
break;
}
_history.try_to_add(line);
_history.reset_to_end();
if (!stdin_is_tty) {
printf("\n");
}
process_line(line, false);
if (_should_exit) {
flush_pending_windows_stdin();
break;
}
_print_prompt();
}
return;
#else
std::string mystr;
int cursor_position = 0; // position of the cursor from right to left
// (0: all the way to the right, mystr.length: all the way to the left)
@@ -310,7 +475,6 @@ void Pxh::run_pxh()
_print_prompt();
while (!_should_exit) {
int c = getchar();
std::string add_string; // string to add at current cursor position
bool update_prompt = true;
@@ -330,6 +494,7 @@ void Pxh::run_pxh()
break;
case '\r': // Windows _getch() reports Enter as CR
case '\n': // user hit enter
_history.try_to_add(mystr);
_history.reset_to_end();
@@ -409,6 +574,7 @@ void Pxh::run_pxh()
}
}
}
#endif
}
void Pxh::stop()
@@ -49,7 +49,11 @@
#include <sys/stat.h>
#include <sys/socket.h>
#include <sys/types.h>
#ifdef __PX4_WINDOWS
#include <netinet/in.h>
#else
#include <sys/un.h>
#endif
#include <vector>
#include <px4_platform_common/log.h>
@@ -60,6 +64,101 @@
namespace px4_daemon
{
namespace
{
struct ClientThreadArgs {
int client_fd;
FILE *thread_stdout;
#ifdef __PX4_WINDOWS
int stdout_read_fd;
#endif
};
#ifdef __PX4_WINDOWS
struct StdoutRelayArgs {
int stdout_read_fd;
int client_fd;
};
static void *stdout_relay_trampoline(void *arg)
{
StdoutRelayArgs *relay_args = static_cast<StdoutRelayArgs *>(arg);
char buffer[1024];
while (true) {
const ssize_t n_read = read(relay_args->stdout_read_fd, buffer, sizeof(buffer));
if (n_read <= 0) {
break;
}
const char *buf = buffer;
ssize_t remaining = n_read;
while (remaining > 0) {
const int n_sent = send((SOCKET)relay_args->client_fd, buf, (int)remaining, 0);
if (n_sent <= 0) {
close(relay_args->stdout_read_fd);
delete relay_args;
return nullptr;
}
buf += n_sent;
remaining -= n_sent;
}
}
close(relay_args->stdout_read_fd);
delete relay_args;
return nullptr;
}
#endif // __PX4_WINDOWS
static ssize_t socket_read(int fd, char *buffer, size_t buffer_size)
{
#ifdef __PX4_WINDOWS
const int n_read = recv((SOCKET)fd, buffer, (int)buffer_size, 0);
if (n_read == SOCKET_ERROR) {
return -1;
}
return n_read;
#else
return read(fd, buffer, buffer_size);
#endif
}
static bool is_shutdown_command(const std::string &cmd)
{
const std::string whitespace{" \t\r\n"};
const std::size_t command_start = cmd.find_first_not_of(whitespace);
if (command_start == std::string::npos) {
return false;
}
const std::size_t command_end = cmd.find_first_of(whitespace, command_start);
const std::size_t command_length = (command_end == std::string::npos) ? std::string::npos : command_end - command_start;
return cmd.compare(command_start, command_length, "shutdown") == 0;
}
} // namespace
#ifdef __PX4_WINDOWS
static SOCKET poll_socket(int fd)
{
return static_cast<SOCKET>(fd);
}
#else
static int poll_socket(int fd)
{
return fd;
}
#endif
Server *Server::_instance = nullptr;
Server::Server(int instance_id)
@@ -77,6 +176,34 @@ Server::~Server()
int
Server::start()
{
#ifdef __PX4_WINDOWS
const uint16_t port = get_socket_port(_instance_id);
_fd = socket(AF_INET, SOCK_STREAM, 0);
if (_fd < 0) {
PX4_ERR("error creating socket");
return -1;
}
// Avoid EADDRINUSE when a prior instance left the port in TIME_WAIT.
int opt = 1;
setsockopt(_fd, SOL_SOCKET, SO_REUSEADDR, (const char *)&opt, sizeof(opt));
sockaddr_in addr = {};
addr.sin_family = AF_INET;
addr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
addr.sin_port = htons(port);
if (bind(_fd, (sockaddr *)&addr, sizeof(addr)) < 0) {
// Winsock bind() does not set errno on failure — the error code
// lives in WSAGetLastError(). Reading errno returns whatever
// stale value the CRT had (often 0 → "Success"), which is
// useless. Report the winsock code directly.
PX4_ERR("error binding socket 127.0.0.1:%u, WSA error = %d", port, WSAGetLastError());
return -1;
}
#else
std::string sock_path = get_socket_path(_instance_id);
// Delete socket in case it exists already.
@@ -97,6 +224,7 @@ Server::start()
PX4_ERR("error binding socket %s, error = %s", sock_path.c_str(), strerror(errno));
return -1;
}
#endif
if (listen(_fd, 10) < 0) {
PX4_ERR("error listening to socket: %s", strerror(errno));
@@ -136,11 +264,78 @@ Server::_server_main()
return;
}
#ifdef __PX4_WINDOWS
while (true) {
pollfd listen_fd {poll_socket(_fd), POLLIN, 0};
int n_ready = poll(&listen_fd, 1, -1);
if (n_ready < 0) {
if (errno != EINTR) {
PX4_ERR("poll() failed: %s", strerror(errno));
break;
}
continue;
}
if (!(listen_fd.revents & POLLIN)) {
continue;
}
const int client = accept(_fd, nullptr, nullptr);
if (client == -1) {
PX4_ERR("failed to accept client: %s", strerror(errno));
continue;
}
int stdout_pipe[2] {-1, -1};
if (pipe(stdout_pipe) != 0) {
PX4_ERR("could not create stdout pipe for new thread");
closesocket((SOCKET)client);
continue;
}
FILE *thread_stdout = fdopen(stdout_pipe[1], "wb");
if (thread_stdout == nullptr) {
PX4_ERR("could not open stdout pipe for new thread");
close(stdout_pipe[0]);
close(stdout_pipe[1]);
closesocket((SOCKET)client);
continue;
}
// Windows sockets are WinSock handles, not CRT file descriptors, so
// they cannot be used with fdopen()/FILE*. PX4 command output still
// expects FILE*, therefore each client gets a real CRT pipe and the
// handler relays that pipe to the socket after command execution.
setvbuf(thread_stdout, nullptr, _IOLBF, BUFSIZ);
ClientThreadArgs *thread_args = new ClientThreadArgs {client, thread_stdout, stdout_pipe[0]};
pthread_t thread {};
ret = pthread_create(&thread, nullptr, Server::_handle_client, thread_args);
if (ret != 0) {
PX4_ERR("could not start pthread (%i)", ret);
fclose(thread_stdout);
close(stdout_pipe[0]);
closesocket((SOCKET)client);
delete thread_args;
continue;
}
pthread_detach(thread);
}
closesocket((SOCKET)_fd);
#else
// The list of file descriptors to watch.
std::vector<pollfd> poll_fds;
// Watch the listening socket for incoming connections.
poll_fds.push_back(pollfd {_fd, POLLIN, 0});
poll_fds.push_back(pollfd {poll_socket(_fd), POLLIN, 0});
// The list of FILE pointers that we'll need to fclose().
// stdouts[i] corresponds to poll_fds[i+1].
@@ -181,20 +376,22 @@ Server::_server_main()
setvbuf(thread_stdout, nullptr, _IOLBF, BUFSIZ);
// Start a new thread to handle the client.
ClientThreadArgs *thread_args = new ClientThreadArgs {client, thread_stdout};
pthread_t *thread = &_fd_to_thread[client];
ret = pthread_create(thread, nullptr, Server::_handle_client, thread_stdout);
ret = pthread_create(thread, nullptr, Server::_handle_client, thread_args);
if (ret != 0) {
PX4_ERR("could not start pthread (%i)", ret);
_fd_to_thread.erase(client);
fclose(thread_stdout);
delete thread_args;
} else {
// We won't join the thread, so detach to automatically release resources at its end
pthread_detach(*thread);
// Start listening for the client hanging up.
poll_fds.push_back(pollfd {client, POLLHUP, 0});
poll_fds.push_back(pollfd {poll_socket(client), POLLHUP, 0});
// Remember the FILE *, so we can fclose() it later.
stdouts.push_back(thread_stdout);
@@ -227,16 +424,20 @@ Server::_server_main()
_unlock();
}
#ifndef __PX4_WINDOWS
std::string sock_path = get_socket_path(_instance_id);
unlink(sock_path.c_str());
#endif
close(_fd);
#endif // __PX4_WINDOWS
}
void
*Server::_handle_client(void *arg)
{
FILE *out = (FILE *)arg;
int fd = fileno(out);
ClientThreadArgs *client_args = static_cast<ClientThreadArgs *>(arg);
FILE *out = client_args->thread_stdout;
int fd = client_args->client_fd;
// Read until the end of the incoming stream.
std::string cmd;
@@ -244,10 +445,18 @@ void
while (true) {
size_t n = cmd.size();
cmd.resize(n + 1024);
ssize_t n_read = read(fd, &cmd[n], cmd.size() - n);
ssize_t n_read = socket_read(fd, &cmd[n], cmd.size() - n);
if (n_read <= 0) {
#ifdef __PX4_WINDOWS
fclose(out);
close(client_args->stdout_read_fd);
closesocket((SOCKET)fd);
delete client_args;
#else
delete client_args;
_cleanup(fd);
#endif
return nullptr;
}
@@ -260,7 +469,15 @@ void
}
if (cmd.size() < 2) {
#ifdef __PX4_WINDOWS
fclose(out);
close(client_args->stdout_read_fd);
closesocket((SOCKET)fd);
delete client_args;
#else
delete client_args;
_cleanup(fd);
#endif
return nullptr;
}
@@ -279,6 +496,45 @@ void
(void)pthread_setspecific(_instance->_key, (void *)thread_data_ptr);
}
#ifdef __PX4_WINDOWS
StdoutRelayArgs *relay_args = new StdoutRelayArgs {client_args->stdout_read_fd, fd};
pthread_t relay_thread {};
const bool relay_started = (pthread_create(&relay_thread, nullptr, stdout_relay_trampoline, relay_args) == 0);
if (!relay_started) {
PX4_ERR("could not start stdout relay thread");
close(client_args->stdout_read_fd);
delete relay_args;
}
#endif
if (is_shutdown_command(cmd)) {
// shutdown exits the daemon process asynchronously. Reply before
// requesting shutdown so px4-shutdown gets the normal success marker
// instead of racing process teardown and reporting 255.
char buf[2] = {0, 0};
(void)fwrite(buf, sizeof buf, 1, out);
fflush(out);
#ifdef __PX4_WINDOWS
fclose(out);
if (relay_started) {
pthread_join(relay_thread, nullptr);
}
shutdown(fd, SHUT_RDWR);
closesocket((SOCKET)fd);
delete client_args;
#else
delete client_args;
_cleanup(fd);
#endif
(void)Pxh::process_line(cmd, true);
return nullptr;
}
// Run the actual command.
int retval = Pxh::process_line(cmd, true);
@@ -292,7 +548,20 @@ void
// Flush the FILE*'s buffer before we shut down the connection.
fflush(out);
#ifdef __PX4_WINDOWS
fclose(out);
if (relay_started) {
pthread_join(relay_thread, nullptr);
}
shutdown(fd, SHUT_RDWR);
closesocket((SOCKET)fd);
delete client_args;
#else
delete client_args;
_cleanup(fd);
#endif
return nullptr;
}
@@ -38,13 +38,32 @@
#include "sock_protocol.h"
#include <cstdlib>
namespace px4_daemon
{
#ifdef __PX4_WINDOWS
uint16_t get_socket_port(int instance_id)
{
// Keep the local daemon control socket away from the default SITL MAVLink
// UDP ranges (for example 14580 + instance is used by px4-rc.mavlink).
// Override by setting PX4_DAEMON_PORT if the default collides with another
// app embedding PX4.
const char *override_port = std::getenv("PX4_DAEMON_PORT");
if (override_port) {
return static_cast<uint16_t>(std::atoi(override_port) + instance_id);
}
return static_cast<uint16_t>(14680 + instance_id);
}
#else
std::string get_socket_path(int instance_id)
{
// TODO: Use /var/run/px4/$instance/sock (or /var/run/user/$UID/... for non-root).
return "/tmp/px4-sock-" + std::to_string(instance_id);
}
#endif
} // namespace px4_daemon
@@ -37,11 +37,21 @@
*/
#pragma once
#include <cstdint>
#include <string>
namespace px4_daemon
{
#ifdef __PX4_WINDOWS
// Windows: AF_INET TCP loopback. AF_UNIX was introduced in Windows 10 1803
// (WinSock2) and in principle would work, but Wine (used for SITL CI) did
// not support AF_UNIX until 7.x — 6.x still returns WSAEAFNOSUPPORT. TCP
// loopback sidesteps the portability gap without changing the rest of the
// daemon protocol.
uint16_t get_socket_port(int instance_id);
#else
std::string get_socket_path(int instance_id);
#endif
} // namespace px4_daemon