mirror of
https://github.com/esphome/esphome.git
synced 2026-06-01 01:19:45 +08:00
[api] Add send_sensor_state benchmarks (#15352)
This commit is contained in:
@@ -44,10 +44,22 @@ static constexpr size_t MAX_INITIAL_PER_BATCH = 34; // For clients >= AP
|
|||||||
static_assert(MAX_MESSAGES_PER_BATCH >= MAX_INITIAL_PER_BATCH,
|
static_assert(MAX_MESSAGES_PER_BATCH >= MAX_INITIAL_PER_BATCH,
|
||||||
"MAX_MESSAGES_PER_BATCH must be >= MAX_INITIAL_PER_BATCH");
|
"MAX_MESSAGES_PER_BATCH must be >= MAX_INITIAL_PER_BATCH");
|
||||||
|
|
||||||
|
#ifdef USE_BENCHMARK
|
||||||
|
class APIConnection;
|
||||||
|
void bench_enable_immediate_send(APIConnection *conn);
|
||||||
|
void bench_clear_batch(APIConnection *conn);
|
||||||
|
void bench_process_batch(APIConnection *conn);
|
||||||
|
#endif
|
||||||
|
|
||||||
class APIConnection final : public APIServerConnectionBase {
|
class APIConnection final : public APIServerConnectionBase {
|
||||||
public:
|
public:
|
||||||
friend class APIServer;
|
friend class APIServer;
|
||||||
friend class ListEntitiesIterator;
|
friend class ListEntitiesIterator;
|
||||||
|
#ifdef USE_BENCHMARK
|
||||||
|
friend void bench_enable_immediate_send(APIConnection *conn);
|
||||||
|
friend void bench_clear_batch(APIConnection *conn);
|
||||||
|
friend void bench_process_batch(APIConnection *conn);
|
||||||
|
#endif
|
||||||
APIConnection(std::unique_ptr<socket::Socket> socket, APIServer *parent);
|
APIConnection(std::unique_ptr<socket::Socket> socket, APIServer *parent);
|
||||||
~APIConnection();
|
~APIConnection();
|
||||||
|
|
||||||
|
|||||||
@@ -0,0 +1,67 @@
|
|||||||
|
#pragma once
|
||||||
|
|
||||||
|
#include <fcntl.h>
|
||||||
|
#include <netinet/in.h>
|
||||||
|
#include <netinet/tcp.h>
|
||||||
|
#include <sys/socket.h>
|
||||||
|
#include <unistd.h>
|
||||||
|
|
||||||
|
#include <memory>
|
||||||
|
#include <utility>
|
||||||
|
|
||||||
|
#include "esphome/components/socket/socket.h"
|
||||||
|
|
||||||
|
namespace esphome::api::benchmarks {
|
||||||
|
|
||||||
|
// Helper to drain accumulated data from the read side of a socket
|
||||||
|
// to prevent the write side from blocking.
|
||||||
|
inline void drain_socket(int fd) {
|
||||||
|
char buf[65536];
|
||||||
|
while (::read(fd, buf, sizeof(buf)) > 0) {
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create a TCP loopback socket pair. Returns the write-side Socket
|
||||||
|
// (wrapped for ESPHome) and the raw read-side fd for draining.
|
||||||
|
// Both ends are non-blocking with 16MB buffers.
|
||||||
|
inline std::pair<std::unique_ptr<socket::Socket>, int> create_tcp_loopback() {
|
||||||
|
// Create a TCP listener on loopback
|
||||||
|
int listen_fd = ::socket(AF_INET, SOCK_STREAM, 0);
|
||||||
|
int opt = 1;
|
||||||
|
::setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
|
||||||
|
|
||||||
|
struct sockaddr_in addr {};
|
||||||
|
addr.sin_family = AF_INET;
|
||||||
|
addr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
|
||||||
|
addr.sin_port = 0; // OS-assigned port
|
||||||
|
::bind(listen_fd, reinterpret_cast<struct sockaddr *>(&addr), sizeof(addr));
|
||||||
|
::listen(listen_fd, 1);
|
||||||
|
|
||||||
|
// Get the assigned port
|
||||||
|
socklen_t addr_len = sizeof(addr);
|
||||||
|
::getsockname(listen_fd, reinterpret_cast<struct sockaddr *>(&addr), &addr_len);
|
||||||
|
|
||||||
|
// Connect from client side
|
||||||
|
int write_fd = ::socket(AF_INET, SOCK_STREAM, 0);
|
||||||
|
::connect(write_fd, reinterpret_cast<struct sockaddr *>(&addr), sizeof(addr));
|
||||||
|
|
||||||
|
// Accept on server side (this is our read fd)
|
||||||
|
int read_fd = ::accept(listen_fd, nullptr, nullptr);
|
||||||
|
::close(listen_fd);
|
||||||
|
|
||||||
|
// Make both ends non-blocking
|
||||||
|
int flags = ::fcntl(write_fd, F_GETFL, 0);
|
||||||
|
::fcntl(write_fd, F_SETFL, flags | O_NONBLOCK);
|
||||||
|
flags = ::fcntl(read_fd, F_GETFL, 0);
|
||||||
|
::fcntl(read_fd, F_SETFL, flags | O_NONBLOCK);
|
||||||
|
|
||||||
|
// Use large socket buffers so benchmarks never hit WOULD_BLOCK
|
||||||
|
// during a single outer iteration (2000 × ~15B messages = ~30KB).
|
||||||
|
int bufsize = 16 * 1024 * 1024;
|
||||||
|
::setsockopt(write_fd, SOL_SOCKET, SO_SNDBUF, &bufsize, sizeof(bufsize));
|
||||||
|
::setsockopt(read_fd, SOL_SOCKET, SO_RCVBUF, &bufsize, sizeof(bufsize));
|
||||||
|
|
||||||
|
return {std::make_unique<socket::Socket>(write_fd), read_fd};
|
||||||
|
}
|
||||||
|
|
||||||
|
} // namespace esphome::api::benchmarks
|
||||||
@@ -2,12 +2,9 @@
|
|||||||
#ifdef USE_API_PLAINTEXT
|
#ifdef USE_API_PLAINTEXT
|
||||||
|
|
||||||
#include <benchmark/benchmark.h>
|
#include <benchmark/benchmark.h>
|
||||||
#include <fcntl.h>
|
|
||||||
#include <netinet/in.h>
|
|
||||||
#include <netinet/tcp.h>
|
|
||||||
#include <sys/socket.h>
|
|
||||||
#include <unistd.h>
|
#include <unistd.h>
|
||||||
|
|
||||||
|
#include "bench_helpers.h"
|
||||||
#include "esphome/components/api/api_frame_helper_plaintext.h"
|
#include "esphome/components/api/api_frame_helper_plaintext.h"
|
||||||
#include "esphome/components/api/api_pb2.h"
|
#include "esphome/components/api/api_pb2.h"
|
||||||
#include "esphome/components/api/api_buffer.h"
|
#include "esphome/components/api/api_buffer.h"
|
||||||
@@ -16,57 +13,12 @@ namespace esphome::api::benchmarks {
|
|||||||
|
|
||||||
static constexpr int kInnerIterations = 2000;
|
static constexpr int kInnerIterations = 2000;
|
||||||
|
|
||||||
// Helper to drain accumulated data from the read side of a socket
|
|
||||||
// to prevent the write side from blocking.
|
|
||||||
static void drain_socket(int fd) {
|
|
||||||
char buf[65536];
|
|
||||||
while (::read(fd, buf, sizeof(buf)) > 0) {
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Helper to create a TCP loopback connection with an APIPlaintextFrameHelper
|
// Helper to create a TCP loopback connection with an APIPlaintextFrameHelper
|
||||||
// on the write end. Returns the helper and the read-side fd.
|
// on the write end. Returns the helper and the read-side fd.
|
||||||
// Uses real TCP sockets so TCP_NODELAY succeeds during init().
|
|
||||||
static std::pair<std::unique_ptr<APIPlaintextFrameHelper>, int> create_plaintext_helper() {
|
static std::pair<std::unique_ptr<APIPlaintextFrameHelper>, int> create_plaintext_helper() {
|
||||||
// Create a TCP listener on loopback
|
auto [sock, read_fd] = create_tcp_loopback();
|
||||||
int listen_fd = ::socket(AF_INET, SOCK_STREAM, 0);
|
|
||||||
int opt = 1;
|
|
||||||
::setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
|
|
||||||
|
|
||||||
struct sockaddr_in addr {};
|
|
||||||
addr.sin_family = AF_INET;
|
|
||||||
addr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
|
|
||||||
addr.sin_port = 0; // OS-assigned port
|
|
||||||
::bind(listen_fd, reinterpret_cast<struct sockaddr *>(&addr), sizeof(addr));
|
|
||||||
::listen(listen_fd, 1);
|
|
||||||
|
|
||||||
// Get the assigned port
|
|
||||||
socklen_t addr_len = sizeof(addr);
|
|
||||||
::getsockname(listen_fd, reinterpret_cast<struct sockaddr *>(&addr), &addr_len);
|
|
||||||
|
|
||||||
// Connect from client side
|
|
||||||
int write_fd = ::socket(AF_INET, SOCK_STREAM, 0);
|
|
||||||
::connect(write_fd, reinterpret_cast<struct sockaddr *>(&addr), sizeof(addr));
|
|
||||||
|
|
||||||
// Accept on server side (this is our read fd)
|
|
||||||
int read_fd = ::accept(listen_fd, nullptr, nullptr);
|
|
||||||
::close(listen_fd);
|
|
||||||
|
|
||||||
// Make both ends non-blocking
|
|
||||||
int flags = ::fcntl(write_fd, F_GETFL, 0);
|
|
||||||
::fcntl(write_fd, F_SETFL, flags | O_NONBLOCK);
|
|
||||||
flags = ::fcntl(read_fd, F_GETFL, 0);
|
|
||||||
::fcntl(read_fd, F_SETFL, flags | O_NONBLOCK);
|
|
||||||
|
|
||||||
// Increase socket buffer sizes to reduce drain frequency
|
|
||||||
int bufsize = 1024 * 1024;
|
|
||||||
::setsockopt(write_fd, SOL_SOCKET, SO_SNDBUF, &bufsize, sizeof(bufsize));
|
|
||||||
::setsockopt(read_fd, SOL_SOCKET, SO_RCVBUF, &bufsize, sizeof(bufsize));
|
|
||||||
|
|
||||||
auto sock = std::make_unique<socket::Socket>(write_fd);
|
|
||||||
auto helper = std::make_unique<APIPlaintextFrameHelper>(std::move(sock));
|
auto helper = std::make_unique<APIPlaintextFrameHelper>(std::move(sock));
|
||||||
helper->init();
|
helper->init();
|
||||||
|
|
||||||
return {std::move(helper), read_fd};
|
return {std::move(helper), read_fd};
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -97,9 +49,6 @@ static void PlaintextFrame_WriteSensorState(benchmark::State &state) {
|
|||||||
msg.encode(writer);
|
msg.encode(writer);
|
||||||
|
|
||||||
helper->write_protobuf_packet(SensorStateResponse::MESSAGE_TYPE, writer);
|
helper->write_protobuf_packet(SensorStateResponse::MESSAGE_TYPE, writer);
|
||||||
|
|
||||||
if ((i & 0xFF) == 0)
|
|
||||||
drain_socket(read_fd);
|
|
||||||
}
|
}
|
||||||
drain_socket(read_fd);
|
drain_socket(read_fd);
|
||||||
benchmark::DoNotOptimize(helper.get());
|
benchmark::DoNotOptimize(helper.get());
|
||||||
@@ -144,9 +93,6 @@ static void PlaintextFrame_WriteBatch5(benchmark::State &state) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
helper->write_protobuf_messages(ProtoWriteBuffer(&buffer, 0), std::span<const MessageInfo>(messages, 5));
|
helper->write_protobuf_messages(ProtoWriteBuffer(&buffer, 0), std::span<const MessageInfo>(messages, 5));
|
||||||
|
|
||||||
if ((i & 0xFF) == 0)
|
|
||||||
drain_socket(read_fd);
|
|
||||||
}
|
}
|
||||||
drain_socket(read_fd);
|
drain_socket(read_fd);
|
||||||
benchmark::DoNotOptimize(helper.get());
|
benchmark::DoNotOptimize(helper.get());
|
||||||
|
|||||||
@@ -0,0 +1,191 @@
|
|||||||
|
#include "esphome/core/defines.h"
|
||||||
|
#if defined(USE_API_PLAINTEXT) && defined(USE_SENSOR)
|
||||||
|
|
||||||
|
#include <benchmark/benchmark.h>
|
||||||
|
#include <unistd.h>
|
||||||
|
|
||||||
|
#include "bench_helpers.h"
|
||||||
|
#include "esphome/components/api/api_connection.h"
|
||||||
|
#include "esphome/components/api/api_server.h"
|
||||||
|
#include "esphome/components/sensor/sensor.h"
|
||||||
|
|
||||||
|
namespace esphome::api {
|
||||||
|
|
||||||
|
// Friend functions declared in APIConnection for benchmark access.
|
||||||
|
void bench_enable_immediate_send(APIConnection *conn) { conn->flags_.should_try_send_immediately = true; }
|
||||||
|
void bench_clear_batch(APIConnection *conn) { conn->clear_batch_(); }
|
||||||
|
void bench_process_batch(APIConnection *conn) { conn->process_batch_(); }
|
||||||
|
|
||||||
|
} // namespace esphome::api
|
||||||
|
|
||||||
|
namespace esphome::api::benchmarks {
|
||||||
|
|
||||||
|
static constexpr int kInnerIterations = 2000;
|
||||||
|
|
||||||
|
// Helper to create a TCP loopback connection with an APIConnection.
|
||||||
|
// Returns the connection and the read-side fd for draining.
|
||||||
|
static std::pair<std::unique_ptr<APIConnection>, int> create_api_connection() {
|
||||||
|
auto [sock, read_fd] = create_tcp_loopback();
|
||||||
|
auto conn = std::make_unique<APIConnection>(std::move(sock), global_api_server);
|
||||||
|
conn->start();
|
||||||
|
return {std::move(conn), read_fd};
|
||||||
|
}
|
||||||
|
|
||||||
|
// Test subclass to access protected configure_entity_() for benchmark setup.
|
||||||
|
class TestSensor : public sensor::Sensor {
|
||||||
|
public:
|
||||||
|
void configure(const char *name) { this->configure_entity_(name, 0x12345678, 0); }
|
||||||
|
};
|
||||||
|
|
||||||
|
// --- send_sensor_state: immediate send path ---
|
||||||
|
// Measures: send_message_smart_ → prepare buffer → dispatch_message_ →
|
||||||
|
// try_send_sensor_state → fill key/device_id + proto encode → frame write →
|
||||||
|
// TCP send. This is the per-client cost when batch_delay=0 and initial states
|
||||||
|
// have been sent.
|
||||||
|
|
||||||
|
static void SendSensorState_Immediate(benchmark::State &state) {
|
||||||
|
auto [conn, read_fd] = create_api_connection();
|
||||||
|
bench_enable_immediate_send(conn.get());
|
||||||
|
// batch_delay must be 0 for should_send_immediately_ to return true
|
||||||
|
uint16_t saved_delay = global_api_server->get_batch_delay();
|
||||||
|
global_api_server->set_batch_delay(0);
|
||||||
|
|
||||||
|
TestSensor sensor;
|
||||||
|
sensor.configure("test_sensor");
|
||||||
|
sensor.publish_state(23.5f);
|
||||||
|
|
||||||
|
for (auto _ : state) {
|
||||||
|
for (int i = 0; i < kInnerIterations; i++) {
|
||||||
|
conn->send_sensor_state(&sensor);
|
||||||
|
}
|
||||||
|
drain_socket(read_fd);
|
||||||
|
benchmark::DoNotOptimize(conn.get());
|
||||||
|
}
|
||||||
|
state.SetItemsProcessed(state.iterations() * kInnerIterations);
|
||||||
|
|
||||||
|
global_api_server->set_batch_delay(saved_delay);
|
||||||
|
::close(read_fd);
|
||||||
|
}
|
||||||
|
BENCHMARK(SendSensorState_Immediate);
|
||||||
|
|
||||||
|
// --- send_sensor_state: batch path (cold — first call allocates) ---
|
||||||
|
// Measures: send_message_smart_ → schedule_message_ → deferred batch add.
|
||||||
|
// Includes one-time vector allocation cost.
|
||||||
|
|
||||||
|
static void SendSensorState_Batch_Cold(benchmark::State &state) {
|
||||||
|
auto [conn, read_fd] = create_api_connection();
|
||||||
|
|
||||||
|
TestSensor sensor;
|
||||||
|
sensor.configure("test_sensor");
|
||||||
|
sensor.publish_state(23.5f);
|
||||||
|
|
||||||
|
for (auto _ : state) {
|
||||||
|
for (int i = 0; i < kInnerIterations; i++) {
|
||||||
|
conn->send_sensor_state(&sensor);
|
||||||
|
}
|
||||||
|
benchmark::DoNotOptimize(conn.get());
|
||||||
|
}
|
||||||
|
state.SetItemsProcessed(state.iterations() * kInnerIterations);
|
||||||
|
|
||||||
|
::close(read_fd);
|
||||||
|
}
|
||||||
|
BENCHMARK(SendSensorState_Batch_Cold);
|
||||||
|
|
||||||
|
// --- send_sensor_state: batch path (warm — buffer already allocated) ---
|
||||||
|
// Measures steady-state batch cost after the vector has been allocated
|
||||||
|
// and cleared at least once. This is the typical path during normal
|
||||||
|
// operation after the first batch has been processed.
|
||||||
|
|
||||||
|
static void SendSensorState_Batch_Warm(benchmark::State &state) {
|
||||||
|
auto [conn, read_fd] = create_api_connection();
|
||||||
|
|
||||||
|
TestSensor sensor;
|
||||||
|
sensor.configure("test_sensor");
|
||||||
|
sensor.publish_state(23.5f);
|
||||||
|
|
||||||
|
// Warm up: send once to allocate, then clear to keep capacity
|
||||||
|
conn->send_sensor_state(&sensor);
|
||||||
|
bench_clear_batch(conn.get());
|
||||||
|
|
||||||
|
for (auto _ : state) {
|
||||||
|
for (int i = 0; i < kInnerIterations; i++) {
|
||||||
|
conn->send_sensor_state(&sensor);
|
||||||
|
}
|
||||||
|
benchmark::DoNotOptimize(conn.get());
|
||||||
|
}
|
||||||
|
state.SetItemsProcessed(state.iterations() * kInnerIterations);
|
||||||
|
|
||||||
|
::close(read_fd);
|
||||||
|
}
|
||||||
|
BENCHMARK(SendSensorState_Batch_Warm);
|
||||||
|
|
||||||
|
// --- process_batch_: single sensor state (encode + frame + write) ---
|
||||||
|
// Measures the deferred batch processing path: dispatch_message_ →
|
||||||
|
// try_send_sensor_state → fill + proto encode → send_buffer → frame write.
|
||||||
|
// This is the cost paid on the next loop() after batching.
|
||||||
|
|
||||||
|
static void ProcessBatch_SingleSensor(benchmark::State &state) {
|
||||||
|
auto [conn, read_fd] = create_api_connection();
|
||||||
|
|
||||||
|
TestSensor sensor;
|
||||||
|
sensor.configure("test_sensor");
|
||||||
|
sensor.publish_state(23.5f);
|
||||||
|
|
||||||
|
// Warm up batch vector
|
||||||
|
conn->send_sensor_state(&sensor);
|
||||||
|
bench_process_batch(conn.get());
|
||||||
|
drain_socket(read_fd);
|
||||||
|
|
||||||
|
for (auto _ : state) {
|
||||||
|
for (int i = 0; i < kInnerIterations; i++) {
|
||||||
|
conn->send_sensor_state(&sensor);
|
||||||
|
bench_process_batch(conn.get());
|
||||||
|
}
|
||||||
|
drain_socket(read_fd);
|
||||||
|
benchmark::DoNotOptimize(conn.get());
|
||||||
|
}
|
||||||
|
state.SetItemsProcessed(state.iterations() * kInnerIterations);
|
||||||
|
|
||||||
|
::close(read_fd);
|
||||||
|
}
|
||||||
|
BENCHMARK(ProcessBatch_SingleSensor);
|
||||||
|
|
||||||
|
// --- process_batch_: 5 different sensors ---
|
||||||
|
// Measures batch processing with multiple items queued.
|
||||||
|
// This exercises the multi-message path in process_batch_.
|
||||||
|
|
||||||
|
static void ProcessBatch_5Sensors(benchmark::State &state) {
|
||||||
|
auto [conn, read_fd] = create_api_connection();
|
||||||
|
|
||||||
|
TestSensor sensors[5];
|
||||||
|
for (int i = 0; i < 5; i++) {
|
||||||
|
char name[20];
|
||||||
|
snprintf(name, sizeof(name), "sensor_%d", i);
|
||||||
|
sensors[i].configure(name);
|
||||||
|
sensors[i].publish_state(23.5f + static_cast<float>(i));
|
||||||
|
}
|
||||||
|
|
||||||
|
// Warm up batch vector
|
||||||
|
for (auto &s : sensors)
|
||||||
|
conn->send_sensor_state(&s);
|
||||||
|
bench_process_batch(conn.get());
|
||||||
|
drain_socket(read_fd);
|
||||||
|
|
||||||
|
for (auto _ : state) {
|
||||||
|
for (int i = 0; i < kInnerIterations; i++) {
|
||||||
|
for (auto &s : sensors)
|
||||||
|
conn->send_sensor_state(&s);
|
||||||
|
bench_process_batch(conn.get());
|
||||||
|
}
|
||||||
|
drain_socket(read_fd);
|
||||||
|
benchmark::DoNotOptimize(conn.get());
|
||||||
|
}
|
||||||
|
state.SetItemsProcessed(state.iterations() * kInnerIterations);
|
||||||
|
|
||||||
|
::close(read_fd);
|
||||||
|
}
|
||||||
|
BENCHMARK(ProcessBatch_5Sensors);
|
||||||
|
|
||||||
|
} // namespace esphome::api::benchmarks
|
||||||
|
|
||||||
|
#endif // USE_API_PLAINTEXT && USE_SENSOR
|
||||||
Reference in New Issue
Block a user