diff --git a/esphome/__main__.py b/esphome/__main__.py index f33e7f4b42..3f0da85a69 100644 --- a/esphome/__main__.py +++ b/esphome/__main__.py @@ -1,6 +1,7 @@ # PYTHON_ARGCOMPLETE_OK import argparse from collections.abc import Callable +from contextlib import suppress from datetime import datetime import functools import getpass @@ -687,6 +688,47 @@ def _check_and_emit_build_info() -> None: ) +def _get_configured_xtal_freq() -> int | None: + """Read the configured crystal frequency from the sdkconfig file.""" + sdkconfig_path = CORE.relative_build_path(f"sdkconfig.{CORE.name}") + if not sdkconfig_path.is_file(): + return None + with suppress(OSError, ValueError): + content = sdkconfig_path.read_text() + for line in content.splitlines(): + if line.startswith("CONFIG_XTAL_FREQ="): + return int(line.split("=", 1)[1]) + return None + + +def _make_crystal_freq_callback( + configured_freq: int, +) -> Callable[[str], str | None]: + """Create a callback that checks esptool crystal frequency output.""" + crystal_re = re.compile(r"Crystal frequency:\s+(\d+)\s*MHz") + + def check_crystal_line(line: str) -> str | None: + if not (match := crystal_re.search(line)): + return None + detected = int(match.group(1)) + if detected == configured_freq: + return None + return ( + f"\n\033[33mWARNING: Crystal frequency mismatch! " + f"Device reports {detected}MHz but firmware is configured " + f"for {configured_freq}MHz.\n" + f"UART logging and other clock-dependent features will not " + f"work correctly.\n" + f"Set the correct crystal frequency with sdkconfig_options:\n" + f" esp32:\n" + f" framework:\n" + f" sdkconfig_options:\n" + f" CONFIG_XTAL_FREQ_{detected}: 'y'\033[0m\n\n" + ) + + return check_crystal_line + + def upload_using_esptool( config: ConfigType, port: str, file: str, speed: int ) -> str | int: @@ -715,6 +757,14 @@ def upload_using_esptool( mcu = get_esp32_variant().lower() + line_callbacks: list[Callable[[str], str | None]] = [] + if ( + CORE.is_esp32 + and file is None + and (configured_freq := _get_configured_xtal_freq()) is not None + ): + line_callbacks.append(_make_crystal_freq_callback(configured_freq)) + def run_esptool(baud_rate): cmd = [ "esptool", @@ -739,9 +789,13 @@ def upload_using_esptool( if os.environ.get("ESPHOME_USE_SUBPROCESS") is None: import esptool - return run_external_command(esptool.main, *cmd) # pylint: disable=no-member + return run_external_command( + esptool.main, # pylint: disable=no-member + *cmd, + line_callbacks=line_callbacks, + ) - return run_external_process(*cmd) + return run_external_process(*cmd, line_callbacks=line_callbacks) rc = run_esptool(first_baudrate) if rc == 0 or first_baudrate == 115200: diff --git a/esphome/util.py b/esphome/util.py index 6a21b4f627..73cc3aa5ab 100644 --- a/esphome/util.py +++ b/esphome/util.py @@ -125,7 +125,12 @@ ANSI_ESCAPE = re.compile(r"\033[@-_][0-?]*[ -/]*[@-~]") class RedirectText: - def __init__(self, out, filter_lines=None): + def __init__( + self, + out, + filter_lines: list[str] | None = None, + line_callbacks: list[Callable[[str], str | None]] | None = None, + ) -> None: self._out = out if filter_lines is None: self._filter_pattern = None @@ -133,6 +138,7 @@ class RedirectText: pattern = r"|".join(r"(?:" + pattern + r")" for pattern in filter_lines) self._filter_pattern = re.compile(pattern) self._line_buffer = "" + self._line_callbacks = line_callbacks or [] def __getattr__(self, item): return getattr(self._out, item) @@ -157,7 +163,7 @@ class RedirectText: if not isinstance(s, str): s = s.decode() - if self._filter_pattern is not None: + if self._filter_pattern is not None or self._line_callbacks: self._line_buffer += s lines = self._line_buffer.splitlines(True) for line in lines: @@ -169,7 +175,10 @@ class RedirectText: line_without_ansi = ANSI_ESCAPE.sub("", line) line_without_end = line_without_ansi.rstrip() - if self._filter_pattern.match(line_without_end) is not None: + if ( + self._filter_pattern is not None + and self._filter_pattern.match(line_without_end) is not None + ): # Filter pattern matched, ignore the line continue @@ -181,6 +190,9 @@ class RedirectText: and (help_msg := get_esp32_arduino_flash_error_help()) ): self._write_color_replace(help_msg) + for callback in self._line_callbacks: + if msg := callback(line_without_end): + self._write_color_replace(msg) else: self._write_color_replace(s) @@ -194,7 +206,11 @@ class RedirectText: def run_external_command( - func, *cmd, capture_stdout: bool = False, filter_lines: str = None + func, + *cmd, + capture_stdout: bool = False, + filter_lines: list[str] | None = None, + line_callbacks: list[Callable[[str], str | None]] | None = None, ) -> int | str: """ Run a function from an external package that acts like a main method. @@ -204,7 +220,9 @@ def run_external_command( :param func: Function to execute :param cmd: Command to run as (eg first element of sys.argv) :param capture_stdout: Capture text from stdout and return that. - :param filter_lines: Regular expression used to filter captured output. + Note: line_callbacks are not invoked when capture_stdout is True. + :param filter_lines: Regular expressions used to filter captured output. + :param line_callbacks: Callbacks invoked per line; non-None returns are written to output. :return: str if `capture_stdout` is set else int exit code. """ @@ -218,9 +236,13 @@ def run_external_command( _LOGGER.debug("Running: %s", full_cmd) orig_stdout = sys.stdout - sys.stdout = RedirectText(sys.stdout, filter_lines=filter_lines) + sys.stdout = RedirectText( + sys.stdout, filter_lines=filter_lines, line_callbacks=line_callbacks + ) orig_stderr = sys.stderr - sys.stderr = RedirectText(sys.stderr, filter_lines=filter_lines) + sys.stderr = RedirectText( + sys.stderr, filter_lines=filter_lines, line_callbacks=line_callbacks + ) if capture_stdout: cap_stdout = sys.stdout = io.StringIO() @@ -254,14 +276,19 @@ def run_external_process(*cmd: str, **kwargs: Any) -> int | str: full_cmd = " ".join(shlex_quote(x) for x in cmd) _LOGGER.debug("Running: %s", full_cmd) filter_lines = kwargs.get("filter_lines") + line_callbacks = kwargs.get("line_callbacks") capture_stdout = kwargs.get("capture_stdout", False) if capture_stdout: sub_stdout = subprocess.PIPE else: - sub_stdout = RedirectText(sys.stdout, filter_lines=filter_lines) + sub_stdout = RedirectText( + sys.stdout, filter_lines=filter_lines, line_callbacks=line_callbacks + ) - sub_stderr = RedirectText(sys.stderr, filter_lines=filter_lines) + sub_stderr = RedirectText( + sys.stderr, filter_lines=filter_lines, line_callbacks=line_callbacks + ) try: proc = subprocess.run( diff --git a/tests/unit_tests/test_main.py b/tests/unit_tests/test_main.py index b6f1a28086..b853461151 100644 --- a/tests/unit_tests/test_main.py +++ b/tests/unit_tests/test_main.py @@ -6,6 +6,7 @@ from collections.abc import Generator from dataclasses import dataclass import json import logging +import os from pathlib import Path import re import sys @@ -19,6 +20,8 @@ from pytest import CaptureFixture from esphome import platformio_api from esphome.__main__ import ( Purpose, + _get_configured_xtal_freq, + _make_crystal_freq_callback, choose_upload_log_host, command_analyze_memory, command_clean_all, @@ -3717,3 +3720,124 @@ esp32: clean_output.split("SUMMARY")[1] if "SUMMARY" in clean_output else "" ) assert "secrets.yaml" not in summary_section + + +def test_get_configured_xtal_freq_reads_sdkconfig(tmp_path: Path) -> None: + """Test reading XTAL_FREQ from sdkconfig.""" + CORE.name = "test-device" + CORE.build_path = tmp_path + sdkconfig = tmp_path / "sdkconfig.test-device" + sdkconfig.write_text( + "CONFIG_SOC_XTAL_SUPPORT_26M=y\nCONFIG_XTAL_FREQ=26\nCONFIG_XTAL_FREQ_26=y\n" + ) + assert _get_configured_xtal_freq() == 26 + + +def test_get_configured_xtal_freq_default_40(tmp_path: Path) -> None: + """Test reading default 40MHz XTAL_FREQ from sdkconfig.""" + CORE.name = "test-device" + CORE.build_path = tmp_path + sdkconfig = tmp_path / "sdkconfig.test-device" + sdkconfig.write_text("CONFIG_XTAL_FREQ=40\nCONFIG_XTAL_FREQ_40=y\n") + assert _get_configured_xtal_freq() == 40 + + +def test_get_configured_xtal_freq_missing_file(tmp_path: Path) -> None: + """Test that missing sdkconfig returns None.""" + CORE.name = "test-device" + CORE.build_path = tmp_path + assert _get_configured_xtal_freq() is None + + +def test_get_configured_xtal_freq_no_xtal_line(tmp_path: Path) -> None: + """Test that sdkconfig without XTAL_FREQ returns None.""" + CORE.name = "test-device" + CORE.build_path = tmp_path + sdkconfig = tmp_path / "sdkconfig.test-device" + sdkconfig.write_text("CONFIG_OTHER=123\n") + assert _get_configured_xtal_freq() is None + + +def test_crystal_freq_callback_mismatch() -> None: + """Test callback returns warning on crystal frequency mismatch.""" + callback = _make_crystal_freq_callback(40) + result = callback("Crystal frequency: 26MHz") + assert result is not None + assert "26MHz" in result + assert "40MHz" in result + assert "CONFIG_XTAL_FREQ_26" in result + + +def test_crystal_freq_callback_match() -> None: + """Test callback returns None when frequencies match.""" + callback = _make_crystal_freq_callback(40) + result = callback("Crystal frequency: 40MHz") + assert result is None + + +def test_crystal_freq_callback_no_crystal_line() -> None: + """Test callback returns None for unrelated lines.""" + callback = _make_crystal_freq_callback(40) + assert callback("Chip type: ESP8684H") is None + assert callback("MAC: a0:b7:65:8b:16:d4") is None + assert callback("") is None + + +def test_upload_using_esptool_passes_crystal_callback( + tmp_path: Path, + mock_run_external_command_main: Mock, + mock_get_idedata: Mock, +) -> None: + """Test that upload_using_esptool passes crystal freq callback for ESP32.""" + setup_core(platform=PLATFORM_ESP32, tmp_path=tmp_path, name="test") + CORE.data[KEY_ESP32] = {KEY_VARIANT: VARIANT_ESP32} + + # Create sdkconfig with XTAL_FREQ + build_dir = Path(CORE.build_path) + build_dir.mkdir(parents=True, exist_ok=True) + sdkconfig = build_dir / "sdkconfig.test" + sdkconfig.write_text("CONFIG_XTAL_FREQ=40\n") + + mock_idedata = MagicMock(spec=platformio_api.IDEData) + mock_idedata.firmware_bin_path = tmp_path / "firmware.bin" + mock_idedata.extra_flash_images = [] + mock_get_idedata.return_value = mock_idedata + (tmp_path / "firmware.bin").touch() + + config = {CONF_ESPHOME: {"platformio_options": {}}} + upload_using_esptool(config, "/dev/ttyUSB0", None, None) + + # Verify line_callbacks was passed with the crystal callback + call_kwargs = mock_run_external_command_main.call_args[1] + assert "line_callbacks" in call_kwargs + assert len(call_kwargs["line_callbacks"]) == 1 + + +def test_upload_using_esptool_subprocess_passes_crystal_callback( + mock_run_external_process: Mock, + mock_get_idedata: Mock, + tmp_path: Path, +) -> None: + """Test that crystal freq callback is passed via run_external_process.""" + setup_core(platform=PLATFORM_ESP32, tmp_path=tmp_path, name="test") + CORE.data[KEY_ESP32] = {KEY_VARIANT: VARIANT_ESP32} + + # Create sdkconfig with XTAL_FREQ + build_dir = Path(CORE.build_path) + build_dir.mkdir(parents=True, exist_ok=True) + sdkconfig = build_dir / "sdkconfig.test" + sdkconfig.write_text("CONFIG_XTAL_FREQ=40\n") + + mock_idedata = MagicMock(spec=platformio_api.IDEData) + mock_idedata.firmware_bin_path = tmp_path / "firmware.bin" + mock_idedata.extra_flash_images = [] + mock_get_idedata.return_value = mock_idedata + (tmp_path / "firmware.bin").touch() + + config = {CONF_ESPHOME: {"platformio_options": {}}} + with patch.dict(os.environ, {"ESPHOME_USE_SUBPROCESS": "1"}): + upload_using_esptool(config, "/dev/ttyUSB0", None, None) + + call_kwargs = mock_run_external_process.call_args[1] + assert "line_callbacks" in call_kwargs + assert len(call_kwargs["line_callbacks"]) == 1 diff --git a/tests/unit_tests/test_util.py b/tests/unit_tests/test_util.py index ca3fd9b78a..ff58fb1394 100644 --- a/tests/unit_tests/test_util.py +++ b/tests/unit_tests/test_util.py @@ -2,9 +2,12 @@ from __future__ import annotations +from collections.abc import Callable +import io from pathlib import Path import subprocess import sys +from typing import Any from unittest.mock import MagicMock, patch import pytest @@ -407,6 +410,178 @@ def test_shlex_quote_edge_cases() -> None: assert util.shlex_quote(" ") == "' '" +def _make_redirect( + line_callbacks: list[Callable[[str], str | None]] | None = None, + filter_lines: list[str] | None = None, +) -> tuple[util.RedirectText, io.StringIO]: + """Create a RedirectText that writes to a StringIO buffer.""" + buf = io.StringIO() + redirect = util.RedirectText( + buf, filter_lines=filter_lines, line_callbacks=line_callbacks + ) + return redirect, buf + + +def test_redirect_text_callback_called_on_matching_line() -> None: + """Test that a line callback is called and its output is written.""" + results: list[str] = [] + + def callback(line: str) -> str | None: + results.append(line) + if "target" in line: + return "CALLBACK OUTPUT\n" + return None + + redirect, buf = _make_redirect(line_callbacks=[callback]) + redirect.write("some target line\n") + + assert "some target line" in buf.getvalue() + assert "CALLBACK OUTPUT" in buf.getvalue() + assert len(results) == 1 + + +def test_redirect_text_callback_not_triggered_on_non_matching_line() -> None: + """Test that callback returns None for non-matching lines.""" + + def callback(line: str) -> str | None: + if "target" in line: + return "FOUND\n" + return None + + redirect, buf = _make_redirect(line_callbacks=[callback]) + redirect.write("no match here\n") + + assert "no match here" in buf.getvalue() + assert "FOUND" not in buf.getvalue() + + +def test_redirect_text_callback_works_without_filter_pattern() -> None: + """Test that callbacks fire even when no filter_lines is set.""" + + def callback(line: str) -> str | None: + if "Crystal" in line: + return "WARNING: mismatch\n" + return None + + redirect, buf = _make_redirect(line_callbacks=[callback]) + redirect.write("Crystal frequency: 26MHz\n") + + assert "Crystal frequency: 26MHz" in buf.getvalue() + assert "WARNING: mismatch" in buf.getvalue() + + +def test_redirect_text_callback_works_with_filter_pattern() -> None: + """Test that callbacks fire alongside filter patterns.""" + + def callback(line: str) -> str | None: + if "important" in line: + return "NOTED\n" + return None + + redirect, buf = _make_redirect( + line_callbacks=[callback], + filter_lines=[r"^skip this.*"], + ) + redirect.write("skip this line\n") + redirect.write("important line\n") + + assert "skip this" not in buf.getvalue() + assert "important line" in buf.getvalue() + assert "NOTED" in buf.getvalue() + + +def test_redirect_text_multiple_callbacks() -> None: + """Test that multiple callbacks are all invoked.""" + + def callback_a(line: str) -> str | None: + if "test" in line: + return "FROM A\n" + return None + + def callback_b(line: str) -> str | None: + if "test" in line: + return "FROM B\n" + return None + + redirect, buf = _make_redirect(line_callbacks=[callback_a, callback_b]) + redirect.write("test line\n") + + output = buf.getvalue() + assert "FROM A" in output + assert "FROM B" in output + + +def test_redirect_text_incomplete_line_buffered() -> None: + """Test that incomplete lines are buffered until newline.""" + results: list[str] = [] + + def callback(line: str) -> str | None: + results.append(line) + return None + + redirect, buf = _make_redirect(line_callbacks=[callback]) + redirect.write("partial") + assert len(results) == 0 + + redirect.write(" line\n") + assert len(results) == 1 + assert results[0] == "partial line" + + +def test_run_external_command_line_callbacks(capsys: pytest.CaptureFixture) -> None: + """Test that run_external_command passes line_callbacks to RedirectText.""" + results: list[str] = [] + + def callback(line: str) -> str | None: + results.append(line) + if "hello" in line: + return "CALLBACK FIRED\n" + return None + + def fake_main() -> int: + print("hello world") + return 0 + + rc = util.run_external_command(fake_main, "fake", line_callbacks=[callback]) + + assert rc == 0 + assert len(results) == 1 + assert "hello world" in results[0] + captured = capsys.readouterr() + assert "CALLBACK FIRED" in captured.out + + +def test_run_external_process_line_callbacks() -> None: + """Test that run_external_process passes line_callbacks to RedirectText.""" + results: list[str] = [] + + def callback(line: str) -> str | None: + results.append(line) + if "from subprocess" in line: + return "PROCESS CALLBACK\n" + return None + + with patch("esphome.util.subprocess.run") as mock_run: + + def run_side_effect(*args: Any, **kwargs: Any) -> MagicMock: + # Simulate subprocess writing to the stdout RedirectText + stdout = kwargs.get("stdout") + if stdout is not None and isinstance(stdout, util.RedirectText): + stdout.write("from subprocess\n") + return MagicMock(returncode=0) + + mock_run.side_effect = run_side_effect + + rc = util.run_external_process( + "echo", + "test", + line_callbacks=[callback], + ) + + assert rc == 0 + assert any("from subprocess" in r for r in results) + + def test_get_picotool_path_found(tmp_path: Path) -> None: """Test picotool path derivation from cc_path.""" # Create the expected directory structure