[web_server_idf] Fix cross-thread race on SSE session state (#15967)

This commit is contained in:
J. Nick Koston
2026-04-24 08:11:47 -05:00
committed by GitHub
parent 773b4d887b
commit baa6d5f96b
2 changed files with 65 additions and 21 deletions
@@ -472,24 +472,36 @@ void AsyncResponseStream::printf(const char *fmt, ...) {
#ifdef USE_WEBSERVER #ifdef USE_WEBSERVER
AsyncEventSource::~AsyncEventSource() { AsyncEventSource::~AsyncEventSource() {
for (auto *ses : this->sessions_) { LockGuard guard{this->pending_mutex_};
for (auto *vec : {&this->sessions_, &this->pending_sessions_}) {
for (auto *ses : *vec) {
delete ses; // NOLINT(cppcoreguidelines-owning-memory) delete ses; // NOLINT(cppcoreguidelines-owning-memory)
} }
}
} }
void AsyncEventSource::handleRequest(AsyncWebServerRequest *request) { void AsyncEventSource::handleRequest(AsyncWebServerRequest *request) {
// Httpd task: set up the live httpd_req_t and park the session; main loop does the rest.
// NOLINTNEXTLINE(cppcoreguidelines-owning-memory,clang-analyzer-cplusplus.NewDeleteLeaks) // NOLINTNEXTLINE(cppcoreguidelines-owning-memory,clang-analyzer-cplusplus.NewDeleteLeaks)
auto *rsp = new AsyncEventSourceResponse(request, this, this->web_server_); auto *rsp = new AsyncEventSourceResponse(request, this, this->web_server_);
if (this->on_connect_) { {
this->on_connect_(rsp); LockGuard guard{this->pending_mutex_};
this->pending_sessions_.push_back(rsp);
this->has_pending_sessions_.store(true, std::memory_order_release);
} }
this->sessions_.push_back(rsp);
// Wake up WebServer::loop() to drain deferred event queues for this client.
// Safe from httpd task context via the pending_enable_loop_ flag.
this->web_server_->enable_loop_soon_any_context(); this->web_server_->enable_loop_soon_any_context();
} }
// clang-analyzer traces a false-positive leak path from loop() through
// adopt_pending_sessions_main_loop_() into start_session_main_loop_() and
// finally ArduinoJson. Suppress along the entire in-our-code call chain.
// NOLINTBEGIN(clang-analyzer-cplusplus.NewDeleteLeaks)
bool AsyncEventSource::loop() { bool AsyncEventSource::loop() {
// Fast path: one atomic load per tick. Slow path is out-of-line on connect.
if (this->has_pending_sessions_.load(std::memory_order_acquire)) {
this->adopt_pending_sessions_main_loop_();
}
// Clean up dead sessions safely // Clean up dead sessions safely
// This follows the ESP-IDF pattern where free_ctx marks resources as dead // This follows the ESP-IDF pattern where free_ctx marks resources as dead
// and the main loop handles the actual cleanup to avoid race conditions // and the main loop handles the actual cleanup to avoid race conditions
@@ -497,7 +509,7 @@ bool AsyncEventSource::loop() {
auto *ses = this->sessions_[i]; auto *ses = this->sessions_[i];
// If the session has a dead socket (marked by destroy callback) // If the session has a dead socket (marked by destroy callback)
if (ses->fd_.load() == 0) { if (ses->fd_.load() == 0) {
ESP_LOGD(TAG, "Removing dead event source session"); // destroy() already logged the close with the fd; don't double-log here.
delete ses; // NOLINT(cppcoreguidelines-owning-memory) delete ses; // NOLINT(cppcoreguidelines-owning-memory)
// Remove by swapping with last element (O(1) removal, order doesn't matter for sessions) // Remove by swapping with last element (O(1) removal, order doesn't matter for sessions)
this->sessions_[i] = this->sessions_.back(); this->sessions_[i] = this->sessions_.back();
@@ -510,6 +522,30 @@ bool AsyncEventSource::loop() {
return !this->sessions_.empty(); return !this->sessions_.empty();
} }
void AsyncEventSource::adopt_pending_sessions_main_loop_() {
std::vector<AsyncEventSourceResponse *> incoming;
{
LockGuard guard{this->pending_mutex_};
incoming.swap(this->pending_sessions_);
this->has_pending_sessions_.store(false, std::memory_order_relaxed);
}
for (auto *rsp : incoming) {
// Already disconnected? Drop it; skip on_connect_/session start on a dead session.
if (rsp->fd_.load() == 0) {
delete rsp; // NOLINT(cppcoreguidelines-owning-memory)
continue;
}
this->sessions_.push_back(rsp);
// Prime first so on_connect_ observes a session that has already sent its
// initial ping/config/sorting_groups, matching the pre-refactor ordering.
rsp->start_session_main_loop_();
if (this->on_connect_) {
this->on_connect_(rsp);
}
}
}
// NOLINTEND(clang-analyzer-cplusplus.NewDeleteLeaks)
void AsyncEventSource::try_send_nodefer(const char *message, const char *event, uint32_t id, uint32_t reconnect) { void AsyncEventSource::try_send_nodefer(const char *message, const char *event, uint32_t id, uint32_t reconnect) {
for (auto *ses : this->sessions_) { for (auto *ses : this->sessions_) {
if (ses->fd_.load() != 0) { // Skip dead sessions if (ses->fd_.load() != 0) { // Skip dead sessions
@@ -534,6 +570,7 @@ AsyncEventSourceResponse::AsyncEventSourceResponse(const AsyncWebServerRequest *
esphome::web_server_idf::AsyncEventSource *server, esphome::web_server_idf::AsyncEventSource *server,
esphome::web_server::WebServer *ws) esphome::web_server::WebServer *ws)
: server_(server), web_server_(ws), entities_iterator_(ws, server) { : server_(server), web_server_(ws), entities_iterator_(ws, server) {
// Httpd task only. start_session_main_loop_() handles event_buffer_ / iterator setup.
httpd_req_t *req = *request; httpd_req_t *req = *request;
httpd_resp_set_status(req, HTTPD_200); httpd_resp_set_status(req, HTTPD_200);
@@ -555,21 +592,23 @@ AsyncEventSourceResponse::AsyncEventSourceResponse(const AsyncWebServerRequest *
// Use non-blocking send to prevent watchdog timeouts when TCP buffers are full // Use non-blocking send to prevent watchdog timeouts when TCP buffers are full
httpd_sess_set_send_override(this->hd_, this->fd_.load(), nonblocking_send); httpd_sess_set_send_override(this->hd_, this->fd_.load(), nonblocking_send);
}
// Configure reconnect timeout and send config // NOLINTBEGIN(clang-analyzer-cplusplus.NewDeleteLeaks) false positive with ArduinoJson
// this should always go through since the tcp send buffer is empty on connect void AsyncEventSourceResponse::start_session_main_loop_() {
auto *ws = this->web_server_;
// tcp send buffer is empty on connect, so these should always go through
auto message = ws->get_config_json(); auto message = ws->get_config_json();
this->try_send_nodefer(message.c_str(), "ping", millis(), 30000); this->try_send_nodefer(message.c_str(), "ping", millis(), 30000);
#ifdef USE_WEBSERVER_SORTING #ifdef USE_WEBSERVER_SORTING
for (auto &group : ws->sorting_groups_) { for (auto &group : ws->sorting_groups_) {
// NOLINTBEGIN(clang-analyzer-cplusplus.NewDeleteLeaks) false positive with ArduinoJson
json::JsonBuilder builder; json::JsonBuilder builder;
JsonObject root = builder.root(); JsonObject root = builder.root();
root["name"] = group.second.name; root["name"] = group.second.name;
root["sorting_weight"] = group.second.weight; root["sorting_weight"] = group.second.weight;
message = builder.serialize(); message = builder.serialize();
// NOLINTEND(clang-analyzer-cplusplus.NewDeleteLeaks)
// a (very) large number of these should be able to be queued initially without defer // a (very) large number of these should be able to be queued initially without defer
// since the only thing in the send buffer at this point is the initial ping/config // since the only thing in the send buffer at this point is the initial ping/config
@@ -578,13 +617,8 @@ AsyncEventSourceResponse::AsyncEventSourceResponse(const AsyncWebServerRequest *
#endif #endif
this->entities_iterator_.begin(ws->include_internal_); this->entities_iterator_.begin(ws->include_internal_);
// just dump them all up-front and take advantage of the deferred queue
// on second thought that takes too long, but leaving the commented code here for debug purposes
// while(!this->entities_iterator_.completed()) {
// this->entities_iterator_.advance();
//}
} }
// NOLINTEND(clang-analyzer-cplusplus.NewDeleteLeaks)
void AsyncEventSourceResponse::destroy(void *ptr) { void AsyncEventSourceResponse::destroy(void *ptr) {
auto *rsp = static_cast<AsyncEventSourceResponse *>(ptr); auto *rsp = static_cast<AsyncEventSourceResponse *>(ptr);
@@ -299,6 +299,9 @@ class AsyncEventSourceResponse {
AsyncEventSourceResponse(const AsyncWebServerRequest *request, esphome::web_server_idf::AsyncEventSource *server, AsyncEventSourceResponse(const AsyncWebServerRequest *request, esphome::web_server_idf::AsyncEventSource *server,
esphome::web_server::WebServer *ws); esphome::web_server::WebServer *ws);
// Main-loop only: sends initial ping/config/sorting_groups, starts entity iterator.
void start_session_main_loop_();
void deq_push_back_with_dedup_(void *source, message_generator_t *message_generator); void deq_push_back_with_dedup_(void *source, message_generator_t *message_generator);
void process_deferred_queue_(); void process_deferred_queue_();
void process_buffer_(); void process_buffer_();
@@ -335,6 +338,8 @@ class AsyncEventSource : public AsyncWebHandler {
} }
// NOLINTNEXTLINE(readability-identifier-naming) // NOLINTNEXTLINE(readability-identifier-naming)
void handleRequest(AsyncWebServerRequest *request) override; void handleRequest(AsyncWebServerRequest *request) override;
// Callback runs on the main loop (not the httpd task) after the session's
// initial ping/config/sorting_groups have been sent.
// NOLINTNEXTLINE(readability-identifier-naming) // NOLINTNEXTLINE(readability-identifier-naming)
void onConnect(connect_handler_t &&cb) { this->on_connect_ = std::move(cb); } void onConnect(connect_handler_t &&cb) { this->on_connect_ = std::move(cb); }
@@ -347,13 +352,18 @@ class AsyncEventSource : public AsyncWebHandler {
size_t count() const { return this->sessions_.size(); } size_t count() const { return this->sessions_.size(); }
protected: protected:
// Cold path: move sessions from pending_sessions_ into sessions_ and greet each one.
void __attribute__((noinline, cold)) adopt_pending_sessions_main_loop_();
std::string url_; std::string url_;
// Use vector instead of set: SSE sessions are typically 1-5 connections (browsers, dashboards). // Main-loop only. Vector: SSE sessions are 1-5 connections, linear search beats set.
// Linear search is faster than red-black tree overhead for this small dataset.
// Only operations needed: add session, remove session, iterate sessions - no need for sorted order.
std::vector<AsyncEventSourceResponse *> sessions_; std::vector<AsyncEventSourceResponse *> sessions_;
// Httpd-task intake; guarded by pending_mutex_, gated by has_pending_sessions_.
std::vector<AsyncEventSourceResponse *> pending_sessions_;
Mutex pending_mutex_;
connect_handler_t on_connect_{}; connect_handler_t on_connect_{};
esphome::web_server::WebServer *web_server_; esphome::web_server::WebServer *web_server_;
std::atomic<bool> has_pending_sessions_{false};
}; };
#endif // USE_WEBSERVER #endif // USE_WEBSERVER