Merge remote-tracking branch 'upstream/sensor/throttle-with-priority-nan-specialize' into integration

This commit is contained in:
J. Nick Koston
2026-04-18 06:32:18 -05:00
4 changed files with 275 additions and 2 deletions
+105
View File
@@ -1,6 +1,8 @@
from __future__ import annotations
import ast
from collections.abc import Callable
from dataclasses import dataclass, field
from functools import cache
import hashlib
import json
@@ -139,6 +141,109 @@ def get_component_test_files(
return list(tests_dir.glob("test.*.yaml"))
@dataclass(frozen=True)
class ComponentMetadata:
"""Statically-parsed AUTO_LOAD and CONFLICTS_WITH declarations."""
auto_load: frozenset[str] = field(default_factory=frozenset)
conflicts_with: frozenset[str] = field(default_factory=frozenset)
@cache
def parse_component_metadata(name: str) -> ComponentMetadata:
"""Return the AUTO_LOAD / CONFLICTS_WITH declarations for a component.
Parses the component's ``esphome/components/<name>/__init__.py`` statically.
Callable forms (``def AUTO_LOAD():``) require runtime imports and are
reported as empty -- safe for conflict detection since they cannot be
evaluated without executing the module.
"""
init_file = Path(root_path) / ESPHOME_COMPONENTS_PATH / name / "__init__.py"
if not init_file.exists():
return ComponentMetadata()
try:
tree = ast.parse(init_file.read_text())
except (OSError, SyntaxError):
return ComponentMetadata()
fields: dict[str, frozenset[str]] = {
"AUTO_LOAD": frozenset(),
"CONFLICTS_WITH": frozenset(),
}
for node in tree.body:
if not isinstance(node, ast.Assign) or not isinstance(node.value, ast.List):
continue
for target in node.targets:
if not isinstance(target, ast.Name) or target.id not in fields:
continue
fields[target.id] = frozenset(
e.value
for e in node.value.elts
if isinstance(e, ast.Constant) and isinstance(e.value, str)
)
return ComponentMetadata(
auto_load=fields["AUTO_LOAD"],
conflicts_with=fields["CONFLICTS_WITH"],
)
@dataclass
class _ConflictWalk:
loaded: set[str]
rejects: set[str]
def split_conflicting_groups(
grouped_components: dict[tuple[str, str], list[str]],
) -> dict[tuple[str, str], list[str]]:
"""Split groups so components declaring mutual CONFLICTS_WITH end up in separate builds.
A conflict propagates through AUTO_LOAD: if X declares CONFLICTS_WITH=[Y]
and Z auto-loads Y, then X and Z conflict (e.g. bme680_bsec vs.
bme68x_bsec2_i2c which auto-loads bme68x_bsec2). Only components that
appear in the batch (and their AUTO_LOAD closures) are parsed. The
conflict relation is treated as symmetric even when only one side
declares it (e.g. ethernet rejects wifi but wifi does not declare the
reverse).
"""
batch = {c for comps in grouped_components.values() for c in comps}
walks: dict[str, _ConflictWalk] = {}
for comp in batch:
walk = _ConflictWalk(loaded={comp}, rejects=set())
stack = [comp]
while stack:
metadata = parse_component_metadata(stack.pop())
walk.rejects |= metadata.conflicts_with
new = metadata.auto_load - walk.loaded
walk.loaded |= new
stack.extend(new)
walks[comp] = walk
def conflicts(a: str, b: str) -> bool:
wa, wb = walks[a], walks[b]
return not wa.rejects.isdisjoint(wb.loaded) or not wb.rejects.isdisjoint(
wa.loaded
)
result: dict[tuple[str, str], list[str]] = {}
for (platform, signature), components in grouped_components.items():
buckets: list[list[str]] = []
for comp in components:
for bucket in buckets:
if not any(conflicts(comp, other) for other in bucket):
bucket.append(comp)
break
else:
buckets.append([comp])
if len(buckets) == 1:
result[(platform, signature)] = buckets[0]
continue
for index, bucket in enumerate(buckets):
key = signature if index == 0 else f"{signature}__conflict{index}"
result[(platform, key)] = bucket
return result
def styled(color: str | tuple[str, ...], msg: str, reset: bool = True) -> str:
prefix = "".join(color) if isinstance(color, tuple) else color
suffix = colorama.Style.RESET_ALL if reset else ""
+6 -1
View File
@@ -28,7 +28,7 @@ from script.analyze_component_buses import (
create_grouping_signature,
merge_compatible_bus_groups,
)
from script.helpers import get_component_test_files
from script.helpers import get_component_test_files, split_conflicting_groups
# Weighting for batch creation
# Isolated components can't be grouped/merged, so they count as 10x
@@ -145,6 +145,11 @@ def create_intelligent_batches(
# improving the efficiency of test_build_components.py grouping
signature_groups = merge_compatible_bus_groups(signature_groups)
# Split groups containing mutually-incompatible components (CONFLICTS_WITH).
# Without this, batch weighting assumes the group is one build when it will
# actually be split into two at build time -- throwing off CI distribution.
signature_groups = split_conflicting_groups(signature_groups)
# Create batches by keeping signature groups together
# Components with the same signature stay in the same batches
batches = []
+8 -1
View File
@@ -39,7 +39,7 @@ from script.analyze_component_buses import (
merge_compatible_bus_groups,
uses_local_file_references,
)
from script.helpers import get_component_test_files
from script.helpers import get_component_test_files, split_conflicting_groups
from script.merge_component_configs import merge_component_configs
@@ -675,6 +675,13 @@ def run_grouped_component_tests(
# as long as they don't have conflicting configurations for the same bus type
grouped_components = merge_compatible_bus_groups(grouped_components)
# Split groups that contain components declaring CONFLICTS_WITH each other.
# The bus-level merge above only considers shared bus configs; components
# with the same bus signature (e.g. both I2C) can still be mutually
# incompatible (e.g. bme680_bsec vs. bme68x_bsec2_i2c which auto-loads
# bme68x_bsec2). Those must end up in separate builds.
grouped_components = split_conflicting_groups(grouped_components)
# Print detailed grouping plan
print("\nGrouping Plan:")
print("-" * 80)
+156
View File
@@ -1468,3 +1468,159 @@ def test_cache_miss_corrupted_json(
result = helpers.create_components_graph()
# Should handle corruption gracefully and rebuild
assert result == {}
# ---------------------------------------------------------------------------
# parse_component_metadata / split_conflicting_groups
# ---------------------------------------------------------------------------
@pytest.fixture
def fake_components(tmp_path: Path) -> Path:
"""Create a fake esphome/components/ tree and return the repo root.
Component layout (tested against split_conflicting_groups):
alpha -- CONFLICTS_WITH=["beta"]
beta -- CONFLICTS_WITH=["alpha"]
beta_variant -- AUTO_LOAD=["beta"]
gamma -- (no metadata)
one_sided -- CONFLICTS_WITH=["plain"] (plain does not reject back)
plain -- no CONFLICTS_WITH
callable_auto -- AUTO_LOAD is a function (not a list literal) -> ignored
broken -- __init__.py has a SyntaxError
"""
components = tmp_path / "esphome" / "components"
components.mkdir(parents=True)
def write(name: str, body: str) -> None:
(components / name).mkdir()
(components / name / "__init__.py").write_text(body)
write("alpha", 'CONFLICTS_WITH = ["beta"]\n')
write("beta", 'CONFLICTS_WITH = ["alpha"]\n')
write("beta_variant", 'AUTO_LOAD = ["beta"]\n')
write("gamma", "")
write("one_sided", 'CONFLICTS_WITH = ["plain"]\n')
write("plain", "")
write("callable_auto", "def AUTO_LOAD():\n return ['beta']\n")
write("broken", "this is not valid python !!!")
helpers.parse_component_metadata.cache_clear()
return tmp_path
def test_parse_component_metadata_list_literals(
fake_components: Path, monkeypatch: MonkeyPatch
) -> None:
monkeypatch.setattr(helpers, "root_path", str(fake_components))
helpers.parse_component_metadata.cache_clear()
meta = helpers.parse_component_metadata("alpha")
assert meta.conflicts_with == frozenset({"beta"})
assert meta.auto_load == frozenset()
variant = helpers.parse_component_metadata("beta_variant")
assert variant.auto_load == frozenset({"beta"})
assert variant.conflicts_with == frozenset()
def test_parse_component_metadata_missing_empty_and_callable(
fake_components: Path, monkeypatch: MonkeyPatch
) -> None:
monkeypatch.setattr(helpers, "root_path", str(fake_components))
helpers.parse_component_metadata.cache_clear()
# Unknown component -> empty metadata, not an error.
unknown = helpers.parse_component_metadata("does_not_exist")
assert unknown == helpers.ComponentMetadata()
# Empty __init__.py -> empty metadata.
assert helpers.parse_component_metadata("gamma") == helpers.ComponentMetadata()
# Callable AUTO_LOAD cannot be statically evaluated -> empty.
callable_meta = helpers.parse_component_metadata("callable_auto")
assert callable_meta.auto_load == frozenset()
# SyntaxError in __init__.py must not raise.
assert helpers.parse_component_metadata("broken") == helpers.ComponentMetadata()
def test_split_conflicting_groups_splits_direct_conflict(
fake_components: Path, monkeypatch: MonkeyPatch
) -> None:
monkeypatch.setattr(helpers, "root_path", str(fake_components))
helpers.parse_component_metadata.cache_clear()
result = helpers.split_conflicting_groups(
{("esp32", "i2c"): ["alpha", "beta", "gamma"]}
)
# alpha and beta must end up in different buckets; gamma has no conflicts.
buckets = list(result.values())
assert any("alpha" in b for b in buckets)
assert any("beta" in b for b in buckets)
for bucket in buckets:
assert not ({"alpha", "beta"} <= set(bucket))
# Gamma sticks with whichever bucket it landed in first (alpha's).
all_members = {c for b in buckets for c in b}
assert all_members == {"alpha", "beta", "gamma"}
def test_split_conflicting_groups_propagates_through_auto_load(
fake_components: Path, monkeypatch: MonkeyPatch
) -> None:
"""A component that AUTO_LOADs a conflicting one must also be split out."""
monkeypatch.setattr(helpers, "root_path", str(fake_components))
helpers.parse_component_metadata.cache_clear()
result = helpers.split_conflicting_groups(
{("esp32", "i2c"): ["alpha", "beta_variant"]}
)
buckets = list(result.values())
for bucket in buckets:
assert not ({"alpha", "beta_variant"} <= set(bucket))
assert sum(len(b) for b in buckets) == 2
def test_split_conflicting_groups_symmetric_one_sided_declaration(
fake_components: Path, monkeypatch: MonkeyPatch
) -> None:
"""If only one side declares CONFLICTS_WITH, the pair must still be split."""
monkeypatch.setattr(helpers, "root_path", str(fake_components))
helpers.parse_component_metadata.cache_clear()
result = helpers.split_conflicting_groups(
{("esp32", "i2c"): ["one_sided", "plain"]}
)
buckets = list(result.values())
for bucket in buckets:
assert not ({"one_sided", "plain"} <= set(bucket))
def test_split_conflicting_groups_preserves_non_conflicting_group(
fake_components: Path, monkeypatch: MonkeyPatch
) -> None:
monkeypatch.setattr(helpers, "root_path", str(fake_components))
helpers.parse_component_metadata.cache_clear()
original = {("esp32", "i2c"): ["alpha", "gamma", "plain"]}
result = helpers.split_conflicting_groups(original)
# All three are mutually compatible -- the group must not be split.
assert result == original
def test_split_conflicting_groups_preserves_original_signature_for_first_bucket(
fake_components: Path, monkeypatch: MonkeyPatch
) -> None:
"""When a group is split, the first bucket keeps the original signature key."""
monkeypatch.setattr(helpers, "root_path", str(fake_components))
helpers.parse_component_metadata.cache_clear()
result = helpers.split_conflicting_groups({("esp32", "i2c"): ["alpha", "beta"]})
keys = set(result.keys())
assert ("esp32", "i2c") in keys
# One additional bucket with a disambiguated signature.
extra = keys - {("esp32", "i2c")}
assert len(extra) == 1
platform, signature = next(iter(extra))
assert platform == "esp32"
assert signature.startswith("i2c__conflict")