[tests] Fix flaky uart_mock integration tests (#14476)

This commit is contained in:
J. Nick Koston
2026-03-04 15:51:51 -10:00
committed by GitHub
parent c0143ac6d6
commit 5df4fd0a27
13 changed files with 420 additions and 499 deletions
@@ -43,6 +43,7 @@ CONF_INJECT_RX = "inject_rx"
CONF_EXPECT_TX = "expect_tx"
CONF_PERIODIC_RX = "periodic_rx"
CONF_ON_TX = "on_tx"
CONF_AUTO_START = "auto_start"
UART_PARITY_OPTIONS = {
"NONE": uart.UARTParityOptions.UART_CONFIG_PARITY_NONE,
@@ -95,6 +96,7 @@ CONFIG_SCHEMA = cv.Schema(
cv.Optional(CONF_INJECTIONS, default=[]): cv.ensure_list(INJECTION_SCHEMA),
cv.Optional(CONF_RESPONSES, default=[]): cv.ensure_list(RESPONSE_SCHEMA),
cv.Optional(CONF_PERIODIC_RX, default=[]): cv.ensure_list(PERIODIC_RX_SCHEMA),
cv.Optional(CONF_AUTO_START, default=True): cv.boolean,
cv.Optional(CONF_ON_TX): automation.validate_automation(
{
cv.GenerateID(CONF_TRIGGER_ID): cv.declare_id(MockUartTXTrigger),
@@ -138,6 +140,9 @@ async def to_code(config):
cg.add(var.set_data_bits(config[CONF_DATA_BITS]))
cg.add(var.set_parity(config[CONF_PARITY]))
if not config[CONF_AUTO_START]:
cg.add(var.set_auto_start(False))
for injection in config[CONF_INJECTIONS]:
rx_data = injection[CONF_INJECT_RX]
delay_ms = injection[CONF_DELAY]
@@ -16,17 +16,21 @@ void MockUartComponent::setup() {
}
void MockUartComponent::loop() {
uint32_t now = App.get_loop_component_start_time();
// Initialize scenario start time on first loop() call, after all components have
// finished setup(). This prevents injection delays from being consumed during setup.
if (!this->loop_started_) {
this->loop_started_ = true;
this->scenario_start_ms_ = now;
this->cumulative_delay_ms_ = 0;
ESP_LOGD(TAG, "Scenario started at %u ms", now);
if (this->auto_start_) {
this->start_scenario();
} else {
ESP_LOGD(TAG, "Scenario waiting for manual start");
}
}
if (!this->scenario_active_) {
return;
}
uint32_t now = App.get_loop_component_start_time();
// Process at most ONE timed injection per loop iteration.
// This ensures each injection is in a separate loop cycle, giving the consuming
// component (e.g., LD2410) a chance to process each batch independently.
@@ -50,6 +54,19 @@ void MockUartComponent::loop() {
}
}
void MockUartComponent::start_scenario() {
uint32_t now = App.get_loop_component_start_time();
this->scenario_active_ = true;
this->scenario_start_ms_ = now;
this->cumulative_delay_ms_ = 0;
this->injection_index_ = 0;
this->tx_buffer_.clear();
for (auto &periodic : this->periodic_rx_) {
periodic.last_inject_ms = now;
}
ESP_LOGD(TAG, "Scenario started at %u ms", now);
}
void MockUartComponent::dump_config() {
ESP_LOGCONFIG(TAG,
"Mock UART Component:\n"
@@ -78,10 +95,12 @@ void MockUartComponent::write_array(const uint8_t *data, size_t len) {
}
#endif
this->try_match_response_();
if (this->scenario_active_) {
this->try_match_response_();
}
// This directly calls a tx_hook (lambda) as an alternative to the simpler match_response mechanism.
if (this->tx_hook_) {
if (this->tx_hook_ && this->scenario_active_) {
std::vector<uint8_t> buf(data, data + len);
this->tx_hook_(buf);
}
@@ -37,6 +37,8 @@ class MockUartComponent : public uart::UARTComponent, public Component {
void add_response(const std::vector<uint8_t> &expect_tx, const std::vector<uint8_t> &inject_rx);
void add_periodic_rx(const std::vector<uint8_t> &data, uint32_t interval_ms);
void start_scenario();
void set_auto_start(bool auto_start) { this->auto_start_ = auto_start; }
void set_tx_hook(std::function<void(const std::vector<uint8_t> &)> &&cb) { this->tx_hook_ = std::move(cb); }
void inject_to_rx_buffer(const std::vector<uint8_t> &data);
void inject_to_rx_buffer(const uint8_t *data, size_t len);
@@ -55,6 +57,8 @@ class MockUartComponent : public uart::UARTComponent, public Component {
uint32_t scenario_start_ms_{0};
uint32_t cumulative_delay_ms_{0};
bool loop_started_{false};
bool auto_start_{true};
bool scenario_active_{false};
// TX-triggered responses
struct Response {
@@ -20,6 +20,7 @@ uart:
uart_mock:
id: mock_uart
baud_rate: 256000
auto_start: false
injections:
# Phase 1 (t=100ms): Valid LD2410 normal mode data frame - happy path
# The buffer is clean at this point, so this frame should parse correctly.
@@ -143,3 +144,10 @@ binary_sensor:
name: "Has Moving Target"
has_still_target:
name: "Has Still Target"
button:
- platform: template
name: "Start Scenario"
id: start_scenario_btn
on_press:
- lambda: 'id(mock_uart).start_scenario();'
@@ -19,6 +19,7 @@ uart:
uart_mock:
id: mock_uart
baud_rate: 256000
auto_start: false
injections:
# Phase 1 (t=100ms): Valid LD2410 engineering mode data frame
# Captured from a real Screek Human Presence Sensor 1U with LD2410 firmware 2.4.x
@@ -154,3 +155,10 @@ binary_sensor:
name: "Has Still Target"
out_pin_presence_status:
name: "Out Pin Presence"
button:
- platform: template
name: "Start Scenario"
id: start_scenario_btn
on_press:
- lambda: 'id(mock_uart).start_scenario();'
@@ -20,6 +20,7 @@ uart:
uart_mock:
id: mock_uart
baud_rate: 256000
auto_start: false
injections:
# Phase 1 (t=100ms): Valid LD2412 normal mode data frame - happy path
# The buffer is clean at this point, so this frame should parse correctly.
@@ -169,3 +170,10 @@ binary_sensor:
name: "Has Still Target"
filters:
- settle: 50ms
button:
- platform: template
name: "Start Scenario"
id: start_scenario_btn
on_press:
- lambda: 'id(mock_uart).start_scenario();'
@@ -19,6 +19,7 @@ uart:
uart_mock:
id: mock_uart
baud_rate: 256000
auto_start: false
injections:
# Phase 1 (t=100ms): Valid LD2412 engineering mode data frame
#
@@ -211,3 +212,10 @@ binary_sensor:
name: "Has Still Target"
filters:
- settle: 50ms
button:
- platform: template
name: "Start Scenario"
id: start_scenario_btn
on_press:
- lambda: 'id(mock_uart).start_scenario();'
@@ -22,6 +22,7 @@ uart_mock:
baud_rate: 9600
rx_full_threshold: 120
rx_timeout: 2
auto_start: false
debug:
responses:
- expect_tx: [0x01, 0x03, 0x00, 0x03, 0x00, 0x01, 0x74, 0x0A] # Read holding register 1 on device 1
@@ -38,3 +39,10 @@ sensor:
name: "basic_register"
address: 0x03
register_type: holding
button:
- platform: template
name: "Start Scenario"
id: start_scenario_btn
on_press:
- lambda: 'id(virtual_uart_dev).start_scenario();'
@@ -22,6 +22,7 @@ uart_mock:
baud_rate: 9600
rx_full_threshold: 120
rx_timeout: 2
auto_start: false
debug:
on_tx:
- then:
@@ -52,3 +53,10 @@ sensor:
phase_a:
voltage:
name: sdm_voltage
button:
- platform: template
name: "Start Scenario"
id: start_scenario_btn
on_press:
- lambda: 'id(virtual_uart_dev).start_scenario();'
+95 -1
View File
@@ -3,10 +3,17 @@
from __future__ import annotations
import asyncio
from collections.abc import Callable
import logging
from typing import TypeVar
from aioesphomeapi import ButtonInfo, EntityInfo, EntityState
from aioesphomeapi import (
BinarySensorState,
ButtonInfo,
EntityInfo,
EntityState,
SensorState,
)
_LOGGER = logging.getLogger(__name__)
@@ -234,3 +241,90 @@ class InitialStateHelper:
asyncio.TimeoutError: If initial states aren't received within timeout
"""
await asyncio.wait_for(self._initial_states_received, timeout=timeout)
class SensorStateCollector:
"""Collects sensor and binary sensor state updates and provides wait helpers.
Usage:
collector = SensorStateCollector(
sensor_names=["moving_distance", "still_distance"],
binary_sensor_names=["has_target"],
)
# Use collector.on_state as the callback (or wrap it)
client.subscribe_states(helper.on_state_wrapper(collector.on_state))
# Wait for all sensors to have at least one value
await collector.wait_for_all(timeout=3.0)
# Access collected states
assert collector.sensor_states["moving_distance"][0] == approx(100.0)
"""
def __init__(
self,
sensor_names: list[str],
binary_sensor_names: list[str] | None = None,
entities: list[EntityInfo] | None = None,
) -> None:
self.sensor_states: dict[str, list[float]] = {name: [] for name in sensor_names}
self.binary_states: dict[str, list[bool]] = {
name: [] for name in (binary_sensor_names or [])
}
self._key_to_sensor: dict[int, str] = {}
self._waiters: list[tuple[Callable[[], bool], asyncio.Future[bool]]] = []
if entities is not None:
self.build_key_mapping(entities)
def build_key_mapping(self, entities: list[EntityInfo]) -> None:
"""Build key-to-name mapping from entities. Sorted by descending length."""
all_names = list(self.sensor_states.keys()) + list(self.binary_states.keys())
all_names.sort(key=len, reverse=True)
self._key_to_sensor = build_key_to_entity_mapping(entities, all_names)
def on_state(self, state: EntityState) -> None:
"""Process a state update."""
if isinstance(state, SensorState) and not state.missing_state:
sensor_name = self._key_to_sensor.get(state.key)
if sensor_name and sensor_name in self.sensor_states:
self.sensor_states[sensor_name].append(state.state)
self._check_waiters()
elif isinstance(state, BinarySensorState):
sensor_name = self._key_to_sensor.get(state.key)
if sensor_name and sensor_name in self.binary_states:
self.binary_states[sensor_name].append(state.state)
self._check_waiters()
def _check_waiters(self) -> None:
"""Check all pending waiters and resolve any whose condition is met."""
for condition, future in self._waiters:
if not future.done() and condition():
future.set_result(True)
def _all_have_values(self) -> bool:
"""Check if all sensor and binary sensor lists have at least one value."""
return all(len(v) >= 1 for v in self.sensor_states.values()) and all(
len(v) >= 1 for v in self.binary_states.values()
)
async def wait_for_all(self, timeout: float = 3.0) -> None:
"""Wait until all sensors and binary sensors have at least one value."""
if self._all_have_values():
return
future: asyncio.Future[bool] = asyncio.get_running_loop().create_future()
self._waiters.append((self._all_have_values, future))
await asyncio.wait_for(future, timeout=timeout)
def add_waiter(self, condition: Callable[[], bool]) -> asyncio.Future[bool]:
"""Add a custom waiter that resolves when condition returns True.
Returns:
A future that resolves when the condition is met.
"""
future: asyncio.Future[bool] = asyncio.get_running_loop().create_future()
if condition():
future.set_result(True)
else:
self._waiters.append((condition, future))
return future
+113 -240
View File
@@ -21,16 +21,10 @@ from __future__ import annotations
import asyncio
from pathlib import Path
from aioesphomeapi import (
BinarySensorInfo,
BinarySensorState,
EntityState,
SensorInfo,
SensorState,
)
from aioesphomeapi import ButtonInfo
import pytest
from .state_utils import InitialStateHelper, build_key_to_entity_mapping, find_entity
from .state_utils import InitialStateHelper, SensorStateCollector, find_entity
from .types import APIClientConnectedFactory, RunCompiledFunction
@@ -64,100 +58,65 @@ async def test_uart_mock_ld2410(
if "uart_mock" in line and "TX " in line:
tx_log_lines.append(line)
# Track sensor state updates (after initial state is swallowed)
sensor_states: dict[str, list[float]] = {
"moving_distance": [],
"still_distance": [],
"moving_energy": [],
"still_energy": [],
"detection_distance": [],
}
binary_states: dict[str, list[bool]] = {
"has_target": [],
"has_moving_target": [],
"has_still_target": [],
}
collector = SensorStateCollector(
sensor_names=[
"moving_distance",
"still_distance",
"moving_energy",
"still_energy",
"detection_distance",
],
binary_sensor_names=[
"has_target",
"has_moving_target",
"has_still_target",
],
)
# Signal when we see recovery frame values
recovery_received = loop.create_future()
def on_state(state: EntityState) -> None:
if isinstance(state, SensorState) and not state.missing_state:
sensor_name = key_to_sensor.get(state.key)
if sensor_name and sensor_name in sensor_states:
sensor_states[sensor_name].append(state.state)
# Check if this is the recovery frame (moving_distance = 50)
if (
sensor_name == "moving_distance"
and state.state == pytest.approx(50.0)
and not recovery_received.done()
):
recovery_received.set_result(True)
elif isinstance(state, BinarySensorState):
sensor_name = key_to_sensor.get(state.key)
if sensor_name and sensor_name in binary_states:
binary_states[sensor_name].append(state.state)
recovery_received = collector.add_waiter(
lambda: pytest.approx(50.0) in collector.sensor_states["moving_distance"]
)
async with (
run_compiled(yaml_config, line_callback=line_callback),
api_client_connected() as client,
):
entities, _ = await client.list_entities_services()
# Build key mappings for all sensor types
all_names = list(sensor_states.keys()) + list(binary_states.keys())
key_to_sensor = build_key_to_entity_mapping(entities, all_names)
collector.build_key_mapping(entities)
# Set up initial state helper
initial_state_helper = InitialStateHelper(entities)
client.subscribe_states(initial_state_helper.on_state_wrapper(on_state))
client.subscribe_states(
initial_state_helper.on_state_wrapper(collector.on_state)
)
try:
await initial_state_helper.wait_for_initial_states()
except TimeoutError:
pytest.fail("Timeout waiting for initial states")
# Phase 1 values are in the initial states (swallowed by InitialStateHelper).
# Verify them via initial_states dict.
moving_dist_entity = find_entity(entities, "moving_distance", SensorInfo)
assert moving_dist_entity is not None
initial_moving = initial_state_helper.initial_states.get(moving_dist_entity.key)
assert initial_moving is not None and isinstance(initial_moving, SensorState)
assert initial_moving.state == pytest.approx(100.0), (
f"Initial moving distance should be 100, got {initial_moving.state}"
)
# Start the UART mock scenario now that we're subscribed
start_btn = find_entity(entities, "start_scenario", ButtonInfo)
assert start_btn is not None, "Start Scenario button not found"
client.button_command(start_btn.key)
still_dist_entity = find_entity(entities, "still_distance", SensorInfo)
assert still_dist_entity is not None
initial_still = initial_state_helper.initial_states.get(still_dist_entity.key)
assert initial_still is not None and isinstance(initial_still, SensorState)
assert initial_still.state == pytest.approx(120.0), (
f"Initial still distance should be 120, got {initial_still.state}"
)
# Wait for Phase 1 - all sensors and binary sensors have at least one value
try:
await collector.wait_for_all(timeout=3.0)
except TimeoutError:
pytest.fail(
f"Timeout waiting for Phase 1 frame. Received:\n"
f" sensor_states: {collector.sensor_states}\n"
f" binary_states: {collector.binary_states}"
)
moving_energy_entity = find_entity(entities, "moving_energy", SensorInfo)
assert moving_energy_entity is not None
initial_me = initial_state_helper.initial_states.get(moving_energy_entity.key)
assert initial_me is not None and isinstance(initial_me, SensorState)
assert initial_me.state == pytest.approx(50.0), (
f"Initial moving energy should be 50, got {initial_me.state}"
)
still_energy_entity = find_entity(entities, "still_energy", SensorInfo)
assert still_energy_entity is not None
initial_se = initial_state_helper.initial_states.get(still_energy_entity.key)
assert initial_se is not None and isinstance(initial_se, SensorState)
assert initial_se.state == pytest.approx(25.0), (
f"Initial still energy should be 25, got {initial_se.state}"
)
detect_dist_entity = find_entity(entities, "detection_distance", SensorInfo)
assert detect_dist_entity is not None
initial_dd = initial_state_helper.initial_states.get(detect_dist_entity.key)
assert initial_dd is not None and isinstance(initial_dd, SensorState)
assert initial_dd.state == pytest.approx(300.0), (
f"Initial detection distance should be 300, got {initial_dd.state}"
)
# Phase 1 values: moving=100, still=120, energy=50/25, detect=300
assert collector.sensor_states["moving_distance"][0] == pytest.approx(100.0)
assert collector.sensor_states["still_distance"][0] == pytest.approx(120.0)
assert collector.sensor_states["moving_energy"][0] == pytest.approx(50.0)
assert collector.sensor_states["still_energy"][0] == pytest.approx(25.0)
assert collector.sensor_states["detection_distance"][0] == pytest.approx(300.0)
# Wait for the recovery frame (Phase 5) to be parsed
# This proves the component survived garbage + truncated + overflow
@@ -165,12 +124,8 @@ async def test_uart_mock_ld2410(
await asyncio.wait_for(recovery_received, timeout=15.0)
except TimeoutError:
pytest.fail(
f"Timeout waiting for recovery frame. Received sensor states:\n"
f" moving_distance: {sensor_states['moving_distance']}\n"
f" still_distance: {sensor_states['still_distance']}\n"
f" moving_energy: {sensor_states['moving_energy']}\n"
f" still_energy: {sensor_states['still_energy']}\n"
f" detection_distance: {sensor_states['detection_distance']}"
f"Timeout waiting for recovery frame. Received:\n"
f" sensor_states: {collector.sensor_states}"
)
# Verify overflow warning was logged
@@ -183,67 +138,36 @@ async def test_uart_mock_ld2410(
# A5 (MAC), AB (distance res), AE (light), 61 (params), FE (config off)
assert len(tx_log_lines) > 0, "Expected TX log lines from uart_mock"
tx_data = " ".join(tx_log_lines)
# Verify command frame header appears (FD:FC:FB:FA)
assert "FD:FC:FB:FA" in tx_data, (
"Expected LD2410 command frame header FD:FC:FB:FA in TX log"
)
# Verify command frame footer appears (04:03:02:01)
assert "04:03:02:01" in tx_data, (
"Expected LD2410 command frame footer 04:03:02:01 in TX log"
)
# Recovery frame values (Phase 5, after overflow)
assert len(sensor_states["moving_distance"]) >= 1, (
f"Expected recovery moving_distance, got: {sensor_states['moving_distance']}"
)
# Find the recovery value (moving_distance = 50)
recovery_values = [
v for v in sensor_states["moving_distance"] if v == pytest.approx(50.0)
]
assert len(recovery_values) >= 1, (
f"Expected moving_distance=50 in recovery, got: {sensor_states['moving_distance']}"
)
# Recovery frame: moving=50, still=75, energy=100/80, detect=127
recovery_idx = next(
i
for i, v in enumerate(sensor_states["moving_distance"])
for i, v in enumerate(collector.sensor_states["moving_distance"])
if v == pytest.approx(50.0)
)
assert sensor_states["still_distance"][recovery_idx] == pytest.approx(75.0), (
f"Recovery still distance should be 75, got {sensor_states['still_distance'][recovery_idx]}"
assert collector.sensor_states["still_distance"][recovery_idx] == pytest.approx(
75.0
)
assert sensor_states["moving_energy"][recovery_idx] == pytest.approx(100.0), (
f"Recovery moving energy should be 100, got {sensor_states['moving_energy'][recovery_idx]}"
assert collector.sensor_states["moving_energy"][recovery_idx] == pytest.approx(
100.0
)
assert sensor_states["still_energy"][recovery_idx] == pytest.approx(80.0), (
f"Recovery still energy should be 80, got {sensor_states['still_energy'][recovery_idx]}"
)
assert sensor_states["detection_distance"][recovery_idx] == pytest.approx(
127.0
), (
f"Recovery detection distance should be 127, got {sensor_states['detection_distance'][recovery_idx]}"
assert collector.sensor_states["still_energy"][recovery_idx] == pytest.approx(
80.0
)
assert collector.sensor_states["detection_distance"][
recovery_idx
] == pytest.approx(127.0)
# Verify binary sensors detected targets
# Binary sensors could be in initial states or forwarded states
has_target_entity = find_entity(entities, "has_target", BinarySensorInfo)
assert has_target_entity is not None
initial_ht = initial_state_helper.initial_states.get(has_target_entity.key)
assert initial_ht is not None and isinstance(initial_ht, BinarySensorState)
assert initial_ht.state is True, "Has target should be True"
has_moving_entity = find_entity(entities, "has_moving_target", BinarySensorInfo)
assert has_moving_entity is not None
initial_hm = initial_state_helper.initial_states.get(has_moving_entity.key)
assert initial_hm is not None and isinstance(initial_hm, BinarySensorState)
assert initial_hm.state is True, "Has moving target should be True"
has_still_entity = find_entity(entities, "has_still_target", BinarySensorInfo)
assert has_still_entity is not None
initial_hs = initial_state_helper.initial_states.get(has_still_entity.key)
assert initial_hs is not None and isinstance(initial_hs, BinarySensorState)
assert initial_hs.state is True, "Has still target should be True"
# Verify binary sensors detected targets (from Phase 1 frame)
assert collector.binary_states["has_target"][0] is True
assert collector.binary_states["has_moving_target"][0] is True
assert collector.binary_states["has_still_target"][0] is True
@pytest.mark.asyncio
@@ -260,133 +184,82 @@ async def test_uart_mock_ld2410_engineering(
"EXTERNAL_COMPONENT_PATH", external_components_path
)
loop = asyncio.get_running_loop()
collector = SensorStateCollector(
sensor_names=[
"moving_distance",
"still_distance",
"moving_energy",
"still_energy",
"detection_distance",
"light",
"gate_0_move_energy",
"gate_1_move_energy",
"gate_2_move_energy",
"gate_0_still_energy",
"gate_1_still_energy",
"gate_2_still_energy",
],
binary_sensor_names=[
"has_target",
"has_moving_target",
"has_still_target",
"out_pin_presence",
],
)
# Track sensor state updates (after initial state is swallowed)
sensor_states: dict[str, list[float]] = {
"moving_distance": [],
"still_distance": [],
"moving_energy": [],
"still_energy": [],
"detection_distance": [],
"light": [],
"gate_0_move_energy": [],
"gate_1_move_energy": [],
"gate_2_move_energy": [],
"gate_0_still_energy": [],
"gate_1_still_energy": [],
"gate_2_still_energy": [],
}
binary_states: dict[str, list[bool]] = {
"has_target": [],
"has_moving_target": [],
"has_still_target": [],
"out_pin_presence": [],
}
# Signal when we see Phase 3 frame (still_distance = 291)
phase3_received = loop.create_future()
def on_state(state: EntityState) -> None:
if isinstance(state, SensorState) and not state.missing_state:
sensor_name = key_to_sensor.get(state.key)
if sensor_name and sensor_name in sensor_states:
sensor_states[sensor_name].append(state.state)
if (
sensor_name == "still_distance"
and state.state == pytest.approx(291.0)
and not phase3_received.done()
):
phase3_received.set_result(True)
elif isinstance(state, BinarySensorState):
sensor_name = key_to_sensor.get(state.key)
if sensor_name and sensor_name in binary_states:
binary_states[sensor_name].append(state.state)
# Signal when we see Phase 3 frame values
phase3_received = collector.add_waiter(
lambda: pytest.approx(291.0) in collector.sensor_states["still_distance"]
)
async with (
run_compiled(yaml_config),
api_client_connected() as client,
):
entities, _ = await client.list_entities_services()
all_names = list(sensor_states.keys()) + list(binary_states.keys())
key_to_sensor = build_key_to_entity_mapping(entities, all_names)
collector.build_key_mapping(entities)
initial_state_helper = InitialStateHelper(entities)
client.subscribe_states(initial_state_helper.on_state_wrapper(on_state))
client.subscribe_states(
initial_state_helper.on_state_wrapper(collector.on_state)
)
try:
await initial_state_helper.wait_for_initial_states()
except TimeoutError:
pytest.fail("Timeout waiting for initial states")
# Phase 1 initial values (engineering mode frame):
# Start the UART mock scenario now that we're subscribed
start_btn = find_entity(entities, "start_scenario", ButtonInfo)
assert start_btn is not None, "Start Scenario button not found"
client.button_command(start_btn.key)
# Wait for Phase 1 - all sensors and binary sensors have at least one value
try:
await collector.wait_for_all(timeout=3.0)
except TimeoutError:
pytest.fail(
f"Timeout waiting for Phase 1 frame. Received:\n"
f" sensor_states: {collector.sensor_states}\n"
f" binary_states: {collector.binary_states}"
)
# Phase 1 values (engineering mode frame):
# moving=30, energy=100, still=30, energy=100, detect=0
moving_dist_entity = find_entity(entities, "moving_distance", SensorInfo)
assert moving_dist_entity is not None
initial_moving = initial_state_helper.initial_states.get(moving_dist_entity.key)
assert initial_moving is not None and isinstance(initial_moving, SensorState)
assert initial_moving.state == pytest.approx(30.0), (
f"Initial moving distance should be 30, got {initial_moving.state}"
)
still_dist_entity = find_entity(entities, "still_distance", SensorInfo)
assert still_dist_entity is not None
initial_still = initial_state_helper.initial_states.get(still_dist_entity.key)
assert initial_still is not None and isinstance(initial_still, SensorState)
assert initial_still.state == pytest.approx(30.0), (
f"Initial still distance should be 30, got {initial_still.state}"
)
# Verify engineering mode sensors from initial state
# Gate 0 moving energy = 0x64 = 100
gate0_move_entity = find_entity(entities, "gate_0_move_energy", SensorInfo)
assert gate0_move_entity is not None
initial_g0m = initial_state_helper.initial_states.get(gate0_move_entity.key)
assert initial_g0m is not None and isinstance(initial_g0m, SensorState)
assert initial_g0m.state == pytest.approx(100.0), (
f"Gate 0 move energy should be 100, got {initial_g0m.state}"
)
# Gate 1 moving energy = 0x41 = 65
gate1_move_entity = find_entity(entities, "gate_1_move_energy", SensorInfo)
assert gate1_move_entity is not None
initial_g1m = initial_state_helper.initial_states.get(gate1_move_entity.key)
assert initial_g1m is not None and isinstance(initial_g1m, SensorState)
assert initial_g1m.state == pytest.approx(65.0), (
f"Gate 1 move energy should be 65, got {initial_g1m.state}"
)
# Light sensor = 0x57 = 87
light_entity = find_entity(entities, "light", SensorInfo)
assert light_entity is not None
initial_light = initial_state_helper.initial_states.get(light_entity.key)
assert initial_light is not None and isinstance(initial_light, SensorState)
assert initial_light.state == pytest.approx(87.0), (
f"Light sensor should be 87, got {initial_light.state}"
)
# Out pin presence = 0x01 = True
out_pin_entity = find_entity(entities, "out_pin_presence", BinarySensorInfo)
assert out_pin_entity is not None
initial_out = initial_state_helper.initial_states.get(out_pin_entity.key)
assert initial_out is not None and isinstance(initial_out, BinarySensorState)
assert initial_out.state is True, "Out pin presence should be True"
assert collector.sensor_states["moving_distance"][0] == pytest.approx(30.0)
assert collector.sensor_states["still_distance"][0] == pytest.approx(30.0)
assert collector.sensor_states["gate_0_move_energy"][0] == pytest.approx(100.0)
assert collector.sensor_states["gate_1_move_energy"][0] == pytest.approx(65.0)
assert collector.sensor_states["light"][0] == pytest.approx(87.0)
assert collector.binary_states["out_pin_presence"][0] is True
# Wait for Phase 3 frame (still_distance = 291cm, multi-byte)
try:
await asyncio.wait_for(phase3_received, timeout=15.0)
except TimeoutError:
pytest.fail(
f"Timeout waiting for Phase 3 frame. Received sensor states:\n"
f" still_distance: {sensor_states['still_distance']}\n"
f" moving_distance: {sensor_states['moving_distance']}"
f"Timeout waiting for Phase 3 frame. Received:\n"
f" still_distance: {collector.sensor_states['still_distance']}"
)
# Phase 3: still distance = 0x0123 = 291cm (multi-byte distance test)
phase3_still = [
v for v in sensor_states["still_distance"] if v == pytest.approx(291.0)
]
assert len(phase3_still) >= 1, (
f"Expected still_distance=291, got: {sensor_states['still_distance']}"
)
assert pytest.approx(291.0) in collector.sensor_states["still_distance"]
+115 -247
View File
@@ -21,16 +21,10 @@ from __future__ import annotations
import asyncio
from pathlib import Path
from aioesphomeapi import (
BinarySensorInfo,
BinarySensorState,
EntityState,
SensorInfo,
SensorState,
)
from aioesphomeapi import ButtonInfo
import pytest
from .state_utils import InitialStateHelper, build_key_to_entity_mapping, find_entity
from .state_utils import InitialStateHelper, SensorStateCollector, find_entity
from .types import APIClientConnectedFactory, RunCompiledFunction
@@ -64,104 +58,65 @@ async def test_uart_mock_ld2412(
if "uart_mock" in line and "TX " in line:
tx_log_lines.append(line)
# Track sensor state updates (after initial state is swallowed)
sensor_states: dict[str, list[float]] = {
"moving_distance": [],
"still_distance": [],
"moving_energy": [],
"still_energy": [],
"detection_distance": [],
}
binary_states: dict[str, list[bool]] = {
"has_target": [],
"has_moving_target": [],
"has_still_target": [],
}
collector = SensorStateCollector(
sensor_names=[
"moving_distance",
"still_distance",
"moving_energy",
"still_energy",
"detection_distance",
],
binary_sensor_names=[
"has_target",
"has_moving_target",
"has_still_target",
],
)
# Signal when we see recovery frame values
recovery_received = loop.create_future()
def on_state(state: EntityState) -> None:
if isinstance(state, SensorState) and not state.missing_state:
sensor_name = key_to_sensor.get(state.key)
if sensor_name and sensor_name in sensor_states:
sensor_states[sensor_name].append(state.state)
# Check if this is the recovery frame (moving_distance = 50)
if (
sensor_name == "moving_distance"
and state.state == pytest.approx(50.0)
and not recovery_received.done()
):
recovery_received.set_result(True)
elif isinstance(state, BinarySensorState):
sensor_name = key_to_sensor.get(state.key)
if sensor_name and sensor_name in binary_states:
binary_states[sensor_name].append(state.state)
recovery_received = collector.add_waiter(
lambda: pytest.approx(50.0) in collector.sensor_states["moving_distance"]
)
async with (
run_compiled(yaml_config, line_callback=line_callback),
api_client_connected() as client,
):
entities, _ = await client.list_entities_services()
# Build key mappings for all sensor types
all_names = list(sensor_states.keys()) + list(binary_states.keys())
# Sort by descending length to avoid substring collisions
# (e.g., "still_energy" matching "gate_0_still_energy")
all_names.sort(key=len, reverse=True)
key_to_sensor = build_key_to_entity_mapping(entities, all_names)
collector.build_key_mapping(entities)
# Set up initial state helper
initial_state_helper = InitialStateHelper(entities)
client.subscribe_states(initial_state_helper.on_state_wrapper(on_state))
client.subscribe_states(
initial_state_helper.on_state_wrapper(collector.on_state)
)
try:
await initial_state_helper.wait_for_initial_states()
except TimeoutError:
pytest.fail("Timeout waiting for initial states")
# Phase 1 values are in the initial states (swallowed by InitialStateHelper).
# Verify them via initial_states dict.
moving_dist_entity = find_entity(entities, "moving_distance", SensorInfo)
assert moving_dist_entity is not None
initial_moving = initial_state_helper.initial_states.get(moving_dist_entity.key)
assert initial_moving is not None and isinstance(initial_moving, SensorState)
assert initial_moving.state == pytest.approx(100.0), (
f"Initial moving distance should be 100, got {initial_moving.state}"
)
# Start the UART mock scenario now that we're subscribed
start_btn = find_entity(entities, "start_scenario", ButtonInfo)
assert start_btn is not None, "Start Scenario button not found"
client.button_command(start_btn.key)
still_dist_entity = find_entity(entities, "still_distance", SensorInfo)
assert still_dist_entity is not None
initial_still = initial_state_helper.initial_states.get(still_dist_entity.key)
assert initial_still is not None and isinstance(initial_still, SensorState)
assert initial_still.state == pytest.approx(120.0), (
f"Initial still distance should be 120, got {initial_still.state}"
)
# Wait for Phase 1 - all sensors and binary sensors have at least one value
try:
await collector.wait_for_all(timeout=3.0)
except TimeoutError:
pytest.fail(
f"Timeout waiting for Phase 1 frame. Received:\n"
f" sensor_states: {collector.sensor_states}\n"
f" binary_states: {collector.binary_states}"
)
moving_energy_entity = find_entity(entities, "moving_energy", SensorInfo)
assert moving_energy_entity is not None
initial_me = initial_state_helper.initial_states.get(moving_energy_entity.key)
assert initial_me is not None and isinstance(initial_me, SensorState)
assert initial_me.state == pytest.approx(50.0), (
f"Initial moving energy should be 50, got {initial_me.state}"
)
still_energy_entity = find_entity(entities, "still_energy", SensorInfo)
assert still_energy_entity is not None
initial_se = initial_state_helper.initial_states.get(still_energy_entity.key)
assert initial_se is not None and isinstance(initial_se, SensorState)
assert initial_se.state == pytest.approx(25.0), (
f"Initial still energy should be 25, got {initial_se.state}"
)
# LD2412 detection_distance = moving_distance when MOVE_BITMASK is set
detect_dist_entity = find_entity(entities, "detection_distance", SensorInfo)
assert detect_dist_entity is not None
initial_dd = initial_state_helper.initial_states.get(detect_dist_entity.key)
assert initial_dd is not None and isinstance(initial_dd, SensorState)
assert initial_dd.state == pytest.approx(100.0), (
f"Initial detection distance should be 100, got {initial_dd.state}"
)
# Phase 1 values: moving=100, still=120, energy=50/25, detect=100
assert collector.sensor_states["moving_distance"][0] == pytest.approx(100.0)
assert collector.sensor_states["still_distance"][0] == pytest.approx(120.0)
assert collector.sensor_states["moving_energy"][0] == pytest.approx(50.0)
assert collector.sensor_states["still_energy"][0] == pytest.approx(25.0)
assert collector.sensor_states["detection_distance"][0] == pytest.approx(100.0)
# Wait for the recovery frame (Phase 5) to be parsed
# This proves the component survived garbage + truncated + overflow
@@ -169,12 +124,8 @@ async def test_uart_mock_ld2412(
await asyncio.wait_for(recovery_received, timeout=3.0)
except TimeoutError:
pytest.fail(
f"Timeout waiting for recovery frame. Received sensor states:\n"
f" moving_distance: {sensor_states['moving_distance']}\n"
f" still_distance: {sensor_states['still_distance']}\n"
f" moving_energy: {sensor_states['moving_energy']}\n"
f" still_energy: {sensor_states['still_energy']}\n"
f" detection_distance: {sensor_states['detection_distance']}"
f"Timeout waiting for recovery frame. Received:\n"
f" sensor_states: {collector.sensor_states}"
)
# Verify overflow warning was logged
@@ -185,67 +136,36 @@ async def test_uart_mock_ld2412(
# Verify LD2412 sent setup commands (TX logging)
assert len(tx_log_lines) > 0, "Expected TX log lines from uart_mock"
tx_data = " ".join(tx_log_lines)
# Verify command frame header appears (FD:FC:FB:FA)
assert "FD:FC:FB:FA" in tx_data, (
"Expected LD2412 command frame header FD:FC:FB:FA in TX log"
)
# Verify command frame footer appears (04:03:02:01)
assert "04:03:02:01" in tx_data, (
"Expected LD2412 command frame footer 04:03:02:01 in TX log"
)
# Recovery frame values (Phase 5, after overflow)
assert len(sensor_states["moving_distance"]) >= 1, (
f"Expected recovery moving_distance, got: {sensor_states['moving_distance']}"
)
# Find the recovery value (moving_distance = 50)
recovery_values = [
v for v in sensor_states["moving_distance"] if v == pytest.approx(50.0)
]
assert len(recovery_values) >= 1, (
f"Expected moving_distance=50 in recovery, got: {sensor_states['moving_distance']}"
)
# Recovery frame: moving=50, still=75, energy=100/80, detect=50
recovery_idx = next(
i
for i, v in enumerate(sensor_states["moving_distance"])
for i, v in enumerate(collector.sensor_states["moving_distance"])
if v == pytest.approx(50.0)
)
assert sensor_states["still_distance"][recovery_idx] == pytest.approx(75.0), (
f"Recovery still distance should be 75, got {sensor_states['still_distance'][recovery_idx]}"
assert collector.sensor_states["still_distance"][recovery_idx] == pytest.approx(
75.0
)
assert sensor_states["moving_energy"][recovery_idx] == pytest.approx(100.0), (
f"Recovery moving energy should be 100, got {sensor_states['moving_energy'][recovery_idx]}"
assert collector.sensor_states["moving_energy"][recovery_idx] == pytest.approx(
100.0
)
assert sensor_states["still_energy"][recovery_idx] == pytest.approx(80.0), (
f"Recovery still energy should be 80, got {sensor_states['still_energy'][recovery_idx]}"
)
# LD2412 detection_distance = moving_distance when MOVE_BITMASK set
assert sensor_states["detection_distance"][recovery_idx] == pytest.approx(
50.0
), (
f"Recovery detection distance should be 50, got {sensor_states['detection_distance'][recovery_idx]}"
assert collector.sensor_states["still_energy"][recovery_idx] == pytest.approx(
80.0
)
assert collector.sensor_states["detection_distance"][
recovery_idx
] == pytest.approx(50.0)
# Verify binary sensors detected targets
has_target_entity = find_entity(entities, "has_target", BinarySensorInfo)
assert has_target_entity is not None
initial_ht = initial_state_helper.initial_states.get(has_target_entity.key)
assert initial_ht is not None and isinstance(initial_ht, BinarySensorState)
assert initial_ht.state is True, "Has target should be True"
has_moving_entity = find_entity(entities, "has_moving_target", BinarySensorInfo)
assert has_moving_entity is not None
initial_hm = initial_state_helper.initial_states.get(has_moving_entity.key)
assert initial_hm is not None and isinstance(initial_hm, BinarySensorState)
assert initial_hm.state is True, "Has moving target should be True"
has_still_entity = find_entity(entities, "has_still_target", BinarySensorInfo)
assert has_still_entity is not None
initial_hs = initial_state_helper.initial_states.get(has_still_entity.key)
assert initial_hs is not None and isinstance(initial_hs, BinarySensorState)
assert initial_hs.state is True, "Has still target should be True"
# Verify binary sensors detected targets (from Phase 1 frame)
assert collector.binary_states["has_target"][0] is True
assert collector.binary_states["has_moving_target"][0] is True
assert collector.binary_states["has_still_target"][0] is True
@pytest.mark.asyncio
@@ -262,120 +182,75 @@ async def test_uart_mock_ld2412_engineering(
"EXTERNAL_COMPONENT_PATH", external_components_path
)
loop = asyncio.get_running_loop()
# Track sensor state updates (after initial state is swallowed)
sensor_states: dict[str, list[float]] = {
"moving_distance": [],
"still_distance": [],
"moving_energy": [],
"still_energy": [],
"detection_distance": [],
"light": [],
"gate_0_move_energy": [],
"gate_1_move_energy": [],
"gate_2_move_energy": [],
"gate_0_still_energy": [],
"gate_1_still_energy": [],
"gate_2_still_energy": [],
}
binary_states: dict[str, list[bool]] = {
"has_target": [],
"has_moving_target": [],
"has_still_target": [],
}
collector = SensorStateCollector(
sensor_names=[
"moving_distance",
"still_distance",
"moving_energy",
"still_energy",
"detection_distance",
"light",
"gate_0_move_energy",
"gate_1_move_energy",
"gate_2_move_energy",
"gate_0_still_energy",
"gate_1_still_energy",
"gate_2_still_energy",
],
binary_sensor_names=[
"has_target",
"has_moving_target",
"has_still_target",
],
)
# Signal when we see Phase 3 frame values
phase3_still_received = loop.create_future()
phase3_detect_received = loop.create_future()
def on_state(state: EntityState) -> None:
if isinstance(state, SensorState) and not state.missing_state:
sensor_name = key_to_sensor.get(state.key)
if sensor_name and sensor_name in sensor_states:
sensor_states[sensor_name].append(state.state)
if (
sensor_name == "still_distance"
and state.state == pytest.approx(291.0)
and not phase3_still_received.done()
):
phase3_still_received.set_result(True)
if (
sensor_name == "detection_distance"
and state.state == pytest.approx(291.0)
and not phase3_detect_received.done()
):
phase3_detect_received.set_result(True)
elif isinstance(state, BinarySensorState):
sensor_name = key_to_sensor.get(state.key)
if sensor_name and sensor_name in binary_states:
binary_states[sensor_name].append(state.state)
phase3_still_received = collector.add_waiter(
lambda: pytest.approx(291.0) in collector.sensor_states["still_distance"]
)
phase3_detect_received = collector.add_waiter(
lambda: pytest.approx(291.0) in collector.sensor_states["detection_distance"]
)
async with (
run_compiled(yaml_config),
api_client_connected() as client,
):
entities, _ = await client.list_entities_services()
all_names = list(sensor_states.keys()) + list(binary_states.keys())
# Sort by descending length to avoid substring collisions
# (e.g., "still_energy" matching "gate_0_still_energy")
all_names.sort(key=len, reverse=True)
key_to_sensor = build_key_to_entity_mapping(entities, all_names)
collector.build_key_mapping(entities)
initial_state_helper = InitialStateHelper(entities)
client.subscribe_states(initial_state_helper.on_state_wrapper(on_state))
client.subscribe_states(
initial_state_helper.on_state_wrapper(collector.on_state)
)
try:
await initial_state_helper.wait_for_initial_states()
except TimeoutError:
pytest.fail("Timeout waiting for initial states")
# Phase 1 initial values (engineering mode frame):
# Start the UART mock scenario now that we're subscribed
start_btn = find_entity(entities, "start_scenario", ButtonInfo)
assert start_btn is not None, "Start Scenario button not found"
client.button_command(start_btn.key)
# Wait for Phase 1 - all sensors and binary sensors have at least one value
try:
await collector.wait_for_all(timeout=3.0)
except TimeoutError:
pytest.fail(
f"Timeout waiting for Phase 1 frame. Received:\n"
f" sensor_states: {collector.sensor_states}\n"
f" binary_states: {collector.binary_states}"
)
# Phase 1 values (engineering mode frame):
# moving=30, energy=100, still=30, energy=100, detect=30
moving_dist_entity = find_entity(entities, "moving_distance", SensorInfo)
assert moving_dist_entity is not None
initial_moving = initial_state_helper.initial_states.get(moving_dist_entity.key)
assert initial_moving is not None and isinstance(initial_moving, SensorState)
assert initial_moving.state == pytest.approx(30.0), (
f"Initial moving distance should be 30, got {initial_moving.state}"
)
still_dist_entity = find_entity(entities, "still_distance", SensorInfo)
assert still_dist_entity is not None
initial_still = initial_state_helper.initial_states.get(still_dist_entity.key)
assert initial_still is not None and isinstance(initial_still, SensorState)
assert initial_still.state == pytest.approx(30.0), (
f"Initial still distance should be 30, got {initial_still.state}"
)
# Verify engineering mode sensors from initial state
# Gate 0 moving energy = 0x64 = 100
gate0_move_entity = find_entity(entities, "gate_0_move_energy", SensorInfo)
assert gate0_move_entity is not None
initial_g0m = initial_state_helper.initial_states.get(gate0_move_entity.key)
assert initial_g0m is not None and isinstance(initial_g0m, SensorState)
assert initial_g0m.state == pytest.approx(100.0), (
f"Gate 0 move energy should be 100, got {initial_g0m.state}"
)
# Gate 1 moving energy = 0x41 = 65
gate1_move_entity = find_entity(entities, "gate_1_move_energy", SensorInfo)
assert gate1_move_entity is not None
initial_g1m = initial_state_helper.initial_states.get(gate1_move_entity.key)
assert initial_g1m is not None and isinstance(initial_g1m, SensorState)
assert initial_g1m.state == pytest.approx(65.0), (
f"Gate 1 move energy should be 65, got {initial_g1m.state}"
)
# Light sensor = 0x57 = 87
light_entity = find_entity(entities, "light", SensorInfo)
assert light_entity is not None
initial_light = initial_state_helper.initial_states.get(light_entity.key)
assert initial_light is not None and isinstance(initial_light, SensorState)
assert initial_light.state == pytest.approx(87.0), (
f"Light sensor should be 87, got {initial_light.state}"
)
assert collector.sensor_states["moving_distance"][0] == pytest.approx(30.0)
assert collector.sensor_states["still_distance"][0] == pytest.approx(30.0)
assert collector.sensor_states["gate_0_move_energy"][0] == pytest.approx(100.0)
assert collector.sensor_states["gate_1_move_energy"][0] == pytest.approx(65.0)
assert collector.sensor_states["light"][0] == pytest.approx(87.0)
# Wait for Phase 3 frame: still_distance = 291cm (multi-byte)
try:
@@ -383,25 +258,18 @@ async def test_uart_mock_ld2412_engineering(
except TimeoutError:
pytest.fail(
f"Timeout waiting for Phase 3 still_distance. Received:\n"
f" still_distance: {sensor_states['still_distance']}\n"
f" moving_distance: {sensor_states['moving_distance']}"
f" still_distance: {collector.sensor_states['still_distance']}"
)
assert pytest.approx(291.0) in sensor_states["still_distance"], (
f"Expected still_distance=291, got: {sensor_states['still_distance']}"
)
assert pytest.approx(291.0) in collector.sensor_states["still_distance"]
# Wait for Phase 3: detection_distance = 291 (still-only target)
# target_state=0x02 so LD2412 uses still_distance for detection_distance.
# The throttle_with_priority filter may delay this value.
try:
await asyncio.wait_for(phase3_detect_received, timeout=3.0)
except TimeoutError:
pytest.fail(
f"Timeout waiting for detection_distance=291 (still-only target). "
f"Received: {sensor_states['detection_distance']}"
f"Timeout waiting for detection_distance=291. "
f"Received: {collector.sensor_states['detection_distance']}"
)
assert pytest.approx(291.0) in sensor_states["detection_distance"], (
f"Expected detection_distance=291, got: {sensor_states['detection_distance']}"
)
assert pytest.approx(291.0) in collector.sensor_states["detection_distance"]
+12 -2
View File
@@ -12,10 +12,10 @@ from __future__ import annotations
import asyncio
from pathlib import Path
from aioesphomeapi import EntityState, SensorState
from aioesphomeapi import ButtonInfo, EntityState, SensorState
import pytest
from .state_utils import InitialStateHelper, build_key_to_entity_mapping
from .state_utils import InitialStateHelper, build_key_to_entity_mapping, find_entity
from .types import APIClientConnectedFactory, RunCompiledFunction
@@ -74,6 +74,11 @@ async def test_uart_mock_modbus(
except TimeoutError:
pytest.fail("Timeout waiting for initial states")
# Start the UART mock scenario now that we're subscribed
start_btn = find_entity(entities, "start_scenario", ButtonInfo)
assert start_btn is not None, "Start Scenario button not found"
client.button_command(start_btn.key)
# Wait for basic register to be updated with successful parse
try:
await asyncio.wait_for(basic_register_changed, timeout=15.0)
@@ -143,6 +148,11 @@ async def test_uart_mock_modbus_timing(
except TimeoutError:
pytest.fail("Timeout waiting for initial states")
# Start the UART mock scenario now that we're subscribed
start_btn = find_entity(entities, "start_scenario", ButtonInfo)
assert start_btn is not None, "Start Scenario button not found"
client.button_command(start_btn.key)
# Wait for voltage to be updated with successful parse
try:
await asyncio.wait_for(voltage_changed, timeout=15.0)