diff --git a/platforms/posix/src/px4/common/px4_daemon/client.cpp b/platforms/posix/src/px4/common/px4_daemon/client.cpp index 42613f3302..08b4dac515 100644 --- a/platforms/posix/src/px4/common/px4_daemon/client.cpp +++ b/platforms/posix/src/px4/common/px4_daemon/client.cpp @@ -47,12 +47,15 @@ #include #ifdef __PX4_WINDOWS #include +#include #else #include #endif #include #include +#include +#include #include #include "client.h" @@ -60,9 +63,19 @@ namespace px4_daemon { -#ifdef __PX4_WINDOWS namespace { + +#ifdef __PX4_WINDOWS +// SHUT_WR is the POSIX value of the half-close-send constant; on Winsock, +// the equivalent is SD_SEND. Use a portable alias so the relay code below is +// platform-neutral. +static constexpr int kShutWr = SD_SEND; +#else +static constexpr int kShutWr = SHUT_WR; +#endif + +#ifdef __PX4_WINDOWS bool ends_with_exe_suffix(const std::string &arg) { if (arg.size() < 4) { @@ -75,11 +88,73 @@ bool ends_with_exe_suffix(const std::string &arg) || suffix == ".exE" || suffix == ".EXe" || suffix == ".eXE" || suffix == ".ExE"; } -} // namespace #endif +// Reads up to `n` bytes from this process's standard input. Returns 0 on EOF +// and -1 on error. Wraps the platform-specific stdin-read primitive so the +// forwarding loop below stays platform-neutral. +static ssize_t read_local_stdin(char *buf, size_t n) +{ +#ifdef __PX4_WINDOWS + HANDLE stdin_h = GetStdHandle(STD_INPUT_HANDLE); + + if (stdin_h == INVALID_HANDLE_VALUE || stdin_h == nullptr) { + return -1; + } + + DWORD got = 0; + + if (!ReadFile(stdin_h, buf, (DWORD)n, &got, nullptr)) { + return -1; + } + + return (ssize_t)got; +#else + return read(STDIN_FILENO, buf, n); +#endif +} + +// Worker that reads bytes from this process's stdin and forwards them over +// the socket so the daemon can hand them to the running module's stdin pipe. +// Stops on stdin EOF (half-closes the socket's send side, signalling EOF to +// the daemon) or on any send() error (typically the daemon already closed +// the connection because the command finished). +static void stdin_forward_loop(socket_handle_t fd) +{ + char buffer[256]; + + while (true) { + const ssize_t bytes_read = read_local_stdin(buffer, sizeof(buffer)); + + if (bytes_read <= 0) { + break; + } + + const char *buf = buffer; + ssize_t remaining = bytes_read; + + while (remaining > 0) { + const int n_sent = send(fd, buf, (int)remaining, 0); + + if (n_sent <= 0) { + // Daemon closed the connection (command finished). Done. + return; + } + + buf += n_sent; + remaining -= n_sent; + } + } + + // Local stdin reached EOF — half-close the send side so the daemon's + // stdin relay sees EOF on its recv() and drains the module's stdin pipe. + shutdown(fd, kShutWr); +} + +} // namespace + Client::Client(int instance_id) : - _fd(-1), + _fd(invalid_socket_handle), _instance_id(instance_id) {} @@ -91,7 +166,7 @@ Client::process_args(const int argc, const char **argv) _fd = socket(AF_INET, SOCK_STREAM, 0); - if (_fd < 0) { + if (_fd == invalid_socket_handle) { PX4_ERR("error creating socket"); return -1; } @@ -105,6 +180,15 @@ Client::process_args(const int argc, const char **argv) PX4_ERR("error connecting to 127.0.0.1:%u: %s", port, strerror(errno)); return -1; } + + // No SO_RCVTIMEO: match POSIX/AF_UNIX behaviour where the client blocks + // in recv() until the daemon either replies or shuts the connection + // down. A short timeout here (the previous 5 s default) would fire + // before commands that legitimately take longer to return — most + // notably `commander takeoff`, whose lockstep wait_for_vehicle_command_reply + // can sit idle for the full 5 s acknowledgement window followed by a + // second 5 s wait for the arming reply. Letting the OS deliver an EOF + // or RST from the server is the correct termination signal here. #else std::string sock_path = get_socket_path(_instance_id); @@ -132,6 +216,18 @@ Client::process_args(const int argc, const char **argv) return -3; } + // Forward this process's stdin to the daemon while we wait for the + // command's stdout. The thread is detached and will exit when the local + // stdin closes or the daemon hangs up the socket — process exit reaps it. + // Best-effort: commands that don't read stdin are unaffected; interactive + // ones simply won't see keystrokes if thread creation fails. + try { + std::thread(stdin_forward_loop, _fd).detach(); + + } catch (const std::system_error &) { + // Non-fatal: stdin relay was best-effort. + } + return _listen(); } @@ -198,7 +294,26 @@ Client::_listen() int n_read = recv(_fd, buffer + n_buffer_used, sizeof buffer - n_buffer_used, 0); if (n_read < 0) { +#ifdef __PX4_WINDOWS + const int wsa_err = WSAGetLastError(); + + // WSAECONNRESET / WSAESHUTDOWN can fire instead of a clean + // recv() == 0 if the daemon's closesocket() races our recv() + // (the OS already delivered the bytes — TCP just abandoned the + // graceful FIN). Treat it as end-of-stream so we still pick up + // the {0, retval} sentinel that was already buffered. + if (wsa_err == WSAECONNRESET || wsa_err == WSAESHUTDOWN) { + if (n_buffer_used == 2) { + return buffer[1]; + } + + return -1; + } + + PX4_ERR("unable to read from socket: WSA error = %d", wsa_err); +#else PX4_ERR("unable to read from socket"); +#endif return -1; } else if (n_read == 0) { @@ -236,8 +351,12 @@ Client::_listen() Client::~Client() { - if (_fd >= 0) { + if (_fd != invalid_socket_handle) { +#ifdef __PX4_WINDOWS + closesocket(_fd); +#else close(_fd); +#endif } } diff --git a/platforms/posix/src/px4/common/px4_daemon/client.h b/platforms/posix/src/px4/common/px4_daemon/client.h index b7883af3eb..ed45bff4d6 100644 --- a/platforms/posix/src/px4/common/px4_daemon/client.h +++ b/platforms/posix/src/px4/common/px4_daemon/client.h @@ -64,7 +64,7 @@ public: Client(Client &&other) : _fd(other._fd), _instance_id(other._instance_id) { // Steal the fd from the moved-from client. - other._fd = -1; + other._fd = invalid_socket_handle; } /** @@ -80,7 +80,7 @@ private: int _send_cmds(const int argc, const char **argv); int _listen(); - int _fd; + socket_handle_t _fd; int _instance_id; ///< instance ID for running multiple instances of the px4 server }; diff --git a/platforms/posix/src/px4/common/px4_daemon/server.cpp b/platforms/posix/src/px4/common/px4_daemon/server.cpp index f3ca71751d..472a4c2996 100644 --- a/platforms/posix/src/px4/common/px4_daemon/server.cpp +++ b/platforms/posix/src/px4/common/px4_daemon/server.cpp @@ -44,6 +44,9 @@ #include #include #include +#include +#include +#include #include #include #include @@ -51,6 +54,7 @@ #include #ifdef __PX4_WINDOWS #include +#include #else #include #endif @@ -68,26 +72,68 @@ namespace { struct ClientThreadArgs { - int client_fd; + socket_handle_t client_fd; FILE *thread_stdout; #ifdef __PX4_WINDOWS int stdout_read_fd; #endif }; +// Platform-neutral wrappers for the half-close-send and stdin-pipe-create +// primitives so the relay code below can stay free of #ifdefs on the hot path. #ifdef __PX4_WINDOWS -struct StdoutRelayArgs { - int stdout_read_fd; - int client_fd; -}; +static constexpr int kShutWr = SD_SEND; +#else +static constexpr int kShutWr = SHUT_WR; +#endif -static void *stdout_relay_trampoline(void *arg) +// Returns {read_fd, write_fd} for an anonymous pipe, or {-1, -1} on failure. +// Wraps CreatePipe()+_open_osfhandle() on Windows and pipe() on POSIX. +static std::pair create_relay_pipe() +{ +#ifdef __PX4_WINDOWS + HANDLE r = nullptr; + HANDLE w = nullptr; + + if (!CreatePipe(&r, &w, nullptr, 0)) { + return {-1, -1}; + } + + int rfd = _open_osfhandle(reinterpret_cast(r), O_RDONLY); + int wfd = _open_osfhandle(reinterpret_cast(w), 0); + + if (rfd < 0 || wfd < 0) { + if (rfd >= 0) { ::close(rfd); } else { CloseHandle(r); } + + if (wfd >= 0) { ::close(wfd); } else { CloseHandle(w); } + + return {-1, -1}; + } + + return {rfd, wfd}; +#else + int fds[2]; + + if (pipe(fds) != 0) { + return {-1, -1}; + } + + return {fds[0], fds[1]}; +#endif +} + +#ifdef __PX4_WINDOWS +// Forwards bytes from the per-client anonymous stdout pipe to the client +// socket. Used only on Windows because Winsock SOCKETs are not CRT file +// descriptors and therefore cannot back a FILE* directly via fdopen() — the +// daemon writes to a CRT pipe and this thread copies it onto the socket. +// On POSIX the socket is fdopen()'d directly and no relay is needed. +static void stdout_relay_loop(int stdout_read_fd, socket_handle_t client_fd) { - StdoutRelayArgs *relay_args = static_cast(arg); char buffer[1024]; while (true) { - const ssize_t n_read = read(relay_args->stdout_read_fd, buffer, sizeof(buffer)); + const ssize_t n_read = read(stdout_read_fd, buffer, sizeof(buffer)); if (n_read <= 0) { break; @@ -97,12 +143,11 @@ static void *stdout_relay_trampoline(void *arg) ssize_t remaining = n_read; while (remaining > 0) { - const int n_sent = send((SOCKET)relay_args->client_fd, buf, (int)remaining, 0); + const int n_sent = send(client_fd, buf, (int)remaining, 0); if (n_sent <= 0) { - close(relay_args->stdout_read_fd); - delete relay_args; - return nullptr; + ::close(stdout_read_fd); + return; } buf += n_sent; @@ -110,16 +155,66 @@ static void *stdout_relay_trampoline(void *arg) } } - close(relay_args->stdout_read_fd); - delete relay_args; - return nullptr; + ::close(stdout_read_fd); } #endif // __PX4_WINDOWS -static ssize_t socket_read(int fd, char *buffer, size_t buffer_size) +// Forwards bytes that arrive on the client socket after the cmd terminator +// into the daemon's anonymous stdin pipe. The pipe's read-end is dup2'd onto +// STDIN_FILENO for the duration of Pxh::process_line(), so the running module +// sees keystrokes typed in the client's terminal. +static void stdin_relay_loop(socket_handle_t client_fd, int stdin_write_fd, std::string prefill) +{ + auto write_all = [&](const char *buf, ssize_t n) -> bool { + while (n > 0) { + const int n_written = ::write(stdin_write_fd, buf, (unsigned)n); + + if (n_written <= 0) { + return false; + } + + buf += n_written; + n -= n_written; + } + + return true; + }; + + if (!prefill.empty()) { + if (!write_all(prefill.data(), (ssize_t)prefill.size())) { + ::close(stdin_write_fd); + return; + } + } + + char buffer[1024]; + + while (true) { + const int n_read = recv(client_fd, buffer, sizeof(buffer), 0); + + if (n_read <= 0) { + break; + } + + if (!write_all(buffer, n_read)) { + break; + } + } + + // EOF/error: close the pipe write end so the module sees EOF on stdin. + ::close(stdin_write_fd); +} + +// Process-wide mutex serializing the STDIN_FILENO redirect. Concurrent commands +// take turns owning the live console-stdin replacement; only one can hold it at +// a time. Interactive use is single-client, so contention is negligible in +// practice. +static pthread_mutex_t s_stdin_redirect_mutex = PTHREAD_MUTEX_INITIALIZER; + +static ssize_t socket_read(socket_handle_t fd, char *buffer, size_t buffer_size) { #ifdef __PX4_WINDOWS - const int n_read = recv((SOCKET)fd, buffer, (int)buffer_size, 0); + const int n_read = recv(fd, buffer, (int)buffer_size, 0); if (n_read == SOCKET_ERROR) { return -1; @@ -142,15 +237,29 @@ static bool is_shutdown_command(const std::string &cmd) const std::size_t command_end = cmd.find_first_of(whitespace, command_start); const std::size_t command_length = (command_end == std::string::npos) ? std::string::npos : command_end - command_start; - return cmd.compare(command_start, command_length, "shutdown") == 0; + + if (cmd.compare(command_start, command_length, "shutdown") != 0) { + return false; + } + + // Only treat the bare "shutdown" command as the daemon-teardown shortcut. + // If there are any positional arguments (e.g. "shutdown status"), fall + // through to the normal command path so shutdown_main can reject them via + // its usage banner -- otherwise an accidental "shutdown " would + // silently kill the daemon. + if (command_end == std::string::npos) { + return true; + } + + return cmd.find_first_not_of(whitespace, command_end) == std::string::npos; } } // namespace #ifdef __PX4_WINDOWS -static SOCKET poll_socket(int fd) +static SOCKET poll_socket(socket_handle_t fd) { - return static_cast(fd); + return fd; } #else static int poll_socket(int fd) @@ -181,7 +290,7 @@ Server::start() _fd = socket(AF_INET, SOCK_STREAM, 0); - if (_fd < 0) { + if (_fd == invalid_socket_handle) { PX4_ERR("error creating socket"); return -1; } @@ -282,9 +391,9 @@ Server::_server_main() continue; } - const int client = accept(_fd, nullptr, nullptr); + const socket_handle_t client = accept(_fd, nullptr, nullptr); - if (client == -1) { + if (client == invalid_socket_handle) { PX4_ERR("failed to accept client: %s", strerror(errno)); continue; } @@ -437,10 +546,13 @@ void { ClientThreadArgs *client_args = static_cast(arg); FILE *out = client_args->thread_stdout; - int fd = client_args->client_fd; + socket_handle_t fd = client_args->client_fd; // Read until the end of the incoming stream. std::string cmd; + // Bytes the cmd-read loop pulled past the terminator. These belong to the + // post-command stdin stream and must be handed to the stdin relay thread. + std::string stdin_prefill; while (true) { size_t n = cmd.size(); @@ -462,8 +574,25 @@ void cmd.resize(n + n_read); - // Command ends in 0x00 (no tty) or 0x01 (tty). - if (!cmd.empty() && cmd.back() < 2) { + // Command ends in 0x00 (no tty) or 0x01 (tty). Scan from the start of + // this read because a single recv() may carry the terminator together + // with a leading slice of the stdin stream the client started sending + // once it had finished writing the cmd buffer. + bool found_terminator = false; + + for (size_t i = n; i < cmd.size(); ++i) { + if ((unsigned char)cmd[i] < 2) { + if (i + 1 < cmd.size()) { + stdin_prefill.assign(cmd, i + 1, cmd.size() - (i + 1)); + } + + cmd.resize(i + 1); + found_terminator = true; + break; + } + } + + if (found_terminator) { break; } } @@ -497,17 +626,75 @@ void } #ifdef __PX4_WINDOWS - StdoutRelayArgs *relay_args = new StdoutRelayArgs {client_args->stdout_read_fd, fd}; - pthread_t relay_thread {}; - const bool relay_started = (pthread_create(&relay_thread, nullptr, stdout_relay_trampoline, relay_args) == 0); + std::thread stdout_relay_thread; + bool stdout_relay_started = false; - if (!relay_started) { + try { + stdout_relay_thread = std::thread(stdout_relay_loop, client_args->stdout_read_fd, fd); + stdout_relay_started = true; + + } catch (const std::system_error &) { PX4_ERR("could not start stdout relay thread"); close(client_args->stdout_read_fd); - delete relay_args; } + #endif + // Redirect STDIN_FILENO onto an anonymous pipe and shovel client-socket + // bytes into it for the duration of process_line(). Modules that read + // STDIN_FILENO (uorb top, mavlink shell, etc.) thereby see keystrokes the + // user types in the px4-* client process. Skip this for the shutdown + // command — it returns immediately and never reads stdin. + int stdin_pipe_read_fd = -1; + int stdin_pipe_write_fd = -1; + int stdin_backup_fd = -1; + std::thread stdin_relay_thread; + bool stdin_relay_started = false; + bool stdin_redirect_active = false; + + if (!is_shutdown_command(cmd)) { + auto pipe_fds = create_relay_pipe(); + + if (pipe_fds.first < 0 || pipe_fds.second < 0) { + PX4_ERR("could not create stdin pipe for module"); + + } else { + stdin_pipe_read_fd = pipe_fds.first; + stdin_pipe_write_fd = pipe_fds.second; + + pthread_mutex_lock(&s_stdin_redirect_mutex); + stdin_backup_fd = dup(STDIN_FILENO); + + if (stdin_backup_fd < 0 || dup2(stdin_pipe_read_fd, STDIN_FILENO) < 0) { + PX4_ERR("could not redirect STDIN_FILENO to the relay pipe"); + + if (stdin_backup_fd >= 0) { + ::close(stdin_backup_fd); + stdin_backup_fd = -1; + } + + ::close(stdin_pipe_read_fd); + ::close(stdin_pipe_write_fd); + stdin_pipe_read_fd = stdin_pipe_write_fd = -1; + pthread_mutex_unlock(&s_stdin_redirect_mutex); + + } else { + stdin_redirect_active = true; + + try { + stdin_relay_thread = std::thread(stdin_relay_loop, fd, stdin_pipe_write_fd, std::move(stdin_prefill)); + stdin_relay_started = true; + + } catch (const std::system_error &) { + PX4_ERR("could not start stdin relay thread"); + ::close(stdin_pipe_write_fd); + stdin_pipe_write_fd = -1; + } + } + } + } + + if (is_shutdown_command(cmd)) { // shutdown exits the daemon process asynchronously. Reply before // requesting shutdown so px4-shutdown gets the normal success marker @@ -519,11 +706,17 @@ void #ifdef __PX4_WINDOWS fclose(out); - if (relay_started) { - pthread_join(relay_thread, nullptr); + if (stdout_relay_started) { + stdout_relay_thread.join(); } - shutdown(fd, SHUT_RDWR); + // kShutWr (SD_SEND) only — never SD_RECEIVE/SHUT_RD: half-closing the + // receive side on Winsock discards the unread receive buffer and + // causes a RST on closesocket(), which races the queued response + // bytes and the peer sees WSAECONNRESET before reading them. A + // half-close-send queues a clean FIN behind any pending sends so the + // client recv() returns 0 after consuming the full reply. + shutdown(fd, kShutWr); closesocket((SOCKET)fd); delete client_args; #else @@ -538,6 +731,27 @@ void // Run the actual command. int retval = Pxh::process_line(cmd, true); + // Tear down the stdin redirect while the socket is still open. We avoid + // shutdown(fd, SHUT_RD/SD_RECEIVE) here: on Winsock, half-closing the + // receive side resets the connection if there is any unread data buffered + // in the kernel, which would race the {0, retval} reply; on POSIX it + // would also discard data the relay had not yet drained. Instead, restore + // STDIN_FILENO and close the pipe read end; the relay thread is detached + // and exits naturally once its recv() on the client socket returns 0 + // (after the closesocket()/_cleanup below) or its write to the now-closed + // pipe fails. + if (stdin_redirect_active) { + dup2(stdin_backup_fd, STDIN_FILENO); + ::close(stdin_backup_fd); + ::close(stdin_pipe_read_fd); + stdin_pipe_read_fd = -1; + pthread_mutex_unlock(&s_stdin_redirect_mutex); + + if (stdin_relay_started) { + stdin_relay_thread.detach(); + } + } + // Report return value. char buf[2] = {0, (char)retval}; @@ -551,11 +765,17 @@ void #ifdef __PX4_WINDOWS fclose(out); - if (relay_started) { - pthread_join(relay_thread, nullptr); + if (stdout_relay_started) { + stdout_relay_thread.join(); } - shutdown(fd, SHUT_RDWR); + // See kShutWr comment in the shutdown-command branch above. On modules + // that return immediately (e.g. ` status` when the module is + // not autostarted, which prints "not running" and returns 1) the entire + // reply is small enough to still be in flight when this thread tears + // down the socket — a full half-close of both directions would make the + // client observe truncation. + shutdown(fd, kShutWr); closesocket((SOCKET)fd); delete client_args; #else diff --git a/platforms/posix/src/px4/common/px4_daemon/server.h b/platforms/posix/src/px4/common/px4_daemon/server.h index 36f347df67..02a2abcff8 100644 --- a/platforms/posix/src/px4/common/px4_daemon/server.h +++ b/platforms/posix/src/px4/common/px4_daemon/server.h @@ -115,7 +115,7 @@ private: int _instance_id; ///< instance ID for running multiple instances of the px4 server - int _fd; + socket_handle_t _fd; static void _pthread_key_destructor(void *arg); diff --git a/platforms/posix/src/px4/common/px4_daemon/sock_protocol.h b/platforms/posix/src/px4/common/px4_daemon/sock_protocol.h index 20136214a9..7c5b9a9edd 100644 --- a/platforms/posix/src/px4/common/px4_daemon/sock_protocol.h +++ b/platforms/posix/src/px4/common/px4_daemon/sock_protocol.h @@ -40,10 +40,17 @@ #include #include +#ifdef __PX4_WINDOWS +#include +#endif + namespace px4_daemon { #ifdef __PX4_WINDOWS +using socket_handle_t = SOCKET; +static constexpr socket_handle_t invalid_socket_handle = INVALID_SOCKET; + // Windows: AF_INET TCP loopback. AF_UNIX was introduced in Windows 10 1803 // (WinSock2) and in principle would work, but Wine (used for SITL CI) did // not support AF_UNIX until 7.x — 6.x still returns WSAEAFNOSUPPORT. TCP @@ -51,6 +58,8 @@ namespace px4_daemon // daemon protocol. uint16_t get_socket_port(int instance_id); #else +using socket_handle_t = int; +static constexpr socket_handle_t invalid_socket_handle = -1; std::string get_socket_path(int instance_id); #endif