[sensor] Use FixedRingBuffer in SlidingWindowFilter, add window_size limit (#14736)

This commit is contained in:
J. Nick Koston
2026-03-13 10:17:40 -10:00
committed by GitHub
parent f41aa8b18c
commit a6c08576be
4 changed files with 171 additions and 61 deletions
+17 -17
View File
@@ -403,9 +403,9 @@ async def filter_out_filter_to_code(config, filter_id):
QUANTILE_SCHEMA = cv.All(
cv.Schema(
{
cv.Optional(CONF_WINDOW_SIZE, default=5): cv.positive_not_null_int,
cv.Optional(CONF_SEND_EVERY, default=5): cv.positive_not_null_int,
cv.Optional(CONF_SEND_FIRST_AT, default=1): cv.positive_not_null_int,
cv.Optional(CONF_WINDOW_SIZE, default=5): cv.int_range(min=1, max=65535),
cv.Optional(CONF_SEND_EVERY, default=5): cv.int_range(min=1, max=65535),
cv.Optional(CONF_SEND_FIRST_AT, default=1): cv.int_range(min=1, max=65535),
cv.Optional(CONF_QUANTILE, default=0.9): cv.zero_to_one_float,
}
),
@@ -427,9 +427,9 @@ async def quantile_filter_to_code(config, filter_id):
MEDIAN_SCHEMA = cv.All(
cv.Schema(
{
cv.Optional(CONF_WINDOW_SIZE, default=5): cv.positive_not_null_int,
cv.Optional(CONF_SEND_EVERY, default=5): cv.positive_not_null_int,
cv.Optional(CONF_SEND_FIRST_AT, default=1): cv.positive_not_null_int,
cv.Optional(CONF_WINDOW_SIZE, default=5): cv.int_range(min=1, max=65535),
cv.Optional(CONF_SEND_EVERY, default=5): cv.int_range(min=1, max=65535),
cv.Optional(CONF_SEND_FIRST_AT, default=1): cv.int_range(min=1, max=65535),
}
),
validate_send_first_at,
@@ -449,9 +449,9 @@ async def median_filter_to_code(config, filter_id):
MIN_SCHEMA = cv.All(
cv.Schema(
{
cv.Optional(CONF_WINDOW_SIZE, default=5): cv.positive_not_null_int,
cv.Optional(CONF_SEND_EVERY, default=5): cv.positive_not_null_int,
cv.Optional(CONF_SEND_FIRST_AT, default=1): cv.positive_not_null_int,
cv.Optional(CONF_WINDOW_SIZE, default=5): cv.int_range(min=1, max=65535),
cv.Optional(CONF_SEND_EVERY, default=5): cv.int_range(min=1, max=65535),
cv.Optional(CONF_SEND_FIRST_AT, default=1): cv.int_range(min=1, max=65535),
}
),
validate_send_first_at,
@@ -483,9 +483,9 @@ async def min_filter_to_code(config, filter_id):
MAX_SCHEMA = cv.All(
cv.Schema(
{
cv.Optional(CONF_WINDOW_SIZE, default=5): cv.positive_not_null_int,
cv.Optional(CONF_SEND_EVERY, default=5): cv.positive_not_null_int,
cv.Optional(CONF_SEND_FIRST_AT, default=1): cv.positive_not_null_int,
cv.Optional(CONF_WINDOW_SIZE, default=5): cv.int_range(min=1, max=65535),
cv.Optional(CONF_SEND_EVERY, default=5): cv.int_range(min=1, max=65535),
cv.Optional(CONF_SEND_FIRST_AT, default=1): cv.int_range(min=1, max=65535),
}
),
validate_send_first_at,
@@ -509,9 +509,9 @@ async def max_filter_to_code(config, filter_id):
SLIDING_AVERAGE_SCHEMA = cv.All(
cv.Schema(
{
cv.Optional(CONF_WINDOW_SIZE, default=15): cv.positive_not_null_int,
cv.Optional(CONF_SEND_EVERY, default=15): cv.positive_not_null_int,
cv.Optional(CONF_SEND_FIRST_AT, default=1): cv.positive_not_null_int,
cv.Optional(CONF_WINDOW_SIZE, default=15): cv.int_range(min=1, max=65535),
cv.Optional(CONF_SEND_EVERY, default=15): cv.int_range(min=1, max=65535),
cv.Optional(CONF_SEND_FIRST_AT, default=1): cv.int_range(min=1, max=65535),
}
),
validate_send_first_at,
@@ -540,8 +540,8 @@ EXPONENTIAL_AVERAGE_SCHEMA = cv.All(
cv.Schema(
{
cv.Optional(CONF_ALPHA, default=0.1): cv.positive_float,
cv.Optional(CONF_SEND_EVERY, default=15): cv.positive_not_null_int,
cv.Optional(CONF_SEND_FIRST_AT, default=1): cv.positive_not_null_int,
cv.Optional(CONF_SEND_EVERY, default=15): cv.int_range(min=1, max=65535),
cv.Optional(CONF_SEND_FIRST_AT, default=1): cv.int_range(min=1, max=65535),
}
),
validate_send_first_at,
+10 -24
View File
@@ -41,26 +41,14 @@ void Filter::initialize(Sensor *parent, Filter *next) {
}
// SlidingWindowFilter
SlidingWindowFilter::SlidingWindowFilter(size_t window_size, size_t send_every, size_t send_first_at)
: window_size_(window_size), send_every_(send_every), send_at_(send_every - send_first_at) {
// Allocate ring buffer once at initialization
SlidingWindowFilter::SlidingWindowFilter(uint16_t window_size, uint16_t send_every, uint16_t send_first_at)
: send_every_(send_every), send_at_(send_every - send_first_at) {
this->window_.init(window_size);
}
optional<float> SlidingWindowFilter::new_value(float value) {
// Add value to ring buffer
if (this->window_count_ < this->window_size_) {
// Buffer not yet full - just append
this->window_.push_back(value);
this->window_count_++;
} else {
// Buffer full - overwrite oldest value (ring buffer)
this->window_[this->window_head_] = value;
this->window_head_++;
if (this->window_head_ >= this->window_size_) {
this->window_head_ = 0;
}
}
// Add value to ring buffer (overwrites oldest when full)
this->window_.push_overwrite(value);
// Check if we should send a result
if (++this->send_at_ >= this->send_every_) {
@@ -77,9 +65,8 @@ FixedVector<float> SortedWindowFilter::get_window_values_() {
// Copy window without NaN values using FixedVector (no heap allocation)
// Returns unsorted values - caller will use std::nth_element for partial sorting as needed
FixedVector<float> values;
values.init(this->window_count_);
for (size_t i = 0; i < this->window_count_; i++) {
float v = this->window_[i];
values.init(this->window_.size());
for (float v : this->window_) {
if (!std::isnan(v)) {
values.push_back(v);
}
@@ -150,8 +137,7 @@ float MaxFilter::compute_result() { return this->find_extremum_<std::greater<flo
float SlidingWindowMovingAverageFilter::compute_result() {
float sum = 0;
size_t valid_count = 0;
for (size_t i = 0; i < this->window_count_; i++) {
float v = this->window_[i];
for (float v : this->window_) {
if (!std::isnan(v)) {
sum += v;
valid_count++;
@@ -161,7 +147,7 @@ float SlidingWindowMovingAverageFilter::compute_result() {
}
// ExponentialMovingAverageFilter
ExponentialMovingAverageFilter::ExponentialMovingAverageFilter(float alpha, size_t send_every, size_t send_first_at)
ExponentialMovingAverageFilter::ExponentialMovingAverageFilter(float alpha, uint16_t send_every, uint16_t send_first_at)
: alpha_(alpha), send_every_(send_every), send_at_(send_every - send_first_at) {}
optional<float> ExponentialMovingAverageFilter::new_value(float value) {
if (!std::isnan(value)) {
@@ -183,7 +169,7 @@ optional<float> ExponentialMovingAverageFilter::new_value(float value) {
}
return {};
}
void ExponentialMovingAverageFilter::set_send_every(size_t send_every) { this->send_every_ = send_every; }
void ExponentialMovingAverageFilter::set_send_every(uint16_t send_every) { this->send_every_ = send_every; }
void ExponentialMovingAverageFilter::set_alpha(float alpha) { this->alpha_ = alpha; }
// ThrottleAverageFilter
@@ -511,7 +497,7 @@ optional<float> ToNTCTemperatureFilter::new_value(float value) {
}
// StreamingFilter (base class)
StreamingFilter::StreamingFilter(size_t window_size, size_t send_first_at)
StreamingFilter::StreamingFilter(uint16_t window_size, uint16_t send_first_at)
: window_size_(window_size), send_first_at_(send_first_at) {}
optional<float> StreamingFilter::new_value(float value) {
+14 -19
View File
@@ -52,7 +52,7 @@ class Filter {
*/
class SlidingWindowFilter : public Filter {
public:
SlidingWindowFilter(size_t window_size, size_t send_every, size_t send_first_at);
SlidingWindowFilter(uint16_t window_size, uint16_t send_every, uint16_t send_first_at);
optional<float> new_value(float value) final;
@@ -60,14 +60,10 @@ class SlidingWindowFilter : public Filter {
/// Called by new_value() to compute the filtered result from the current window
virtual float compute_result() = 0;
/// Access the sliding window values (ring buffer implementation)
/// Use: for (size_t i = 0; i < window_count_; i++) { float val = window_[i]; }
FixedVector<float> window_;
size_t window_head_{0}; ///< Index where next value will be written
size_t window_count_{0}; ///< Number of valid values in window (0 to window_size_)
size_t window_size_; ///< Maximum window size
size_t send_every_; ///< Send result every N values
size_t send_at_; ///< Counter for send_every
/// Sliding window ring buffer - automatically overwrites oldest values when full
FixedRingBuffer<float> window_;
uint16_t send_every_; ///< Send result every N values
uint16_t send_at_; ///< Counter for send_every
};
/** Base class for Min/Max filters.
@@ -84,8 +80,7 @@ class MinMaxFilter : public SlidingWindowFilter {
template<typename Compare> float find_extremum_() {
float result = NAN;
Compare comp;
for (size_t i = 0; i < this->window_count_; i++) {
float v = this->window_[i];
for (float v : this->window_) {
if (!std::isnan(v)) {
result = std::isnan(result) ? v : (comp(v, result) ? v : result);
}
@@ -239,18 +234,18 @@ class SlidingWindowMovingAverageFilter : public SlidingWindowFilter {
*/
class ExponentialMovingAverageFilter : public Filter {
public:
ExponentialMovingAverageFilter(float alpha, size_t send_every, size_t send_first_at);
ExponentialMovingAverageFilter(float alpha, uint16_t send_every, uint16_t send_first_at);
optional<float> new_value(float value) override;
void set_send_every(size_t send_every);
void set_send_every(uint16_t send_every);
void set_alpha(float alpha);
protected:
float accumulator_{NAN};
float alpha_;
size_t send_every_;
size_t send_at_;
uint16_t send_every_;
uint16_t send_at_;
bool first_value_{true};
};
@@ -570,7 +565,7 @@ class ToNTCTemperatureFilter : public Filter {
*/
class StreamingFilter : public Filter {
public:
StreamingFilter(size_t window_size, size_t send_first_at);
StreamingFilter(uint16_t window_size, uint16_t send_first_at);
optional<float> new_value(float value) final;
@@ -584,9 +579,9 @@ class StreamingFilter : public Filter {
/// Called by new_value() to reset internal state after sending a result
virtual void reset_batch() = 0;
size_t window_size_;
size_t count_{0};
size_t send_first_at_;
uint16_t window_size_;
uint16_t count_{0};
uint16_t send_first_at_;
bool first_send_{true};
};
+130 -1
View File
@@ -301,7 +301,7 @@ template<typename T, size_t N> class StaticVector {
/// Not thread-safe. All access (push/pop/iteration) must occur from a single
/// context, or the caller must provide external synchronization.
template<typename T, size_t N> class StaticRingBuffer {
using index_type = std::conditional_t<(N <= 255), uint8_t, uint16_t>;
using index_type = std::conditional_t<(N <= std::numeric_limits<uint8_t>::max()), uint8_t, uint16_t>;
public:
class Iterator {
@@ -356,6 +356,13 @@ template<typename T, size_t N> class StaticRingBuffer {
index_type size() const { return this->count_; }
bool empty() const { return this->count_ == 0; }
/// Clear all elements (reset to empty)
void clear() {
this->head_ = 0;
this->tail_ = 0;
this->count_ = 0;
}
Iterator begin() { return Iterator(this, 0); }
Iterator end() { return Iterator(this, this->count_); }
ConstIterator begin() const { return ConstIterator(this, 0); }
@@ -368,6 +375,128 @@ template<typename T, size_t N> class StaticRingBuffer {
index_type count_{0};
};
/// Fixed-capacity circular buffer - allocates once at runtime, never reallocates.
/// Runtime-sized equivalent of StaticRingBuffer - use when capacity is only known at initialization.
/// Supports FIFO push/pop and iteration over queued elements.
/// Not thread-safe.
template<typename T, size_t MAX_CAPACITY = std::numeric_limits<uint16_t>::max()> class FixedRingBuffer {
using index_type = std::conditional_t<
(MAX_CAPACITY <= std::numeric_limits<uint8_t>::max()), uint8_t,
std::conditional_t<(MAX_CAPACITY <= std::numeric_limits<uint16_t>::max()), uint16_t, uint32_t>>;
public:
class Iterator {
public:
Iterator(FixedRingBuffer *buf, index_type pos) : buf_(buf), pos_(pos) {}
T &operator*() { return buf_->data_[(buf_->head_ + pos_) % buf_->capacity_]; }
Iterator &operator++() {
++pos_;
return *this;
}
bool operator!=(const Iterator &other) const { return pos_ != other.pos_; }
private:
FixedRingBuffer *buf_;
index_type pos_;
};
class ConstIterator {
public:
ConstIterator(const FixedRingBuffer *buf, index_type pos) : buf_(buf), pos_(pos) {}
const T &operator*() const { return buf_->data_[(buf_->head_ + pos_) % buf_->capacity_]; }
ConstIterator &operator++() {
++pos_;
return *this;
}
bool operator!=(const ConstIterator &other) const { return pos_ != other.pos_; }
private:
const FixedRingBuffer *buf_;
index_type pos_;
};
FixedRingBuffer() = default;
~FixedRingBuffer() {
if constexpr (std::is_trivial<T>::value) {
::operator delete(this->data_);
} else {
delete[] this->data_;
}
}
// Disable copy
FixedRingBuffer(const FixedRingBuffer &) = delete;
FixedRingBuffer &operator=(const FixedRingBuffer &) = delete;
/// Allocate capacity - can only be called once
void init(index_type capacity) {
if constexpr (std::is_trivial<T>::value) {
// Raw allocation without initialization (elements are written before read)
// NOLINTNEXTLINE(bugprone-sizeof-expression)
this->data_ = static_cast<T *>(::operator new(capacity * sizeof(T)));
} else {
this->data_ = new T[capacity];
}
this->capacity_ = capacity;
}
/// Push a value. Returns false if full.
bool push(const T &value) {
if (this->count_ >= this->capacity_)
return false;
this->data_[this->tail_] = value;
this->tail_ = (this->tail_ + 1) % this->capacity_;
++this->count_;
return true;
}
/// Push a value, overwriting the oldest if full.
void push_overwrite(const T &value) {
this->data_[this->tail_] = value;
this->tail_ = (this->tail_ + 1) % this->capacity_;
if (this->count_ >= this->capacity_) {
// Buffer full - advance head to drop oldest, count stays at capacity
this->head_ = this->tail_;
} else {
++this->count_;
}
}
/// Remove the oldest element.
void pop() {
if (this->count_ > 0) {
this->head_ = (this->head_ + 1) % this->capacity_;
--this->count_;
}
}
T &front() { return this->data_[this->head_]; }
const T &front() const { return this->data_[this->head_]; }
index_type size() const { return this->count_; }
bool empty() const { return this->count_ == 0; }
index_type capacity() const { return this->capacity_; }
bool full() const { return this->count_ == this->capacity_; }
/// Clear all elements (reset to empty, keep capacity)
void clear() {
this->head_ = 0;
this->tail_ = 0;
this->count_ = 0;
}
Iterator begin() { return Iterator(this, 0); }
Iterator end() { return Iterator(this, this->count_); }
ConstIterator begin() const { return ConstIterator(this, 0); }
ConstIterator end() const { return ConstIterator(this, this->count_); }
protected:
T *data_{nullptr};
index_type head_{0};
index_type tail_{0};
index_type count_{0};
index_type capacity_{0};
};
/// Fixed-capacity vector - allocates once at runtime, never reallocates
/// This avoids std::vector template overhead (_M_realloc_insert, _M_default_append)
/// when size is known at initialization but not at compile time