mirror of
https://github.com/esphome/esphome.git
synced 2026-05-22 01:42:49 +08:00
[light] Validate effect names during config validation instead of codegen (#15107)
This commit is contained in:
@@ -24,6 +24,7 @@ from esphome.const import (
|
||||
CONF_ID,
|
||||
CONF_INITIAL_STATE,
|
||||
CONF_MQTT_ID,
|
||||
CONF_NAME,
|
||||
CONF_ON_STATE,
|
||||
CONF_ON_TURN_OFF,
|
||||
CONF_ON_TURN_ON,
|
||||
@@ -41,6 +42,8 @@ from esphome.const import (
|
||||
from esphome.core import CORE, ID, CoroPriority, HexInt, Lambda, coroutine_with_priority
|
||||
from esphome.core.entity_helpers import entity_duplicate_validator, setup_entity
|
||||
from esphome.cpp_generator import MockObjClass
|
||||
import esphome.final_validate as fv
|
||||
from esphome.types import ConfigType
|
||||
|
||||
from .automation import LIGHT_STATE_SCHEMA
|
||||
from .effects import (
|
||||
@@ -70,9 +73,19 @@ IS_PLATFORM_COMPONENT = True
|
||||
DOMAIN = "light"
|
||||
|
||||
|
||||
@dataclass
|
||||
class EffectRef:
|
||||
"""A pending effect name reference from a light action to validate."""
|
||||
|
||||
light_id: ID
|
||||
effect_name: str
|
||||
component_path: list[str | int] # path_context when the action was validated
|
||||
|
||||
|
||||
@dataclass
|
||||
class LightData:
|
||||
gamma_tables: dict = field(default_factory=dict) # gamma_value -> fwd_arr
|
||||
effect_refs: list[EffectRef] = field(default_factory=list)
|
||||
|
||||
|
||||
def _get_data() -> LightData:
|
||||
@@ -115,6 +128,68 @@ def _get_or_create_gamma_table(gamma_correct):
|
||||
return fwd_arr
|
||||
|
||||
|
||||
def find_effect_index(effects: list, effect_name: str) -> int | None:
|
||||
"""Find the 1-based index of an effect by name (case-insensitive).
|
||||
|
||||
Returns the 1-based index if found, or None if not found.
|
||||
"""
|
||||
effect_name_lower = effect_name.lower()
|
||||
for i, effect_conf in enumerate(effects):
|
||||
key = next(iter(effect_conf))
|
||||
if effect_conf[key][CONF_NAME].lower() == effect_name_lower:
|
||||
return i + 1
|
||||
return None
|
||||
|
||||
|
||||
def available_effects_str(effects: list) -> str:
|
||||
"""Return a comma-separated string of available effect names."""
|
||||
available = [
|
||||
effect_conf[next(iter(effect_conf))][CONF_NAME] for effect_conf in effects
|
||||
]
|
||||
return ", ".join(f"'{name}'" for name in available) if available else "none"
|
||||
|
||||
|
||||
def _final_validate(config: ConfigType) -> ConfigType:
|
||||
"""Validate all recorded effect name references against their target lights.
|
||||
|
||||
This runs once per light platform instance. If no light platform is configured,
|
||||
this never runs — but the ID validator will catch the missing light ID separately.
|
||||
"""
|
||||
data = _get_data()
|
||||
if not data.effect_refs:
|
||||
return config
|
||||
|
||||
# Drain the list so we only validate once even though
|
||||
# FINAL_VALIDATE_SCHEMA runs for each light platform instance.
|
||||
refs = data.effect_refs
|
||||
data.effect_refs = []
|
||||
|
||||
fconf = fv.full_config.get()
|
||||
|
||||
for ref in refs:
|
||||
try:
|
||||
light_path = fconf.get_path_for_id(ref.light_id)[:-1]
|
||||
light_config = fconf.get_config_for_path(light_path)
|
||||
except KeyError:
|
||||
# Light ID not found — ID validation will have already reported this
|
||||
continue
|
||||
|
||||
effects = light_config.get(CONF_EFFECTS, [])
|
||||
|
||||
if find_effect_index(effects, ref.effect_name) is None:
|
||||
raise cv.FinalExternalInvalid(
|
||||
f"Effect '{ref.effect_name}' not found for light "
|
||||
f"'{ref.light_id}'. "
|
||||
f"Available effects: {available_effects_str(effects)}",
|
||||
path=[cv.ROOT_CONFIG_PATH] + ref.component_path,
|
||||
)
|
||||
|
||||
return config
|
||||
|
||||
|
||||
FINAL_VALIDATE_SCHEMA = _final_validate
|
||||
|
||||
|
||||
LightRestoreMode = light_ns.enum("LightRestoreMode")
|
||||
RESTORE_MODES = {
|
||||
"RESTORE_DEFAULT_OFF": LightRestoreMode.LIGHT_RESTORE_DEFAULT_OFF,
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
from esphome import automation
|
||||
import esphome.codegen as cg
|
||||
from esphome.config import path_context
|
||||
import esphome.config_validation as cv
|
||||
from esphome.const import (
|
||||
CONF_BLUE,
|
||||
@@ -17,7 +18,6 @@ from esphome.const import (
|
||||
CONF_LIMIT_MODE,
|
||||
CONF_MAX_BRIGHTNESS,
|
||||
CONF_MIN_BRIGHTNESS,
|
||||
CONF_NAME,
|
||||
CONF_RANGE_FROM,
|
||||
CONF_RANGE_TO,
|
||||
CONF_RED,
|
||||
@@ -26,7 +26,7 @@ from esphome.const import (
|
||||
CONF_WARM_WHITE,
|
||||
CONF_WHITE,
|
||||
)
|
||||
from esphome.core import CORE, Lambda
|
||||
from esphome.core import CORE, EsphomeError, Lambda
|
||||
from esphome.cpp_generator import LambdaExpression
|
||||
from esphome.types import ConfigType
|
||||
|
||||
@@ -98,6 +98,31 @@ LIGHT_CONTROL_ACTION_SCHEMA = LIGHT_STATE_SCHEMA.extend(
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
def _record_effect_ref(config: ConfigType) -> ConfigType:
|
||||
"""Record a static effect name reference for later cross-component validation."""
|
||||
if CONF_EFFECT not in config:
|
||||
return config
|
||||
effect = config[CONF_EFFECT]
|
||||
if isinstance(effect, Lambda):
|
||||
return config # Lambda effects resolved at runtime
|
||||
if effect.lower() == "none":
|
||||
return config # "None" is always valid
|
||||
|
||||
from . import EffectRef, _get_data
|
||||
|
||||
_get_data().effect_refs.append(
|
||||
EffectRef(
|
||||
light_id=config[CONF_ID],
|
||||
effect_name=effect,
|
||||
component_path=path_context.get(),
|
||||
)
|
||||
)
|
||||
return config
|
||||
|
||||
|
||||
LIGHT_CONTROL_ACTION_SCHEMA.add_extra(_record_effect_ref)
|
||||
|
||||
LIGHT_TURN_OFF_ACTION_SCHEMA = automation.maybe_simple_id(
|
||||
{
|
||||
cv.Required(CONF_ID): cv.use_id(LightState),
|
||||
@@ -122,18 +147,24 @@ def _resolve_effect_index(config: ConfigType) -> int:
|
||||
Effect index 0 means "None" (no effect). Effects are 1-indexed matching
|
||||
the C++ convention in LightState.
|
||||
"""
|
||||
from . import available_effects_str, find_effect_index
|
||||
|
||||
original_name = config[CONF_EFFECT]
|
||||
effect_name = original_name.lower()
|
||||
if effect_name == "none":
|
||||
if original_name.lower() == "none":
|
||||
return 0
|
||||
light_id = config[CONF_ID]
|
||||
light_path = CORE.config.get_path_for_id(light_id)[:-1]
|
||||
light_config = CORE.config.get_config_for_path(light_path)
|
||||
for i, effect_conf in enumerate(light_config.get(CONF_EFFECTS, [])):
|
||||
key = next(iter(effect_conf))
|
||||
if effect_conf[key][CONF_NAME].lower() == effect_name:
|
||||
return i + 1
|
||||
raise ValueError(f"Effect '{original_name}' not found in light '{light_id}'")
|
||||
effects = light_config.get(CONF_EFFECTS, [])
|
||||
index = find_effect_index(effects, original_name)
|
||||
if index is not None:
|
||||
return index
|
||||
# Should never reach here — effect names are validated during config
|
||||
# validation in FINAL_VALIDATE_SCHEMA. This is a safety net.
|
||||
raise EsphomeError(
|
||||
f"Effect '{original_name}' not found for light '{light_id}'. "
|
||||
f"Available effects: {available_effects_str(effects)}"
|
||||
)
|
||||
|
||||
|
||||
@automation.register_action(
|
||||
|
||||
@@ -0,0 +1,280 @@
|
||||
"""Tests for light effect name validation."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from collections.abc import Generator
|
||||
from contextvars import Token
|
||||
|
||||
import pytest
|
||||
|
||||
from esphome import config_validation as cv
|
||||
from esphome.components.light import (
|
||||
EffectRef,
|
||||
_final_validate,
|
||||
_get_data,
|
||||
available_effects_str,
|
||||
find_effect_index,
|
||||
)
|
||||
from esphome.components.light.automation import _record_effect_ref
|
||||
from esphome.config import Config, path_context
|
||||
from esphome.const import CONF_EFFECT, CONF_EFFECTS, CONF_ID, CONF_NAME
|
||||
from esphome.core import ID, Lambda
|
||||
import esphome.final_validate as fv
|
||||
from esphome.types import ConfigType
|
||||
|
||||
|
||||
def _make_effects(*names: str) -> list[dict[str, dict[str, str]]]:
|
||||
"""Create a list of effect config dicts from names."""
|
||||
return [{f"effect_{i}": {CONF_NAME: name}} for i, name in enumerate(names)]
|
||||
|
||||
|
||||
# --- find_effect_index ---
|
||||
|
||||
|
||||
def test_find_effect_index_found() -> None:
|
||||
effects = _make_effects("Fast Pulse", "Slow Pulse")
|
||||
assert find_effect_index(effects, "Fast Pulse") == 1
|
||||
assert find_effect_index(effects, "Slow Pulse") == 2
|
||||
|
||||
|
||||
def test_find_effect_index_case_insensitive() -> None:
|
||||
effects = _make_effects("Fast Pulse")
|
||||
assert find_effect_index(effects, "fast pulse") == 1
|
||||
assert find_effect_index(effects, "FAST PULSE") == 1
|
||||
|
||||
|
||||
def test_find_effect_index_not_found() -> None:
|
||||
effects = _make_effects("Fast Pulse", "Slow Pulse")
|
||||
assert find_effect_index(effects, "Missing") is None
|
||||
|
||||
|
||||
def test_find_effect_index_empty() -> None:
|
||||
assert find_effect_index([], "anything") is None
|
||||
|
||||
|
||||
# --- available_effects_str ---
|
||||
|
||||
|
||||
def test_available_effects_str_multiple() -> None:
|
||||
effects = _make_effects("Fast Pulse", "Slow Pulse")
|
||||
assert available_effects_str(effects) == "'Fast Pulse', 'Slow Pulse'"
|
||||
|
||||
|
||||
def test_available_effects_str_single() -> None:
|
||||
effects = _make_effects("Fast Pulse")
|
||||
assert available_effects_str(effects) == "'Fast Pulse'"
|
||||
|
||||
|
||||
def test_available_effects_str_empty() -> None:
|
||||
assert available_effects_str([]) == "none"
|
||||
|
||||
|
||||
# --- _final_validate ---
|
||||
|
||||
|
||||
def _setup_final_validate(
|
||||
effect_refs: list[EffectRef],
|
||||
light_configs: list[ConfigType],
|
||||
declare_ids: list[tuple[ID, list[str | int]]],
|
||||
) -> Token:
|
||||
"""Set up CORE.data and fv.full_config for _final_validate tests."""
|
||||
data = _get_data()
|
||||
data.effect_refs = effect_refs
|
||||
|
||||
full_conf = Config()
|
||||
full_conf["light"] = light_configs
|
||||
for id_, path in declare_ids:
|
||||
full_conf.declare_ids.append((id_, path))
|
||||
|
||||
return fv.full_config.set(full_conf)
|
||||
|
||||
|
||||
def test_final_validate_valid_effect() -> None:
|
||||
"""Valid effect name should not raise."""
|
||||
light_id = ID("led1", is_declaration=True)
|
||||
token = _setup_final_validate(
|
||||
effect_refs=[
|
||||
EffectRef(
|
||||
light_id=light_id, effect_name="Fast Pulse", component_path=["esphome"]
|
||||
),
|
||||
],
|
||||
light_configs=[
|
||||
{CONF_ID: light_id, CONF_EFFECTS: _make_effects("Fast Pulse", "Slow Pulse")}
|
||||
],
|
||||
declare_ids=[(light_id, ["light", 0, CONF_ID])],
|
||||
)
|
||||
try:
|
||||
_final_validate({})
|
||||
finally:
|
||||
fv.full_config.reset(token)
|
||||
|
||||
|
||||
def test_final_validate_invalid_effect_raises() -> None:
|
||||
"""Invalid effect name should raise FinalExternalInvalid."""
|
||||
light_id = ID("led1", is_declaration=True)
|
||||
token = _setup_final_validate(
|
||||
effect_refs=[
|
||||
EffectRef(
|
||||
light_id=light_id, effect_name="Nonexistent", component_path=["esphome"]
|
||||
),
|
||||
],
|
||||
light_configs=[
|
||||
{CONF_ID: light_id, CONF_EFFECTS: _make_effects("Fast Pulse", "Slow Pulse")}
|
||||
],
|
||||
declare_ids=[(light_id, ["light", 0, CONF_ID])],
|
||||
)
|
||||
try:
|
||||
with pytest.raises(cv.FinalExternalInvalid, match="Nonexistent"):
|
||||
_final_validate({})
|
||||
finally:
|
||||
fv.full_config.reset(token)
|
||||
|
||||
|
||||
def test_final_validate_lists_available_effects() -> None:
|
||||
"""Error message should list available effects."""
|
||||
light_id = ID("led1", is_declaration=True)
|
||||
token = _setup_final_validate(
|
||||
effect_refs=[
|
||||
EffectRef(
|
||||
light_id=light_id, effect_name="Missing", component_path=["esphome"]
|
||||
),
|
||||
],
|
||||
light_configs=[
|
||||
{CONF_ID: light_id, CONF_EFFECTS: _make_effects("Fast Pulse", "Slow Pulse")}
|
||||
],
|
||||
declare_ids=[(light_id, ["light", 0, CONF_ID])],
|
||||
)
|
||||
try:
|
||||
with pytest.raises(cv.FinalExternalInvalid, match="'Fast Pulse', 'Slow Pulse'"):
|
||||
_final_validate({})
|
||||
finally:
|
||||
fv.full_config.reset(token)
|
||||
|
||||
|
||||
def test_final_validate_no_effects_on_light() -> None:
|
||||
"""Light with no effects should report 'none' as available."""
|
||||
light_id = ID("led1", is_declaration=True)
|
||||
token = _setup_final_validate(
|
||||
effect_refs=[
|
||||
EffectRef(
|
||||
light_id=light_id, effect_name="Missing", component_path=["esphome"]
|
||||
),
|
||||
],
|
||||
light_configs=[{CONF_ID: light_id}],
|
||||
declare_ids=[(light_id, ["light", 0, CONF_ID])],
|
||||
)
|
||||
try:
|
||||
with pytest.raises(cv.FinalExternalInvalid, match="Available effects: none"):
|
||||
_final_validate({})
|
||||
finally:
|
||||
fv.full_config.reset(token)
|
||||
|
||||
|
||||
def test_final_validate_no_refs_is_noop() -> None:
|
||||
"""No stored refs should pass without error."""
|
||||
data = _get_data()
|
||||
data.effect_refs = []
|
||||
_final_validate({})
|
||||
|
||||
|
||||
def test_final_validate_unknown_light_id_skipped() -> None:
|
||||
"""Refs to unknown light IDs should be silently skipped."""
|
||||
data = _get_data()
|
||||
data.effect_refs = [
|
||||
EffectRef(
|
||||
light_id=ID("nonexistent", is_declaration=True),
|
||||
effect_name="Missing",
|
||||
component_path=["esphome"],
|
||||
)
|
||||
]
|
||||
|
||||
full_conf = Config()
|
||||
token = fv.full_config.set(full_conf)
|
||||
try:
|
||||
_final_validate({})
|
||||
finally:
|
||||
fv.full_config.reset(token)
|
||||
|
||||
|
||||
def test_final_validate_drains_refs() -> None:
|
||||
"""Refs should be drained after validation to avoid redundant runs."""
|
||||
light_id = ID("led1", is_declaration=True)
|
||||
token = _setup_final_validate(
|
||||
effect_refs=[
|
||||
EffectRef(
|
||||
light_id=light_id, effect_name="Fast Pulse", component_path=["esphome"]
|
||||
),
|
||||
],
|
||||
light_configs=[{CONF_ID: light_id, CONF_EFFECTS: _make_effects("Fast Pulse")}],
|
||||
declare_ids=[(light_id, ["light", 0, CONF_ID])],
|
||||
)
|
||||
try:
|
||||
_final_validate({})
|
||||
assert _get_data().effect_refs == []
|
||||
finally:
|
||||
fv.full_config.reset(token)
|
||||
|
||||
|
||||
# --- _record_effect_ref ---
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def _path_ctx() -> Generator[None]:
|
||||
"""Set path_context for _record_effect_ref tests."""
|
||||
token = path_context.set(["esphome"])
|
||||
yield
|
||||
path_context.reset(token)
|
||||
|
||||
|
||||
@pytest.mark.usefixtures("_path_ctx")
|
||||
def test_record_effect_ref_static() -> None:
|
||||
"""Static effect name should be recorded."""
|
||||
light_id = ID("led1", is_declaration=True)
|
||||
config: ConfigType = {CONF_ID: light_id, CONF_EFFECT: "Fast Pulse"}
|
||||
result = _record_effect_ref(config)
|
||||
assert result is config
|
||||
data = _get_data()
|
||||
assert len(data.effect_refs) == 1
|
||||
assert data.effect_refs[0].effect_name == "Fast Pulse"
|
||||
assert data.effect_refs[0].light_id is light_id
|
||||
assert data.effect_refs[0].component_path == ["esphome"]
|
||||
|
||||
|
||||
@pytest.mark.usefixtures("_path_ctx")
|
||||
def test_record_effect_ref_skips_lambda() -> None:
|
||||
"""Lambda effect should not be recorded."""
|
||||
config: ConfigType = {
|
||||
CONF_ID: ID("led1", is_declaration=True),
|
||||
CONF_EFFECT: Lambda("return effect;"),
|
||||
}
|
||||
_record_effect_ref(config)
|
||||
assert _get_data().effect_refs == []
|
||||
|
||||
|
||||
@pytest.mark.usefixtures("_path_ctx")
|
||||
def test_record_effect_ref_skips_none() -> None:
|
||||
"""Effect 'None' should not be recorded."""
|
||||
config: ConfigType = {
|
||||
CONF_ID: ID("led1", is_declaration=True),
|
||||
CONF_EFFECT: "None",
|
||||
}
|
||||
_record_effect_ref(config)
|
||||
assert _get_data().effect_refs == []
|
||||
|
||||
|
||||
@pytest.mark.usefixtures("_path_ctx")
|
||||
def test_record_effect_ref_skips_none_case_insensitive() -> None:
|
||||
"""Effect 'none' (lowercase) should not be recorded."""
|
||||
config: ConfigType = {
|
||||
CONF_ID: ID("led1", is_declaration=True),
|
||||
CONF_EFFECT: "none",
|
||||
}
|
||||
_record_effect_ref(config)
|
||||
assert _get_data().effect_refs == []
|
||||
|
||||
|
||||
def test_record_effect_ref_skips_no_effect_key() -> None:
|
||||
"""Config without effect key should be a no-op."""
|
||||
config: ConfigType = {CONF_ID: ID("led1", is_declaration=True)}
|
||||
_record_effect_ref(config)
|
||||
assert _get_data().effect_refs == []
|
||||
Reference in New Issue
Block a user