[substitutions] Improve error messages with include stack trace (#15874)

Co-authored-by: J. Nick Koston <nick@home-assistant.io>
This commit is contained in:
Javier Peletier
2026-04-22 03:19:01 +02:00
committed by GitHub
parent b20fedd806
commit 9cebce1b6e
6 changed files with 432 additions and 86 deletions
+45 -19
View File
@@ -42,6 +42,11 @@ DOMAIN = CONF_PACKAGES
# Guard against infinite include chains (e.g. A includes B includes A).
MAX_INCLUDE_DEPTH = 20
PackageCallback = Callable[
[dict | str | yaml_util.IncludeFile, ContextVars | None, yaml_util.DocumentPath],
dict,
]
def is_remote_package(package_config: dict) -> bool:
"""Returns True if the package_config is a remote package definition."""
@@ -281,8 +286,9 @@ def _process_remote_package(config: dict, skip_update: bool = False) -> dict:
def _walk_package_dict(
packages: dict,
callback: Callable[[dict, ContextVars | None], dict],
callback: PackageCallback,
context: ContextVars | None,
path: yaml_util.DocumentPath,
) -> cv.Invalid | None:
"""Iterate a packages dict in reverse priority order, invoking callback on each entry.
@@ -291,7 +297,9 @@ def _walk_package_dict(
for package_name, package_config in reversed(packages.items()):
with cv.prepend_path(package_name):
try:
packages[package_name] = callback(package_config, context)
packages[package_name] = callback(
package_config, context, path + [package_name]
)
except cv.Invalid as err:
return err
return None
@@ -299,20 +307,22 @@ def _walk_package_dict(
def _walk_package_list(
packages: list,
callback: Callable[[dict, ContextVars | None], dict],
callback: PackageCallback,
context: ContextVars | None,
path: yaml_util.DocumentPath,
) -> None:
"""Iterate a packages list in reverse priority order, invoking callback on each entry."""
for idx in reversed(range(len(packages))):
with cv.prepend_path(idx):
packages[idx] = callback(packages[idx], context)
packages[idx] = callback(packages[idx], context, path + [idx])
def _walk_packages(
config: dict,
callback: Callable[[dict, ContextVars | None], dict],
callback: PackageCallback,
context: ContextVars | None = None,
validate_deprecated: bool = True,
path: yaml_util.DocumentPath | None = None,
) -> dict:
"""Walks the packages structure in priority order, invoking ``callback`` on each package definition found.
@@ -323,19 +333,24 @@ def _walk_packages(
if CONF_PACKAGES not in config:
return config
packages = config[CONF_PACKAGES]
packages_path = (path or []) + [CONF_PACKAGES]
with cv.prepend_path(CONF_PACKAGES):
if isinstance(packages, yaml_util.IncludeFile):
# If the packages key is an IncludeFile, resolve it first before processing.
packages, _ = resolve_include(packages, [], context, strict_undefined=False)
packages = resolve_include(
packages, packages_path, context, strict_undefined=False
)
if not isinstance(packages, (dict, list)):
raise cv.Invalid(
f"Packages must be a key to value mapping or list, got {type(packages)} instead"
)
if not isinstance(packages, dict):
_walk_package_list(packages, callback, context)
elif (result := _walk_package_dict(packages, callback, context)) is not None:
_walk_package_list(packages, callback, context, packages_path)
elif (
result := _walk_package_dict(packages, callback, context, packages_path)
) is not None:
if not validate_deprecated or any(
is_package_definition(v) for v in packages.values()
):
@@ -344,14 +359,18 @@ def _walk_packages(
# This block can be removed once the single-package
# deprecation period (2026.7.0) is over.
config[CONF_PACKAGES] = [packages]
return _walk_packages(deprecate_single_package(config), callback, context)
return _walk_packages(
deprecate_single_package(config), callback, context, path=path
)
config[CONF_PACKAGES] = packages
return config
def _substitute_package_definition(
package_config: dict | str, context_vars: ContextVars | None
package_config: dict | str,
context_vars: ContextVars | None,
path: yaml_util.DocumentPath | None = None,
) -> dict | str:
"""Substitute variables in a package definition string or remote package dict.
@@ -369,12 +388,12 @@ def _substitute_package_definition(
errors: ErrList = []
package_config = substitute(
item=package_config,
path=[],
path=path or [],
parent_context=context_vars or ContextVars(),
strict_undefined=False,
errors=errors,
)
raise_first_undefined(errors, package_config, "package definition")
raise_first_undefined(errors, "package definition")
return package_config
@@ -432,6 +451,7 @@ class _PackageProcessor:
self,
package_config: dict | str | yaml_util.IncludeFile,
context_vars: ContextVars | None,
path: yaml_util.DocumentPath,
) -> dict:
"""Resolve a package definition to a concrete ``dict`` and fetch remote packages.
@@ -454,15 +474,15 @@ class _PackageProcessor:
"""
for _ in range(MAX_INCLUDE_DEPTH):
if isinstance(package_config, yaml_util.IncludeFile):
package_config, _ = resolve_include(
package_config = resolve_include(
package_config,
[],
path,
context_vars or ContextVars(),
strict_undefined=False,
)
package_config = _substitute_package_definition(
package_config, context_vars
package_config, context_vars, path
)
package_config = PACKAGE_SCHEMA(package_config)
if isinstance(package_config, dict):
@@ -483,13 +503,16 @@ class _PackageProcessor:
_update_substitutions_context(self.parent_context, subs)
def process_package(
self, package_config: dict | str, context_vars: ContextVars | None
self,
package_config: dict | str,
context_vars: ContextVars | None,
path: yaml_util.DocumentPath,
) -> dict:
"""Resolve a single package and recurse into any nested packages."""
from_remote = isinstance(package_config, dict) and is_remote_package(
package_config
)
package_config = self.resolve_package(package_config, context_vars)
package_config = self.resolve_package(package_config, context_vars, path)
self.collect_substitutions(package_config)
if CONF_PACKAGES not in package_config:
@@ -509,6 +532,7 @@ class _PackageProcessor:
self.process_package,
context_vars,
validate_deprecated=not from_remote,
path=path,
)
@@ -565,11 +589,13 @@ def merge_packages(config: dict) -> dict:
merge_list: list[dict] = []
def process_package_callback(
package_config: dict, context: ContextVars | None
package_config: dict,
context: ContextVars | None,
path: yaml_util.DocumentPath | None = None,
) -> dict:
"""This will be called for each package found in the config."""
merge_list.append(package_config)
return _walk_packages(package_config, process_package_callback)
return _walk_packages(package_config, process_package_callback, path=path)
_walk_packages(config, process_package_callback, validate_deprecated=False)
# Merge all packages into the main config:
+29 -54
View File
@@ -11,9 +11,11 @@ from esphome.types import ConfigType
from esphome.util import OrderedDict
from esphome.yaml_util import (
ConfigContext,
DocumentPath,
ESPHomeDataBase,
ESPLiteralValue,
IncludeFile,
format_path,
make_data_base,
)
@@ -23,8 +25,8 @@ CODEOWNERS = ["@esphome/core"]
_LOGGER = logging.getLogger(__name__)
ContextVars = ChainMap[str, Any]
SubstitutionPath = list[int | str]
ErrList = list[tuple[UndefinedError, SubstitutionPath, Any]]
ErrList = list[tuple[UndefinedError, DocumentPath, Any]]
# Module-level instance is safe: context_vars is passed per-call, and context_trace
# is stack-saved/restored within expand(). Not thread-safe — only use from one thread.
jinja = Jinja()
@@ -32,16 +34,13 @@ jinja = Jinja()
def raise_first_undefined(
errors: ErrList,
source: Any,
context_label: str,
) -> None:
"""If *errors* is non-empty, raise ``cv.Invalid`` for the first undefined variable.
The raised error names the missing variable, the path walked into *source*
(for nested dicts, e.g. ``url`` or ``ref``), and the YAML source location
when *source* carries one. Only the first error is surfaced; the user will
re-run after fixing it and any remaining undefined variables will be
reported then.
The raised error names the missing variable and its location in the include
stack. Only the first error is surfaced; the user will re-run after fixing it
and any remaining undefined variables will be reported then.
``context_label`` is the noun describing where the undefined variable
appeared (e.g. ``"package definition"``).
@@ -57,26 +56,8 @@ def raise_first_undefined(
for e, p_path, _ in errors[1:]
)
_LOGGER.debug("Additional undefined variables in %s: %s", context_label, extras)
# Prefer the location of the offending scalar (e.g. the `url:` value) over
# the enclosing package-definition dict so the message points at the exact
# line/column that carries the undefined variable.
location_node = (
err_value
if isinstance(err_value, ESPHomeDataBase) and err_value.esp_range is not None
else source
)
location = ""
if (
isinstance(location_node, ESPHomeDataBase)
and location_node.esp_range is not None
):
mark = location_node.esp_range.start_mark
# DocumentLocation.line/column are 0-based (from the YAML Mark). Render
# as 1-based to match config.line_info() and editor line numbering.
location = f" (in {mark.document} {mark.line + 1}:{mark.column + 1})"
field = f" at '{'->'.join(str(p) for p in err_path)}'" if err_path else ""
raise cv.Invalid(
f"Undefined variable in {context_label}{field}: {err.message}{location}"
f"Undefined variable in {context_label}: {err.message}\n{format_path(err_path, err_value)}"
)
@@ -145,7 +126,7 @@ def _resolve_var(name: str, context_vars: ContextVars) -> Any:
def _handle_undefined(
err: UndefinedError,
path: SubstitutionPath,
path: DocumentPath,
value: Any,
strict_undefined: bool,
errors: ErrList | None,
@@ -163,7 +144,7 @@ def _handle_undefined(
def _expand_substitutions(
value: str,
path: SubstitutionPath,
path: DocumentPath,
context_vars: ContextVars,
strict_undefined: bool,
errors: ErrList | None,
@@ -236,7 +217,7 @@ def _expand_substitutions(
f"\nEvaluation stack: (most recent evaluation last)"
f"\n{err.stack_trace_str()}"
f"\nRelevant context:\n{err.context_trace_str()}"
f"\nSee {'->'.join(str(x) for x in path)}",
f"\n{format_path(path, orig_value)}",
path,
) from err
else:
@@ -345,15 +326,13 @@ def push_context(
def resolve_include(
include: IncludeFile,
path: list[int | str],
path: DocumentPath,
context_vars: ContextVars,
strict_undefined: bool = True,
errors: ErrList | None = None,
) -> tuple[Any, str]:
) -> Any:
"""Resolve an include, substituting the filename if needed.
Returns the loaded content and the resolved filename.
Note: no path-traversal validation is performed on the resolved filename.
A substitution that resolves to an absolute path will bypass the parent
directory (Path.__truediv__ ignores the left operand for absolute paths).
@@ -361,44 +340,44 @@ def resolve_include(
values (including command-line substitutions), so path restrictions are
an explicit non-goal here.
"""
original = str(include.file)
original = include.file
original_str = str(original)
filename = str(
_expand_substitutions(
original, path + ["file"], context_vars, strict_undefined, errors
original_str, path + ["file"], context_vars, strict_undefined, errors
)
)
if filename != original:
substituted = filename != original_str
if substituted:
include = IncludeFile(
include.parent_file, filename, include.vars, include.yaml_loader
)
try:
return include.load(), filename
return include.load()
except esphome.core.EsphomeError as err:
resolved = f" (expanded from '{original}')" if substituted else ""
raise cv.Invalid(
f"Error including file '{filename}': {err}",
f"Error including file '{filename}'{resolved}: {err}"
f"\n{format_path(path, original)}",
path + [f"<{filename}>"],
) from err
def _substitute_include(
include: IncludeFile,
path: list[int | str],
path: DocumentPath,
context_vars: ContextVars,
strict_undefined: bool,
errors: ErrList | None,
) -> Any:
"""Resolve an include and substitute its content."""
content, filename = resolve_include(
include, path, context_vars, strict_undefined, errors
)
return substitute(
content, path + [f"<{filename}>"], context_vars, strict_undefined, errors
)
content = resolve_include(include, path, context_vars, strict_undefined, errors)
return substitute(content, path, context_vars, strict_undefined, errors)
def substitute(
item: Any,
path: SubstitutionPath,
path: DocumentPath,
parent_context: ContextVars,
strict_undefined: bool,
errors: ErrList | None = None,
@@ -451,16 +430,12 @@ def _warn_unresolved_variables(errors: ErrList) -> None:
for err, path, expression in errors:
if "password" in path:
continue
location: str = "->".join(str(x) for x in path)
if isinstance(expression, ESPHomeDataBase) and expression.esp_range is not None:
location += f" in {str(expression.esp_range.start_mark)}"
_LOGGER.warning(
"The string '%s' looks like an expression,"
" but could not resolve all the variables: %s (see %s)",
" but could not resolve all the variables: %s\n%s",
expression,
err.message,
location,
format_path(path, expression),
)
@@ -479,7 +454,7 @@ def resolve_substitutions_block(
# Single-shot resolution — matches ``_walk_packages`` for the
# ``packages: !include`` entry point. Chained includes (an include that
# itself loads another ``!include`` at the top level) are not supported.
substitutions, _ = resolve_include(
substitutions = resolve_include(
substitutions,
[],
ContextVars(command_line_substitutions or {}),
+119
View File
@@ -48,6 +48,8 @@ _SECRET_VALUES = {}
# Not thread-safe — config processing is single-threaded today.
_load_listeners: list[Callable[[Path], None]] = []
DocumentPath = list[str | int]
@contextmanager
def track_yaml_loads() -> Generator[list[Path]]:
@@ -679,6 +681,123 @@ def is_secret(value):
return None
def _path_doc(item: Any) -> str | None:
"""Return the source document name if *item* carries location info."""
if isinstance(item, ESPHomeDataBase) and (r := item.esp_range) is not None:
return r.start_mark.document
return None
def _fmt_mark(loc: Any) -> str:
"""Render a DocumentLocation as a 1-based 'file line:col' string."""
return f"{loc.document} {loc.line + 1}:{loc.column + 1}"
def _obj_loc(obj: Any) -> str:
"""Return formatted source location for *obj*, or '' if it has none."""
if isinstance(obj, ESPHomeDataBase) and (r := obj.esp_range) is not None:
return _fmt_mark(r.start_mark)
return ""
def _fmt_segment(seg: list) -> str:
"""Format a path segment, rendering integers as [n] subscripts."""
parts: list[str] = []
for item in seg:
if isinstance(item, int):
if parts:
parts[-1] = f"{parts[-1]}[{item}]"
else:
parts.append(f"[{item}]")
else:
parts.append(str(item))
return "->".join(parts)
def _split_into_frames(
path: DocumentPath,
) -> list[tuple[list, str]]:
"""Group *path* into per-file frames at include boundaries.
A "frame" is the slice of the path that belongs to one source document.
Each path item is either:
* a **located key** — has an ``ESPHomeDataBase`` source mark; this is
what tells us which document owns the surrounding keys.
* an **integer** — a list subscript; always attaches to the open frame
(renders as ``foo[3]`` on the previous name).
* an **unlocated string** — a key with no source mark (e.g. constants
like ``CONF_PACKAGES``); it describes the parent of the *next* file,
so it migrates to the next frame when the document changes.
Returns a list of ``(items, "file line:col")`` tuples in walk order
(outermost frame first).
"""
frames: list[tuple[list, str]] = []
open_frame: list = []
next_frame_keys: list = [] # unlocated strings buffered for the next frame
open_doc: str | None = None
open_loc = ""
for item in path:
doc = _path_doc(item)
if doc is None:
# Ints subscript the open frame's last name; everything else
# (strings, or leading ints with no open frame) is buffered for
# the next frame.
if isinstance(item, int) and open_doc is not None:
open_frame.append(item)
else:
next_frame_keys.append(item)
continue
if open_doc is not None and doc != open_doc:
# Crossed an include boundary: close the open frame.
frames.append((open_frame, open_loc))
open_frame = []
open_frame.extend(next_frame_keys)
next_frame_keys.clear()
open_frame.append(item)
open_doc = doc
open_loc = _fmt_mark(item.esp_range.start_mark)
if open_doc is not None:
# Trailing buffered keys belong to the innermost (last) frame.
open_frame.extend(next_frame_keys)
frames.append((open_frame, open_loc))
return frames
def format_path(path: DocumentPath, current_obj: Any) -> str:
"""Build a human-readable include stack from a config path.
Each YAML key in *path* that carries an ``ESPHomeDataBase`` ``esp_range``
reveals which file it came from. When the source document changes between
consecutive such keys, that is an include boundary. The path is split
into per-file frames and formatted innermost-first, e.g.::
In: packages->roam in common/package/wifi.yaml 26:10
Included from packages->net in common/hardware.yaml 44:2
Included from packages->device in my_project.yaml 11:2
The innermost ``In:`` line uses the location from *current_obj* when
available (the value that triggered the error) for extra precision.
"""
frames = _split_into_frames(path)
obj_loc = _obj_loc(current_obj)
if not frames:
# No source info anywhere in the path: render as a flat path,
# using current_obj's location if it happens to have one.
suffix = f" in {obj_loc}" if obj_loc else ""
return f"In: {_fmt_segment(path)}{suffix}"
inner_seg, inner_loc = frames[-1]
lines = [f"In: {_fmt_segment(inner_seg)} in {obj_loc or inner_loc}"]
for seg, loc in reversed(frames[:-1]):
lines.append(f" Included from {_fmt_segment(seg)} in {loc}")
return "\n".join(lines)
class ESPHomeDumper(yaml.SafeDumper):
def represent_mapping(self, tag, mapping, flow_style=None):
value = []
@@ -46,7 +46,7 @@ from esphome.const import (
)
from esphome.core import CORE
from esphome.util import OrderedDict
from esphome.yaml_util import IncludeFile, add_context, load_yaml
from esphome.yaml_util import DocumentPath, IncludeFile, add_context, load_yaml
# Test strings
TEST_DEVICE_NAME = "test_device_name"
@@ -1113,7 +1113,7 @@ def test_packages_include_file_resolves_to_list(mock_resolve_include) -> None:
"""When packages: is an IncludeFile that resolves to a list, it is processed correctly."""
include_file = MagicMock(spec=IncludeFile)
package_content = {CONF_WIFI: {CONF_SSID: TEST_PACKAGE_WIFI_SSID}}
mock_resolve_include.return_value = ([package_content], None)
mock_resolve_include.return_value = [package_content]
config = {CONF_PACKAGES: include_file}
result = do_packages_pass(config)
@@ -1127,7 +1127,7 @@ def test_packages_include_file_resolves_to_dict(mock_resolve_include) -> None:
"""When packages: is an IncludeFile that resolves to a dict, it is processed correctly."""
include_file = MagicMock(spec=IncludeFile)
package_content = {CONF_WIFI: {CONF_SSID: TEST_PACKAGE_WIFI_SSID}}
mock_resolve_include.return_value = ({"network": package_content}, None)
mock_resolve_include.return_value = {"network": package_content}
config = {CONF_PACKAGES: include_file}
result = do_packages_pass(config)
@@ -1142,7 +1142,7 @@ def test_packages_include_file_resolves_to_invalid_type_raises(
) -> None:
"""When packages: is an IncludeFile that resolves to an invalid type, cv.Invalid is raised."""
include_file = MagicMock(spec=IncludeFile)
mock_resolve_include.return_value = ("not_a_dict_or_list", None)
mock_resolve_include.return_value = "not_a_dict_or_list"
config = {CONF_PACKAGES: include_file}
with pytest.raises(
@@ -1215,7 +1215,9 @@ def test_named_dict_with_include_files_no_false_deprecation_warning(
call_count = 0
def failing_callback(package_config: dict, context: object) -> dict:
def failing_callback(
package_config: dict, context: object, path: DocumentPath | None = None
) -> dict:
nonlocal call_count
call_count += 1
if call_count == 1:
@@ -1251,7 +1253,9 @@ def test_validate_deprecated_false_raises_directly(
call_count = 0
def failing_callback(package_config: dict, context: object) -> dict:
def failing_callback(
package_config: dict, context: object, path: DocumentPath | None = None
) -> dict:
nonlocal call_count
call_count += 1
if call_count == 1:
@@ -1283,7 +1287,9 @@ def test_error_on_first_declared_package_still_detected() -> None:
call_count = 0
def fail_on_last(package_config: dict, context: object) -> dict:
def fail_on_last(
package_config: dict, context: object, path: DocumentPath | None = None
) -> dict:
nonlocal call_count
call_count += 1
# Reverse iteration: third_pkg (1), second_pkg (2), first_pkg (3)
@@ -1312,7 +1318,9 @@ def test_deprecated_single_package_fallback_still_works(
attempt = 0
def fail_then_succeed(package_config: dict, context: object) -> dict:
def fail_then_succeed(
package_config: dict, context: object, path: DocumentPath | None = None
) -> dict:
nonlocal attempt
attempt += 1
if attempt == 1:
+43 -4
View File
@@ -659,7 +659,7 @@ def test_resolve_package_max_depth_exceeded(tmp_path: Path) -> None:
cv.Invalid,
match=f"Maximum include nesting depth \\({MAX_INCLUDE_DEPTH}\\) exceeded",
):
processor.resolve_package(package_config, substitutions.ContextVars())
processor.resolve_package(package_config, substitutions.ContextVars(), [])
def test_include_filename_substitution_undefined_var(tmp_path: Path) -> None:
@@ -690,7 +690,7 @@ def test_raise_first_undefined_logs_extras_at_debug(
caplog.at_level(logging.DEBUG, logger="esphome.components.substitutions"),
pytest.raises(cv.Invalid) as exc_info,
):
substitutions.raise_first_undefined(errors, None, "package definition")
substitutions.raise_first_undefined(errors, "package definition")
# First error is surfaced as the cv.Invalid message.
raised = str(exc_info.value)
@@ -706,7 +706,7 @@ def test_raise_first_undefined_logs_extras_at_debug(
def test_raise_first_undefined_noop_on_empty() -> None:
"""An empty errors list is a no-op — no exception, no log."""
substitutions.raise_first_undefined([], None, "package definition")
substitutions.raise_first_undefined([], "package definition")
def test_do_substitution_pass_included_substitutions_must_be_mapping(
@@ -778,4 +778,43 @@ def test_resolve_package_undefined_var_in_include_filename(tmp_path: Path) -> No
)
processor = _PackageProcessor({}, None, False)
with pytest.raises(cv.Invalid, match="unresolved substitutions"):
processor.resolve_package(package_config, substitutions.ContextVars())
processor.resolve_package(package_config, substitutions.ContextVars(), [])
def test_resolve_include_error_shows_expanded_from_when_substituted(
tmp_path: Path,
) -> None:
"""When a substituted filename fails to load, the error includes '(expanded from ...)'."""
parent = tmp_path / "main.yaml"
parent.write_text("")
def failing_loader(_path: Path) -> None:
raise EsphomeError("File not found")
include = yaml_util.IncludeFile(parent, "${device}.yaml", None, failing_loader)
context = substitutions.ContextVars({"device": "my_device"})
with pytest.raises(cv.Invalid) as exc_info:
substitutions.resolve_include(include, [], context)
msg = str(exc_info.value)
assert "my_device.yaml" in msg
assert "expanded from '${device}.yaml'" in msg
def test_resolve_include_error_no_expanded_from_for_literal_filename(
tmp_path: Path,
) -> None:
"""When a literal filename fails to load, the error has no 'expanded from' clause."""
parent = tmp_path / "main.yaml"
parent.write_text("")
def failing_loader(_path: Path) -> None:
raise EsphomeError("File not found")
include = yaml_util.IncludeFile(parent, "literal.yaml", None, failing_loader)
with pytest.raises(cv.Invalid) as exc_info:
substitutions.resolve_include(include, [], substitutions.ContextVars())
assert "expanded from" not in str(exc_info.value)
+180 -1
View File
@@ -9,8 +9,9 @@ from esphome import core, yaml_util
from esphome.components import substitutions
from esphome.config_helpers import Extend, Remove
import esphome.config_validation as cv
from esphome.core import EsphomeError
from esphome.core import DocumentLocation, DocumentRange, EsphomeError
from esphome.util import OrderedDict
from esphome.yaml_util import ESPHomeDataBase, format_path, make_data_base
@pytest.fixture(autouse=True)
@@ -712,3 +713,181 @@ def test_yaml_merge_chain_include_depth_exceeded() -> None:
yaml_text = "base:\n <<: !include loop.yaml\n"
with pytest.raises(EsphomeError, match="Maximum include chain depth"):
yaml_util.parse_yaml(parent, io.StringIO(yaml_text), self_referencing_loader)
def _located(value, doc: str, line: int, col: int):
"""Return *value* wrapped with a fake ESPHomeDataBase source location."""
loc = DocumentLocation(doc, line, col)
obj = make_data_base(value)
if isinstance(obj, ESPHomeDataBase):
obj._esp_range = DocumentRange(loc, loc)
return obj
def test_format_path_no_location_info_returns_flat_path():
"""Plain path items with no esp_range produce a simple flat 'In:' line."""
result = format_path(["wifi", "ssid"], None)
assert result == "In: wifi->ssid"
def test_format_path_no_location_info_current_obj_adds_file():
"""When path has no location but current_obj does, its location is shown."""
obj = _located("${var}", "main.yaml", 5, 10)
result = format_path(["wifi", "ssid"], obj)
assert result == "In: wifi->ssid in main.yaml 6:11"
def test_format_path_single_frame_no_include_boundary():
"""All located keys from the same document → single 'In:' line, no 'Included from'."""
path = ["packages", _located("pkg1", "root.yaml", 5, 2)]
result = format_path(path, None)
assert result.startswith("In: packages->pkg1 in root.yaml 6:3")
assert "Included from" not in result
def test_format_path_two_frames_shows_included_from():
"""Keys from two different documents produce 'In:' + one 'Included from' line."""
path = [
"packages",
_located("device", "root.yaml", 10, 2),
"packages",
_located("inner", "hardware.yaml", 3, 2),
]
result = format_path(path, None)
assert "In: packages->inner in hardware.yaml 4:3" in result
assert "Included from packages->device in root.yaml 11:3" in result
def test_format_path_three_frames_full_include_stack():
"""Three document levels produce two 'Included from' lines in correct order."""
path = [
"packages",
_located("device", "root.yaml", 10, 2),
"packages",
_located("_wifi_", "hardware.yaml", 43, 2),
"packages",
_located("_roam_", "wifi.yaml", 25, 2),
]
result = format_path(path, None)
lines = result.splitlines()
assert lines[0].startswith("In: packages->_roam_ in wifi.yaml")
assert lines[1].startswith(" Included from packages->_wifi_ in hardware.yaml")
assert lines[2].startswith(" Included from packages->device in root.yaml")
def test_format_path_current_obj_overrides_innermost_location():
"""current_obj's esp_range replaces the key's column for the 'In:' line."""
path = ["packages", _located("pkg1", "root.yaml", 5, 2)]
# Value (the expression) sits at column 10, not column 2 like the key
value = _located("${undefined}", "root.yaml", 5, 10)
result = format_path(path, value)
assert "6:11" in result
assert "6:3" not in result
def test_format_path_empty_path_with_no_location():
"""Empty path with no location info returns 'In: '."""
result = format_path([], None)
assert result == "In: "
def test_format_path_integer_path_items_formatted_as_subscript():
"""Integer indices are rendered as [n] subscripts in the flat fallback."""
result = format_path(["packages", 0], None)
assert result == "In: packages[0]"
def test_format_path_integer_list_index_attached_to_previous_frame():
"""A list index between two include boundaries attaches to the outer frame."""
path = [
"packages",
_located("packages", "main.yaml", 5, 0),
0,
_located("packages", "level1.yaml", 2, 0),
0,
_located("esphome", "level2.yaml", 0, 0),
_located("name", "level2.yaml", 1, 8),
]
result = format_path(path, None)
lines = result.splitlines()
assert lines[0].startswith("In: esphome->name in level2.yaml")
assert "packages[0]" in lines[1] and "level1.yaml" in lines[1]
assert "packages[0]" in lines[2] and "main.yaml" in lines[2]
def test_format_path_trailing_unlocated_string_after_located_key():
"""Plain string keys after the last located key must still appear in output."""
path = [_located("packages", "main.yaml", 5, 0), "sub", "key"]
result = format_path(path, None)
assert result == "In: packages->sub->key in main.yaml 6:1"
def test_format_path_trailing_unlocated_int_attaches_to_current_frame():
"""Trailing ints attach to the open frame's last key (subscript), strings
buffer until end-of-path and then flush behind."""
path = [_located("packages", "main.yaml", 5, 0), 0, "sub"]
result = format_path(path, None)
# Int attaches to 'packages' as [0] subscript; trailing 'sub' is flushed
# at end and appears after.
assert result == "In: packages[0]->sub in main.yaml 6:1"
def test_format_path_only_trailing_unlocated_strings_are_preserved():
"""Trailing pending items must not be silently dropped after the last frame."""
path = [
_located("packages", "main.yaml", 5, 0),
_located("inner", "hardware.yaml", 3, 0),
"tail1",
"tail2",
]
result = format_path(path, None)
lines = result.splitlines()
assert lines[0] == "In: inner->tail1->tail2 in hardware.yaml 4:1"
assert lines[1] == " Included from packages in main.yaml 6:1"
def test_format_path_leading_int_with_no_current_doc_goes_to_pending():
"""An int before any located key is buffered and shown in the first frame."""
path = [0, _located("name", "main.yaml", 1, 0)]
result = format_path(path, None)
# Leading ints have no preceding name to subscript onto, so they render
# as bare [n] in the formatted segment.
assert result == "In: [0]->name in main.yaml 2:1"
def test_format_path_only_unlocated_int_returns_flat_fallback():
"""Path with only an int and no location info renders via the flat fallback."""
result = format_path([0], None)
assert result == "In: [0]"
def test_format_path_current_obj_in_different_doc_than_innermost_frame():
"""current_obj's location is preferred even when its document differs from the frame's."""
path = [_located("packages", "root.yaml", 1, 0)]
value = _located("${var}", "other.yaml", 9, 4)
result = format_path(path, value)
# Innermost line uses current_obj's mark (other.yaml 10:5), not the key's.
assert result == "In: packages in other.yaml 10:5"
def test_format_path_current_obj_without_location_falls_back_to_key():
"""An ESPHomeDataBase current_obj with no esp_range falls back to the key's location."""
class _NoRange(ESPHomeDataBase, str):
pass
obj = _NoRange.__new__(_NoRange, "value")
str.__init__(obj)
# No _esp_range set on this instance.
assert obj.esp_range is None
path = [_located("packages", "main.yaml", 5, 2)]
result = format_path(path, obj)
assert result == "In: packages in main.yaml 6:3"
def test_format_path_empty_path_with_located_current_obj():
"""An empty path with a located current_obj still surfaces the location."""
obj = _located("${var}", "main.yaml", 0, 0)
result = format_path([], obj)
assert result == "In: in main.yaml 1:1"