diff --git a/esphome/core/scheduler.cpp b/esphome/core/scheduler.cpp index db40ede78c6..44fc277ec82 100644 --- a/esphome/core/scheduler.cpp +++ b/esphome/core/scheduler.cpp @@ -454,6 +454,61 @@ void Scheduler::compact_defer_queue_locked_() { // (saves ~156 bytes flash). Erasing from the end is O(1) - no shifting needed. this->defer_queue_.erase(this->defer_queue_.begin() + remaining, this->defer_queue_.end()); } +void HOT Scheduler::process_defer_queue_slow_path_(uint32_t &now) { + // Process defer queue to guarantee FIFO execution order for deferred items. + // Previously, defer() used the heap which gave undefined order for equal timestamps, + // causing race conditions on multi-core systems (ESP32, BK7200). + // With the defer queue: + // - Deferred items (delay=0) go directly to defer_queue_ in set_timer_common_ + // - Items execute in exact order they were deferred (FIFO guarantee) + // - No deferred items exist in to_add_, so processing order doesn't affect correctness + // Single-core platforms don't use this queue and fall back to the heap-based approach. + // + // Note: Items cancelled via cancel_item_locked_() are marked with remove=true but still + // processed here. They are skipped during execution by should_skip_item_(). + // This is intentional - no memory leak occurs. + // + // We use an index (defer_queue_front_) to track the read position instead of calling + // erase() on every pop, which would be O(n). The queue is processed once per loop - + // any items added during processing are left for the next loop iteration. + + // Merge lock acquisitions: instead of separate locks for move-out and recycle (2N+1 total), + // recycle each item after re-acquiring the lock for the next iteration (N+1 total). + // The lock is held across: recycle → loop condition → move-out, then released for execution. + SchedulerItem *item; + + this->lock_.lock(); + // Reset counter and snapshot queue end under lock + this->defer_count_clear_(); + size_t defer_queue_end = this->defer_queue_.size(); + if (this->defer_queue_front_ >= defer_queue_end) { + this->lock_.unlock(); + return; + } + while (this->defer_queue_front_ < defer_queue_end) { + // Take ownership of the item, leaving nullptr in the vector slot. + // This is safe because: + // 1. The vector is only cleaned up by cleanup_defer_queue_locked_() at the end of this function + // 2. Any code iterating defer_queue_ MUST check for nullptr items (see mark_matching_items_removed_locked_) + // 3. The lock protects concurrent access, but the nullptr remains until cleanup + item = this->defer_queue_[this->defer_queue_front_]; + this->defer_queue_[this->defer_queue_front_] = nullptr; + this->defer_queue_front_++; + this->lock_.unlock(); + + // Execute callback without holding lock to prevent deadlocks + // if the callback tries to call defer() again + if (!this->should_skip_item_(item)) { + now = this->execute_item_(item, now); + } + + this->lock_.lock(); + this->recycle_item_main_loop_(item); + } + // Clean up the queue (lock already held from last recycle or initial acquisition) + this->cleanup_defer_queue_locked_(); + this->lock_.unlock(); +} #endif /* not ESPHOME_THREAD_SINGLE */ void HOT Scheduler::call(uint32_t now) { @@ -613,11 +668,7 @@ void HOT Scheduler::call(uint32_t now) { } #endif } -void HOT Scheduler::process_to_add() { - // Fast path: skip lock acquisition when nothing to add. - // Worst case is a one-loop-iteration delay before newly added items are processed. - if (this->to_add_empty_()) - return; +void HOT Scheduler::process_to_add_slow_path_() { LockGuard guard{this->lock_}; for (auto *&it : this->to_add_) { if (is_item_removed_locked_(it)) { @@ -633,13 +684,7 @@ void HOT Scheduler::process_to_add() { this->to_add_.clear(); this->to_add_count_clear_(); } -bool HOT Scheduler::cleanup_() { - // Fast path: if nothing to remove, just check if items exist. - // Uses atomic load on platforms with atomics, falls back to always taking the lock otherwise. - // Worst case is a one-loop-iteration delay in cleanup. - if (this->to_remove_empty_()) - return !this->items_.empty(); - +bool HOT Scheduler::cleanup_slow_path_() { // We must hold the lock for the entire cleanup operation because: // 1. We're modifying items_ (via pop_raw_locked_) which requires exclusive access // 2. We're decrementing to_remove_ which is also modified by other threads diff --git a/esphome/core/scheduler.h b/esphome/core/scheduler.h index e545055fcae..36c853ad176 100644 --- a/esphome/core/scheduler.h +++ b/esphome/core/scheduler.h @@ -131,7 +131,18 @@ class Scheduler { // @param now Fresh timestamp from millis() - must not be stale/cached void call(uint32_t now); - void process_to_add(); + // Move items from to_add_ into the main heap. + // IMPORTANT: This method should only be called from the main thread (loop task). + // Inlined: the fast path (nothing to add) is just an atomic load / empty check. + // The lock-free fast path uses to_add_count_ (atomic) or to_add_.empty() + // (single-threaded). This is safe because the main loop is the only thread + // that reads to_add_ without holding lock_; other threads may read it only + // while holding the mutex (e.g. cancel_item_locked_). + inline void HOT process_to_add() { + if (this->to_add_empty_()) + return; + this->process_to_add_slow_path_(); + } // Name storage type discriminator for SchedulerItem // Used to distinguish between static strings, hashed strings, numeric IDs, and internal numeric IDs @@ -286,7 +297,20 @@ class Scheduler { // Cleanup logically deleted items from the scheduler // Returns true if items remain after cleanup // IMPORTANT: This method should only be called from the main thread (loop task). - bool cleanup_(); + // Inlined: the fast path (nothing to remove) is just an atomic load + empty check. + // Reading items_.empty() without the lock is safe here because only the main + // loop thread structurally modifies items_ (push/pop/erase). Other threads may + // iterate items_ and mark items removed under lock_, but never change the + // vector's size or data pointer. + inline bool HOT cleanup_() { + if (this->to_remove_empty_()) + return !this->items_.empty(); + return this->cleanup_slow_path_(); + } + // Slow path for cleanup_() when there are items to remove - defined in scheduler.cpp + bool cleanup_slow_path_(); + // Slow path for process_to_add() when there are items to merge - defined in scheduler.cpp + void process_to_add_slow_path_(); // Remove and return the front item from the heap as a raw pointer. // Caller takes ownership and must either recycle or delete the item. // IMPORTANT: Caller must hold the scheduler lock before calling this function. @@ -376,68 +400,20 @@ class Scheduler { #endif /* ESPHOME_DEBUG_SCHEDULER */ #ifndef ESPHOME_THREAD_SINGLE - // Helper to process defer queue - inline for performance in hot path - inline void process_defer_queue_(uint32_t &now) { - // Process defer queue first to guarantee FIFO execution order for deferred items. - // Previously, defer() used the heap which gave undefined order for equal timestamps, - // causing race conditions on multi-core systems (ESP32, BK7200). - // With the defer queue: - // - Deferred items (delay=0) go directly to defer_queue_ in set_timer_common_ - // - Items execute in exact order they were deferred (FIFO guarantee) - // - No deferred items exist in to_add_, so processing order doesn't affect correctness - // Single-core platforms don't use this queue and fall back to the heap-based approach. - // - // Note: Items cancelled via cancel_item_locked_() are marked with remove=true but still - // processed here. They are skipped during execution by should_skip_item_(). - // This is intentional - no memory leak occurs. - // - // We use an index (defer_queue_front_) to track the read position instead of calling - // erase() on every pop, which would be O(n). The queue is processed once per loop - - // any items added during processing are left for the next loop iteration. - + // Process defer queue for FIFO execution of deferred items. + // IMPORTANT: This method should only be called from the main thread (loop task). + // Inlined: the fast path (nothing deferred) is just an atomic load check. + inline void HOT process_defer_queue_(uint32_t &now) { // Fast path: nothing to process, avoid lock entirely. // Worst case is a one-loop-iteration delay before newly deferred items are processed. if (this->defer_empty_()) return; - - // Merge lock acquisitions: instead of separate locks for move-out and recycle (2N+1 total), - // recycle each item after re-acquiring the lock for the next iteration (N+1 total). - // The lock is held across: recycle → loop condition → move-out, then released for execution. - SchedulerItem *item; - - this->lock_.lock(); - // Reset counter and snapshot queue end under lock - this->defer_count_clear_(); - size_t defer_queue_end = this->defer_queue_.size(); - if (this->defer_queue_front_ >= defer_queue_end) { - this->lock_.unlock(); - return; - } - while (this->defer_queue_front_ < defer_queue_end) { - // Take ownership of the item, leaving nullptr in the vector slot. - // This is safe because: - // 1. The vector is only cleaned up by cleanup_defer_queue_locked_() at the end of this function - // 2. Any code iterating defer_queue_ MUST check for nullptr items (see mark_matching_items_removed_locked_) - // 3. The lock protects concurrent access, but the nullptr remains until cleanup - item = this->defer_queue_[this->defer_queue_front_]; - this->defer_queue_[this->defer_queue_front_] = nullptr; - this->defer_queue_front_++; - this->lock_.unlock(); - - // Execute callback without holding lock to prevent deadlocks - // if the callback tries to call defer() again - if (!this->should_skip_item_(item)) { - now = this->execute_item_(item, now); - } - - this->lock_.lock(); - this->recycle_item_main_loop_(item); - } - // Clean up the queue (lock already held from last recycle or initial acquisition) - this->cleanup_defer_queue_locked_(); - this->lock_.unlock(); + this->process_defer_queue_slow_path_(now); } + // Slow path for process_defer_queue_() - defined in scheduler.cpp + void process_defer_queue_slow_path_(uint32_t &now); + // Helper to cleanup defer_queue_ after processing. // Keeps the common clear() path inline, outlines the rare compaction to keep // cold code out of the hot instruction cache lines.