diff --git a/script/analyze_component_buses.py b/script/analyze_component_buses.py index ae90da91f3a..17af7af5771 100755 --- a/script/analyze_component_buses.py +++ b/script/analyze_component_buses.py @@ -24,8 +24,7 @@ Example output: from __future__ import annotations import argparse -import ast -from functools import cache, lru_cache +from functools import lru_cache import json from pathlib import Path import re @@ -43,9 +42,6 @@ from esphome.config_helpers import Extend, Remove # Path to common bus configs COMMON_BUS_PATH = Path("tests/test_build_components/common") -# Path to esphome component sources (used to parse AUTO_LOAD / CONFLICTS_WITH metadata) -COMPONENTS_SRC_PATH = Path("esphome/components") - # Package dependencies - maps packages to the packages they include # When a component uses a package on the left, it automatically gets # the packages on the right as well @@ -515,86 +511,6 @@ def merge_compatible_bus_groups( return merged_groups -@cache -def _parse_component_init(name: str) -> tuple[frozenset[str], frozenset[str]]: - """Return (AUTO_LOAD, CONFLICTS_WITH) list-literal values for a single component.""" - init_file = COMPONENTS_SRC_PATH / name / "__init__.py" - result: dict[str, frozenset[str]] = { - "AUTO_LOAD": frozenset(), - "CONFLICTS_WITH": frozenset(), - } - if not init_file.exists(): - return result["AUTO_LOAD"], result["CONFLICTS_WITH"] - try: - tree = ast.parse(init_file.read_text()) - except (OSError, SyntaxError): - return result["AUTO_LOAD"], result["CONFLICTS_WITH"] - for node in tree.body: - if not isinstance(node, ast.Assign) or not isinstance(node.value, ast.List): - continue - for target in node.targets: - if not isinstance(target, ast.Name) or target.id not in result: - continue - result[target.id] = frozenset( - e.value - for e in node.value.elts - if isinstance(e, ast.Constant) and isinstance(e.value, str) - ) - return result["AUTO_LOAD"], result["CONFLICTS_WITH"] - - -def split_conflicting_groups( - grouped_components: dict[tuple[str, str], list[str]], -) -> dict[tuple[str, str], list[str]]: - """Split groups so that components declaring mutual CONFLICTS_WITH end up in separate builds. - - A conflict propagates through AUTO_LOAD: if X declares CONFLICTS_WITH=[Y] - and Z auto-loads Y, then X and Z conflict (e.g. bme680_bsec vs. - bme68x_bsec2_i2c which auto-loads bme68x_bsec2). Only components that - appear in the batch (and their AUTO_LOAD closures) are parsed. - """ - batch = {c for comps in grouped_components.values() for c in comps} - - # Walk each batch component's AUTO_LOAD chain once, collecting the full - # loaded-alongside set and the names it rejects via CONFLICTS_WITH. Two - # components conflict when either one's rejects intersect the other's - # loaded set (relation must be symmetric -- e.g. ethernet rejects wifi - # but wifi does not reject ethernet). - walks: dict[str, tuple[set[str], set[str]]] = {} - for comp in batch: - loaded, rejects, stack = {comp}, set(), [comp] - while stack: - auto_load, conflicts_with = _parse_component_init(stack.pop()) - rejects |= conflicts_with - new = auto_load - loaded - loaded |= new - stack.extend(new) - walks[comp] = (loaded, rejects) - - def conflicts(a: str, b: str) -> bool: - loaded_a, rejects_a = walks[a] - loaded_b, rejects_b = walks[b] - return not rejects_a.isdisjoint(loaded_b) or not rejects_b.isdisjoint(loaded_a) - - result: dict[tuple[str, str], list[str]] = {} - for (platform, signature), components in grouped_components.items(): - buckets: list[list[str]] = [] - for comp in components: - for bucket in buckets: - if not any(conflicts(comp, other) for other in bucket): - bucket.append(comp) - break - else: - buckets.append([comp]) - if len(buckets) == 1: - result[(platform, signature)] = buckets[0] - continue - for index, bucket in enumerate(buckets): - key = signature if index == 0 else f"{signature}__conflict{index}" - result[(platform, key)] = bucket - return result - - def create_grouping_signature( platform_buses: dict[str, list[str]], platform: str ) -> str: diff --git a/script/helpers.py b/script/helpers.py index 9f5ea7894cd..8eff38ce927 100644 --- a/script/helpers.py +++ b/script/helpers.py @@ -1,6 +1,8 @@ from __future__ import annotations +import ast from collections.abc import Callable +from dataclasses import dataclass, field from functools import cache import hashlib import json @@ -139,6 +141,109 @@ def get_component_test_files( return list(tests_dir.glob("test.*.yaml")) +@dataclass(frozen=True) +class ComponentMetadata: + """Statically-parsed AUTO_LOAD and CONFLICTS_WITH declarations.""" + + auto_load: frozenset[str] = field(default_factory=frozenset) + conflicts_with: frozenset[str] = field(default_factory=frozenset) + + +@cache +def parse_component_metadata(name: str) -> ComponentMetadata: + """Return the AUTO_LOAD / CONFLICTS_WITH declarations for a component. + + Parses the component's ``esphome/components//__init__.py`` statically. + Callable forms (``def AUTO_LOAD():``) require runtime imports and are + reported as empty -- safe for conflict detection since they cannot be + evaluated without executing the module. + """ + init_file = Path(root_path) / ESPHOME_COMPONENTS_PATH / name / "__init__.py" + if not init_file.exists(): + return ComponentMetadata() + try: + tree = ast.parse(init_file.read_text()) + except (OSError, SyntaxError): + return ComponentMetadata() + fields: dict[str, frozenset[str]] = { + "AUTO_LOAD": frozenset(), + "CONFLICTS_WITH": frozenset(), + } + for node in tree.body: + if not isinstance(node, ast.Assign) or not isinstance(node.value, ast.List): + continue + for target in node.targets: + if not isinstance(target, ast.Name) or target.id not in fields: + continue + fields[target.id] = frozenset( + e.value + for e in node.value.elts + if isinstance(e, ast.Constant) and isinstance(e.value, str) + ) + return ComponentMetadata( + auto_load=fields["AUTO_LOAD"], + conflicts_with=fields["CONFLICTS_WITH"], + ) + + +@dataclass +class _ConflictWalk: + loaded: set[str] + rejects: set[str] + + +def split_conflicting_groups( + grouped_components: dict[tuple[str, str], list[str]], +) -> dict[tuple[str, str], list[str]]: + """Split groups so components declaring mutual CONFLICTS_WITH end up in separate builds. + + A conflict propagates through AUTO_LOAD: if X declares CONFLICTS_WITH=[Y] + and Z auto-loads Y, then X and Z conflict (e.g. bme680_bsec vs. + bme68x_bsec2_i2c which auto-loads bme68x_bsec2). Only components that + appear in the batch (and their AUTO_LOAD closures) are parsed. The + conflict relation is treated as symmetric even when only one side + declares it (e.g. ethernet rejects wifi but wifi does not declare the + reverse). + """ + batch = {c for comps in grouped_components.values() for c in comps} + + walks: dict[str, _ConflictWalk] = {} + for comp in batch: + walk = _ConflictWalk(loaded={comp}, rejects=set()) + stack = [comp] + while stack: + metadata = parse_component_metadata(stack.pop()) + walk.rejects |= metadata.conflicts_with + new = metadata.auto_load - walk.loaded + walk.loaded |= new + stack.extend(new) + walks[comp] = walk + + def conflicts(a: str, b: str) -> bool: + wa, wb = walks[a], walks[b] + return not wa.rejects.isdisjoint(wb.loaded) or not wb.rejects.isdisjoint( + wa.loaded + ) + + result: dict[tuple[str, str], list[str]] = {} + for (platform, signature), components in grouped_components.items(): + buckets: list[list[str]] = [] + for comp in components: + for bucket in buckets: + if not any(conflicts(comp, other) for other in bucket): + bucket.append(comp) + break + else: + buckets.append([comp]) + if len(buckets) == 1: + result[(platform, signature)] = buckets[0] + continue + for index, bucket in enumerate(buckets): + key = signature if index == 0 else f"{signature}__conflict{index}" + result[(platform, key)] = bucket + return result + + def styled(color: str | tuple[str, ...], msg: str, reset: bool = True) -> str: prefix = "".join(color) if isinstance(color, tuple) else color suffix = colorama.Style.RESET_ALL if reset else "" diff --git a/script/test_build_components.py b/script/test_build_components.py index fa5a02155af..82d05f78b23 100755 --- a/script/test_build_components.py +++ b/script/test_build_components.py @@ -37,10 +37,9 @@ from script.analyze_component_buses import ( create_grouping_signature, is_platform_component, merge_compatible_bus_groups, - split_conflicting_groups, uses_local_file_references, ) -from script.helpers import get_component_test_files +from script.helpers import get_component_test_files, split_conflicting_groups from script.merge_component_configs import merge_component_configs diff --git a/tests/script/test_helpers.py b/tests/script/test_helpers.py index 948aabaa669..db0d2908f47 100644 --- a/tests/script/test_helpers.py +++ b/tests/script/test_helpers.py @@ -1468,3 +1468,159 @@ def test_cache_miss_corrupted_json( result = helpers.create_components_graph() # Should handle corruption gracefully and rebuild assert result == {} + + +# --------------------------------------------------------------------------- +# parse_component_metadata / split_conflicting_groups +# --------------------------------------------------------------------------- + + +@pytest.fixture +def fake_components(tmp_path: Path) -> Path: + """Create a fake esphome/components/ tree and return the repo root. + + Component layout (tested against split_conflicting_groups): + + alpha -- CONFLICTS_WITH=["beta"] + beta -- CONFLICTS_WITH=["alpha"] + beta_variant -- AUTO_LOAD=["beta"] + gamma -- (no metadata) + one_sided -- CONFLICTS_WITH=["plain"] (plain does not reject back) + plain -- no CONFLICTS_WITH + callable_auto -- AUTO_LOAD is a function (not a list literal) -> ignored + broken -- __init__.py has a SyntaxError + """ + components = tmp_path / "esphome" / "components" + components.mkdir(parents=True) + + def write(name: str, body: str) -> None: + (components / name).mkdir() + (components / name / "__init__.py").write_text(body) + + write("alpha", 'CONFLICTS_WITH = ["beta"]\n') + write("beta", 'CONFLICTS_WITH = ["alpha"]\n') + write("beta_variant", 'AUTO_LOAD = ["beta"]\n') + write("gamma", "") + write("one_sided", 'CONFLICTS_WITH = ["plain"]\n') + write("plain", "") + write("callable_auto", "def AUTO_LOAD():\n return ['beta']\n") + write("broken", "this is not valid python !!!") + helpers.parse_component_metadata.cache_clear() + return tmp_path + + +def test_parse_component_metadata_list_literals( + fake_components: Path, monkeypatch: MonkeyPatch +) -> None: + monkeypatch.setattr(helpers, "root_path", str(fake_components)) + helpers.parse_component_metadata.cache_clear() + + meta = helpers.parse_component_metadata("alpha") + assert meta.conflicts_with == frozenset({"beta"}) + assert meta.auto_load == frozenset() + + variant = helpers.parse_component_metadata("beta_variant") + assert variant.auto_load == frozenset({"beta"}) + assert variant.conflicts_with == frozenset() + + +def test_parse_component_metadata_missing_empty_and_callable( + fake_components: Path, monkeypatch: MonkeyPatch +) -> None: + monkeypatch.setattr(helpers, "root_path", str(fake_components)) + helpers.parse_component_metadata.cache_clear() + + # Unknown component -> empty metadata, not an error. + unknown = helpers.parse_component_metadata("does_not_exist") + assert unknown == helpers.ComponentMetadata() + + # Empty __init__.py -> empty metadata. + assert helpers.parse_component_metadata("gamma") == helpers.ComponentMetadata() + + # Callable AUTO_LOAD cannot be statically evaluated -> empty. + callable_meta = helpers.parse_component_metadata("callable_auto") + assert callable_meta.auto_load == frozenset() + + # SyntaxError in __init__.py must not raise. + assert helpers.parse_component_metadata("broken") == helpers.ComponentMetadata() + + +def test_split_conflicting_groups_splits_direct_conflict( + fake_components: Path, monkeypatch: MonkeyPatch +) -> None: + monkeypatch.setattr(helpers, "root_path", str(fake_components)) + helpers.parse_component_metadata.cache_clear() + + result = helpers.split_conflicting_groups( + {("esp32", "i2c"): ["alpha", "beta", "gamma"]} + ) + # alpha and beta must end up in different buckets; gamma has no conflicts. + buckets = list(result.values()) + assert any("alpha" in b for b in buckets) + assert any("beta" in b for b in buckets) + for bucket in buckets: + assert not ({"alpha", "beta"} <= set(bucket)) + # Gamma sticks with whichever bucket it landed in first (alpha's). + all_members = {c for b in buckets for c in b} + assert all_members == {"alpha", "beta", "gamma"} + + +def test_split_conflicting_groups_propagates_through_auto_load( + fake_components: Path, monkeypatch: MonkeyPatch +) -> None: + """A component that AUTO_LOADs a conflicting one must also be split out.""" + monkeypatch.setattr(helpers, "root_path", str(fake_components)) + helpers.parse_component_metadata.cache_clear() + + result = helpers.split_conflicting_groups( + {("esp32", "i2c"): ["alpha", "beta_variant"]} + ) + buckets = list(result.values()) + for bucket in buckets: + assert not ({"alpha", "beta_variant"} <= set(bucket)) + assert sum(len(b) for b in buckets) == 2 + + +def test_split_conflicting_groups_symmetric_one_sided_declaration( + fake_components: Path, monkeypatch: MonkeyPatch +) -> None: + """If only one side declares CONFLICTS_WITH, the pair must still be split.""" + monkeypatch.setattr(helpers, "root_path", str(fake_components)) + helpers.parse_component_metadata.cache_clear() + + result = helpers.split_conflicting_groups( + {("esp32", "i2c"): ["one_sided", "plain"]} + ) + buckets = list(result.values()) + for bucket in buckets: + assert not ({"one_sided", "plain"} <= set(bucket)) + + +def test_split_conflicting_groups_preserves_non_conflicting_group( + fake_components: Path, monkeypatch: MonkeyPatch +) -> None: + monkeypatch.setattr(helpers, "root_path", str(fake_components)) + helpers.parse_component_metadata.cache_clear() + + original = {("esp32", "i2c"): ["alpha", "gamma", "plain"]} + result = helpers.split_conflicting_groups(original) + # All three are mutually compatible -- the group must not be split. + assert result == original + + +def test_split_conflicting_groups_preserves_original_signature_for_first_bucket( + fake_components: Path, monkeypatch: MonkeyPatch +) -> None: + """When a group is split, the first bucket keeps the original signature key.""" + monkeypatch.setattr(helpers, "root_path", str(fake_components)) + helpers.parse_component_metadata.cache_clear() + + result = helpers.split_conflicting_groups({("esp32", "i2c"): ["alpha", "beta"]}) + keys = set(result.keys()) + assert ("esp32", "i2c") in keys + # One additional bucket with a disambiguated signature. + extra = keys - {("esp32", "i2c")} + assert len(extra) == 1 + platform, signature = next(iter(extra)) + assert platform == "esp32" + assert signature.startswith("i2c__conflict")