[cpptests] support testing platform components (#13075)

Co-authored-by: J. Nick Koston <nick@home-assistant.io>
Co-authored-by: J. Nick Koston <nick@koston.org>
This commit is contained in:
Javier Peletier
2026-03-10 03:41:02 +01:00
committed by GitHub
parent 00f809f5f0
commit e82f0f4432
11 changed files with 467 additions and 330 deletions
+15
View File
@@ -615,6 +615,10 @@ class EsphomeCore:
self.address_cache: AddressCache | None = None
# Cached config hash (computed lazily)
self._config_hash: int | None = None
# True if compiling for C++ unit tests
self.cpp_testing = False
# Allowlist of components whose to_code should run during C++ testing
self.cpp_testing_codegen: set[str] = set()
def reset(self):
from esphome.pins import PIN_SCHEMA_REGISTRY
@@ -644,6 +648,8 @@ class EsphomeCore:
self.current_component = None
self.address_cache = None
self._config_hash = None
self.cpp_testing = False
self.cpp_testing_codegen = set()
PIN_SCHEMA_REGISTRY.reset()
@contextmanager
@@ -987,6 +993,15 @@ class EsphomeCore:
"""
self.platform_counts[platform_name] += 1
def testing_ensure_platform_registered(self, platform_name: str) -> None:
"""Ensure a platform has at least one entity registered for testing.
Used during C++ test builds to guarantee USE_* defines are emitted
without needing a real component variable.
"""
if not self.platform_counts[platform_name]:
self.platform_counts[platform_name] = 1
def register_controller(self) -> None:
"""Track registration of a Controller for ControllerRegistry StaticVector sizing."""
controller_count = self.data.setdefault(KEY_CONTROLLER_REGISTRY_COUNT, 0)
+5
View File
@@ -71,6 +71,11 @@ class ComponentManifest:
@property
def to_code(self) -> Callable[[Any], None] | None:
if CORE.cpp_testing:
# During C++ testing, only run to_code for allowlisted components
name = self.module.__package__.rsplit(".", 1)[-1]
if name not in CORE.cpp_testing_codegen:
return None
return getattr(self.module, "to_code", None)
@property
+95 -58
View File
@@ -10,9 +10,10 @@ from helpers import get_all_components, get_all_dependencies, root_path
from esphome.__main__ import command_compile, parse_args
from esphome.config import validate_config
from esphome.const import CONF_PLATFORM
from esphome.core import CORE
from esphome.loader import get_component
from esphome.platformio_api import get_idedata
from esphome.yaml_util import load_yaml
# This must coincide with the version in /platformio.ini
PLATFORMIO_GOOGLE_TEST_LIB = "google/googletest@^1.15.2"
@@ -20,6 +21,13 @@ PLATFORMIO_GOOGLE_TEST_LIB = "google/googletest@^1.15.2"
# Path to /tests/components
COMPONENTS_TESTS_DIR: Path = Path(root_path) / "tests" / "components"
# Components whose to_code should run during C++ test builds.
# Most components don't need code generation for tests; only these
# essential ones (platform setup, logging, core config) are needed.
# Note: "core" is the esphome core config module (esphome/core/config.py),
# which registers under package name "core" not "esphome".
CPP_TESTING_CODEGEN_COMPONENTS = {"core", "host", "logger"}
def hash_components(components: list[str]) -> str:
key = ",".join(components)
@@ -30,12 +38,14 @@ def filter_components_without_tests(components: list[str]) -> list[str]:
"""Filter out components that do not have a corresponding test file.
This is done by checking if the component's directory contains at
least a .cpp file.
least a .cpp or .h file.
"""
filtered_components: list[str] = []
for component in components:
test_dir = COMPONENTS_TESTS_DIR / component
if test_dir.is_dir() and any(test_dir.glob("*.cpp")):
if test_dir.is_dir() and (
any(test_dir.glob("*.cpp")) or any(test_dir.glob("*.h"))
):
filtered_components.append(component)
else:
print(
@@ -45,38 +55,6 @@ def filter_components_without_tests(components: list[str]) -> list[str]:
return filtered_components
# Name of optional per-component YAML config merged into the test build
# before validation so that platform defines (USE_SENSOR, etc.) are generated.
CPP_TEST_CONFIG_FILE = "cpp_test.yaml"
def load_component_test_configs(components: list[str]) -> dict:
"""Load cpp_test.yaml files from test component directories.
These configs are merged into the base test config *before* validation
so that entity registration runs during code generation, which causes
the corresponding USE_* defines to be emitted.
"""
merged: dict = {}
for component in components:
config_file = COMPONENTS_TESTS_DIR / component / CPP_TEST_CONFIG_FILE
if not config_file.exists():
continue
component_config = load_yaml(config_file)
if not component_config:
continue
for key, value in component_config.items():
if (
key in merged
and isinstance(merged[key], list)
and isinstance(value, list)
):
merged[key].extend(value)
else:
merged[key] = value
return merged
def create_test_config(config_name: str, includes: list[str]) -> dict:
"""Create ESPHome test configuration for C++ unit tests.
@@ -113,11 +91,52 @@ def create_test_config(config_name: str, includes: list[str]) -> dict:
}
def get_platform_components(components: list[str]) -> list[str]:
"""Discover platform sub-components referenced by test directory structure.
For each component being tested, any sub-directory named after a platform
domain (e.g. ``sensor``, ``binary_sensor``) is treated as a request to
include that ``<domain>.<component>`` platform in the build. The sub-
directory must name a valid platform domain; anything else raises an error
so that typos are caught early.
Returns:
List of ``"domain.component"`` strings, one per discovered sub-directory.
"""
platform_components: list[str] = []
for component in components:
test_dir = COMPONENTS_TESTS_DIR / component
if not test_dir.is_dir():
continue
# Each sub-directory name is expected to be a platform domain
# (e.g. tests/components/bthome/sensor/ → sensor.bthome).
for domain_dir in test_dir.iterdir():
if not domain_dir.is_dir():
continue
domain = domain_dir.name
domain_module = get_component(domain)
if domain_module is None or not domain_module.is_platform_component:
raise ValueError(
f"Component tests for '{component}' reference non-existing or invalid domain '{domain}'"
f" in its directory structure. See ({COMPONENTS_TESTS_DIR / component / domain})."
)
platform_components.append(f"{domain}.{component}")
return platform_components
# Exit codes for run_tests
EXIT_OK = 0
EXIT_SKIPPED = 1
EXIT_COMPILE_ERROR = 2
EXIT_CONFIG_ERROR = 3
EXIT_NO_EXECUTABLE = 4
def run_tests(selected_components: list[str]) -> int:
# Skip tests on Windows
if os.name == "nt":
print("Skipping esphome tests on Windows", file=sys.stderr)
return 1
return EXIT_SKIPPED
# Remove components that do not have tests
components = filter_components_without_tests(selected_components)
@@ -127,45 +146,63 @@ def run_tests(selected_components: list[str]) -> int:
"No components specified or no tests found for the specified components.",
file=sys.stderr,
)
return 0
return EXIT_OK
components = sorted(components)
# Obtain possible dependencies for the requested components.
# Always include 'time' because USE_TIME_TIMEZONE is defined as a build flag,
# which causes core/time.h to include components/time/posix_tz.h.
components_with_dependencies = sorted(
get_all_dependencies(set(components) | {"time"})
)
# Build a list of include folders, one folder per component containing tests.
# A special replacement main.cpp is located in /tests/components/main.cpp
# Build a list of include folders relative to COMPONENTS_TESTS_DIR. These folders will
# be added along with their subfolders.
# "main.cpp" is a special entry that points to /tests/components/main.cpp,
# which provides a custom test runner entry-point replacing the default one.
# Each remaining entry is a component folder whose *.cpp files are compiled.
includes: list[str] = ["main.cpp"] + components
# Obtain a list of platform components to be tested:
try:
platform_components = get_platform_components(components)
except ValueError as e:
print(f"Error obtaining platform components: {e}")
return EXIT_CONFIG_ERROR
components = sorted(components + platform_components)
# Create a unique name for this config based on the actual components being tested
# to maximize cache during testing
config_name: str = "cpptests-" + hash_components(components)
config = create_test_config(config_name, includes)
# Obtain possible dependencies for the requested components.
# Always include 'time' because USE_TIME_TIMEZONE is defined as a build flag,
# which causes core/time.h to include components/time/posix_tz.h.
components_with_dependencies: list[str] = sorted(
get_all_dependencies(set(components) | {"time"}, cpp_testing=True)
)
# Merge component-specific test configs (e.g. sensor instances) before
# validation so that entity registration and USE_* defines work.
extra_config = load_component_test_configs(components)
config.update(extra_config)
config = create_test_config(config_name, includes)
CORE.config_path = COMPONENTS_TESTS_DIR / "dummy.yaml"
CORE.dashboard = None
CORE.cpp_testing = True
CORE.cpp_testing_codegen = CPP_TESTING_CODEGEN_COMPONENTS
# Validate config will expand the above with defaults:
config = validate_config(config, {})
# Add all components and dependencies to the base configuration after validation, so their files
# are added to the build. Use setdefault to avoid overwriting entries that were
# already validated (e.g. sensor instances from cpp_test.yaml).
for key in components_with_dependencies:
config.setdefault(key, {})
# are added to the build.
for component_name in components_with_dependencies:
if "." in component_name:
# Format is always "domain.component" (exactly one dot),
# as produced by get_platform_components().
domain, component = component_name.split(".", maxsplit=1)
domain_list = config.setdefault(domain, [])
CORE.testing_ensure_platform_registered(domain)
domain_list.append({CONF_PLATFORM: component})
else:
config.setdefault(component_name, [])
print(f"Testing components: {', '.join(components)}")
dependencies = set(components_with_dependencies) - set(components)
deps_str = ", ".join(dependencies) if dependencies else "None"
print(f"Testing components: {', '.join(components)}. Dependencies: {deps_str}")
CORE.config = config
args = parse_args(["program", "compile", str(CORE.config_path)])
try:
@@ -178,13 +215,13 @@ def run_tests(selected_components: list[str]) -> int:
print(
f"Error compiling unit tests for {', '.join(components)}. Check path. : {e}"
)
return 2
return EXIT_COMPILE_ERROR
# After a successful compilation, locate the executable and run it:
idedata = get_idedata(config)
if idedata is None:
print("Cannot find executable")
return 1
return EXIT_NO_EXECUTABLE
program_path: str = idedata.raw["prog_path"]
run_cmd: list[str] = [program_path]
+13 -2
View File
@@ -15,6 +15,8 @@ from typing import Any
import colorama
from esphome.loader import get_platform
root_path = os.path.abspath(os.path.normpath(os.path.join(__file__, "..", "..")))
basepath = os.path.join(root_path, "esphome")
temp_folder = os.path.join(root_path, ".temp")
@@ -624,11 +626,15 @@ def get_usable_cpu_count() -> int:
)
def get_all_dependencies(component_names: set[str]) -> set[str]:
def get_all_dependencies(
component_names: set[str], cpp_testing: bool = False
) -> set[str]:
"""Get all dependencies for a set of components.
Args:
component_names: Set of component names to get dependencies for
cpp_testing: If True, set CORE.cpp_testing so AUTO_LOAD callables that
conditionally include testing-only dependencies work correctly
Returns:
Set of all components including dependencies and auto-loaded components
@@ -646,6 +652,7 @@ def get_all_dependencies(component_names: set[str]) -> set[str]:
# Reset CORE to ensure clean state
CORE.reset()
CORE.cpp_testing = cpp_testing
# Set up fake config path for component loading
root = Path(__file__).parent.parent
@@ -660,7 +667,11 @@ def get_all_dependencies(component_names: set[str]) -> set[str]:
new_components: set[str] = set()
for comp_name in all_components:
comp = get_component(comp_name)
if "." in comp_name:
domain, platform = comp_name.split(".", maxsplit=1)
comp = get_platform(domain, platform)
else:
comp = get_component(comp_name)
if not comp:
continue
+13
View File
@@ -7,10 +7,23 @@
testing binaries that combine many components. By convention, this unique namespace is `esphome::component::testing`
(where "component" is the component under test), for example: `esphome::uart::testing`.
### Platform components
For components that expose to a platform component, create a folder under your component test folder with the platform component name, e.g. `binary_sensor` and
include the relevant `.cpp` and `.h` test files there.
### Override component code generation for testing
When generating code for testing, ESPHome won't invoke the component's `to_code` function, since most components do not
need to generate configuration code for testing.
If you do need to generate code to for example configure compilation flags or add libraries,
add the component name to the `CPP_TESTING_CODEGEN_COMPONENTS` allowlist in `script/cpp_unit_test.py`.
## Running component unit tests
(from the repository root)
```bash
./script/cpp_unit_test.py component1 component2 ...
```
@@ -0,0 +1,77 @@
#include "../common.h"
namespace esphome::packet_transport::testing {
TEST(PacketTransportBinarySensorTest, AddBinarySensor) {
TestablePacketTransport transport;
binary_sensor::BinarySensor bs;
transport.add_binary_sensor("motion", &bs);
ASSERT_EQ(transport.binary_sensors_.size(), 1u);
EXPECT_STREQ(transport.binary_sensors_[0].id, "motion");
EXPECT_EQ(transport.binary_sensors_[0].sensor, &bs);
}
TEST(PacketTransportBinarySensorTest, AddRemoteBinarySensor) {
TestablePacketTransport transport;
binary_sensor::BinarySensor bs;
transport.add_remote_binary_sensor("host1", "remote_motion", &bs);
EXPECT_TRUE(transport.providers_.contains("host1"));
EXPECT_EQ(transport.remote_binary_sensors_["host1"]["remote_motion"], &bs);
}
TEST(PacketTransportBinarySensorTest, UnencryptedBinarySensorRoundTrip) {
TestablePacketTransport encoder;
encoder.init_for_test("sender");
binary_sensor::BinarySensor local_bs;
local_bs.state = true;
encoder.add_binary_sensor("motion", &local_bs);
encoder.send_data_(true);
ASSERT_EQ(encoder.sent_packets.size(), 1u);
TestablePacketTransport decoder;
decoder.init_for_test("receiver");
binary_sensor::BinarySensor remote_bs;
decoder.add_remote_binary_sensor("sender", "motion", &remote_bs);
auto &packet = encoder.sent_packets[0];
decoder.process_({packet.data(), packet.size()});
EXPECT_TRUE(remote_bs.state);
}
TEST(PacketTransportBinarySensorTest, MultipleSensorsRoundTrip) {
TestablePacketTransport encoder;
encoder.init_for_test("sender");
sensor::Sensor s1, s2;
s1.state = 10.0f;
s2.state = 20.0f;
encoder.add_sensor("s1", &s1);
encoder.add_sensor("s2", &s2);
binary_sensor::BinarySensor bs1;
bs1.state = true;
encoder.add_binary_sensor("bs1", &bs1);
encoder.send_data_(true);
ASSERT_EQ(encoder.sent_packets.size(), 1u);
TestablePacketTransport decoder;
decoder.init_for_test("receiver");
sensor::Sensor rs1, rs2;
binary_sensor::BinarySensor rbs1;
rs1.state = -999.0f;
rs2.state = -999.0f;
decoder.add_remote_sensor("sender", "s1", &rs1);
decoder.add_remote_sensor("sender", "s2", &rs2);
decoder.add_remote_binary_sensor("sender", "bs1", &rbs1);
auto &packet = encoder.sent_packets[0];
decoder.process_({packet.data(), packet.size()});
EXPECT_FLOAT_EQ(rs1.state, 10.0f);
EXPECT_FLOAT_EQ(rs2.state, 20.0f);
EXPECT_TRUE(rbs1.state);
}
} // namespace esphome::packet_transport::testing
@@ -1,11 +0,0 @@
# Extra component configuration required by C++ unit tests.
# Loaded by cpp_unit_test.py and merged into the test build config
# before validation, so that platform defines (USE_SENSOR, etc.) are generated.
sensor:
- platform: template
id: test_cpp_sensor
binary_sensor:
- platform: template
id: test_cpp_binary_sensor
@@ -65,198 +65,6 @@ TEST(PacketTransportTest, SetProviderEncryption) {
EXPECT_EQ(transport.providers_["host1"].encryption_key, key);
}
// --- Sensor management (requires USE_SENSOR / USE_BINARY_SENSOR) ---
#ifdef USE_SENSOR
TEST(PacketTransportTest, AddSensor) {
TestablePacketTransport transport;
sensor::Sensor s;
transport.add_sensor("temp", &s);
ASSERT_EQ(transport.sensors_.size(), 1u);
EXPECT_STREQ(transport.sensors_[0].id, "temp");
EXPECT_EQ(transport.sensors_[0].sensor, &s);
EXPECT_TRUE(transport.sensors_[0].updated);
}
TEST(PacketTransportTest, AddRemoteSensor) {
TestablePacketTransport transport;
sensor::Sensor s;
transport.add_remote_sensor("host1", "remote_temp", &s);
EXPECT_TRUE(transport.providers_.contains("host1"));
EXPECT_EQ(transport.remote_sensors_["host1"]["remote_temp"], &s);
}
#endif
#ifdef USE_BINARY_SENSOR
TEST(PacketTransportTest, AddBinarySensor) {
TestablePacketTransport transport;
binary_sensor::BinarySensor bs;
transport.add_binary_sensor("motion", &bs);
ASSERT_EQ(transport.binary_sensors_.size(), 1u);
EXPECT_STREQ(transport.binary_sensors_[0].id, "motion");
EXPECT_EQ(transport.binary_sensors_[0].sensor, &bs);
}
TEST(PacketTransportTest, AddRemoteBinarySensor) {
TestablePacketTransport transport;
binary_sensor::BinarySensor bs;
transport.add_remote_binary_sensor("host1", "remote_motion", &bs);
EXPECT_TRUE(transport.providers_.contains("host1"));
EXPECT_EQ(transport.remote_binary_sensors_["host1"]["remote_motion"], &bs);
}
#endif
// --- Unencrypted round-trip tests (require USE_SENSOR / USE_BINARY_SENSOR) ---
#ifdef USE_SENSOR
TEST(PacketTransportTest, UnencryptedSensorRoundTrip) {
// Encoder
TestablePacketTransport encoder;
encoder.init_for_test("sender");
sensor::Sensor local_sensor;
local_sensor.state = 42.5f;
encoder.add_sensor("temp", &local_sensor);
encoder.send_data_(true);
ASSERT_EQ(encoder.sent_packets.size(), 1u);
// Decoder
TestablePacketTransport decoder;
decoder.init_for_test("receiver");
sensor::Sensor remote_sensor;
remote_sensor.state = -999.0f; // sentinel
decoder.add_remote_sensor("sender", "temp", &remote_sensor);
auto &packet = encoder.sent_packets[0];
decoder.process_({packet.data(), packet.size()});
EXPECT_FLOAT_EQ(remote_sensor.state, 42.5f);
}
#endif
#ifdef USE_BINARY_SENSOR
TEST(PacketTransportTest, UnencryptedBinarySensorRoundTrip) {
TestablePacketTransport encoder;
encoder.init_for_test("sender");
binary_sensor::BinarySensor local_bs;
local_bs.state = true;
encoder.add_binary_sensor("motion", &local_bs);
encoder.send_data_(true);
ASSERT_EQ(encoder.sent_packets.size(), 1u);
TestablePacketTransport decoder;
decoder.init_for_test("receiver");
binary_sensor::BinarySensor remote_bs;
decoder.add_remote_binary_sensor("sender", "motion", &remote_bs);
auto &packet = encoder.sent_packets[0];
decoder.process_({packet.data(), packet.size()});
EXPECT_TRUE(remote_bs.state);
}
#endif
#if defined(USE_SENSOR) && defined(USE_BINARY_SENSOR)
TEST(PacketTransportTest, MultipleSensorsRoundTrip) {
TestablePacketTransport encoder;
encoder.init_for_test("sender");
sensor::Sensor s1, s2;
s1.state = 10.0f;
s2.state = 20.0f;
encoder.add_sensor("s1", &s1);
encoder.add_sensor("s2", &s2);
binary_sensor::BinarySensor bs1;
bs1.state = true;
encoder.add_binary_sensor("bs1", &bs1);
encoder.send_data_(true);
ASSERT_EQ(encoder.sent_packets.size(), 1u);
TestablePacketTransport decoder;
decoder.init_for_test("receiver");
sensor::Sensor rs1, rs2;
binary_sensor::BinarySensor rbs1;
rs1.state = -999.0f;
rs2.state = -999.0f;
decoder.add_remote_sensor("sender", "s1", &rs1);
decoder.add_remote_sensor("sender", "s2", &rs2);
decoder.add_remote_binary_sensor("sender", "bs1", &rbs1);
auto &packet = encoder.sent_packets[0];
decoder.process_({packet.data(), packet.size()});
EXPECT_FLOAT_EQ(rs1.state, 10.0f);
EXPECT_FLOAT_EQ(rs2.state, 20.0f);
EXPECT_TRUE(rbs1.state);
}
#endif
// --- Encrypted round-trip ---
#ifdef USE_SENSOR
TEST(PacketTransportTest, EncryptedSensorRoundTrip) {
std::vector<uint8_t> key(32);
for (int i = 0; i < 32; i++)
key[i] = i;
TestablePacketTransport encoder;
encoder.init_for_test("sender");
encoder.set_encryption_key(key);
sensor::Sensor local_sensor;
local_sensor.state = 99.9f;
encoder.add_sensor("temp", &local_sensor);
encoder.send_data_(true);
ASSERT_EQ(encoder.sent_packets.size(), 1u);
TestablePacketTransport decoder;
decoder.init_for_test("receiver");
sensor::Sensor remote_sensor;
remote_sensor.state = -999.0f;
decoder.add_remote_sensor("sender", "temp", &remote_sensor);
decoder.set_provider_encryption("sender", key);
auto &packet = encoder.sent_packets[0];
decoder.process_({packet.data(), packet.size()});
EXPECT_FLOAT_EQ(remote_sensor.state, 99.9f);
}
// --- Selective send ---
TEST(PacketTransportTest, SendDataOnlyUpdated) {
TestablePacketTransport encoder;
encoder.init_for_test("sender");
sensor::Sensor s1, s2;
s1.state = 1.0f;
s2.state = 2.0f;
encoder.add_sensor("s1", &s1);
encoder.add_sensor("s2", &s2);
// Mark s1 as not updated, only s2 as updated
encoder.sensors_[0].updated = false;
encoder.sensors_[1].updated = true;
encoder.send_data_(false);
ASSERT_EQ(encoder.sent_packets.size(), 1u);
TestablePacketTransport decoder;
decoder.init_for_test("receiver");
sensor::Sensor rs1, rs2;
rs1.state = -999.0f;
rs2.state = -999.0f;
decoder.add_remote_sensor("sender", "s1", &rs1);
decoder.add_remote_sensor("sender", "s2", &rs2);
auto &packet = encoder.sent_packets[0];
decoder.process_({packet.data(), packet.size()});
EXPECT_FLOAT_EQ(rs1.state, -999.0f); // not updated, not sent
EXPECT_FLOAT_EQ(rs2.state, 2.0f); // updated, sent
}
#endif
// --- Ping key tests ---
TEST(PacketTransportTest, PingKeyStoredWhenEncrypted) {
@@ -319,73 +127,6 @@ TEST(PacketTransportTest, PingKeyMaxLimit) {
EXPECT_FALSE(transport.ping_keys_.contains("host4"));
}
#ifdef USE_SENSOR
TEST(PacketTransportTest, PingKeyIncludedInTransmittedPacket) {
std::vector<uint8_t> key(32, 0xBB);
// Responder: encrypted, owns a sensor
TestablePacketTransport responder;
responder.init_for_test("responder");
responder.set_encryption_key(key);
sensor::Sensor local_sensor;
local_sensor.state = 77.7f;
responder.add_sensor("temp", &local_sensor);
// Requester sends a MAGIC_PING that the responder processes
auto ping = build_ping_packet("requester", 0xDEADBEEF);
responder.process_({ping.data(), ping.size()});
ASSERT_EQ(responder.ping_keys_.size(), 1u);
// Responder sends sensor data — ping key should be embedded
responder.send_data_(true);
ASSERT_EQ(responder.sent_packets.size(), 1u);
// Requester: encrypted provider, ping-pong enabled, expects key 0xDEADBEEF
TestablePacketTransport requester;
requester.init_for_test("requester");
requester.set_ping_pong_enable(true);
requester.ping_key_ = 0xDEADBEEF;
sensor::Sensor remote_sensor;
remote_sensor.state = -999.0f;
requester.add_remote_sensor("responder", "temp", &remote_sensor);
requester.set_provider_encryption("responder", key);
// The requester decrypts the packet and finds its ping key echoed back,
// which gates the sensor data — if the key is missing, data is blocked.
auto &packet = responder.sent_packets[0];
requester.process_({packet.data(), packet.size()});
EXPECT_FLOAT_EQ(remote_sensor.state, 77.7f);
}
TEST(PacketTransportTest, MissingPingKeyBlocksSensorData) {
std::vector<uint8_t> key(32, 0xBB);
// Responder sends data WITHOUT receiving any MAGIC_PING first — no ping keys
TestablePacketTransport responder;
responder.init_for_test("responder");
responder.set_encryption_key(key);
sensor::Sensor local_sensor;
local_sensor.state = 77.7f;
responder.add_sensor("temp", &local_sensor);
responder.send_data_(true);
ASSERT_EQ(responder.sent_packets.size(), 1u);
// Requester with ping-pong enabled expects a key that isn't in the packet
TestablePacketTransport requester;
requester.init_for_test("requester");
requester.set_ping_pong_enable(true);
requester.ping_key_ = 0xDEADBEEF;
sensor::Sensor remote_sensor;
remote_sensor.state = -999.0f;
requester.add_remote_sensor("responder", "temp", &remote_sensor);
requester.set_provider_encryption("responder", key);
auto &packet = responder.sent_packets[0];
requester.process_({packet.data(), packet.size()});
EXPECT_FLOAT_EQ(remote_sensor.state, -999.0f); // blocked — ping key not found
}
#endif
// --- Process error handling ---
TEST(PacketTransportTest, ProcessShortBuffer) {
@@ -0,0 +1,170 @@
#include "../common.h"
namespace esphome::packet_transport::testing {
TEST(PacketTransportSensorTest, AddSensor) {
TestablePacketTransport transport;
sensor::Sensor s;
transport.add_sensor("temp", &s);
ASSERT_EQ(transport.sensors_.size(), 1u);
EXPECT_STREQ(transport.sensors_[0].id, "temp");
EXPECT_EQ(transport.sensors_[0].sensor, &s);
EXPECT_TRUE(transport.sensors_[0].updated);
}
TEST(PacketTransportSensorTest, AddRemoteSensor) {
TestablePacketTransport transport;
sensor::Sensor s;
transport.add_remote_sensor("host1", "remote_temp", &s);
EXPECT_TRUE(transport.providers_.contains("host1"));
EXPECT_EQ(transport.remote_sensors_["host1"]["remote_temp"], &s);
}
TEST(PacketTransportSensorTest, UnencryptedSensorRoundTrip) {
// Encoder
TestablePacketTransport encoder;
encoder.init_for_test("sender");
sensor::Sensor local_sensor;
local_sensor.state = 42.5f;
encoder.add_sensor("temp", &local_sensor);
encoder.send_data_(true);
ASSERT_EQ(encoder.sent_packets.size(), 1u);
// Decoder
TestablePacketTransport decoder;
decoder.init_for_test("receiver");
sensor::Sensor remote_sensor;
remote_sensor.state = -999.0f; // sentinel
decoder.add_remote_sensor("sender", "temp", &remote_sensor);
auto &packet = encoder.sent_packets[0];
decoder.process_({packet.data(), packet.size()});
EXPECT_FLOAT_EQ(remote_sensor.state, 42.5f);
}
TEST(PacketTransportSensorTest, EncryptedSensorRoundTrip) {
std::vector<uint8_t> key(32);
for (int i = 0; i < 32; i++)
key[i] = i;
TestablePacketTransport encoder;
encoder.init_for_test("sender");
encoder.set_encryption_key(key);
sensor::Sensor local_sensor;
local_sensor.state = 99.9f;
encoder.add_sensor("temp", &local_sensor);
encoder.send_data_(true);
ASSERT_EQ(encoder.sent_packets.size(), 1u);
TestablePacketTransport decoder;
decoder.init_for_test("receiver");
sensor::Sensor remote_sensor;
remote_sensor.state = -999.0f;
decoder.add_remote_sensor("sender", "temp", &remote_sensor);
decoder.set_provider_encryption("sender", key);
auto &packet = encoder.sent_packets[0];
decoder.process_({packet.data(), packet.size()});
EXPECT_FLOAT_EQ(remote_sensor.state, 99.9f);
}
TEST(PacketTransportSensorTest, SendDataOnlyUpdated) {
TestablePacketTransport encoder;
encoder.init_for_test("sender");
sensor::Sensor s1, s2;
s1.state = 1.0f;
s2.state = 2.0f;
encoder.add_sensor("s1", &s1);
encoder.add_sensor("s2", &s2);
// Mark s1 as not updated, only s2 as updated
encoder.sensors_[0].updated = false;
encoder.sensors_[1].updated = true;
encoder.send_data_(false);
ASSERT_EQ(encoder.sent_packets.size(), 1u);
TestablePacketTransport decoder;
decoder.init_for_test("receiver");
sensor::Sensor rs1, rs2;
rs1.state = -999.0f;
rs2.state = -999.0f;
decoder.add_remote_sensor("sender", "s1", &rs1);
decoder.add_remote_sensor("sender", "s2", &rs2);
auto &packet = encoder.sent_packets[0];
decoder.process_({packet.data(), packet.size()});
EXPECT_FLOAT_EQ(rs1.state, -999.0f); // not updated, not sent
EXPECT_FLOAT_EQ(rs2.state, 2.0f); // updated, sent
}
TEST(PacketTransportSensorTest, PingKeyIncludedInTransmittedPacket) {
std::vector<uint8_t> key(32, 0xBB);
// Responder: encrypted, owns a sensor
TestablePacketTransport responder;
responder.init_for_test("responder");
responder.set_encryption_key(key);
sensor::Sensor local_sensor;
local_sensor.state = 77.7f;
responder.add_sensor("temp", &local_sensor);
// Requester sends a MAGIC_PING that the responder processes
auto ping = build_ping_packet("requester", 0xDEADBEEF);
responder.process_({ping.data(), ping.size()});
ASSERT_EQ(responder.ping_keys_.size(), 1u);
// Responder sends sensor data — ping key should be embedded
responder.send_data_(true);
ASSERT_EQ(responder.sent_packets.size(), 1u);
// Requester: encrypted provider, ping-pong enabled, expects key 0xDEADBEEF
TestablePacketTransport requester;
requester.init_for_test("requester");
requester.set_ping_pong_enable(true);
requester.ping_key_ = 0xDEADBEEF;
sensor::Sensor remote_sensor;
remote_sensor.state = -999.0f;
requester.add_remote_sensor("responder", "temp", &remote_sensor);
requester.set_provider_encryption("responder", key);
// The requester decrypts the packet and finds its ping key echoed back,
// which gates the sensor data — if the key is missing, data is blocked.
auto &packet = responder.sent_packets[0];
requester.process_({packet.data(), packet.size()});
EXPECT_FLOAT_EQ(remote_sensor.state, 77.7f);
}
TEST(PacketTransportSensorTest, MissingPingKeyBlocksSensorData) {
std::vector<uint8_t> key(32, 0xBB);
// Responder sends data WITHOUT receiving any MAGIC_PING first — no ping keys
TestablePacketTransport responder;
responder.init_for_test("responder");
responder.set_encryption_key(key);
sensor::Sensor local_sensor;
local_sensor.state = 77.7f;
responder.add_sensor("temp", &local_sensor);
responder.send_data_(true);
ASSERT_EQ(responder.sent_packets.size(), 1u);
// Requester with ping-pong enabled expects a key that isn't in the packet
TestablePacketTransport requester;
requester.init_for_test("requester");
requester.set_ping_pong_enable(true);
requester.ping_key_ = 0xDEADBEEF;
sensor::Sensor remote_sensor;
remote_sensor.state = -999.0f;
requester.add_remote_sensor("responder", "temp", &remote_sensor);
requester.set_provider_encryption("responder", key);
auto &packet = responder.sent_packets[0];
requester.process_({packet.data(), packet.size()});
EXPECT_FLOAT_EQ(remote_sensor.state, -999.0f); // blocked — ping key not found
}
} // namespace esphome::packet_transport::testing
+67
View File
@@ -1027,6 +1027,73 @@ def test_get_all_dependencies_empty_set() -> None:
assert result == set()
def test_get_all_dependencies_platform_component() -> None:
"""Platform components (domain.component) are looked up via get_platform,
not get_component."""
platform_comp = Mock()
platform_comp.dependencies = []
platform_comp.auto_load = []
with (
patch("esphome.loader.get_component") as mock_get_component,
patch("helpers.get_platform") as mock_get_platform,
):
mock_get_platform.return_value = platform_comp
mock_get_component.return_value = None
result = helpers.get_all_dependencies({"sensor.bthome"})
mock_get_platform.assert_called_once_with("sensor", "bthome")
mock_get_component.assert_not_called()
assert result == {"sensor.bthome"}
def test_get_all_dependencies_platform_component_with_dependencies() -> None:
"""Dependencies of a platform component are resolved transitively."""
platform_comp = Mock()
platform_comp.dependencies = ["sensor"]
platform_comp.auto_load = []
sensor_comp = Mock()
sensor_comp.dependencies = []
sensor_comp.auto_load = []
with (
patch("esphome.loader.get_component") as mock_get_component,
patch("helpers.get_platform") as mock_get_platform,
):
mock_get_platform.return_value = platform_comp
mock_get_component.side_effect = lambda name: (
sensor_comp if name == "sensor" else None
)
result = helpers.get_all_dependencies({"sensor.bthome"})
assert result == {"sensor.bthome", "sensor"}
def test_get_all_dependencies_cpp_testing_flag() -> None:
"""cpp_testing=True propagates to CORE.cpp_testing during resolution."""
from esphome.core import CORE
with (
patch("esphome.loader.get_component") as mock_get_component,
patch("esphome.loader.get_platform"),
):
observed: list[bool] = []
def capturing_get_component(name: str):
observed.append(CORE.cpp_testing)
mock_get_component.side_effect = capturing_get_component
helpers.get_all_dependencies({"some_comp"}, cpp_testing=True)
assert observed and all(observed), (
"CORE.cpp_testing should be True during resolution"
)
def test_get_components_from_integration_fixtures() -> None:
"""Test extraction of components from fixture YAML files."""
yaml_content = {
+12
View File
@@ -841,6 +841,18 @@ class TestEsphomeCore:
assert "WiFi" in target.platformio_libraries
def test_testing_ensure_platform_registered__sets_count(self, target):
"""Test testing_ensure_platform_registered sets count to 1 for new platform."""
assert target.platform_counts["sensor"] == 0
target.testing_ensure_platform_registered("sensor")
assert target.platform_counts["sensor"] == 1
def test_testing_ensure_platform_registered__does_not_overwrite(self, target):
"""Test testing_ensure_platform_registered preserves existing count."""
target.platform_counts["sensor"] = 3
target.testing_ensure_platform_registered("sensor")
assert target.platform_counts["sensor"] == 3
def test_add_library__extracts_short_name_from_path(self, target):
"""Test add_library extracts short name from library paths like owner/lib."""
target.data[const.KEY_CORE] = {