diff --git a/esphome/components/mqtt/mqtt_backend_esp32.cpp b/esphome/components/mqtt/mqtt_backend_esp32.cpp index 5642fd5f7b..ab067c4418 100644 --- a/esphome/components/mqtt/mqtt_backend_esp32.cpp +++ b/esphome/components/mqtt/mqtt_backend_esp32.cpp @@ -82,10 +82,16 @@ bool MQTTBackendESP32::initialize_() { void MQTTBackendESP32::loop() { // process new events // handle only 1 message per loop iteration - if (!mqtt_events_.empty()) { - auto &event = mqtt_events_.front(); - mqtt_event_handler_(event); - mqtt_events_.pop(); + Event *event = this->mqtt_event_queue_.pop(); + if (event != nullptr) { + this->mqtt_event_handler_(*event); + this->mqtt_event_pool_.release(event); + } + + // Log dropped inbound events (check is cheap - single atomic load in common case) + uint16_t inbound_dropped = this->mqtt_event_queue_.get_and_reset_dropped_count(); + if (inbound_dropped > 0) { + ESP_LOGW(TAG, "Dropped %u inbound MQTT events", inbound_dropped); } #if defined(USE_MQTT_IDF_ENQUEUE) @@ -183,10 +189,18 @@ void MQTTBackendESP32::mqtt_event_handler_(const Event &event) { void MQTTBackendESP32::mqtt_event_handler(void *handler_args, esp_event_base_t base, int32_t event_id, void *event_data) { MQTTBackendESP32 *instance = static_cast(handler_args); - // queue event to decouple processing + // queue event to decouple processing from ESP-IDF MQTT task to main loop if (instance) { - auto event = *static_cast(event_data); - instance->mqtt_events_.emplace(event); + auto *event = instance->mqtt_event_pool_.allocate(); + if (event == nullptr) { + // Pool exhausted, drop event (counted via queue's dropped counter) + instance->mqtt_event_queue_.increment_dropped_count(); + return; + } + event->populate(*static_cast(event_data)); + // Push always succeeds: pool is sized to queue capacity (SIZE-1), so if + // allocate() returned non-null, the queue cannot be full. + instance->mqtt_event_queue_.push(event); // Wake main loop immediately to process MQTT event instead of waiting for select() timeout #if defined(USE_SOCKET_SELECT_SUPPORT) && defined(USE_WAKE_LOOP_THREADSAFE) @@ -226,14 +240,14 @@ void MQTTBackendESP32::esphome_mqtt_task(void *params) { break; } } - this_mqtt->mqtt_event_pool_.release(elem); + this_mqtt->mqtt_outbound_pool_.release(elem); } } } bool MQTTBackendESP32::enqueue_(MqttQueueTypeT type, const char *topic, int qos, bool retain, const char *payload, size_t len) { - auto *elem = this->mqtt_event_pool_.allocate(); + auto *elem = this->mqtt_outbound_pool_.allocate(); if (!elem) { // Queue is full - increment counter but don't log immediately. @@ -253,7 +267,7 @@ bool MQTTBackendESP32::enqueue_(MqttQueueTypeT type, const char *topic, int qos, // Use the helper to allocate and copy data if (!elem->set_data(topic, payload, len)) { // Allocation failed, return elem to pool - this->mqtt_event_pool_.release(elem); + this->mqtt_outbound_pool_.release(elem); // Increment counter without logging to avoid cascade effect during memory pressure this->mqtt_queue_.increment_dropped_count(); return false; diff --git a/esphome/components/mqtt/mqtt_backend_esp32.h b/esphome/components/mqtt/mqtt_backend_esp32.h index 5c4dc413bd..58d1b29b32 100644 --- a/esphome/components/mqtt/mqtt_backend_esp32.h +++ b/esphome/components/mqtt/mqtt_backend_esp32.h @@ -5,7 +5,6 @@ #ifdef USE_ESP32 #include -#include #include #include #include @@ -18,32 +17,39 @@ namespace esphome::mqtt { struct Event { - esp_mqtt_event_id_t event_id; + esp_mqtt_event_id_t event_id{}; std::vector data; - int total_data_len; - int current_data_offset; + int total_data_len{0}; + int current_data_offset{0}; std::string topic; - int msg_id; - bool retain; - int qos; - bool dup; - bool session_present; - esp_mqtt_error_codes_t error_handle; + int msg_id{0}; + bool retain{false}; + int qos{0}; + bool dup{false}; + bool session_present{false}; + esp_mqtt_error_codes_t error_handle{}; - // Construct from esp_mqtt_event_t - // Any pointer values that are unsafe to keep are converted to safe copies - Event(const esp_mqtt_event_t &event) - : event_id(event.event_id), - data(event.data, event.data + event.data_len), - total_data_len(event.total_data_len), - current_data_offset(event.current_data_offset), - topic(event.topic, event.topic_len), - msg_id(event.msg_id), - retain(event.retain), - qos(event.qos), - dup(event.dup), - session_present(event.session_present), - error_handle(*event.error_handle) {} + // Populate from esp_mqtt_event_t + // Copies pointer-based data to owned storage for safe cross-thread transfer + void populate(const esp_mqtt_event_t &event) { + this->event_id = event.event_id; + this->data.assign(event.data, event.data + event.data_len); + this->total_data_len = event.total_data_len; + this->current_data_offset = event.current_data_offset; + this->topic.assign(event.topic, event.topic_len); + this->msg_id = event.msg_id; + this->retain = event.retain; + this->qos = event.qos; + this->dup = event.dup; + this->session_present = event.session_present; + this->error_handle = *event.error_handle; + } + + // Release owned resources for pool reuse (keeps allocated capacity for efficiency) + void release() { + this->data.clear(); + this->topic.clear(); + } }; enum MqttQueueTypeT : uint8_t { @@ -118,7 +124,8 @@ class MQTTBackendESP32 final : public MQTTBackend { static constexpr size_t TASK_STACK_SIZE = 3072; static constexpr size_t TASK_STACK_SIZE_TLS = 4096; // Larger stack for TLS operations static constexpr ssize_t TASK_PRIORITY = 5; - static constexpr uint8_t MQTT_QUEUE_LENGTH = 30; // 30*12 bytes = 360 + static constexpr uint8_t MQTT_QUEUE_LENGTH = 30; // 30*12 bytes = 360 + static constexpr uint8_t MQTT_EVENT_QUEUE_LENGTH = 32; // Inbound events from broker void set_keep_alive(uint16_t keep_alive) final { this->keep_alive_ = keep_alive; } void set_client_id(const char *client_id) final { this->client_id_ = client_id; } @@ -251,7 +258,8 @@ class MQTTBackendESP32 final : public MQTTBackend { bool skip_cert_cn_check_{false}; #if defined(USE_MQTT_IDF_ENQUEUE) static void esphome_mqtt_task(void *params); - EventPool mqtt_event_pool_; + // Pool sized to queue capacity (SIZE-1) — see mqtt_event_pool_ comment. + EventPool mqtt_outbound_pool_; NotifyingLockFreeQueue mqtt_queue_; TaskHandle_t task_handle_{nullptr}; bool enqueue_(MqttQueueTypeT type, const char *topic, int qos = 0, bool retain = false, const char *payload = NULL, @@ -266,7 +274,14 @@ class MQTTBackendESP32 final : public MQTTBackend { CallbackManager on_message_; CallbackManager on_publish_; std::string cached_topic_; - std::queue mqtt_events_; + // Pool sized to queue capacity (SIZE-1) because LockFreeQueue is a ring + // buffer that holds N-1 elements (one slot distinguishes full from empty). + // This guarantees allocate() returns nullptr before push() can fail, which: + // 1. Prevents leaking a pool slot (the Nth allocate succeeds but push fails) + // 2. Avoids needing release() on the producer path after a failed push(), + // preserving the SPSC contract on the pool's internal free list + EventPool mqtt_event_pool_; + LockFreeQueue mqtt_event_queue_; #if defined(USE_MQTT_IDF_ENQUEUE) uint32_t last_dropped_log_time_{0};