[mqtt] Fix data race on inbound event queue (#14891)

Co-authored-by: Jesse Hills <3060199+jesserockz@users.noreply.github.com>
This commit is contained in:
J. Nick Koston
2026-03-17 13:49:24 -10:00
committed by GitHub
parent 62f9bc79c4
commit 342020e1d3
2 changed files with 66 additions and 37 deletions
+24 -10
View File
@@ -82,10 +82,16 @@ bool MQTTBackendESP32::initialize_() {
void MQTTBackendESP32::loop() { void MQTTBackendESP32::loop() {
// process new events // process new events
// handle only 1 message per loop iteration // handle only 1 message per loop iteration
if (!mqtt_events_.empty()) { Event *event = this->mqtt_event_queue_.pop();
auto &event = mqtt_events_.front(); if (event != nullptr) {
mqtt_event_handler_(event); this->mqtt_event_handler_(*event);
mqtt_events_.pop(); this->mqtt_event_pool_.release(event);
}
// Log dropped inbound events (check is cheap - single atomic load in common case)
uint16_t inbound_dropped = this->mqtt_event_queue_.get_and_reset_dropped_count();
if (inbound_dropped > 0) {
ESP_LOGW(TAG, "Dropped %u inbound MQTT events", inbound_dropped);
} }
#if defined(USE_MQTT_IDF_ENQUEUE) #if defined(USE_MQTT_IDF_ENQUEUE)
@@ -183,10 +189,18 @@ void MQTTBackendESP32::mqtt_event_handler_(const Event &event) {
void MQTTBackendESP32::mqtt_event_handler(void *handler_args, esp_event_base_t base, int32_t event_id, void MQTTBackendESP32::mqtt_event_handler(void *handler_args, esp_event_base_t base, int32_t event_id,
void *event_data) { void *event_data) {
MQTTBackendESP32 *instance = static_cast<MQTTBackendESP32 *>(handler_args); MQTTBackendESP32 *instance = static_cast<MQTTBackendESP32 *>(handler_args);
// queue event to decouple processing // queue event to decouple processing from ESP-IDF MQTT task to main loop
if (instance) { if (instance) {
auto event = *static_cast<esp_mqtt_event_t *>(event_data); auto *event = instance->mqtt_event_pool_.allocate();
instance->mqtt_events_.emplace(event); if (event == nullptr) {
// Pool exhausted, drop event (counted via queue's dropped counter)
instance->mqtt_event_queue_.increment_dropped_count();
return;
}
event->populate(*static_cast<esp_mqtt_event_t *>(event_data));
// Push always succeeds: pool is sized to queue capacity (SIZE-1), so if
// allocate() returned non-null, the queue cannot be full.
instance->mqtt_event_queue_.push(event);
// Wake main loop immediately to process MQTT event instead of waiting for select() timeout // Wake main loop immediately to process MQTT event instead of waiting for select() timeout
#if defined(USE_SOCKET_SELECT_SUPPORT) && defined(USE_WAKE_LOOP_THREADSAFE) #if defined(USE_SOCKET_SELECT_SUPPORT) && defined(USE_WAKE_LOOP_THREADSAFE)
@@ -226,14 +240,14 @@ void MQTTBackendESP32::esphome_mqtt_task(void *params) {
break; break;
} }
} }
this_mqtt->mqtt_event_pool_.release(elem); this_mqtt->mqtt_outbound_pool_.release(elem);
} }
} }
} }
bool MQTTBackendESP32::enqueue_(MqttQueueTypeT type, const char *topic, int qos, bool retain, const char *payload, bool MQTTBackendESP32::enqueue_(MqttQueueTypeT type, const char *topic, int qos, bool retain, const char *payload,
size_t len) { size_t len) {
auto *elem = this->mqtt_event_pool_.allocate(); auto *elem = this->mqtt_outbound_pool_.allocate();
if (!elem) { if (!elem) {
// Queue is full - increment counter but don't log immediately. // Queue is full - increment counter but don't log immediately.
@@ -253,7 +267,7 @@ bool MQTTBackendESP32::enqueue_(MqttQueueTypeT type, const char *topic, int qos,
// Use the helper to allocate and copy data // Use the helper to allocate and copy data
if (!elem->set_data(topic, payload, len)) { if (!elem->set_data(topic, payload, len)) {
// Allocation failed, return elem to pool // Allocation failed, return elem to pool
this->mqtt_event_pool_.release(elem); this->mqtt_outbound_pool_.release(elem);
// Increment counter without logging to avoid cascade effect during memory pressure // Increment counter without logging to avoid cascade effect during memory pressure
this->mqtt_queue_.increment_dropped_count(); this->mqtt_queue_.increment_dropped_count();
return false; return false;
+42 -27
View File
@@ -5,7 +5,6 @@
#ifdef USE_ESP32 #ifdef USE_ESP32
#include <string> #include <string>
#include <queue>
#include <cstring> #include <cstring>
#include <mqtt_client.h> #include <mqtt_client.h>
#include <freertos/FreeRTOS.h> #include <freertos/FreeRTOS.h>
@@ -18,32 +17,39 @@
namespace esphome::mqtt { namespace esphome::mqtt {
struct Event { struct Event {
esp_mqtt_event_id_t event_id; esp_mqtt_event_id_t event_id{};
std::vector<char> data; std::vector<char> data;
int total_data_len; int total_data_len{0};
int current_data_offset; int current_data_offset{0};
std::string topic; std::string topic;
int msg_id; int msg_id{0};
bool retain; bool retain{false};
int qos; int qos{0};
bool dup; bool dup{false};
bool session_present; bool session_present{false};
esp_mqtt_error_codes_t error_handle; esp_mqtt_error_codes_t error_handle{};
// Construct from esp_mqtt_event_t // Populate from esp_mqtt_event_t
// Any pointer values that are unsafe to keep are converted to safe copies // Copies pointer-based data to owned storage for safe cross-thread transfer
Event(const esp_mqtt_event_t &event) void populate(const esp_mqtt_event_t &event) {
: event_id(event.event_id), this->event_id = event.event_id;
data(event.data, event.data + event.data_len), this->data.assign(event.data, event.data + event.data_len);
total_data_len(event.total_data_len), this->total_data_len = event.total_data_len;
current_data_offset(event.current_data_offset), this->current_data_offset = event.current_data_offset;
topic(event.topic, event.topic_len), this->topic.assign(event.topic, event.topic_len);
msg_id(event.msg_id), this->msg_id = event.msg_id;
retain(event.retain), this->retain = event.retain;
qos(event.qos), this->qos = event.qos;
dup(event.dup), this->dup = event.dup;
session_present(event.session_present), this->session_present = event.session_present;
error_handle(*event.error_handle) {} this->error_handle = *event.error_handle;
}
// Release owned resources for pool reuse (keeps allocated capacity for efficiency)
void release() {
this->data.clear();
this->topic.clear();
}
}; };
enum MqttQueueTypeT : uint8_t { enum MqttQueueTypeT : uint8_t {
@@ -118,7 +124,8 @@ class MQTTBackendESP32 final : public MQTTBackend {
static constexpr size_t TASK_STACK_SIZE = 3072; static constexpr size_t TASK_STACK_SIZE = 3072;
static constexpr size_t TASK_STACK_SIZE_TLS = 4096; // Larger stack for TLS operations static constexpr size_t TASK_STACK_SIZE_TLS = 4096; // Larger stack for TLS operations
static constexpr ssize_t TASK_PRIORITY = 5; static constexpr ssize_t TASK_PRIORITY = 5;
static constexpr uint8_t MQTT_QUEUE_LENGTH = 30; // 30*12 bytes = 360 static constexpr uint8_t MQTT_QUEUE_LENGTH = 30; // 30*12 bytes = 360
static constexpr uint8_t MQTT_EVENT_QUEUE_LENGTH = 32; // Inbound events from broker
void set_keep_alive(uint16_t keep_alive) final { this->keep_alive_ = keep_alive; } void set_keep_alive(uint16_t keep_alive) final { this->keep_alive_ = keep_alive; }
void set_client_id(const char *client_id) final { this->client_id_ = client_id; } void set_client_id(const char *client_id) final { this->client_id_ = client_id; }
@@ -251,7 +258,8 @@ class MQTTBackendESP32 final : public MQTTBackend {
bool skip_cert_cn_check_{false}; bool skip_cert_cn_check_{false};
#if defined(USE_MQTT_IDF_ENQUEUE) #if defined(USE_MQTT_IDF_ENQUEUE)
static void esphome_mqtt_task(void *params); static void esphome_mqtt_task(void *params);
EventPool<struct QueueElement, MQTT_QUEUE_LENGTH> mqtt_event_pool_; // Pool sized to queue capacity (SIZE-1) — see mqtt_event_pool_ comment.
EventPool<struct QueueElement, MQTT_QUEUE_LENGTH - 1> mqtt_outbound_pool_;
NotifyingLockFreeQueue<struct QueueElement, MQTT_QUEUE_LENGTH> mqtt_queue_; NotifyingLockFreeQueue<struct QueueElement, MQTT_QUEUE_LENGTH> mqtt_queue_;
TaskHandle_t task_handle_{nullptr}; TaskHandle_t task_handle_{nullptr};
bool enqueue_(MqttQueueTypeT type, const char *topic, int qos = 0, bool retain = false, const char *payload = NULL, bool enqueue_(MqttQueueTypeT type, const char *topic, int qos = 0, bool retain = false, const char *payload = NULL,
@@ -266,7 +274,14 @@ class MQTTBackendESP32 final : public MQTTBackend {
CallbackManager<on_message_callback_t> on_message_; CallbackManager<on_message_callback_t> on_message_;
CallbackManager<on_publish_user_callback_t> on_publish_; CallbackManager<on_publish_user_callback_t> on_publish_;
std::string cached_topic_; std::string cached_topic_;
std::queue<Event> mqtt_events_; // Pool sized to queue capacity (SIZE-1) because LockFreeQueue<T,N> is a ring
// buffer that holds N-1 elements (one slot distinguishes full from empty).
// This guarantees allocate() returns nullptr before push() can fail, which:
// 1. Prevents leaking a pool slot (the Nth allocate succeeds but push fails)
// 2. Avoids needing release() on the producer path after a failed push(),
// preserving the SPSC contract on the pool's internal free list
EventPool<Event, MQTT_EVENT_QUEUE_LENGTH - 1> mqtt_event_pool_;
LockFreeQueue<Event, MQTT_EVENT_QUEUE_LENGTH> mqtt_event_queue_;
#if defined(USE_MQTT_IDF_ENQUEUE) #if defined(USE_MQTT_IDF_ENQUEUE)
uint32_t last_dropped_log_time_{0}; uint32_t last_dropped_log_time_{0};