[core] Warn on crystal frequency mismatch during serial upload (#14582)
Some checks failed
CI / Create common environment (push) Has been cancelled
CI / Check pylint (push) Has been cancelled
CI / Run script/ci-custom (push) Has been cancelled
CI / Run pytest (macOS-latest, 3.11) (push) Has been cancelled
CI / Run pytest (macOS-latest, 3.14) (push) Has been cancelled
CI / Run pytest (ubuntu-latest, 3.11) (push) Has been cancelled
CI / Run pytest (ubuntu-latest, 3.13) (push) Has been cancelled
CI / Run pytest (ubuntu-latest, 3.14) (push) Has been cancelled
CI / Run pytest (windows-latest, 3.11) (push) Has been cancelled
CI / Run pytest (windows-latest, 3.14) (push) Has been cancelled
CI / Determine which jobs to run (push) Has been cancelled
CI / Run integration tests (push) Has been cancelled
CI / Run C++ unit tests (push) Has been cancelled
CI / Run script/clang-tidy for ESP32 IDF (push) Has been cancelled
CI / Run script/clang-tidy for ESP8266 (push) Has been cancelled
CI / Run script/clang-tidy for ZEPHYR (push) Has been cancelled
CI / Run script/clang-tidy for ESP32 Arduino (push) Has been cancelled
CI / Run script/clang-tidy for ESP32 Arduino 1/4 (push) Has been cancelled
CI / Run script/clang-tidy for ESP32 Arduino 2/4 (push) Has been cancelled
CI / Run script/clang-tidy for ESP32 Arduino 3/4 (push) Has been cancelled
CI / Run script/clang-tidy for ESP32 Arduino 4/4 (push) Has been cancelled
CI / Test components batch (${{ matrix.components }}) (push) Has been cancelled
CI / pre-commit.ci lite (push) Has been cancelled
CI / Build target branch for memory impact (push) Has been cancelled
CI / Build PR branch for memory impact (push) Has been cancelled
CI / Comment memory impact (push) Has been cancelled
CI / CI Status (push) Has been cancelled
Stale / stale (push) Has been cancelled
Lock closed issues and PRs / lock (push) Has been cancelled
Publish Release / Initialize build (push) Has been cancelled
Publish Release / Build and publish to PyPi (push) Has been cancelled
Publish Release / Build ESPHome amd64 (push) Has been cancelled
Publish Release / Build ESPHome arm64 (push) Has been cancelled
Publish Release / Publish ESPHome docker to dockerhub (push) Has been cancelled
Publish Release / Publish ESPHome docker to ghcr (push) Has been cancelled
Publish Release / Publish ESPHome ha-addon to dockerhub (push) Has been cancelled
Publish Release / Publish ESPHome ha-addon to ghcr (push) Has been cancelled
Publish Release / deploy-ha-addon-repo (push) Has been cancelled
Publish Release / deploy-esphome-schema (push) Has been cancelled
Publish Release / version-notifier (push) Has been cancelled
Synchronise Device Classes from Home Assistant / Sync Device Classes (push) Has been cancelled

Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
J. Nick Koston
2026-03-10 10:42:38 -10:00
committed by GitHub
parent 8d988723cd
commit 6356e3def9
4 changed files with 391 additions and 11 deletions

View File

@@ -1,6 +1,7 @@
# PYTHON_ARGCOMPLETE_OK
import argparse
from collections.abc import Callable
from contextlib import suppress
from datetime import datetime
import functools
import getpass
@@ -687,6 +688,47 @@ def _check_and_emit_build_info() -> None:
)
def _get_configured_xtal_freq() -> int | None:
"""Read the configured crystal frequency from the sdkconfig file."""
sdkconfig_path = CORE.relative_build_path(f"sdkconfig.{CORE.name}")
if not sdkconfig_path.is_file():
return None
with suppress(OSError, ValueError):
content = sdkconfig_path.read_text()
for line in content.splitlines():
if line.startswith("CONFIG_XTAL_FREQ="):
return int(line.split("=", 1)[1])
return None
def _make_crystal_freq_callback(
configured_freq: int,
) -> Callable[[str], str | None]:
"""Create a callback that checks esptool crystal frequency output."""
crystal_re = re.compile(r"Crystal frequency:\s+(\d+)\s*MHz")
def check_crystal_line(line: str) -> str | None:
if not (match := crystal_re.search(line)):
return None
detected = int(match.group(1))
if detected == configured_freq:
return None
return (
f"\n\033[33mWARNING: Crystal frequency mismatch! "
f"Device reports {detected}MHz but firmware is configured "
f"for {configured_freq}MHz.\n"
f"UART logging and other clock-dependent features will not "
f"work correctly.\n"
f"Set the correct crystal frequency with sdkconfig_options:\n"
f" esp32:\n"
f" framework:\n"
f" sdkconfig_options:\n"
f" CONFIG_XTAL_FREQ_{detected}: 'y'\033[0m\n\n"
)
return check_crystal_line
def upload_using_esptool(
config: ConfigType, port: str, file: str, speed: int
) -> str | int:
@@ -715,6 +757,14 @@ def upload_using_esptool(
mcu = get_esp32_variant().lower()
line_callbacks: list[Callable[[str], str | None]] = []
if (
CORE.is_esp32
and file is None
and (configured_freq := _get_configured_xtal_freq()) is not None
):
line_callbacks.append(_make_crystal_freq_callback(configured_freq))
def run_esptool(baud_rate):
cmd = [
"esptool",
@@ -739,9 +789,13 @@ def upload_using_esptool(
if os.environ.get("ESPHOME_USE_SUBPROCESS") is None:
import esptool
return run_external_command(esptool.main, *cmd) # pylint: disable=no-member
return run_external_command(
esptool.main, # pylint: disable=no-member
*cmd,
line_callbacks=line_callbacks,
)
return run_external_process(*cmd)
return run_external_process(*cmd, line_callbacks=line_callbacks)
rc = run_esptool(first_baudrate)
if rc == 0 or first_baudrate == 115200:

View File

@@ -125,7 +125,12 @@ ANSI_ESCAPE = re.compile(r"\033[@-_][0-?]*[ -/]*[@-~]")
class RedirectText:
def __init__(self, out, filter_lines=None):
def __init__(
self,
out,
filter_lines: list[str] | None = None,
line_callbacks: list[Callable[[str], str | None]] | None = None,
) -> None:
self._out = out
if filter_lines is None:
self._filter_pattern = None
@@ -133,6 +138,7 @@ class RedirectText:
pattern = r"|".join(r"(?:" + pattern + r")" for pattern in filter_lines)
self._filter_pattern = re.compile(pattern)
self._line_buffer = ""
self._line_callbacks = line_callbacks or []
def __getattr__(self, item):
return getattr(self._out, item)
@@ -157,7 +163,7 @@ class RedirectText:
if not isinstance(s, str):
s = s.decode()
if self._filter_pattern is not None:
if self._filter_pattern is not None or self._line_callbacks:
self._line_buffer += s
lines = self._line_buffer.splitlines(True)
for line in lines:
@@ -169,7 +175,10 @@ class RedirectText:
line_without_ansi = ANSI_ESCAPE.sub("", line)
line_without_end = line_without_ansi.rstrip()
if self._filter_pattern.match(line_without_end) is not None:
if (
self._filter_pattern is not None
and self._filter_pattern.match(line_without_end) is not None
):
# Filter pattern matched, ignore the line
continue
@@ -181,6 +190,9 @@ class RedirectText:
and (help_msg := get_esp32_arduino_flash_error_help())
):
self._write_color_replace(help_msg)
for callback in self._line_callbacks:
if msg := callback(line_without_end):
self._write_color_replace(msg)
else:
self._write_color_replace(s)
@@ -194,7 +206,11 @@ class RedirectText:
def run_external_command(
func, *cmd, capture_stdout: bool = False, filter_lines: str = None
func,
*cmd,
capture_stdout: bool = False,
filter_lines: list[str] | None = None,
line_callbacks: list[Callable[[str], str | None]] | None = None,
) -> int | str:
"""
Run a function from an external package that acts like a main method.
@@ -204,7 +220,9 @@ def run_external_command(
:param func: Function to execute
:param cmd: Command to run as (eg first element of sys.argv)
:param capture_stdout: Capture text from stdout and return that.
:param filter_lines: Regular expression used to filter captured output.
Note: line_callbacks are not invoked when capture_stdout is True.
:param filter_lines: Regular expressions used to filter captured output.
:param line_callbacks: Callbacks invoked per line; non-None returns are written to output.
:return: str if `capture_stdout` is set else int exit code.
"""
@@ -218,9 +236,13 @@ def run_external_command(
_LOGGER.debug("Running: %s", full_cmd)
orig_stdout = sys.stdout
sys.stdout = RedirectText(sys.stdout, filter_lines=filter_lines)
sys.stdout = RedirectText(
sys.stdout, filter_lines=filter_lines, line_callbacks=line_callbacks
)
orig_stderr = sys.stderr
sys.stderr = RedirectText(sys.stderr, filter_lines=filter_lines)
sys.stderr = RedirectText(
sys.stderr, filter_lines=filter_lines, line_callbacks=line_callbacks
)
if capture_stdout:
cap_stdout = sys.stdout = io.StringIO()
@@ -254,14 +276,19 @@ def run_external_process(*cmd: str, **kwargs: Any) -> int | str:
full_cmd = " ".join(shlex_quote(x) for x in cmd)
_LOGGER.debug("Running: %s", full_cmd)
filter_lines = kwargs.get("filter_lines")
line_callbacks = kwargs.get("line_callbacks")
capture_stdout = kwargs.get("capture_stdout", False)
if capture_stdout:
sub_stdout = subprocess.PIPE
else:
sub_stdout = RedirectText(sys.stdout, filter_lines=filter_lines)
sub_stdout = RedirectText(
sys.stdout, filter_lines=filter_lines, line_callbacks=line_callbacks
)
sub_stderr = RedirectText(sys.stderr, filter_lines=filter_lines)
sub_stderr = RedirectText(
sys.stderr, filter_lines=filter_lines, line_callbacks=line_callbacks
)
try:
proc = subprocess.run(

View File

@@ -6,6 +6,7 @@ from collections.abc import Generator
from dataclasses import dataclass
import json
import logging
import os
from pathlib import Path
import re
import sys
@@ -19,6 +20,8 @@ from pytest import CaptureFixture
from esphome import platformio_api
from esphome.__main__ import (
Purpose,
_get_configured_xtal_freq,
_make_crystal_freq_callback,
choose_upload_log_host,
command_analyze_memory,
command_clean_all,
@@ -3717,3 +3720,124 @@ esp32:
clean_output.split("SUMMARY")[1] if "SUMMARY" in clean_output else ""
)
assert "secrets.yaml" not in summary_section
def test_get_configured_xtal_freq_reads_sdkconfig(tmp_path: Path) -> None:
"""Test reading XTAL_FREQ from sdkconfig."""
CORE.name = "test-device"
CORE.build_path = tmp_path
sdkconfig = tmp_path / "sdkconfig.test-device"
sdkconfig.write_text(
"CONFIG_SOC_XTAL_SUPPORT_26M=y\nCONFIG_XTAL_FREQ=26\nCONFIG_XTAL_FREQ_26=y\n"
)
assert _get_configured_xtal_freq() == 26
def test_get_configured_xtal_freq_default_40(tmp_path: Path) -> None:
"""Test reading default 40MHz XTAL_FREQ from sdkconfig."""
CORE.name = "test-device"
CORE.build_path = tmp_path
sdkconfig = tmp_path / "sdkconfig.test-device"
sdkconfig.write_text("CONFIG_XTAL_FREQ=40\nCONFIG_XTAL_FREQ_40=y\n")
assert _get_configured_xtal_freq() == 40
def test_get_configured_xtal_freq_missing_file(tmp_path: Path) -> None:
"""Test that missing sdkconfig returns None."""
CORE.name = "test-device"
CORE.build_path = tmp_path
assert _get_configured_xtal_freq() is None
def test_get_configured_xtal_freq_no_xtal_line(tmp_path: Path) -> None:
"""Test that sdkconfig without XTAL_FREQ returns None."""
CORE.name = "test-device"
CORE.build_path = tmp_path
sdkconfig = tmp_path / "sdkconfig.test-device"
sdkconfig.write_text("CONFIG_OTHER=123\n")
assert _get_configured_xtal_freq() is None
def test_crystal_freq_callback_mismatch() -> None:
"""Test callback returns warning on crystal frequency mismatch."""
callback = _make_crystal_freq_callback(40)
result = callback("Crystal frequency: 26MHz")
assert result is not None
assert "26MHz" in result
assert "40MHz" in result
assert "CONFIG_XTAL_FREQ_26" in result
def test_crystal_freq_callback_match() -> None:
"""Test callback returns None when frequencies match."""
callback = _make_crystal_freq_callback(40)
result = callback("Crystal frequency: 40MHz")
assert result is None
def test_crystal_freq_callback_no_crystal_line() -> None:
"""Test callback returns None for unrelated lines."""
callback = _make_crystal_freq_callback(40)
assert callback("Chip type: ESP8684H") is None
assert callback("MAC: a0:b7:65:8b:16:d4") is None
assert callback("") is None
def test_upload_using_esptool_passes_crystal_callback(
tmp_path: Path,
mock_run_external_command_main: Mock,
mock_get_idedata: Mock,
) -> None:
"""Test that upload_using_esptool passes crystal freq callback for ESP32."""
setup_core(platform=PLATFORM_ESP32, tmp_path=tmp_path, name="test")
CORE.data[KEY_ESP32] = {KEY_VARIANT: VARIANT_ESP32}
# Create sdkconfig with XTAL_FREQ
build_dir = Path(CORE.build_path)
build_dir.mkdir(parents=True, exist_ok=True)
sdkconfig = build_dir / "sdkconfig.test"
sdkconfig.write_text("CONFIG_XTAL_FREQ=40\n")
mock_idedata = MagicMock(spec=platformio_api.IDEData)
mock_idedata.firmware_bin_path = tmp_path / "firmware.bin"
mock_idedata.extra_flash_images = []
mock_get_idedata.return_value = mock_idedata
(tmp_path / "firmware.bin").touch()
config = {CONF_ESPHOME: {"platformio_options": {}}}
upload_using_esptool(config, "/dev/ttyUSB0", None, None)
# Verify line_callbacks was passed with the crystal callback
call_kwargs = mock_run_external_command_main.call_args[1]
assert "line_callbacks" in call_kwargs
assert len(call_kwargs["line_callbacks"]) == 1
def test_upload_using_esptool_subprocess_passes_crystal_callback(
mock_run_external_process: Mock,
mock_get_idedata: Mock,
tmp_path: Path,
) -> None:
"""Test that crystal freq callback is passed via run_external_process."""
setup_core(platform=PLATFORM_ESP32, tmp_path=tmp_path, name="test")
CORE.data[KEY_ESP32] = {KEY_VARIANT: VARIANT_ESP32}
# Create sdkconfig with XTAL_FREQ
build_dir = Path(CORE.build_path)
build_dir.mkdir(parents=True, exist_ok=True)
sdkconfig = build_dir / "sdkconfig.test"
sdkconfig.write_text("CONFIG_XTAL_FREQ=40\n")
mock_idedata = MagicMock(spec=platformio_api.IDEData)
mock_idedata.firmware_bin_path = tmp_path / "firmware.bin"
mock_idedata.extra_flash_images = []
mock_get_idedata.return_value = mock_idedata
(tmp_path / "firmware.bin").touch()
config = {CONF_ESPHOME: {"platformio_options": {}}}
with patch.dict(os.environ, {"ESPHOME_USE_SUBPROCESS": "1"}):
upload_using_esptool(config, "/dev/ttyUSB0", None, None)
call_kwargs = mock_run_external_process.call_args[1]
assert "line_callbacks" in call_kwargs
assert len(call_kwargs["line_callbacks"]) == 1

View File

@@ -2,9 +2,12 @@
from __future__ import annotations
from collections.abc import Callable
import io
from pathlib import Path
import subprocess
import sys
from typing import Any
from unittest.mock import MagicMock, patch
import pytest
@@ -407,6 +410,178 @@ def test_shlex_quote_edge_cases() -> None:
assert util.shlex_quote(" ") == "' '"
def _make_redirect(
line_callbacks: list[Callable[[str], str | None]] | None = None,
filter_lines: list[str] | None = None,
) -> tuple[util.RedirectText, io.StringIO]:
"""Create a RedirectText that writes to a StringIO buffer."""
buf = io.StringIO()
redirect = util.RedirectText(
buf, filter_lines=filter_lines, line_callbacks=line_callbacks
)
return redirect, buf
def test_redirect_text_callback_called_on_matching_line() -> None:
"""Test that a line callback is called and its output is written."""
results: list[str] = []
def callback(line: str) -> str | None:
results.append(line)
if "target" in line:
return "CALLBACK OUTPUT\n"
return None
redirect, buf = _make_redirect(line_callbacks=[callback])
redirect.write("some target line\n")
assert "some target line" in buf.getvalue()
assert "CALLBACK OUTPUT" in buf.getvalue()
assert len(results) == 1
def test_redirect_text_callback_not_triggered_on_non_matching_line() -> None:
"""Test that callback returns None for non-matching lines."""
def callback(line: str) -> str | None:
if "target" in line:
return "FOUND\n"
return None
redirect, buf = _make_redirect(line_callbacks=[callback])
redirect.write("no match here\n")
assert "no match here" in buf.getvalue()
assert "FOUND" not in buf.getvalue()
def test_redirect_text_callback_works_without_filter_pattern() -> None:
"""Test that callbacks fire even when no filter_lines is set."""
def callback(line: str) -> str | None:
if "Crystal" in line:
return "WARNING: mismatch\n"
return None
redirect, buf = _make_redirect(line_callbacks=[callback])
redirect.write("Crystal frequency: 26MHz\n")
assert "Crystal frequency: 26MHz" in buf.getvalue()
assert "WARNING: mismatch" in buf.getvalue()
def test_redirect_text_callback_works_with_filter_pattern() -> None:
"""Test that callbacks fire alongside filter patterns."""
def callback(line: str) -> str | None:
if "important" in line:
return "NOTED\n"
return None
redirect, buf = _make_redirect(
line_callbacks=[callback],
filter_lines=[r"^skip this.*"],
)
redirect.write("skip this line\n")
redirect.write("important line\n")
assert "skip this" not in buf.getvalue()
assert "important line" in buf.getvalue()
assert "NOTED" in buf.getvalue()
def test_redirect_text_multiple_callbacks() -> None:
"""Test that multiple callbacks are all invoked."""
def callback_a(line: str) -> str | None:
if "test" in line:
return "FROM A\n"
return None
def callback_b(line: str) -> str | None:
if "test" in line:
return "FROM B\n"
return None
redirect, buf = _make_redirect(line_callbacks=[callback_a, callback_b])
redirect.write("test line\n")
output = buf.getvalue()
assert "FROM A" in output
assert "FROM B" in output
def test_redirect_text_incomplete_line_buffered() -> None:
"""Test that incomplete lines are buffered until newline."""
results: list[str] = []
def callback(line: str) -> str | None:
results.append(line)
return None
redirect, buf = _make_redirect(line_callbacks=[callback])
redirect.write("partial")
assert len(results) == 0
redirect.write(" line\n")
assert len(results) == 1
assert results[0] == "partial line"
def test_run_external_command_line_callbacks(capsys: pytest.CaptureFixture) -> None:
"""Test that run_external_command passes line_callbacks to RedirectText."""
results: list[str] = []
def callback(line: str) -> str | None:
results.append(line)
if "hello" in line:
return "CALLBACK FIRED\n"
return None
def fake_main() -> int:
print("hello world")
return 0
rc = util.run_external_command(fake_main, "fake", line_callbacks=[callback])
assert rc == 0
assert len(results) == 1
assert "hello world" in results[0]
captured = capsys.readouterr()
assert "CALLBACK FIRED" in captured.out
def test_run_external_process_line_callbacks() -> None:
"""Test that run_external_process passes line_callbacks to RedirectText."""
results: list[str] = []
def callback(line: str) -> str | None:
results.append(line)
if "from subprocess" in line:
return "PROCESS CALLBACK\n"
return None
with patch("esphome.util.subprocess.run") as mock_run:
def run_side_effect(*args: Any, **kwargs: Any) -> MagicMock:
# Simulate subprocess writing to the stdout RedirectText
stdout = kwargs.get("stdout")
if stdout is not None and isinstance(stdout, util.RedirectText):
stdout.write("from subprocess\n")
return MagicMock(returncode=0)
mock_run.side_effect = run_side_effect
rc = util.run_external_process(
"echo",
"test",
line_callbacks=[callback],
)
assert rc == 0
assert any("from subprocess" in r for r in results)
def test_get_picotool_path_found(tmp_path: Path) -> None:
"""Test picotool path derivation from cc_path."""
# Create the expected directory structure