[main] Move stacktrace handling out of platformio_api and FlashImage into platform components/util (#16186)

This commit is contained in:
Diorcet Yann
2026-05-04 21:07:31 +02:00
committed by GitHub
parent 24d4da1021
commit 690a197346
11 changed files with 359 additions and 292 deletions
+11 -15
View File
@@ -63,6 +63,7 @@ from esphome.log import AnsiFore, color, setup_log
from esphome.types import ConfigType
from esphome.util import (
PICOTOOL_PACKAGE,
FlashImage,
detect_rp2040_bootsel,
get_picotool_path,
get_serial_ports,
@@ -586,8 +587,6 @@ def run_miniterm(config: ConfigType, port: str, args) -> int:
from aioesphomeapi import LogParser
import serial
from esphome import platformio_api
if CONF_LOGGER not in config:
_LOGGER.info("Logger is not enabled. Not starting UART logs.")
return 1
@@ -602,8 +601,11 @@ def run_miniterm(config: ConfigType, port: str, args) -> int:
try:
module = importlib.import_module("esphome.components." + CORE.target_platform)
process_stacktrace = getattr(module, "process_stacktrace")
except AttributeError:
pass
except (AttributeError, ImportError):
_LOGGER.info(
'Stacktrace analysis is unavailable: no compatible analyzer found for target platform "%s".',
CORE.target_platform,
)
backtrace_state = False
ser = serial.Serial()
@@ -646,14 +648,10 @@ def run_miniterm(config: ConfigType, port: str, args) -> int:
)
safe_print(parser.parse_line(line, time_str))
if process_stacktrace:
if process_stacktrace is not None:
backtrace_state = process_stacktrace(
config, line, backtrace_state
)
else:
backtrace_state = platformio_api.process_stacktrace(
config, line, backtrace_state=backtrace_state
)
except serial.SerialException:
_LOGGER.error("Serial port closed!")
return 0
@@ -843,22 +841,20 @@ def _make_crystal_freq_callback(
def upload_using_esptool(
config: ConfigType, port: str, file: str, speed: int
) -> str | int:
from esphome import platformio_api
first_baudrate = speed or config[CONF_ESPHOME][CONF_PLATFORMIO_OPTIONS].get(
"upload_speed", os.getenv("ESPHOME_UPLOAD_SPEED", "460800")
)
if file is not None:
flash_images = [platformio_api.FlashImage(path=file, offset="0x0")]
flash_images = [FlashImage(path=file, offset="0x0")]
else:
from esphome import platformio_api
idedata = platformio_api.get_idedata(config)
firmware_offset = "0x10000" if CORE.is_esp32 else "0x0"
flash_images = [
platformio_api.FlashImage(
path=idedata.firmware_bin_path, offset=firmware_offset
),
FlashImage(path=idedata.firmware_bin_path, offset=firmware_offset),
]
for image in idedata.extra_flash_images:
if not image.path.is_file():
+4 -6
View File
@@ -19,7 +19,6 @@ import contextlib
from esphome.const import CONF_KEY, CONF_PORT, __version__
from esphome.core import CORE, EsphomeError
from esphome.platformio_api import process_stacktrace
from . import CONF_ENCRYPTION
@@ -61,10 +60,6 @@ class _LogLineProcessor:
self.backtrace_state = self._platform_handler(
self._config, raw_line, self.backtrace_state
)
else:
self.backtrace_state = process_stacktrace(
self._config, raw_line, backtrace_state=self.backtrace_state
)
except EsphomeError as exc:
self._decode_enabled = False
self.backtrace_state = False
@@ -114,7 +109,10 @@ async def async_run_logs(
module = importlib.import_module("esphome.components." + CORE.target_platform)
platform_process_stacktrace = getattr(module, "process_stacktrace")
except (AttributeError, ImportError):
pass
_LOGGER.info(
'Stacktrace analysis is unavailable: no compatible analyzer found for target platform "%s".',
CORE.target_platform,
)
processor = _LogLineProcessor(config, platform_process_stacktrace)
+76
View File
@@ -5,6 +5,7 @@ import logging
import os
from pathlib import Path
import re
import subprocess
from esphome import yaml_util
import esphome.codegen as cg
@@ -2515,3 +2516,78 @@ def copy_files():
CORE.relative_build_path(name).write_bytes(content)
else:
copy_file_if_changed(path, CORE.relative_build_path(name))
def _decode_pc(config, addr):
from esphome import platformio_api
idedata = platformio_api.get_idedata(config)
if not idedata.addr2line_path or not idedata.firmware_elf_path:
_LOGGER.debug("decode_pc no addr2line")
return
command = [idedata.addr2line_path, "-pfiaC", "-e", idedata.firmware_elf_path, addr]
try:
translation = subprocess.check_output(command, close_fds=False).decode().strip()
except Exception: # pylint: disable=broad-except
_LOGGER.debug("Caught exception for command %s", command, exc_info=1)
return
if "?? ??:0" in translation:
# Nothing useful
return
translation = translation.replace(" at ??:?", "").replace(":?", "")
_LOGGER.warning("Decoded %s", translation)
def _parse_register(config, regex, line):
match = regex.match(line)
if match is not None:
_decode_pc(config, match.group(1))
STACKTRACE_ESP32_PC_RE = re.compile(r".*PC\s*:\s*(?:0x)?(4[0-9a-fA-F]{7}).*")
STACKTRACE_ESP32_EXCVADDR_RE = re.compile(r"EXCVADDR\s*:\s*(?:0x)?(4[0-9a-fA-F]{7})")
STACKTRACE_ESP32_C3_PC_RE = re.compile(r"MEPC\s*:\s*(?:0x)?(4[0-9a-fA-F]{7})")
STACKTRACE_ESP32_C3_RA_RE = re.compile(r"RA\s*:\s*(?:0x)?(4[0-9a-fA-F]{7})")
STACKTRACE_BAD_ALLOC_RE = re.compile(
r"^last failed alloc call: (4[0-9a-fA-F]{7})\((\d+)\)$"
)
STACKTRACE_ESP32_BACKTRACE_RE = re.compile(
r"Backtrace:(?:\s*0x[0-9a-fA-F]{8}:0x[0-9a-fA-F]{8})+"
)
STACKTRACE_ESP32_BACKTRACE_PC_RE = re.compile(r"4[0-9a-f]{7}")
# ESP32 crash handler (stored backtrace from previous boot)
STACKTRACE_ESP32_CRASH_BT_RE = re.compile(r"BT\d+:\s*0x([0-9a-fA-F]{8})")
def process_stacktrace(config, line, backtrace_state):
line = line.strip()
# ESP32 PC/EXCVADDR
_parse_register(config, STACKTRACE_ESP32_PC_RE, line)
_parse_register(config, STACKTRACE_ESP32_EXCVADDR_RE, line)
# ESP32-C3 PC/RA
_parse_register(config, STACKTRACE_ESP32_C3_PC_RE, line)
_parse_register(config, STACKTRACE_ESP32_C3_RA_RE, line)
# bad alloc
match = re.match(STACKTRACE_BAD_ALLOC_RE, line)
if match is not None:
_LOGGER.warning(
"Memory allocation of %s bytes failed at %s", match.group(2), match.group(1)
)
_decode_pc(config, match.group(1))
# ESP32 crash handler backtrace (from previous boot)
match = re.search(STACKTRACE_ESP32_CRASH_BT_RE, line)
if match is not None:
_decode_pc(config, match.group(1))
# ESP32 single-line backtrace
match = re.match(STACKTRACE_ESP32_BACKTRACE_RE, line)
if match is not None:
_LOGGER.warning("Found stack trace! Trying to decode it")
for addr in re.finditer(STACKTRACE_ESP32_BACKTRACE_PC_RE, line):
_decode_pc(config, addr.group())
return backtrace_state
+115
View File
@@ -1,6 +1,7 @@
import logging
from pathlib import Path
import re
import subprocess
import esphome.codegen as cg
import esphome.config_validation as cv
@@ -419,3 +420,117 @@ def copy_files() -> None:
remove_float_scanf_file,
CORE.relative_build_path("remove_float_scanf.py"),
)
# ESP logs stack trace decoder, based on https://github.com/me-no-dev/EspExceptionDecoder
ESP8266_EXCEPTION_CODES = {
0: "Illegal instruction (Is the flash damaged?)",
1: "SYSCALL instruction",
2: "InstructionFetchError: Processor internal physical address or data error during "
"instruction fetch",
3: "LoadStoreError: Processor internal physical address or data error during load or store",
4: "Level1Interrupt: Level-1 interrupt as indicated by set level-1 bits in the INTERRUPT "
"register",
5: "Alloca: MOVSP instruction, if caller's registers are not in the register file",
6: "Integer Divide By Zero",
7: "reserved",
8: "Privileged: Attempt to execute a privileged operation when CRING ? 0",
9: "LoadStoreAlignmentCause: Load or store to an unaligned address",
10: "reserved",
11: "reserved",
12: "InstrPIFDataError: PIF data error during instruction fetch",
13: "LoadStorePIFDataError: Synchronous PIF data error during LoadStore access",
14: "InstrPIFAddrError: PIF address error during instruction fetch",
15: "LoadStorePIFAddrError: Synchronous PIF address error during LoadStore access",
16: "InstTLBMiss: Error during Instruction TLB refill",
17: "InstTLBMultiHit: Multiple instruction TLB entries matched",
18: "InstFetchPrivilege: An instruction fetch referenced a virtual address at a ring level "
"less than CRING",
19: "reserved",
20: "InstFetchProhibited: An instruction fetch referenced a page mapped with an attribute "
"that does not permit instruction fetch",
21: "reserved",
22: "reserved",
23: "reserved",
24: "LoadStoreTLBMiss: Error during TLB refill for a load or store",
25: "LoadStoreTLBMultiHit: Multiple TLB entries matched for a load or store",
26: "LoadStorePrivilege: A load or store referenced a virtual address at a ring level less "
"than ",
27: "reserved",
28: "Access to invalid address: LOAD (wild pointer?)",
29: "Access to invalid address: STORE (wild pointer?)",
}
def _decode_pc(config, addr):
from esphome import platformio_api
idedata = platformio_api.get_idedata(config)
if not idedata.addr2line_path or not idedata.firmware_elf_path:
_LOGGER.debug("decode_pc no addr2line")
return
command = [idedata.addr2line_path, "-pfiaC", "-e", idedata.firmware_elf_path, addr]
try:
translation = subprocess.check_output(command, close_fds=False).decode().strip()
except Exception: # pylint: disable=broad-except
_LOGGER.debug("Caught exception for command %s", command, exc_info=1)
return
if "?? ??:0" in translation:
# Nothing useful
return
translation = translation.replace(" at ??:?", "").replace(":?", "")
_LOGGER.warning("Decoded %s", translation)
def _parse_register(config, regex, line):
match = regex.match(line)
if match is not None:
_decode_pc(config, match.group(1))
STACKTRACE_ESP8266_EXCEPTION_TYPE_RE = re.compile(r"[eE]xception \((\d+)\):")
STACKTRACE_ESP8266_PC_RE = re.compile(r"epc1=0x(4[0-9a-fA-F]{7})")
STACKTRACE_ESP8266_EXCVADDR_RE = re.compile(r"excvaddr=0x(4[0-9a-fA-F]{7})")
STACKTRACE_BAD_ALLOC_RE = re.compile(
r"^last failed alloc call: (4[0-9a-fA-F]{7})\((\d+)\)$"
)
STACKTRACE_ESP8266_BACKTRACE_PC_RE = re.compile(r"4[0-9a-f]{7}")
def process_stacktrace(config, line, backtrace_state):
line = line.strip()
# ESP8266 Exception type
match = re.match(STACKTRACE_ESP8266_EXCEPTION_TYPE_RE, line)
if match is not None:
code = int(match.group(1))
_LOGGER.warning(
"Exception type: %s", ESP8266_EXCEPTION_CODES.get(code, "unknown")
)
# ESP8266 PC/EXCVADDR
_parse_register(config, STACKTRACE_ESP8266_PC_RE, line)
_parse_register(config, STACKTRACE_ESP8266_EXCVADDR_RE, line)
# bad alloc
match = re.match(STACKTRACE_BAD_ALLOC_RE, line)
if match is not None:
_LOGGER.warning(
"Memory allocation of %s bytes failed at %s", match.group(2), match.group(1)
)
_decode_pc(config, match.group(1))
# ESP8266 multi-line backtrace
if ">>>stack>>>" in line:
# Start of backtrace
backtrace_state = True
_LOGGER.warning("Found stack trace! Trying to decode it")
elif "<<<stack<<<" in line:
# End of backtrace
backtrace_state = False
if backtrace_state:
for addr in re.finditer(STACKTRACE_ESP8266_BACKTRACE_PC_RE, line):
_decode_pc(config, addr.group())
return backtrace_state
+1 -149
View File
@@ -1,15 +1,13 @@
from dataclasses import dataclass
import json
import logging
import os
from pathlib import Path
import re
import subprocess
import sys
from esphome.const import CONF_COMPILE_PROCESS_LIMIT, CONF_ESPHOME, KEY_CORE
from esphome.core import CORE, EsphomeError
from esphome.util import run_external_process
from esphome.util import FlashImage, run_external_process
_LOGGER = logging.getLogger(__name__)
@@ -140,152 +138,6 @@ def get_idedata(config) -> "IDEData":
return idedata
# ESP logs stack trace decoder, based on https://github.com/me-no-dev/EspExceptionDecoder
ESP8266_EXCEPTION_CODES = {
0: "Illegal instruction (Is the flash damaged?)",
1: "SYSCALL instruction",
2: "InstructionFetchError: Processor internal physical address or data error during "
"instruction fetch",
3: "LoadStoreError: Processor internal physical address or data error during load or store",
4: "Level1Interrupt: Level-1 interrupt as indicated by set level-1 bits in the INTERRUPT "
"register",
5: "Alloca: MOVSP instruction, if caller's registers are not in the register file",
6: "Integer Divide By Zero",
7: "reserved",
8: "Privileged: Attempt to execute a privileged operation when CRING ? 0",
9: "LoadStoreAlignmentCause: Load or store to an unaligned address",
10: "reserved",
11: "reserved",
12: "InstrPIFDataError: PIF data error during instruction fetch",
13: "LoadStorePIFDataError: Synchronous PIF data error during LoadStore access",
14: "InstrPIFAddrError: PIF address error during instruction fetch",
15: "LoadStorePIFAddrError: Synchronous PIF address error during LoadStore access",
16: "InstTLBMiss: Error during Instruction TLB refill",
17: "InstTLBMultiHit: Multiple instruction TLB entries matched",
18: "InstFetchPrivilege: An instruction fetch referenced a virtual address at a ring level "
"less than CRING",
19: "reserved",
20: "InstFetchProhibited: An instruction fetch referenced a page mapped with an attribute "
"that does not permit instruction fetch",
21: "reserved",
22: "reserved",
23: "reserved",
24: "LoadStoreTLBMiss: Error during TLB refill for a load or store",
25: "LoadStoreTLBMultiHit: Multiple TLB entries matched for a load or store",
26: "LoadStorePrivilege: A load or store referenced a virtual address at a ring level less "
"than ",
27: "reserved",
28: "Access to invalid address: LOAD (wild pointer?)",
29: "Access to invalid address: STORE (wild pointer?)",
}
def _decode_pc(config, addr):
idedata = get_idedata(config)
if not idedata.addr2line_path or not idedata.firmware_elf_path:
_LOGGER.debug("decode_pc no addr2line")
return
command = [idedata.addr2line_path, "-pfiaC", "-e", idedata.firmware_elf_path, addr]
try:
translation = subprocess.check_output(command, close_fds=False).decode().strip()
except Exception: # pylint: disable=broad-except
_LOGGER.debug("Caught exception for command %s", command, exc_info=1)
return
if "?? ??:0" in translation:
# Nothing useful
return
translation = translation.replace(" at ??:?", "").replace(":?", "")
_LOGGER.warning("Decoded %s", translation)
def _parse_register(config, regex, line):
match = regex.match(line)
if match is not None:
_decode_pc(config, match.group(1))
STACKTRACE_ESP8266_EXCEPTION_TYPE_RE = re.compile(r"[eE]xception \((\d+)\):")
STACKTRACE_ESP8266_PC_RE = re.compile(r"epc1=0x(4[0-9a-fA-F]{7})")
STACKTRACE_ESP8266_EXCVADDR_RE = re.compile(r"excvaddr=0x(4[0-9a-fA-F]{7})")
STACKTRACE_ESP32_PC_RE = re.compile(r".*PC\s*:\s*(?:0x)?(4[0-9a-fA-F]{7}).*")
STACKTRACE_ESP32_EXCVADDR_RE = re.compile(r"EXCVADDR\s*:\s*(?:0x)?(4[0-9a-fA-F]{7})")
STACKTRACE_ESP32_C3_PC_RE = re.compile(r"MEPC\s*:\s*(?:0x)?(4[0-9a-fA-F]{7})")
STACKTRACE_ESP32_C3_RA_RE = re.compile(r"RA\s*:\s*(?:0x)?(4[0-9a-fA-F]{7})")
STACKTRACE_BAD_ALLOC_RE = re.compile(
r"^last failed alloc call: (4[0-9a-fA-F]{7})\((\d+)\)$"
)
STACKTRACE_ESP32_BACKTRACE_RE = re.compile(
r"Backtrace:(?:\s*0x[0-9a-fA-F]{8}:0x[0-9a-fA-F]{8})+"
)
STACKTRACE_ESP32_BACKTRACE_PC_RE = re.compile(r"4[0-9a-f]{7}")
# ESP32 crash handler (stored backtrace from previous boot)
STACKTRACE_ESP32_CRASH_BT_RE = re.compile(r"BT\d+:\s*0x([0-9a-fA-F]{8})")
STACKTRACE_ESP8266_BACKTRACE_PC_RE = re.compile(r"4[0-9a-f]{7}")
def process_stacktrace(config, line, backtrace_state):
line = line.strip()
# ESP8266 Exception type
match = re.match(STACKTRACE_ESP8266_EXCEPTION_TYPE_RE, line)
if match is not None:
code = int(match.group(1))
_LOGGER.warning(
"Exception type: %s", ESP8266_EXCEPTION_CODES.get(code, "unknown")
)
# ESP8266 PC/EXCVADDR
_parse_register(config, STACKTRACE_ESP8266_PC_RE, line)
_parse_register(config, STACKTRACE_ESP8266_EXCVADDR_RE, line)
# ESP32 PC/EXCVADDR
_parse_register(config, STACKTRACE_ESP32_PC_RE, line)
_parse_register(config, STACKTRACE_ESP32_EXCVADDR_RE, line)
# ESP32-C3 PC/RA
_parse_register(config, STACKTRACE_ESP32_C3_PC_RE, line)
_parse_register(config, STACKTRACE_ESP32_C3_RA_RE, line)
# bad alloc
match = re.match(STACKTRACE_BAD_ALLOC_RE, line)
if match is not None:
_LOGGER.warning(
"Memory allocation of %s bytes failed at %s", match.group(2), match.group(1)
)
_decode_pc(config, match.group(1))
# ESP32 crash handler backtrace (from previous boot)
match = re.search(STACKTRACE_ESP32_CRASH_BT_RE, line)
if match is not None:
_decode_pc(config, match.group(1))
# ESP32 single-line backtrace
match = re.match(STACKTRACE_ESP32_BACKTRACE_RE, line)
if match is not None:
_LOGGER.warning("Found stack trace! Trying to decode it")
for addr in re.finditer(STACKTRACE_ESP32_BACKTRACE_PC_RE, line):
_decode_pc(config, addr.group())
# ESP8266 multi-line backtrace
if ">>>stack>>>" in line:
# Start of backtrace
backtrace_state = True
_LOGGER.warning("Found stack trace! Trying to decode it")
elif "<<<stack<<<" in line:
# End of backtrace
backtrace_state = False
if backtrace_state:
for addr in re.finditer(STACKTRACE_ESP8266_BACKTRACE_PC_RE, line):
_decode_pc(config, addr.group())
return backtrace_state
@dataclass
class FlashImage:
path: Path
offset: str
class IDEData:
def __init__(self, raw):
self.raw = raw
+6
View File
@@ -487,3 +487,9 @@ def get_esp32_arduino_flash_error_help() -> str | None:
"https://esphome.io/guides/esp32_arduino_to_idf/\n\n",
)
)
@dataclass
class FlashImage:
path: Path
offset: str
+12 -11
View File
@@ -4,6 +4,7 @@ from __future__ import annotations
from unittest.mock import patch
from esphome.components import esp32
from esphome.components.api import client as api_client
from esphome.core import EsphomeError
@@ -18,11 +19,11 @@ def test_decoder_swallows_esphome_error() -> None:
reconnect.
"""
config = {"esphome": {"name": "test"}}
processor = api_client._LogLineProcessor(config, None)
with patch.object(
api_client, "process_stacktrace", side_effect=EsphomeError("no idedata")
esp32, "process_stacktrace", side_effect=EsphomeError("no idedata")
) as mock_process:
processor = api_client._LogLineProcessor(config, esp32.process_stacktrace)
processor.process_line("PC: 0x4010496e")
assert mock_process.called
@@ -47,9 +48,9 @@ def test_decoder_warning_uses_fallback_for_empty_error(caplog) -> None:
must show a useful explanation rather than empty parens.
"""
config = {"esphome": {"name": "test"}}
processor = api_client._LogLineProcessor(config, None)
with patch.object(api_client, "process_stacktrace", side_effect=EsphomeError()):
with patch.object(esp32, "process_stacktrace", side_effect=EsphomeError()):
processor = api_client._LogLineProcessor(config, esp32.process_stacktrace)
processor.process_line("PC: 0x4010496e")
warnings = [r.message for r in caplog.records if r.levelname == "WARNING"]
@@ -65,11 +66,11 @@ def test_decoder_short_circuits_after_failure() -> None:
stall log streaming.
"""
config = {"esphome": {"name": "test"}}
processor = api_client._LogLineProcessor(config, None)
with patch.object(
api_client, "process_stacktrace", side_effect=EsphomeError("no idedata")
esp32, "process_stacktrace", side_effect=EsphomeError("no idedata")
) as mock_process:
processor = api_client._LogLineProcessor(config, esp32.process_stacktrace)
processor.process_line("PC: 0x4010496e")
processor.process_line("BT0: 0x4010496e")
processor.process_line("BT1: 0x401049aa")
@@ -80,18 +81,18 @@ def test_decoder_short_circuits_after_failure() -> None:
def test_decoder_threads_backtrace_state() -> None:
"""When decoding succeeds, backtrace_state is threaded across calls."""
config = {"esphome": {"name": "test"}}
processor = api_client._LogLineProcessor(config, None)
with patch.object(
api_client, "process_stacktrace", side_effect=[True, False]
esp32, "process_stacktrace", side_effect=[True, False]
) as mock_process:
processor = api_client._LogLineProcessor(config, esp32.process_stacktrace)
processor.process_line(">>>stack>>>")
assert processor.backtrace_state is True
processor.process_line("<<<stack<<<")
assert processor.backtrace_state is False
assert mock_process.call_args_list[0].kwargs == {"backtrace_state": False}
assert mock_process.call_args_list[1].kwargs == {"backtrace_state": True}
assert not mock_process.call_args_list[0].args[-1]
assert mock_process.call_args_list[1].args[-1]
def test_decoder_uses_platform_handler_when_provided() -> None:
@@ -105,7 +106,7 @@ def test_decoder_uses_platform_handler_when_provided() -> None:
processor = api_client._LogLineProcessor(config, platform_handler)
with patch.object(api_client, "process_stacktrace") as mock_generic:
with patch.object(esp32, "process_stacktrace") as mock_generic:
processor.process_line("BT0: 0x4010496e")
assert calls == [(config, "BT0: 0x4010496e", False)]
@@ -0,0 +1,109 @@
"""Tests for ESP32 component."""
from pathlib import Path
from unittest.mock import Mock
def test_process_stacktrace_esp8266_exception(setup_core: Path, caplog) -> None:
"""Test process_stacktrace handles ESP8266 exceptions."""
from esphome.components.esp8266 import process_stacktrace
config = {"name": "test"}
# Test exception type parsing
line = "Exception (28):"
backtrace_state = False
result = process_stacktrace(config, line, backtrace_state)
assert "Access to invalid address: LOAD (wild pointer?)" in caplog.text
assert result is False
def test_process_stacktrace_esp8266_backtrace(
setup_core: Path, mock_esp8266_decode_pc: Mock
) -> None:
"""Test process_stacktrace handles ESP8266 multi-line backtrace."""
from esphome.components.esp8266 import process_stacktrace
config = {"name": "test"}
# Start of backtrace
line1 = ">>>stack>>>"
state = process_stacktrace(config, line1, False)
assert state is True
# Backtrace content with addresses
line2 = "40201234 40205678"
state = process_stacktrace(config, line2, state)
assert state is True
assert mock_esp8266_decode_pc.call_count == 2
# End of backtrace
line3 = "<<<stack<<<"
state = process_stacktrace(config, line3, state)
assert state is False
def test_process_stacktrace_esp32_backtrace(
setup_core: Path, mock_esp32_decode_pc: Mock
) -> None:
"""Test process_stacktrace handles ESP32 single-line backtrace."""
from esphome.components.esp32 import process_stacktrace
config = {"name": "test"}
line = "Backtrace: 0x40081234:0x3ffb1234 0x40085678:0x3ffb5678"
state = process_stacktrace(config, line, False)
# Should decode both addresses
assert mock_esp32_decode_pc.call_count == 2
mock_esp32_decode_pc.assert_any_call(config, "40081234")
mock_esp32_decode_pc.assert_any_call(config, "40085678")
assert state is False
def test_process_stacktrace_bad_alloc(
setup_core: Path, mock_esp32_decode_pc: Mock, caplog
) -> None:
"""Test process_stacktrace handles bad alloc messages."""
from esphome.components.esp32 import process_stacktrace
config = {"name": "test"}
line = "last failed alloc call: 40201234(512)"
state = process_stacktrace(config, line, False)
assert "Memory allocation of 512 bytes failed at 40201234" in caplog.text
mock_esp32_decode_pc.assert_called_once_with(config, "40201234")
assert state is False
def test_process_stacktrace_esp32_crash_handler(
setup_core: Path, mock_esp32_decode_pc: Mock
) -> None:
"""Test process_stacktrace handles ESP32 crash handler backtrace lines."""
from esphome.components.esp32 import process_stacktrace
config = {"name": "test"}
# Simulate crash handler log lines as they appear from the API/serial
line_pc = "[E][esp32.crash:078]: PC: 0x400D1234 (fault location)"
state = process_stacktrace(config, line_pc, False)
# PC line is matched by existing STACKTRACE_ESP32_PC_RE
mock_esp32_decode_pc.assert_called_with(config, "400D1234")
assert state is False
mock_esp32_decode_pc.reset_mock()
line_bt0 = "[E][esp32.crash:080]: BT0: 0x400D5678 (backtrace)"
state = process_stacktrace(config, line_bt0, False)
mock_esp32_decode_pc.assert_called_once_with(config, "400D5678")
assert state is False
mock_esp32_decode_pc.reset_mock()
line_bt1 = "[E][esp32.crash:080]: BT1: 0x42005ABC (backtrace)"
state = process_stacktrace(config, line_bt1, False)
mock_esp32_decode_pc.assert_called_once_with(config, "42005ABC")
assert state is False
+10 -3
View File
@@ -77,9 +77,16 @@ def mock_run_platformio_cli_run() -> Generator[Mock, None, None]:
@pytest.fixture
def mock_decode_pc() -> Generator[Mock, None, None]:
"""Mock _decode_pc for platformio_api."""
with patch("esphome.platformio_api._decode_pc") as mock:
def mock_esp32_decode_pc() -> Generator[Mock, None, None]:
"""Mock _decode_pc for esp32."""
with patch("esphome.components.esp32._decode_pc") as mock:
yield mock
@pytest.fixture
def mock_esp8266_decode_pc() -> Generator[Mock, None, None]:
"""Mock _decode_pc for esp8266."""
with patch("esphome.components.esp8266._decode_pc") as mock:
yield mock
+12 -11
View File
@@ -54,6 +54,7 @@ from esphome.__main__ import (
)
from esphome.address_cache import AddressCache
from esphome.bundle import BUNDLE_EXTENSION, BundleFile, BundleResult
from esphome.components import esp32
from esphome.components.esp32 import KEY_ESP32, KEY_VARIANT, VARIANT_ESP32
from esphome.const import (
CONF_API,
@@ -85,7 +86,7 @@ from esphome.const import (
)
from esphome.core import CORE, EsphomeError
from esphome.espota2 import OTA_TYPE_UPDATE_APP, OTA_TYPE_UPDATE_PARTITION_TABLE
from esphome.util import BootselResult
from esphome.util import BootselResult, FlashImage
from esphome.zeroconf import _await_discovery, discover_mdns_devices
@@ -1181,8 +1182,8 @@ def test_upload_using_esptool_path_conversion(
mock_idedata = MagicMock(spec=platformio_api.IDEData)
mock_idedata.firmware_bin_path = tmp_path / "firmware.bin"
mock_idedata.extra_flash_images = [
platformio_api.FlashImage(path=tmp_path / "bootloader.bin", offset="0x1000"),
platformio_api.FlashImage(path=tmp_path / "partitions.bin", offset="0x8000"),
FlashImage(path=tmp_path / "bootloader.bin", offset="0x1000"),
FlashImage(path=tmp_path / "partitions.bin", offset="0x8000"),
]
mock_get_idedata.return_value = mock_idedata
@@ -1259,8 +1260,8 @@ def test_upload_using_esptool_skips_missing_extra_flash_images(
mock_idedata = MagicMock(spec=platformio_api.IDEData)
mock_idedata.firmware_bin_path = tmp_path / "firmware.bin"
mock_idedata.extra_flash_images = [
platformio_api.FlashImage(path=tmp_path / "bootloader.bin", offset="0x1000"),
platformio_api.FlashImage(path=missing_path, offset="0x2d0000"),
FlashImage(path=tmp_path / "bootloader.bin", offset="0x1000"),
FlashImage(path=missing_path, offset="0x2d0000"),
]
mock_get_idedata.return_value = mock_idedata
@@ -4225,7 +4226,7 @@ def test_run_miniterm_batches_lines_with_same_timestamp(
with (
patch("serial.Serial", return_value=mock_serial),
patch.object(platformio_api, "process_stacktrace") as mock_bt,
patch.object(esp32, "process_stacktrace") as mock_bt,
):
mock_bt.return_value = False
result = run_miniterm(config, "/dev/ttyUSB0", args)
@@ -4264,7 +4265,7 @@ def test_run_miniterm_different_chunks_different_timestamps(
with (
patch("serial.Serial", return_value=mock_serial),
patch.object(platformio_api, "process_stacktrace") as mock_bt,
patch.object(esp32, "process_stacktrace") as mock_bt,
):
mock_bt.return_value = False
result = run_miniterm(config, "/dev/ttyUSB0", args)
@@ -4295,7 +4296,7 @@ def test_run_miniterm_handles_split_lines() -> None:
with (
patch("serial.Serial", return_value=mock_serial),
patch.object(platformio_api, "process_stacktrace") as mock_bt,
patch.object(esp32, "process_stacktrace") as mock_bt,
patch("esphome.__main__.safe_print") as mock_print,
):
mock_bt.return_value = False
@@ -4349,7 +4350,7 @@ def test_run_miniterm_backtrace_state_maintained() -> None:
with (
patch("serial.Serial", return_value=mock_serial),
patch.object(
platformio_api,
esp32,
"process_stacktrace",
side_effect=track_backtrace_state,
),
@@ -4400,7 +4401,7 @@ def test_run_miniterm_handles_empty_reads(
with (
patch("serial.Serial", return_value=mock_serial),
patch.object(platformio_api, "process_stacktrace") as mock_bt,
patch.object(esp32, "process_stacktrace") as mock_bt,
):
mock_bt.return_value = False
result = run_miniterm(config, "/dev/ttyUSB0", args)
@@ -4473,7 +4474,7 @@ def test_run_miniterm_buffer_limit_prevents_unbounded_growth() -> None:
with (
patch("serial.Serial", return_value=mock_serial),
patch.object(platformio_api, "process_stacktrace") as mock_bt,
patch.object(esp32, "process_stacktrace") as mock_bt,
patch("esphome.__main__.safe_print") as mock_print,
patch("esphome.__main__.SERIAL_BUFFER_MAX_SIZE", test_buffer_limit),
):
+3 -97
View File
@@ -13,6 +13,7 @@ import pytest
from esphome import platformio_api, platformio_runner
from esphome.core import CORE, EsphomeError
from esphome.util import FlashImage
def test_idedata_firmware_elf_path(setup_core: Path) -> None:
@@ -70,7 +71,7 @@ def test_idedata_extra_flash_images(setup_core: Path) -> None:
images = idedata.extra_flash_images
assert len(images) == 2
assert all(isinstance(img, platformio_api.FlashImage) for img in images)
assert all(isinstance(img, FlashImage) for img in images)
assert images[0].path == Path("/path/to/bootloader.bin")
assert images[0].offset == "0x1000"
assert images[1].path == Path("/path/to/partition.bin")
@@ -106,7 +107,7 @@ def test_idedata_cc_path(setup_core: Path) -> None:
def test_flash_image_dataclass() -> None:
"""Test FlashImage dataclass stores path and offset correctly."""
image = platformio_api.FlashImage(path=Path("/path/to/image.bin"), offset="0x10000")
image = FlashImage(path=Path("/path/to/image.bin"), offset="0x10000")
assert image.path == Path("/path/to/image.bin")
assert image.offset == "0x10000"
@@ -708,101 +709,6 @@ def test_patched_clean_build_dir_creates_missing(setup_core: Path) -> None:
assert build_dir.exists()
def test_process_stacktrace_esp8266_exception(setup_core: Path, caplog) -> None:
"""Test process_stacktrace handles ESP8266 exceptions."""
config = {"name": "test"}
# Test exception type parsing
line = "Exception (28):"
backtrace_state = False
result = platformio_api.process_stacktrace(config, line, backtrace_state)
assert "Access to invalid address: LOAD (wild pointer?)" in caplog.text
assert result is False
def test_process_stacktrace_esp8266_backtrace(
setup_core: Path, mock_decode_pc: Mock
) -> None:
"""Test process_stacktrace handles ESP8266 multi-line backtrace."""
config = {"name": "test"}
# Start of backtrace
line1 = ">>>stack>>>"
state = platformio_api.process_stacktrace(config, line1, False)
assert state is True
# Backtrace content with addresses
line2 = "40201234 40205678"
state = platformio_api.process_stacktrace(config, line2, state)
assert state is True
assert mock_decode_pc.call_count == 2
# End of backtrace
line3 = "<<<stack<<<"
state = platformio_api.process_stacktrace(config, line3, state)
assert state is False
def test_process_stacktrace_esp32_backtrace(
setup_core: Path, mock_decode_pc: Mock
) -> None:
"""Test process_stacktrace handles ESP32 single-line backtrace."""
config = {"name": "test"}
line = "Backtrace: 0x40081234:0x3ffb1234 0x40085678:0x3ffb5678"
state = platformio_api.process_stacktrace(config, line, False)
# Should decode both addresses
assert mock_decode_pc.call_count == 2
mock_decode_pc.assert_any_call(config, "40081234")
mock_decode_pc.assert_any_call(config, "40085678")
assert state is False
def test_process_stacktrace_bad_alloc(
setup_core: Path, mock_decode_pc: Mock, caplog
) -> None:
"""Test process_stacktrace handles bad alloc messages."""
config = {"name": "test"}
line = "last failed alloc call: 40201234(512)"
state = platformio_api.process_stacktrace(config, line, False)
assert "Memory allocation of 512 bytes failed at 40201234" in caplog.text
mock_decode_pc.assert_called_once_with(config, "40201234")
assert state is False
def test_process_stacktrace_esp32_crash_handler(
setup_core: Path, mock_decode_pc: Mock
) -> None:
"""Test process_stacktrace handles ESP32 crash handler backtrace lines."""
config = {"name": "test"}
# Simulate crash handler log lines as they appear from the API/serial
line_pc = "[E][esp32.crash:078]: PC: 0x400D1234 (fault location)"
state = platformio_api.process_stacktrace(config, line_pc, False)
# PC line is matched by existing STACKTRACE_ESP32_PC_RE
mock_decode_pc.assert_called_with(config, "400D1234")
assert state is False
mock_decode_pc.reset_mock()
line_bt0 = "[E][esp32.crash:080]: BT0: 0x400D5678 (backtrace)"
state = platformio_api.process_stacktrace(config, line_bt0, False)
mock_decode_pc.assert_called_once_with(config, "400D5678")
assert state is False
mock_decode_pc.reset_mock()
line_bt1 = "[E][esp32.crash:080]: BT1: 0x42005ABC (backtrace)"
state = platformio_api.process_stacktrace(config, line_bt1, False)
mock_decode_pc.assert_called_once_with(config, "42005ABC")
assert state is False
def test_patch_file_downloader_succeeds_first_try() -> None:
"""Test patch_file_downloader succeeds on first attempt."""
mock_exception_cls = type("PackageException", (Exception,), {})