[ci] Move conflict-aware grouping helpers into script/helpers.py with tests

This commit is contained in:
J. Nick Koston
2026-04-18 06:26:03 -05:00
parent 698dbbdcb5
commit 897dd7cd5e
4 changed files with 263 additions and 87 deletions
+1 -85
View File
@@ -24,8 +24,7 @@ Example output:
from __future__ import annotations
import argparse
import ast
from functools import cache, lru_cache
from functools import lru_cache
import json
from pathlib import Path
import re
@@ -43,9 +42,6 @@ from esphome.config_helpers import Extend, Remove
# Path to common bus configs
COMMON_BUS_PATH = Path("tests/test_build_components/common")
# Path to esphome component sources (used to parse AUTO_LOAD / CONFLICTS_WITH metadata)
COMPONENTS_SRC_PATH = Path("esphome/components")
# Package dependencies - maps packages to the packages they include
# When a component uses a package on the left, it automatically gets
# the packages on the right as well
@@ -515,86 +511,6 @@ def merge_compatible_bus_groups(
return merged_groups
@cache
def _parse_component_init(name: str) -> tuple[frozenset[str], frozenset[str]]:
"""Return (AUTO_LOAD, CONFLICTS_WITH) list-literal values for a single component."""
init_file = COMPONENTS_SRC_PATH / name / "__init__.py"
result: dict[str, frozenset[str]] = {
"AUTO_LOAD": frozenset(),
"CONFLICTS_WITH": frozenset(),
}
if not init_file.exists():
return result["AUTO_LOAD"], result["CONFLICTS_WITH"]
try:
tree = ast.parse(init_file.read_text())
except (OSError, SyntaxError):
return result["AUTO_LOAD"], result["CONFLICTS_WITH"]
for node in tree.body:
if not isinstance(node, ast.Assign) or not isinstance(node.value, ast.List):
continue
for target in node.targets:
if not isinstance(target, ast.Name) or target.id not in result:
continue
result[target.id] = frozenset(
e.value
for e in node.value.elts
if isinstance(e, ast.Constant) and isinstance(e.value, str)
)
return result["AUTO_LOAD"], result["CONFLICTS_WITH"]
def split_conflicting_groups(
grouped_components: dict[tuple[str, str], list[str]],
) -> dict[tuple[str, str], list[str]]:
"""Split groups so that components declaring mutual CONFLICTS_WITH end up in separate builds.
A conflict propagates through AUTO_LOAD: if X declares CONFLICTS_WITH=[Y]
and Z auto-loads Y, then X and Z conflict (e.g. bme680_bsec vs.
bme68x_bsec2_i2c which auto-loads bme68x_bsec2). Only components that
appear in the batch (and their AUTO_LOAD closures) are parsed.
"""
batch = {c for comps in grouped_components.values() for c in comps}
# Walk each batch component's AUTO_LOAD chain once, collecting the full
# loaded-alongside set and the names it rejects via CONFLICTS_WITH. Two
# components conflict when either one's rejects intersect the other's
# loaded set (relation must be symmetric -- e.g. ethernet rejects wifi
# but wifi does not reject ethernet).
walks: dict[str, tuple[set[str], set[str]]] = {}
for comp in batch:
loaded, rejects, stack = {comp}, set(), [comp]
while stack:
auto_load, conflicts_with = _parse_component_init(stack.pop())
rejects |= conflicts_with
new = auto_load - loaded
loaded |= new
stack.extend(new)
walks[comp] = (loaded, rejects)
def conflicts(a: str, b: str) -> bool:
loaded_a, rejects_a = walks[a]
loaded_b, rejects_b = walks[b]
return not rejects_a.isdisjoint(loaded_b) or not rejects_b.isdisjoint(loaded_a)
result: dict[tuple[str, str], list[str]] = {}
for (platform, signature), components in grouped_components.items():
buckets: list[list[str]] = []
for comp in components:
for bucket in buckets:
if not any(conflicts(comp, other) for other in bucket):
bucket.append(comp)
break
else:
buckets.append([comp])
if len(buckets) == 1:
result[(platform, signature)] = buckets[0]
continue
for index, bucket in enumerate(buckets):
key = signature if index == 0 else f"{signature}__conflict{index}"
result[(platform, key)] = bucket
return result
def create_grouping_signature(
platform_buses: dict[str, list[str]], platform: str
) -> str:
+105
View File
@@ -1,6 +1,8 @@
from __future__ import annotations
import ast
from collections.abc import Callable
from dataclasses import dataclass, field
from functools import cache
import hashlib
import json
@@ -139,6 +141,109 @@ def get_component_test_files(
return list(tests_dir.glob("test.*.yaml"))
@dataclass(frozen=True)
class ComponentMetadata:
"""Statically-parsed AUTO_LOAD and CONFLICTS_WITH declarations."""
auto_load: frozenset[str] = field(default_factory=frozenset)
conflicts_with: frozenset[str] = field(default_factory=frozenset)
@cache
def parse_component_metadata(name: str) -> ComponentMetadata:
"""Return the AUTO_LOAD / CONFLICTS_WITH declarations for a component.
Parses the component's ``esphome/components/<name>/__init__.py`` statically.
Callable forms (``def AUTO_LOAD():``) require runtime imports and are
reported as empty -- safe for conflict detection since they cannot be
evaluated without executing the module.
"""
init_file = Path(root_path) / ESPHOME_COMPONENTS_PATH / name / "__init__.py"
if not init_file.exists():
return ComponentMetadata()
try:
tree = ast.parse(init_file.read_text())
except (OSError, SyntaxError):
return ComponentMetadata()
fields: dict[str, frozenset[str]] = {
"AUTO_LOAD": frozenset(),
"CONFLICTS_WITH": frozenset(),
}
for node in tree.body:
if not isinstance(node, ast.Assign) or not isinstance(node.value, ast.List):
continue
for target in node.targets:
if not isinstance(target, ast.Name) or target.id not in fields:
continue
fields[target.id] = frozenset(
e.value
for e in node.value.elts
if isinstance(e, ast.Constant) and isinstance(e.value, str)
)
return ComponentMetadata(
auto_load=fields["AUTO_LOAD"],
conflicts_with=fields["CONFLICTS_WITH"],
)
@dataclass
class _ConflictWalk:
loaded: set[str]
rejects: set[str]
def split_conflicting_groups(
grouped_components: dict[tuple[str, str], list[str]],
) -> dict[tuple[str, str], list[str]]:
"""Split groups so components declaring mutual CONFLICTS_WITH end up in separate builds.
A conflict propagates through AUTO_LOAD: if X declares CONFLICTS_WITH=[Y]
and Z auto-loads Y, then X and Z conflict (e.g. bme680_bsec vs.
bme68x_bsec2_i2c which auto-loads bme68x_bsec2). Only components that
appear in the batch (and their AUTO_LOAD closures) are parsed. The
conflict relation is treated as symmetric even when only one side
declares it (e.g. ethernet rejects wifi but wifi does not declare the
reverse).
"""
batch = {c for comps in grouped_components.values() for c in comps}
walks: dict[str, _ConflictWalk] = {}
for comp in batch:
walk = _ConflictWalk(loaded={comp}, rejects=set())
stack = [comp]
while stack:
metadata = parse_component_metadata(stack.pop())
walk.rejects |= metadata.conflicts_with
new = metadata.auto_load - walk.loaded
walk.loaded |= new
stack.extend(new)
walks[comp] = walk
def conflicts(a: str, b: str) -> bool:
wa, wb = walks[a], walks[b]
return not wa.rejects.isdisjoint(wb.loaded) or not wb.rejects.isdisjoint(
wa.loaded
)
result: dict[tuple[str, str], list[str]] = {}
for (platform, signature), components in grouped_components.items():
buckets: list[list[str]] = []
for comp in components:
for bucket in buckets:
if not any(conflicts(comp, other) for other in bucket):
bucket.append(comp)
break
else:
buckets.append([comp])
if len(buckets) == 1:
result[(platform, signature)] = buckets[0]
continue
for index, bucket in enumerate(buckets):
key = signature if index == 0 else f"{signature}__conflict{index}"
result[(platform, key)] = bucket
return result
def styled(color: str | tuple[str, ...], msg: str, reset: bool = True) -> str:
prefix = "".join(color) if isinstance(color, tuple) else color
suffix = colorama.Style.RESET_ALL if reset else ""
+1 -2
View File
@@ -37,10 +37,9 @@ from script.analyze_component_buses import (
create_grouping_signature,
is_platform_component,
merge_compatible_bus_groups,
split_conflicting_groups,
uses_local_file_references,
)
from script.helpers import get_component_test_files
from script.helpers import get_component_test_files, split_conflicting_groups
from script.merge_component_configs import merge_component_configs
+156
View File
@@ -1468,3 +1468,159 @@ def test_cache_miss_corrupted_json(
result = helpers.create_components_graph()
# Should handle corruption gracefully and rebuild
assert result == {}
# ---------------------------------------------------------------------------
# parse_component_metadata / split_conflicting_groups
# ---------------------------------------------------------------------------
@pytest.fixture
def fake_components(tmp_path: Path) -> Path:
"""Create a fake esphome/components/ tree and return the repo root.
Component layout (tested against split_conflicting_groups):
alpha -- CONFLICTS_WITH=["beta"]
beta -- CONFLICTS_WITH=["alpha"]
beta_variant -- AUTO_LOAD=["beta"]
gamma -- (no metadata)
one_sided -- CONFLICTS_WITH=["plain"] (plain does not reject back)
plain -- no CONFLICTS_WITH
callable_auto -- AUTO_LOAD is a function (not a list literal) -> ignored
broken -- __init__.py has a SyntaxError
"""
components = tmp_path / "esphome" / "components"
components.mkdir(parents=True)
def write(name: str, body: str) -> None:
(components / name).mkdir()
(components / name / "__init__.py").write_text(body)
write("alpha", 'CONFLICTS_WITH = ["beta"]\n')
write("beta", 'CONFLICTS_WITH = ["alpha"]\n')
write("beta_variant", 'AUTO_LOAD = ["beta"]\n')
write("gamma", "")
write("one_sided", 'CONFLICTS_WITH = ["plain"]\n')
write("plain", "")
write("callable_auto", "def AUTO_LOAD():\n return ['beta']\n")
write("broken", "this is not valid python !!!")
helpers.parse_component_metadata.cache_clear()
return tmp_path
def test_parse_component_metadata_list_literals(
fake_components: Path, monkeypatch: MonkeyPatch
) -> None:
monkeypatch.setattr(helpers, "root_path", str(fake_components))
helpers.parse_component_metadata.cache_clear()
meta = helpers.parse_component_metadata("alpha")
assert meta.conflicts_with == frozenset({"beta"})
assert meta.auto_load == frozenset()
variant = helpers.parse_component_metadata("beta_variant")
assert variant.auto_load == frozenset({"beta"})
assert variant.conflicts_with == frozenset()
def test_parse_component_metadata_missing_empty_and_callable(
fake_components: Path, monkeypatch: MonkeyPatch
) -> None:
monkeypatch.setattr(helpers, "root_path", str(fake_components))
helpers.parse_component_metadata.cache_clear()
# Unknown component -> empty metadata, not an error.
unknown = helpers.parse_component_metadata("does_not_exist")
assert unknown == helpers.ComponentMetadata()
# Empty __init__.py -> empty metadata.
assert helpers.parse_component_metadata("gamma") == helpers.ComponentMetadata()
# Callable AUTO_LOAD cannot be statically evaluated -> empty.
callable_meta = helpers.parse_component_metadata("callable_auto")
assert callable_meta.auto_load == frozenset()
# SyntaxError in __init__.py must not raise.
assert helpers.parse_component_metadata("broken") == helpers.ComponentMetadata()
def test_split_conflicting_groups_splits_direct_conflict(
fake_components: Path, monkeypatch: MonkeyPatch
) -> None:
monkeypatch.setattr(helpers, "root_path", str(fake_components))
helpers.parse_component_metadata.cache_clear()
result = helpers.split_conflicting_groups(
{("esp32", "i2c"): ["alpha", "beta", "gamma"]}
)
# alpha and beta must end up in different buckets; gamma has no conflicts.
buckets = list(result.values())
assert any("alpha" in b for b in buckets)
assert any("beta" in b for b in buckets)
for bucket in buckets:
assert not ({"alpha", "beta"} <= set(bucket))
# Gamma sticks with whichever bucket it landed in first (alpha's).
all_members = {c for b in buckets for c in b}
assert all_members == {"alpha", "beta", "gamma"}
def test_split_conflicting_groups_propagates_through_auto_load(
fake_components: Path, monkeypatch: MonkeyPatch
) -> None:
"""A component that AUTO_LOADs a conflicting one must also be split out."""
monkeypatch.setattr(helpers, "root_path", str(fake_components))
helpers.parse_component_metadata.cache_clear()
result = helpers.split_conflicting_groups(
{("esp32", "i2c"): ["alpha", "beta_variant"]}
)
buckets = list(result.values())
for bucket in buckets:
assert not ({"alpha", "beta_variant"} <= set(bucket))
assert sum(len(b) for b in buckets) == 2
def test_split_conflicting_groups_symmetric_one_sided_declaration(
fake_components: Path, monkeypatch: MonkeyPatch
) -> None:
"""If only one side declares CONFLICTS_WITH, the pair must still be split."""
monkeypatch.setattr(helpers, "root_path", str(fake_components))
helpers.parse_component_metadata.cache_clear()
result = helpers.split_conflicting_groups(
{("esp32", "i2c"): ["one_sided", "plain"]}
)
buckets = list(result.values())
for bucket in buckets:
assert not ({"one_sided", "plain"} <= set(bucket))
def test_split_conflicting_groups_preserves_non_conflicting_group(
fake_components: Path, monkeypatch: MonkeyPatch
) -> None:
monkeypatch.setattr(helpers, "root_path", str(fake_components))
helpers.parse_component_metadata.cache_clear()
original = {("esp32", "i2c"): ["alpha", "gamma", "plain"]}
result = helpers.split_conflicting_groups(original)
# All three are mutually compatible -- the group must not be split.
assert result == original
def test_split_conflicting_groups_preserves_original_signature_for_first_bucket(
fake_components: Path, monkeypatch: MonkeyPatch
) -> None:
"""When a group is split, the first bucket keeps the original signature key."""
monkeypatch.setattr(helpers, "root_path", str(fake_components))
helpers.parse_component_metadata.cache_clear()
result = helpers.split_conflicting_groups({("esp32", "i2c"): ["alpha", "beta"]})
keys = set(result.keys())
assert ("esp32", "i2c") in keys
# One additional bucket with a disambiguated signature.
extra = keys - {("esp32", "i2c")}
assert len(extra) == 1
platform, signature = next(iter(extra))
assert platform == "esp32"
assert signature.startswith("i2c__conflict")