From baa6d5f96b85ff28f34af1a718e5bbe71bef3e2f Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Fri, 24 Apr 2026 08:11:47 -0500 Subject: [PATCH] [web_server_idf] Fix cross-thread race on SSE session state (#15967) --- .../web_server_idf/web_server_idf.cpp | 70 ++++++++++++++----- .../web_server_idf/web_server_idf.h | 16 ++++- 2 files changed, 65 insertions(+), 21 deletions(-) diff --git a/esphome/components/web_server_idf/web_server_idf.cpp b/esphome/components/web_server_idf/web_server_idf.cpp index 8f464ae912..e1d3e4bf34 100644 --- a/esphome/components/web_server_idf/web_server_idf.cpp +++ b/esphome/components/web_server_idf/web_server_idf.cpp @@ -472,24 +472,36 @@ void AsyncResponseStream::printf(const char *fmt, ...) { #ifdef USE_WEBSERVER AsyncEventSource::~AsyncEventSource() { - for (auto *ses : this->sessions_) { - delete ses; // NOLINT(cppcoreguidelines-owning-memory) + LockGuard guard{this->pending_mutex_}; + for (auto *vec : {&this->sessions_, &this->pending_sessions_}) { + for (auto *ses : *vec) { + delete ses; // NOLINT(cppcoreguidelines-owning-memory) + } } } void AsyncEventSource::handleRequest(AsyncWebServerRequest *request) { + // Httpd task: set up the live httpd_req_t and park the session; main loop does the rest. // NOLINTNEXTLINE(cppcoreguidelines-owning-memory,clang-analyzer-cplusplus.NewDeleteLeaks) auto *rsp = new AsyncEventSourceResponse(request, this, this->web_server_); - if (this->on_connect_) { - this->on_connect_(rsp); + { + LockGuard guard{this->pending_mutex_}; + this->pending_sessions_.push_back(rsp); + this->has_pending_sessions_.store(true, std::memory_order_release); } - this->sessions_.push_back(rsp); - // Wake up WebServer::loop() to drain deferred event queues for this client. - // Safe from httpd task context via the pending_enable_loop_ flag. this->web_server_->enable_loop_soon_any_context(); } +// clang-analyzer traces a false-positive leak path from loop() through +// adopt_pending_sessions_main_loop_() into start_session_main_loop_() and +// finally ArduinoJson. Suppress along the entire in-our-code call chain. +// NOLINTBEGIN(clang-analyzer-cplusplus.NewDeleteLeaks) bool AsyncEventSource::loop() { + // Fast path: one atomic load per tick. Slow path is out-of-line on connect. + if (this->has_pending_sessions_.load(std::memory_order_acquire)) { + this->adopt_pending_sessions_main_loop_(); + } + // Clean up dead sessions safely // This follows the ESP-IDF pattern where free_ctx marks resources as dead // and the main loop handles the actual cleanup to avoid race conditions @@ -497,7 +509,7 @@ bool AsyncEventSource::loop() { auto *ses = this->sessions_[i]; // If the session has a dead socket (marked by destroy callback) if (ses->fd_.load() == 0) { - ESP_LOGD(TAG, "Removing dead event source session"); + // destroy() already logged the close with the fd; don't double-log here. delete ses; // NOLINT(cppcoreguidelines-owning-memory) // Remove by swapping with last element (O(1) removal, order doesn't matter for sessions) this->sessions_[i] = this->sessions_.back(); @@ -510,6 +522,30 @@ bool AsyncEventSource::loop() { return !this->sessions_.empty(); } +void AsyncEventSource::adopt_pending_sessions_main_loop_() { + std::vector incoming; + { + LockGuard guard{this->pending_mutex_}; + incoming.swap(this->pending_sessions_); + this->has_pending_sessions_.store(false, std::memory_order_relaxed); + } + for (auto *rsp : incoming) { + // Already disconnected? Drop it; skip on_connect_/session start on a dead session. + if (rsp->fd_.load() == 0) { + delete rsp; // NOLINT(cppcoreguidelines-owning-memory) + continue; + } + this->sessions_.push_back(rsp); + // Prime first so on_connect_ observes a session that has already sent its + // initial ping/config/sorting_groups, matching the pre-refactor ordering. + rsp->start_session_main_loop_(); + if (this->on_connect_) { + this->on_connect_(rsp); + } + } +} +// NOLINTEND(clang-analyzer-cplusplus.NewDeleteLeaks) + void AsyncEventSource::try_send_nodefer(const char *message, const char *event, uint32_t id, uint32_t reconnect) { for (auto *ses : this->sessions_) { if (ses->fd_.load() != 0) { // Skip dead sessions @@ -534,6 +570,7 @@ AsyncEventSourceResponse::AsyncEventSourceResponse(const AsyncWebServerRequest * esphome::web_server_idf::AsyncEventSource *server, esphome::web_server::WebServer *ws) : server_(server), web_server_(ws), entities_iterator_(ws, server) { + // Httpd task only. start_session_main_loop_() handles event_buffer_ / iterator setup. httpd_req_t *req = *request; httpd_resp_set_status(req, HTTPD_200); @@ -555,21 +592,23 @@ AsyncEventSourceResponse::AsyncEventSourceResponse(const AsyncWebServerRequest * // Use non-blocking send to prevent watchdog timeouts when TCP buffers are full httpd_sess_set_send_override(this->hd_, this->fd_.load(), nonblocking_send); +} - // Configure reconnect timeout and send config - // this should always go through since the tcp send buffer is empty on connect +// NOLINTBEGIN(clang-analyzer-cplusplus.NewDeleteLeaks) false positive with ArduinoJson +void AsyncEventSourceResponse::start_session_main_loop_() { + auto *ws = this->web_server_; + + // tcp send buffer is empty on connect, so these should always go through auto message = ws->get_config_json(); this->try_send_nodefer(message.c_str(), "ping", millis(), 30000); #ifdef USE_WEBSERVER_SORTING for (auto &group : ws->sorting_groups_) { - // NOLINTBEGIN(clang-analyzer-cplusplus.NewDeleteLeaks) false positive with ArduinoJson json::JsonBuilder builder; JsonObject root = builder.root(); root["name"] = group.second.name; root["sorting_weight"] = group.second.weight; message = builder.serialize(); - // NOLINTEND(clang-analyzer-cplusplus.NewDeleteLeaks) // a (very) large number of these should be able to be queued initially without defer // since the only thing in the send buffer at this point is the initial ping/config @@ -578,13 +617,8 @@ AsyncEventSourceResponse::AsyncEventSourceResponse(const AsyncWebServerRequest * #endif this->entities_iterator_.begin(ws->include_internal_); - - // just dump them all up-front and take advantage of the deferred queue - // on second thought that takes too long, but leaving the commented code here for debug purposes - // while(!this->entities_iterator_.completed()) { - // this->entities_iterator_.advance(); - //} } +// NOLINTEND(clang-analyzer-cplusplus.NewDeleteLeaks) void AsyncEventSourceResponse::destroy(void *ptr) { auto *rsp = static_cast(ptr); diff --git a/esphome/components/web_server_idf/web_server_idf.h b/esphome/components/web_server_idf/web_server_idf.h index f2931fb507..cdb58c2f04 100644 --- a/esphome/components/web_server_idf/web_server_idf.h +++ b/esphome/components/web_server_idf/web_server_idf.h @@ -299,6 +299,9 @@ class AsyncEventSourceResponse { AsyncEventSourceResponse(const AsyncWebServerRequest *request, esphome::web_server_idf::AsyncEventSource *server, esphome::web_server::WebServer *ws); + // Main-loop only: sends initial ping/config/sorting_groups, starts entity iterator. + void start_session_main_loop_(); + void deq_push_back_with_dedup_(void *source, message_generator_t *message_generator); void process_deferred_queue_(); void process_buffer_(); @@ -335,6 +338,8 @@ class AsyncEventSource : public AsyncWebHandler { } // NOLINTNEXTLINE(readability-identifier-naming) void handleRequest(AsyncWebServerRequest *request) override; + // Callback runs on the main loop (not the httpd task) after the session's + // initial ping/config/sorting_groups have been sent. // NOLINTNEXTLINE(readability-identifier-naming) void onConnect(connect_handler_t &&cb) { this->on_connect_ = std::move(cb); } @@ -347,13 +352,18 @@ class AsyncEventSource : public AsyncWebHandler { size_t count() const { return this->sessions_.size(); } protected: + // Cold path: move sessions from pending_sessions_ into sessions_ and greet each one. + void __attribute__((noinline, cold)) adopt_pending_sessions_main_loop_(); + std::string url_; - // Use vector instead of set: SSE sessions are typically 1-5 connections (browsers, dashboards). - // Linear search is faster than red-black tree overhead for this small dataset. - // Only operations needed: add session, remove session, iterate sessions - no need for sorted order. + // Main-loop only. Vector: SSE sessions are 1-5 connections, linear search beats set. std::vector sessions_; + // Httpd-task intake; guarded by pending_mutex_, gated by has_pending_sessions_. + std::vector pending_sessions_; + Mutex pending_mutex_; connect_handler_t on_connect_{}; esphome::web_server::WebServer *web_server_; + std::atomic has_pending_sessions_{false}; }; #endif // USE_WEBSERVER