[packet_transport] Minimise heap allocations (#14482)

This commit is contained in:
Clyde Stubbs
2026-03-05 14:24:01 +11:00
committed by GitHub
parent 0ff5270632
commit f5c37bf486
6 changed files with 659 additions and 37 deletions
@@ -0,0 +1,98 @@
#pragma once
#include <cstdint>
#include <cstring>
#include <cstdio>
#include <vector>
#include <gtest/gtest.h>
#include "esphome/components/packet_transport/packet_transport.h"
namespace esphome::packet_transport::testing {
// Protocol constants mirrored from packet_transport.cpp for test packet construction.
static constexpr uint16_t MAGIC_NUMBER = 0x4553;
static constexpr uint16_t MAGIC_PING = 0x5048;
// Concrete testable implementation of PacketTransport.
// Captures sent packets and exposes protected members for verification.
//
// Sensor round-trip tests require USE_SENSOR / USE_BINARY_SENSOR to be defined,
// which happens when 'sensor' and 'binary_sensor' components are in the build.
// Run with --all or include those components to enable the full test suite.
class TestablePacketTransport : public PacketTransport {
public:
using PacketTransport::add_key_;
using PacketTransport::data_;
using PacketTransport::encryption_key_;
using PacketTransport::flush_;
using PacketTransport::header_;
using PacketTransport::increment_code_;
using PacketTransport::init_data_;
using PacketTransport::is_encrypted_;
using PacketTransport::is_provider_;
using PacketTransport::name_;
using PacketTransport::ping_key_;
using PacketTransport::ping_keys_;
using PacketTransport::ping_pong_enable_;
using PacketTransport::ping_pong_recyle_time_;
using PacketTransport::process_;
using PacketTransport::providers_;
using PacketTransport::rolling_code_;
using PacketTransport::rolling_code_enable_;
using PacketTransport::send_data_;
using PacketTransport::updated_;
#ifdef USE_SENSOR
using PacketTransport::add_data_;
using PacketTransport::remote_sensors_;
using PacketTransport::sensors_;
#endif
#ifdef USE_BINARY_SENSOR
using PacketTransport::add_binary_data_;
using PacketTransport::binary_sensors_;
using PacketTransport::remote_binary_sensors_;
#endif
// NOTE: std::vector is used here for test convenience. For production code,
// consider using StaticVector or FixedVector from esphome/core/helpers.h instead.
mutable std::vector<std::vector<uint8_t>> sent_packets;
size_t max_packet_size{512};
bool send_enabled{true};
void send_packet(const std::vector<uint8_t> &buf) const override { this->sent_packets.push_back(buf); }
size_t get_max_packet_size() override { return this->max_packet_size; }
bool should_send() override { return this->send_enabled; }
/// Build the packet header for testing without requiring App or global_preferences.
void init_for_test(const char *name) {
this->name_ = name;
this->header_.clear();
// MAGIC_NUMBER as uint16_t little-endian
this->header_.push_back(MAGIC_NUMBER & 0xFF);
this->header_.push_back((MAGIC_NUMBER >> 8) & 0xFF);
// Length-prefixed hostname
auto len = strlen(name);
this->header_.push_back(static_cast<uint8_t>(len));
for (size_t i = 0; i < len; i++)
this->header_.push_back(name[i]);
// Pad to 4-byte boundary
while (this->header_.size() & 0x3)
this->header_.push_back(0);
}
};
/// Build a MAGIC_PING packet for testing add_key_ / ping-pong flows.
inline std::vector<uint8_t> build_ping_packet(const char *hostname, uint32_t key) {
std::vector<uint8_t> packet;
packet.push_back(MAGIC_PING & 0xFF);
packet.push_back((MAGIC_PING >> 8) & 0xFF);
auto len = strlen(hostname);
packet.push_back(static_cast<uint8_t>(len));
for (size_t i = 0; i < len; i++)
packet.push_back(hostname[i]);
packet.push_back(key & 0xFF);
packet.push_back((key >> 8) & 0xFF);
packet.push_back((key >> 16) & 0xFF);
packet.push_back((key >> 24) & 0xFF);
return packet;
}
} // namespace esphome::packet_transport::testing
@@ -0,0 +1,11 @@
# Extra component configuration required by C++ unit tests.
# Loaded by cpp_unit_test.py and merged into the test build config
# before validation, so that platform defines (USE_SENSOR, etc.) are generated.
sensor:
- platform: template
id: test_cpp_sensor
binary_sensor:
- platform: template
id: test_cpp_binary_sensor
@@ -0,0 +1,445 @@
#include "common.h"
namespace esphome::packet_transport::testing {
// --- Configuration setter tests ---
TEST(PacketTransportTest, SetIsProvider) {
TestablePacketTransport transport;
transport.set_is_provider(true);
EXPECT_TRUE(transport.is_provider_);
}
TEST(PacketTransportTest, SetEncryptionKey) {
TestablePacketTransport transport;
std::vector<uint8_t> key(32, 0xAB);
transport.set_encryption_key(key);
EXPECT_EQ(transport.encryption_key_, key);
EXPECT_TRUE(transport.is_encrypted_());
}
TEST(PacketTransportTest, NoEncryptionByDefault) {
TestablePacketTransport transport;
EXPECT_FALSE(transport.is_encrypted_());
}
TEST(PacketTransportTest, SetRollingCodeEnable) {
TestablePacketTransport transport;
transport.set_rolling_code_enable(true);
EXPECT_TRUE(transport.rolling_code_enable_);
}
TEST(PacketTransportTest, SetPingPongEnable) {
TestablePacketTransport transport;
transport.set_ping_pong_enable(true);
EXPECT_TRUE(transport.ping_pong_enable_);
}
TEST(PacketTransportTest, SetPingPongRecycleTime) {
TestablePacketTransport transport;
transport.set_ping_pong_recycle_time(600);
EXPECT_EQ(transport.ping_pong_recyle_time_, 600u);
}
// --- Provider management ---
TEST(PacketTransportTest, AddProvider) {
TestablePacketTransport transport;
transport.add_provider("host1");
EXPECT_TRUE(transport.providers_.contains("host1"));
EXPECT_EQ(transport.providers_.size(), 1u);
}
TEST(PacketTransportTest, AddProviderDuplicate) {
TestablePacketTransport transport;
transport.add_provider("host1");
transport.add_provider("host1");
EXPECT_EQ(transport.providers_.size(), 1u);
}
TEST(PacketTransportTest, SetProviderEncryption) {
TestablePacketTransport transport;
transport.add_provider("host1");
std::vector<uint8_t> key(32, 0xCD);
transport.set_provider_encryption("host1", key);
EXPECT_EQ(transport.providers_["host1"].encryption_key, key);
}
// --- Sensor management (requires USE_SENSOR / USE_BINARY_SENSOR) ---
#ifdef USE_SENSOR
TEST(PacketTransportTest, AddSensor) {
TestablePacketTransport transport;
sensor::Sensor s;
transport.add_sensor("temp", &s);
ASSERT_EQ(transport.sensors_.size(), 1u);
EXPECT_STREQ(transport.sensors_[0].id, "temp");
EXPECT_EQ(transport.sensors_[0].sensor, &s);
EXPECT_TRUE(transport.sensors_[0].updated);
}
TEST(PacketTransportTest, AddRemoteSensor) {
TestablePacketTransport transport;
sensor::Sensor s;
transport.add_remote_sensor("host1", "remote_temp", &s);
EXPECT_TRUE(transport.providers_.contains("host1"));
EXPECT_EQ(transport.remote_sensors_["host1"]["remote_temp"], &s);
}
#endif
#ifdef USE_BINARY_SENSOR
TEST(PacketTransportTest, AddBinarySensor) {
TestablePacketTransport transport;
binary_sensor::BinarySensor bs;
transport.add_binary_sensor("motion", &bs);
ASSERT_EQ(transport.binary_sensors_.size(), 1u);
EXPECT_STREQ(transport.binary_sensors_[0].id, "motion");
EXPECT_EQ(transport.binary_sensors_[0].sensor, &bs);
}
TEST(PacketTransportTest, AddRemoteBinarySensor) {
TestablePacketTransport transport;
binary_sensor::BinarySensor bs;
transport.add_remote_binary_sensor("host1", "remote_motion", &bs);
EXPECT_TRUE(transport.providers_.contains("host1"));
EXPECT_EQ(transport.remote_binary_sensors_["host1"]["remote_motion"], &bs);
}
#endif
// --- Unencrypted round-trip tests (require USE_SENSOR / USE_BINARY_SENSOR) ---
#ifdef USE_SENSOR
TEST(PacketTransportTest, UnencryptedSensorRoundTrip) {
// Encoder
TestablePacketTransport encoder;
encoder.init_for_test("sender");
sensor::Sensor local_sensor;
local_sensor.state = 42.5f;
encoder.add_sensor("temp", &local_sensor);
encoder.send_data_(true);
ASSERT_EQ(encoder.sent_packets.size(), 1u);
// Decoder
TestablePacketTransport decoder;
decoder.init_for_test("receiver");
sensor::Sensor remote_sensor;
remote_sensor.state = -999.0f; // sentinel
decoder.add_remote_sensor("sender", "temp", &remote_sensor);
auto &packet = encoder.sent_packets[0];
decoder.process_({packet.data(), packet.size()});
EXPECT_FLOAT_EQ(remote_sensor.state, 42.5f);
}
#endif
#ifdef USE_BINARY_SENSOR
TEST(PacketTransportTest, UnencryptedBinarySensorRoundTrip) {
TestablePacketTransport encoder;
encoder.init_for_test("sender");
binary_sensor::BinarySensor local_bs;
local_bs.state = true;
encoder.add_binary_sensor("motion", &local_bs);
encoder.send_data_(true);
ASSERT_EQ(encoder.sent_packets.size(), 1u);
TestablePacketTransport decoder;
decoder.init_for_test("receiver");
binary_sensor::BinarySensor remote_bs;
decoder.add_remote_binary_sensor("sender", "motion", &remote_bs);
auto &packet = encoder.sent_packets[0];
decoder.process_({packet.data(), packet.size()});
EXPECT_TRUE(remote_bs.state);
}
#endif
#if defined(USE_SENSOR) && defined(USE_BINARY_SENSOR)
TEST(PacketTransportTest, MultipleSensorsRoundTrip) {
TestablePacketTransport encoder;
encoder.init_for_test("sender");
sensor::Sensor s1, s2;
s1.state = 10.0f;
s2.state = 20.0f;
encoder.add_sensor("s1", &s1);
encoder.add_sensor("s2", &s2);
binary_sensor::BinarySensor bs1;
bs1.state = true;
encoder.add_binary_sensor("bs1", &bs1);
encoder.send_data_(true);
ASSERT_EQ(encoder.sent_packets.size(), 1u);
TestablePacketTransport decoder;
decoder.init_for_test("receiver");
sensor::Sensor rs1, rs2;
binary_sensor::BinarySensor rbs1;
rs1.state = -999.0f;
rs2.state = -999.0f;
decoder.add_remote_sensor("sender", "s1", &rs1);
decoder.add_remote_sensor("sender", "s2", &rs2);
decoder.add_remote_binary_sensor("sender", "bs1", &rbs1);
auto &packet = encoder.sent_packets[0];
decoder.process_({packet.data(), packet.size()});
EXPECT_FLOAT_EQ(rs1.state, 10.0f);
EXPECT_FLOAT_EQ(rs2.state, 20.0f);
EXPECT_TRUE(rbs1.state);
}
#endif
// --- Encrypted round-trip ---
#ifdef USE_SENSOR
TEST(PacketTransportTest, EncryptedSensorRoundTrip) {
std::vector<uint8_t> key(32);
for (int i = 0; i < 32; i++)
key[i] = i;
TestablePacketTransport encoder;
encoder.init_for_test("sender");
encoder.set_encryption_key(key);
sensor::Sensor local_sensor;
local_sensor.state = 99.9f;
encoder.add_sensor("temp", &local_sensor);
encoder.send_data_(true);
ASSERT_EQ(encoder.sent_packets.size(), 1u);
TestablePacketTransport decoder;
decoder.init_for_test("receiver");
sensor::Sensor remote_sensor;
remote_sensor.state = -999.0f;
decoder.add_remote_sensor("sender", "temp", &remote_sensor);
decoder.set_provider_encryption("sender", key);
auto &packet = encoder.sent_packets[0];
decoder.process_({packet.data(), packet.size()});
EXPECT_FLOAT_EQ(remote_sensor.state, 99.9f);
}
// --- Selective send ---
TEST(PacketTransportTest, SendDataOnlyUpdated) {
TestablePacketTransport encoder;
encoder.init_for_test("sender");
sensor::Sensor s1, s2;
s1.state = 1.0f;
s2.state = 2.0f;
encoder.add_sensor("s1", &s1);
encoder.add_sensor("s2", &s2);
// Mark s1 as not updated, only s2 as updated
encoder.sensors_[0].updated = false;
encoder.sensors_[1].updated = true;
encoder.send_data_(false);
ASSERT_EQ(encoder.sent_packets.size(), 1u);
TestablePacketTransport decoder;
decoder.init_for_test("receiver");
sensor::Sensor rs1, rs2;
rs1.state = -999.0f;
rs2.state = -999.0f;
decoder.add_remote_sensor("sender", "s1", &rs1);
decoder.add_remote_sensor("sender", "s2", &rs2);
auto &packet = encoder.sent_packets[0];
decoder.process_({packet.data(), packet.size()});
EXPECT_FLOAT_EQ(rs1.state, -999.0f); // not updated, not sent
EXPECT_FLOAT_EQ(rs2.state, 2.0f); // updated, sent
}
#endif
// --- Ping key tests ---
TEST(PacketTransportTest, PingKeyStoredWhenEncrypted) {
TestablePacketTransport transport;
transport.init_for_test("receiver");
transport.set_encryption_key(std::vector<uint8_t>(32, 0xAA));
auto ping = build_ping_packet("requester", 0xDEADBEEF);
transport.process_({ping.data(), ping.size()});
ASSERT_EQ(transport.ping_keys_.size(), 1u);
EXPECT_EQ(transport.ping_keys_["requester"], 0xDEADBEEFu);
}
TEST(PacketTransportTest, PingKeyIgnoredWhenNotEncrypted) {
TestablePacketTransport transport;
transport.init_for_test("receiver");
// No encryption key — add_key_ should be a no-op
auto ping = build_ping_packet("requester", 0xDEADBEEF);
transport.process_({ping.data(), ping.size()});
EXPECT_TRUE(transport.ping_keys_.empty());
}
TEST(PacketTransportTest, PingKeyUpdatedOnRepeat) {
TestablePacketTransport transport;
transport.init_for_test("receiver");
transport.set_encryption_key(std::vector<uint8_t>(32, 0xAA));
auto ping1 = build_ping_packet("host1", 0x1111);
transport.process_({ping1.data(), ping1.size()});
EXPECT_EQ(transport.ping_keys_["host1"], 0x1111u);
// Same host, new key value — should update in place
auto ping2 = build_ping_packet("host1", 0x2222);
transport.process_({ping2.data(), ping2.size()});
EXPECT_EQ(transport.ping_keys_.size(), 1u);
EXPECT_EQ(transport.ping_keys_["host1"], 0x2222u);
}
TEST(PacketTransportTest, PingKeyMaxLimit) {
TestablePacketTransport transport;
transport.init_for_test("receiver");
transport.set_encryption_key(std::vector<uint8_t>(32, 0xAA));
// Fill to MAX_PING_KEYS (4)
for (int i = 0; i < 4; i++) {
char name[16];
snprintf(name, sizeof(name), "host%d", i);
auto ping = build_ping_packet(name, 0x1000 + i);
transport.process_({ping.data(), ping.size()});
}
EXPECT_EQ(transport.ping_keys_.size(), 4u);
// 5th key should be discarded
auto ping = build_ping_packet("host4", 0x9999);
transport.process_({ping.data(), ping.size()});
EXPECT_EQ(transport.ping_keys_.size(), 4u);
EXPECT_FALSE(transport.ping_keys_.contains("host4"));
}
#ifdef USE_SENSOR
TEST(PacketTransportTest, PingKeyIncludedInTransmittedPacket) {
std::vector<uint8_t> key(32, 0xBB);
// Responder: encrypted, owns a sensor
TestablePacketTransport responder;
responder.init_for_test("responder");
responder.set_encryption_key(key);
sensor::Sensor local_sensor;
local_sensor.state = 77.7f;
responder.add_sensor("temp", &local_sensor);
// Requester sends a MAGIC_PING that the responder processes
auto ping = build_ping_packet("requester", 0xDEADBEEF);
responder.process_({ping.data(), ping.size()});
ASSERT_EQ(responder.ping_keys_.size(), 1u);
// Responder sends sensor data — ping key should be embedded
responder.send_data_(true);
ASSERT_EQ(responder.sent_packets.size(), 1u);
// Requester: encrypted provider, ping-pong enabled, expects key 0xDEADBEEF
TestablePacketTransport requester;
requester.init_for_test("requester");
requester.set_ping_pong_enable(true);
requester.ping_key_ = 0xDEADBEEF;
sensor::Sensor remote_sensor;
remote_sensor.state = -999.0f;
requester.add_remote_sensor("responder", "temp", &remote_sensor);
requester.set_provider_encryption("responder", key);
// The requester decrypts the packet and finds its ping key echoed back,
// which gates the sensor data — if the key is missing, data is blocked.
auto &packet = responder.sent_packets[0];
requester.process_({packet.data(), packet.size()});
EXPECT_FLOAT_EQ(remote_sensor.state, 77.7f);
}
TEST(PacketTransportTest, MissingPingKeyBlocksSensorData) {
std::vector<uint8_t> key(32, 0xBB);
// Responder sends data WITHOUT receiving any MAGIC_PING first — no ping keys
TestablePacketTransport responder;
responder.init_for_test("responder");
responder.set_encryption_key(key);
sensor::Sensor local_sensor;
local_sensor.state = 77.7f;
responder.add_sensor("temp", &local_sensor);
responder.send_data_(true);
ASSERT_EQ(responder.sent_packets.size(), 1u);
// Requester with ping-pong enabled expects a key that isn't in the packet
TestablePacketTransport requester;
requester.init_for_test("requester");
requester.set_ping_pong_enable(true);
requester.ping_key_ = 0xDEADBEEF;
sensor::Sensor remote_sensor;
remote_sensor.state = -999.0f;
requester.add_remote_sensor("responder", "temp", &remote_sensor);
requester.set_provider_encryption("responder", key);
auto &packet = responder.sent_packets[0];
requester.process_({packet.data(), packet.size()});
EXPECT_FLOAT_EQ(remote_sensor.state, -999.0f); // blocked — ping key not found
}
#endif
// --- Process error handling ---
TEST(PacketTransportTest, ProcessShortBuffer) {
TestablePacketTransport transport;
transport.init_for_test("receiver");
uint8_t buf[] = {0x53};
// Too short for a magic number - should return safely
transport.process_({buf, 1});
}
TEST(PacketTransportTest, ProcessBadMagic) {
TestablePacketTransport transport;
transport.init_for_test("receiver");
uint8_t buf[] = {0xFF, 0xFF, 0x00, 0x00};
// Wrong magic - should return safely
transport.process_({buf, sizeof(buf)});
}
TEST(PacketTransportTest, ProcessOwnHostname) {
TestablePacketTransport transport;
transport.init_for_test("myself");
// Build a packet from "myself" using a separate encoder
TestablePacketTransport fake_sender;
fake_sender.init_for_test("myself");
fake_sender.send_data_(true);
ASSERT_EQ(fake_sender.sent_packets.size(), 1u);
auto &packet = fake_sender.sent_packets[0];
// Should be silently ignored because hostname matches our own
transport.process_({packet.data(), packet.size()});
}
TEST(PacketTransportTest, ProcessUnknownHostname) {
TestablePacketTransport transport;
transport.init_for_test("receiver");
// No providers registered - "unknown" will not be found
TestablePacketTransport sender;
sender.init_for_test("unknown");
sender.send_data_(true);
ASSERT_EQ(sender.sent_packets.size(), 1u);
auto &packet = sender.sent_packets[0];
// Should return safely without crash
transport.process_({packet.data(), packet.size()});
}
// --- Send disabled ---
TEST(PacketTransportTest, NoSendWhenDisabled) {
TestablePacketTransport transport;
transport.init_for_test("sender");
transport.send_enabled = false;
transport.send_data_(true);
EXPECT_TRUE(transport.sent_packets.empty());
}
} // namespace esphome::packet_transport::testing