diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index f43c46dd006..9909d7a5ddf 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -783,6 +783,91 @@ jobs: # Run compilation with grouping and isolation python3 script/test_build_components.py -e compile -c "$components_csv" -f --isolate "$directly_changed_csv" + test-native-idf: + name: Test components with native ESP-IDF + runs-on: ubuntu-24.04 + needs: + - common + - determine-jobs + if: github.event_name == 'pull_request' + env: + ESPHOME_ESP_IDF_PREFIX: ~/.esphome-idf + TEST_COMPONENTS: esp32,api,heatpumpir,bme280_i2c,bh1750,aht10,esp32_ble,esp32_ble_beacon,esp32_ble_client,esp32_ble_server,esp32_ble_tracker,ble_client,ble_presence,ble_rssi,ble_scanner + steps: + - name: Check out code from GitHub + uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2 + + - name: Restore Python + uses: ./.github/actions/restore-python + with: + python-version: ${{ env.DEFAULT_PYTHON }} + cache-key: ${{ needs.common.outputs.cache-key }} + + - name: Cache ESPHome + uses: actions/cache@cdf6c1fa76f9f475f3d7449005a359c84ca0f306 # v5.0.3 + with: + path: ~/.esphome-idf + key: ${{ runner.os }}-esphome-${{ needs.common.outputs.cache-key }} + + - name: Run native ESP-IDF compile test + run: | + . venv/bin/activate + + # Check if /mnt has more free space than / before bind mounting + # Extract available space in KB for comparison + root_avail=$(df -k / | awk 'NR==2 {print $4}') + mnt_avail=$(df -k /mnt 2>/dev/null | awk 'NR==2 {print $4}') + + echo "Available space: / has ${root_avail}KB, /mnt has ${mnt_avail}KB" + + # Only use /mnt if it has more space than / + if [ -n "$mnt_avail" ] && [ "$mnt_avail" -gt "$root_avail" ]; then + echo "Using /mnt for build files (more space available)" + # Bind mount PlatformIO directory to /mnt (tools, packages, build cache all go there) + sudo mkdir -p /mnt/esphome-idf + sudo chown $USER:$USER /mnt/esphome-idf + mkdir -p ~/.esphome-idf + sudo mount --bind /mnt/esphome-idf ~/.esphome-idf + + # Bind mount test build directory to /mnt + sudo mkdir -p /mnt/test_build_components_build + sudo chown $USER:$USER /mnt/test_build_components_build + mkdir -p tests/test_build_components/build + sudo mount --bind /mnt/test_build_components_build tests/test_build_components/build + else + echo "Using / for build files (more space available than /mnt or /mnt unavailable)" + fi + + echo "Testing components: $TEST_COMPONENTS" + echo "" + + # Show disk space before validation (after bind mounts setup) + echo "Disk space before config validation:" + df -h + echo "" + + # Run config validation (auto-grouped by test_build_components.py) + python3 script/test_build_components.py -e config -t esp32-idf -c "$TEST_COMPONENTS" -f --toolchain esp-idf + + echo "" + echo "Config validation passed! Starting compilation..." + echo "" + + # Show disk space before compilation + echo "Disk space before compilation:" + df -h + echo "" + + # Run compilation (auto-grouped by test_build_components.py) + python3 script/test_build_components.py -e compile -t esp32-idf -c "$TEST_COMPONENTS" -f --toolchain esp-idf + + - name: Save ESPHome cache + if: github.ref == 'refs/heads/dev' + uses: actions/cache/save@cdf6c1fa76f9f475f3d7449005a359c84ca0f306 # v5.0.3 + with: + path: ~/.esphome-idf + key: ${{ runner.os }}-esphome-${{ needs.common.outputs.cache-key }} + pre-commit-ci-lite: name: pre-commit.ci lite runs-on: ubuntu-latest @@ -1114,6 +1199,7 @@ jobs: - determine-jobs - device-builder - test-build-components-split + - test-native-idf - pre-commit-ci-lite - memory-impact-target-branch - memory-impact-pr-branch diff --git a/esphome/__main__.py b/esphome/__main__.py index a0ee5a359f4..01b33eb8acf 100644 --- a/esphome/__main__.py +++ b/esphome/__main__.py @@ -52,12 +52,12 @@ from esphome.const import ( CONF_WEB_SERVER, ENV_NOGITIGNORE, KEY_CORE, - KEY_NATIVE_IDF, KEY_TARGET_PLATFORM, PLATFORM_ESP32, PLATFORM_ESP8266, PLATFORM_RP2040, SECRETS_FILES, + Toolchain, ) from esphome.core import CORE, EsphomeError, coroutine from esphome.enum import StrEnum @@ -155,7 +155,6 @@ class ArgsProtocol(Protocol): configuration: str name: str upload_speed: str | None - native_idf: bool def choose_prompt(options, purpose: str = None): @@ -720,17 +719,14 @@ def _wrap_to_code(name, comp, yaml_util): return wrapped -def write_cpp(config: ConfigType, native_idf: bool = False) -> int: +def write_cpp(config: ConfigType) -> int: from esphome import writer if not get_bool_env(ENV_NOGITIGNORE): writer.write_gitignore() - # Store native_idf flag so esp32 component can check it - CORE.data[KEY_NATIVE_IDF] = native_idf - generate_cpp_contents(config) - return write_cpp_file(native_idf=native_idf) + return write_cpp_file() def generate_cpp_contents(config: ConfigType) -> None: @@ -746,13 +742,13 @@ def generate_cpp_contents(config: ConfigType) -> None: CORE.flush_tasks() -def write_cpp_file(native_idf: bool = False) -> int: +def write_cpp_file() -> int: from esphome import writer code_s = indent(CORE.cpp_main_section) writer.write_cpp(code_s) - if native_idf and CORE.is_esp32 and CORE.target_framework == "esp-idf": + if CORE.using_toolchain_esp_idf: from esphome.build_gen import espidf espidf.write_project() @@ -765,22 +761,21 @@ def write_cpp_file(native_idf: bool = False) -> int: def compile_program(args: ArgsProtocol, config: ConfigType) -> int: - native_idf = getattr(args, "native_idf", False) - # NOTE: "Build path:" format is parsed by script/ci_memory_impact_extract.py # If you change this format, update the regex in that script as well _LOGGER.info("Compiling app... Build path: %s", CORE.build_path) - if native_idf and CORE.is_esp32 and CORE.target_framework == "esp-idf": - from esphome import espidf_api + if CORE.using_toolchain_esp_idf: + from esphome.espidf import api - rc = espidf_api.run_compile(config, CORE.verbose) + rc = api.run_compile(config, CORE.verbose) if rc != 0: return rc - # Create factory.bin and ota.bin - espidf_api.create_factory_bin() - espidf_api.create_ota_bin() + # Create factory.bin, ota.bin, and firmware.elf copy + api.create_factory_bin() + api.create_ota_bin() + api.create_elf_copy() else: from esphome import platformio_api @@ -883,6 +878,10 @@ def upload_using_esptool( if file is not None: flash_images = [FlashImage(path=file, offset="0x0")] + elif CORE.using_toolchain_esp_idf: + from esphome.espidf import api + + flash_images = [FlashImage(path=api.get_factory_firmware_path(), offset="0x0")] else: from esphome import platformio_api @@ -1447,8 +1446,7 @@ def command_vscode(args: ArgsProtocol) -> int | None: def command_compile(args: ArgsProtocol, config: ConfigType) -> int | None: - native_idf = getattr(args, "native_idf", False) - exit_code = write_cpp(config, native_idf=native_idf) + exit_code = write_cpp(config) if exit_code != 0: return exit_code if args.only_generate: @@ -1458,9 +1456,14 @@ def command_compile(args: ArgsProtocol, config: ConfigType) -> int | None: if exit_code != 0: return exit_code if CORE.is_host: - from esphome.platformio_api import get_idedata + if CORE.using_toolchain_esp_idf: + from esphome.espidf import api - program_path = str(get_idedata(config).firmware_elf_path) + program_path = str(api.get_elf_path()) + else: + from esphome.platformio_api import get_idedata + + program_path = str(get_idedata(config).firmware_elf_path) _LOGGER.info("Successfully compiled program to path '%s'", program_path) else: _LOGGER.info("Successfully compiled program.") @@ -1503,8 +1506,7 @@ def command_logs(args: ArgsProtocol, config: ConfigType) -> int | None: def command_run(args: ArgsProtocol, config: ConfigType) -> int | None: - native_idf = getattr(args, "native_idf", False) - exit_code = write_cpp(config, native_idf=native_idf) + exit_code = write_cpp(config) if exit_code != 0: return exit_code exit_code = compile_program(args, config) @@ -1512,9 +1514,14 @@ def command_run(args: ArgsProtocol, config: ConfigType) -> int | None: return exit_code _LOGGER.info("Successfully compiled program.") if CORE.is_host: - from esphome.platformio_api import get_idedata + if CORE.using_toolchain_esp_idf: + from esphome.espidf import api - program_path = str(get_idedata(config).firmware_elf_path) + program_path = str(api.get_elf_path()) + else: + from esphome.platformio_api import get_idedata + + program_path = str(get_idedata(config).firmware_elf_path) _LOGGER.info("Running program from path '%s'", program_path) return run_external_process(program_path) @@ -1705,6 +1712,13 @@ def command_update_all(args: ArgsProtocol) -> int | None: def command_idedata(args: ArgsProtocol, config: ConfigType) -> int: import json + if not CORE.using_toolchain_platformio: + _LOGGER.error( + "The idedata command is not compatible with %s toolchain", + CORE.toolchain.value, + ) + return 1 + from esphome import platformio_api logging.disable(logging.INFO) @@ -1724,7 +1738,6 @@ def command_analyze_memory(args: ArgsProtocol, config: ConfigType) -> int: This command compiles the configuration and performs memory analysis. Compilation is fast if sources haven't changed (just relinking). """ - from esphome import platformio_api from esphome.analyze_memory.cli import MemoryAnalyzerCLI from esphome.analyze_memory.ram_strings import RamStringsAnalyzer @@ -1738,12 +1751,25 @@ def command_analyze_memory(args: ArgsProtocol, config: ConfigType) -> int: _LOGGER.info("Successfully compiled program.") # Get idedata for analysis - idedata = platformio_api.get_idedata(config) - if idedata is None: - _LOGGER.error("Failed to get IDE data for memory analysis") - return 1 + idedata = None + if CORE.using_toolchain_esp_idf: + from esphome.espidf import api - firmware_elf = Path(idedata.firmware_elf_path) + objdump_path = str(api.get_objdump_path()) + readelf_path = str(api.get_readelf_path()) + + firmware_elf = api.get_elf_path() + else: + from esphome import platformio_api + + idedata = platformio_api.get_idedata(config) + if idedata is None: + _LOGGER.error("Failed to get IDE data for memory analysis") + return 1 + objdump_path = idedata.objdump_path + readelf_path = idedata.readelf_path + + firmware_elf = Path(idedata.firmware_elf_path) # Extract external components from config external_components = detect_external_components(config) @@ -1753,8 +1779,8 @@ def command_analyze_memory(args: ArgsProtocol, config: ConfigType) -> int: _LOGGER.info("Analyzing memory usage...") analyzer = MemoryAnalyzerCLI( str(firmware_elf), - idedata.objdump_path, - idedata.readelf_path, + objdump_path, + readelf_path, external_components, idedata=idedata, ) @@ -1770,7 +1796,7 @@ def command_analyze_memory(args: ArgsProtocol, config: ConfigType) -> int: try: ram_analyzer = RamStringsAnalyzer( str(firmware_elf), - objdump_path=idedata.objdump_path, + objdump_path=objdump_path, platform=CORE.target_platform, ) ram_analyzer.analyze() @@ -2015,6 +2041,17 @@ def parse_args(argv): action="store_true", default=False, ) + options_parser.add_argument( + "--toolchain", + type=Toolchain, + default=None, + choices=list(Toolchain), + metavar="{" + ",".join(t.value for t in Toolchain) + "}", + help=( + "Select toolchain for compiling. Overrides '.toolchain' in YAML. " + f"Default: {Toolchain.PLATFORMIO.value}." + ), + ) parser = argparse.ArgumentParser( description=f"ESPHome {const.__version__}", parents=[options_parser] @@ -2059,11 +2096,6 @@ def parse_args(argv): help="Only generate source code, do not compile.", action="store_true", ) - parser_compile.add_argument( - "--native-idf", - help="Build with native ESP-IDF instead of PlatformIO (ESP32 esp-idf framework only).", - action="store_true", - ) parser_upload = subparsers.add_parser( "upload", @@ -2171,11 +2203,6 @@ def parse_args(argv): help="Reset the device before starting serial logs.", default=os.getenv("ESPHOME_SERIAL_LOGGING_RESET"), ) - parser_run.add_argument( - "--native-idf", - help="Build with native ESP-IDF instead of PlatformIO (ESP32 esp-idf framework only).", - action="store_true", - ) parser_run.add_argument( "--ota-platform", choices=[CONF_ESPHOME, CONF_WEB_SERVER], @@ -2398,6 +2425,9 @@ def run_esphome(argv): CORE.config_path = conf_path CORE.dashboard = args.dashboard + if args.toolchain is not None: + # CLI toolchain wins over esp32.toolchain in YAML. + CORE.toolchain = args.toolchain # Commands that don't need fresh external components: logs just connects # to the device, and clean is about to delete the build directory. @@ -2410,6 +2440,12 @@ def run_esphome(argv): return 2 CORE.config = config + # Fallback for platforms whose validators didn't set the toolchain + # (only the esp32 component reads esp32.framework.toolchain). All + # other platforms only support PlatformIO today. + if CORE.toolchain is None: + CORE.toolchain = Toolchain.PLATFORMIO + if args.command not in POST_CONFIG_ACTIONS: safe_print(f"Unknown command {args.command}") return 1 diff --git a/esphome/build_gen/espidf.py b/esphome/build_gen/espidf.py index 01923baaac2..b1443edac31 100644 --- a/esphome/build_gen/espidf.py +++ b/esphome/build_gen/espidf.py @@ -6,6 +6,7 @@ from pathlib import Path from esphome.components.esp32 import get_esp32_variant from esphome.core import CORE from esphome.helpers import mkdir_p, write_file_if_changed +from esphome.writer import update_storage_json def get_available_components() -> list[str] | None: @@ -54,7 +55,7 @@ def get_project_cmakelists() -> str: idf_target = variant.lower().replace("-", "") # Extract compile definitions from build flags (-DXXX -> XXX) - compile_defs = [flag for flag in CORE.build_flags if flag.startswith("-D")] + compile_defs = [flag for flag in sorted(CORE.build_flags) if flag.startswith("-D")] extra_compile_options = "\n".join( f'idf_build_set_property(COMPILE_OPTIONS "{compile_def}" APPEND)' for compile_def in compile_defs @@ -64,6 +65,22 @@ def get_project_cmakelists() -> str: # Auto-generated by ESPHome cmake_minimum_required(VERSION 3.16) +# On Windows, Ninja can fail with: +# "CreateProcess: The parameter is incorrect (is the command line too long?)" +# when compiler/linker command lines exceed the OS length limit. +# +# The following settings force CMake/Ninja to use *response files* (@file.rsp) +# to pass long lists of includes, objects, and other arguments indirectly, +# avoiding command-line length limits and fixing the build failure. +# +# This is especially useful for large ESP-IDF / ESPHome projects with many +# source files or include directories. +set(CMAKE_C_USE_RESPONSE_FILE_FOR_INCLUDES 1) +set(CMAKE_CXX_USE_RESPONSE_FILE_FOR_INCLUDES 1) +set(CMAKE_C_USE_RESPONSE_FILE_FOR_OBJECTS 1) +set(CMAKE_CXX_USE_RESPONSE_FILE_FOR_OBJECTS 1) +set(CMAKE_NINJA_FORCE_RESPONSE_FILE 1) + set(IDF_TARGET {idf_target}) set(EXTRA_COMPONENT_DIRS ${{CMAKE_SOURCE_DIR}}/src) @@ -124,6 +141,11 @@ target_link_options(${{COMPONENT_LIB}} PUBLIC def write_project(minimal: bool = False) -> None: """Write ESP-IDF project files.""" + # Refresh /storage/.yaml.json so the dashboard's + # /info and /downloads endpoints can locate the build (they 404 + # otherwise). This mirrors the PlatformIO build-gen path's call + # in build_gen/platformio.py:write_ini(). + update_storage_json() mkdir_p(CORE.build_path) mkdir_p(CORE.relative_src_path()) diff --git a/esphome/components/esp32/__init__.py b/esphome/components/esp32/__init__.py index 582721ef736..ba32d13ab34 100644 --- a/esphome/components/esp32/__init__.py +++ b/esphome/components/esp32/__init__.py @@ -31,6 +31,7 @@ from esphome.const import ( CONF_SAFE_MODE, CONF_SIZE, CONF_SOURCE, + CONF_TOOLCHAIN, CONF_TYPE, CONF_VARIANT, CONF_VERSION, @@ -38,16 +39,17 @@ from esphome.const import ( KEY_CORE, KEY_FRAMEWORK_VERSION, KEY_NAME, - KEY_NATIVE_IDF, KEY_TARGET_FRAMEWORK, KEY_TARGET_PLATFORM, PLATFORM_ESP32, ThreadModel, + Toolchain, __version__, ) -from esphome.core import CORE, HexInt +from esphome.core import CORE, HexInt, Library from esphome.core.config import BOARD_MAX_LENGTH from esphome.coroutine import CoroPriority, coroutine_with_priority +from esphome.espidf.component import generate_idf_component import esphome.final_validate as fv from esphome.helpers import copy_file_if_changed, rmtree, write_file_if_changed from esphome.types import ConfigType @@ -465,6 +467,9 @@ def set_core_data(config): if conf[CONF_TYPE] == FRAMEWORK_ESP_IDF: CORE.data[KEY_ESP32][KEY_IDF_VERSION] = framework_ver elif (idf_ver := ARDUINO_IDF_VERSION_LOOKUP.get(framework_ver)) is not None: + if CORE.using_toolchain_esp_idf: + # Official ESP-IDF frameworks don't use extra + idf_ver = cv.Version(idf_ver.major, idf_ver.minor, idf_ver.patch) CORE.data[KEY_ESP32][KEY_IDF_VERSION] = idf_ver else: raise cv.Invalid( @@ -652,7 +657,7 @@ def _format_framework_arduino_version(ver: cv.Version) -> str: return f"{ARDUINO_FRAMEWORK_PKG}@https://github.com/espressif/arduino-esp32/releases/download/{ver}/{filename}" -def _format_framework_espidf_version( +def _format_framework_pio_espidf_version( ver: cv.Version, release: str | None = None ) -> str: # format the given espidf (https://github.com/pioarduino/esp-idf/releases) version to @@ -741,6 +746,7 @@ ESP_IDF_FRAMEWORK_VERSION_LOOKUP = { "latest": cv.Version(5, 5, 4), "dev": cv.Version(5, 5, 4), } + ESP_IDF_PLATFORM_VERSION_LOOKUP = { cv.Version( 6, 0, 1 @@ -774,7 +780,7 @@ PLATFORM_VERSION_LOOKUP = { } -def _check_versions(config): +def _check_pio_versions(config): config = config.copy() value = config[CONF_FRAMEWORK] @@ -785,7 +791,7 @@ def _check_versions(config): ) platform_lookup = PLATFORM_VERSION_LOOKUP[value[CONF_VERSION]] - value[CONF_PLATFORM_VERSION] = _parse_platform_version(str(platform_lookup)) + value[CONF_PLATFORM_VERSION] = _parse_pio_platform_version(str(platform_lookup)) if value[CONF_TYPE] == FRAMEWORK_ARDUINO: version = ARDUINO_FRAMEWORK_VERSION_LOOKUP[value[CONF_VERSION]] @@ -813,7 +819,7 @@ def _check_versions(config): platform_lookup = ESP_IDF_PLATFORM_VERSION_LOOKUP.get(version) value[CONF_SOURCE] = value.get( CONF_SOURCE, - _format_framework_espidf_version(version, value.get(CONF_RELEASE)), + _format_framework_pio_espidf_version(version, value.get(CONF_RELEASE)), ) if _is_framework_url(value[CONF_SOURCE]): value[CONF_SOURCE] = f"pioarduino/framework-espidf@{value[CONF_SOURCE]}" @@ -823,7 +829,7 @@ def _check_versions(config): raise cv.Invalid( "Framework version not recognized; please specify platform_version" ) - value[CONF_PLATFORM_VERSION] = _parse_platform_version(str(platform_lookup)) + value[CONF_PLATFORM_VERSION] = _parse_pio_platform_version(str(platform_lookup)) if version != recommended_version: _LOGGER.warning( @@ -831,7 +837,7 @@ def _check_versions(config): "If there are connectivity or build issues please remove the manual version." ) - if value[CONF_PLATFORM_VERSION] != _parse_platform_version( + if value[CONF_PLATFORM_VERSION] != _parse_pio_platform_version( str(PLATFORM_VERSION_LOOKUP["recommended"]) ): _LOGGER.warning( @@ -842,7 +848,38 @@ def _check_versions(config): return config -def _parse_platform_version(value): +def _check_esp_idf_versions(config): + config = _check_pio_versions(config) + value = config[CONF_FRAMEWORK] + + # Remove unwanted keys if present + for key in (CONF_SOURCE, CONF_PLATFORM_VERSION): + value.pop(key, None) + + # Official ESP-IDF frameworks don't use extra + version = cv.Version.parse(value[CONF_VERSION]) + version = cv.Version(version.major, version.minor, version.patch) + + value[CONF_VERSION] = str(version) + + return config + + +def _validate_toolchain(value) -> Toolchain: + return Toolchain(cv.one_of(*(t.value for t in Toolchain), lower=True)(value)) + + +def _check_versions(config): + # Resolve toolchain: CLI (already on CORE.toolchain) > YAML > default. + if CORE.toolchain is None: + CORE.toolchain = config.get(CONF_TOOLCHAIN, Toolchain.PLATFORMIO) + + if CORE.using_toolchain_esp_idf: + return _check_esp_idf_versions(config) + return _check_pio_versions(config) + + +def _parse_pio_platform_version(value): try: ver = cv.Version.parse(cv.version_number(value)) release = f"{ver.major}.{ver.minor:02d}.{ver.patch:02d}" @@ -1272,7 +1309,7 @@ FRAMEWORK_SCHEMA = cv.Schema( cv.Optional(CONF_VERSION, default="recommended"): cv.string_strict, cv.Optional(CONF_RELEASE): cv.string_strict, cv.Optional(CONF_SOURCE): cv.string_strict, - cv.Optional(CONF_PLATFORM_VERSION): _parse_platform_version, + cv.Optional(CONF_PLATFORM_VERSION): _parse_pio_platform_version, cv.Optional(CONF_SDKCONFIG_OPTIONS, default={}): { cv.string_strict: cv.string_strict }, @@ -1524,6 +1561,7 @@ CONFIG_SCHEMA = cv.All( ), cv.Optional(CONF_VARIANT): cv.one_of(*VARIANTS, upper=True), cv.Optional(CONF_FRAMEWORK): FRAMEWORK_SCHEMA, + cv.Optional(CONF_TOOLCHAIN): _validate_toolchain, cv.Optional(CONF_WATCHDOG_TIMEOUT, default="5s"): cv.All( cv.positive_time_period_seconds, cv.Range(min=cv.TimePeriod(seconds=5), max=cv.TimePeriod(seconds=60)), @@ -1672,11 +1710,11 @@ async def to_code(config): framework_ver: cv.Version = CORE.data[KEY_CORE][KEY_FRAMEWORK_VERSION] conf = config[CONF_FRAMEWORK] - # Check if using native ESP-IDF build (--native-idf) - use_platformio = not CORE.data.get(KEY_NATIVE_IDF, False) + # Check if using ESP-IDF toolchain + use_platformio = not CORE.using_toolchain_esp_idf if use_platformio: # Clear IDF environment variables to avoid conflicts with PlatformIO's ESP-IDF - # but keep them when using --native-idf for native ESP-IDF builds + # but keep them when using ESP-IDF toolchain for clean_var in ("IDF_PATH", "IDF_TOOLS_PATH"): os.environ.pop(clean_var, None) @@ -1716,6 +1754,8 @@ async def to_code(config): ) else: cg.add_build_flag("-Wno-error=format") + cg.add_build_flag("-Wno-error=missing-field-initializers") + cg.add_build_flag("-Wno-error=volatile") cg.set_cpp_standard("gnu++20") cg.add_build_flag("-DUSE_ESP32") @@ -1792,7 +1832,7 @@ async def to_code(config): if (idf_ver := ARDUINO_IDF_VERSION_LOOKUP.get(framework_ver)) is not None: cg.add_platformio_option( "platform_packages", - [_format_framework_espidf_version(idf_ver)], + [_format_framework_pio_espidf_version(idf_ver)], ) # Use stub package to skip downloading precompiled libs stubs_dir = CORE.relative_build_path("arduino_libs_stub") @@ -2424,6 +2464,14 @@ def _write_sdkconfig(): clean_build(clear_pio_cache=False) +def _platformio_library_to_dependency(library: Library) -> tuple[str, dict[str, str]]: + dependency: dict[str, str] = {} + name, version, path = generate_idf_component(library) + dependency["override_path"] = str(path) + dependency["version"] = version + return name, dependency + + def _write_idf_component_yml(): yml_path = CORE.relative_build_path("src/idf_component.yml") dependencies: dict[str, dict] = {} @@ -2465,6 +2513,21 @@ def _write_idf_component_yml(): if stub_path.exists(): rmtree(stub_path) + if CORE.using_toolchain_esp_idf: + add_idf_component( + name="espressif/arduino-esp32", + ref=str(CORE.data[KEY_CORE][KEY_FRAMEWORK_VERSION]), + ) + + if CORE.using_toolchain_esp_idf: + # Try to convert PlatformIO library to ESP-IDF components + for name, library in CORE.platformio_libraries.items(): + # Don't process arduino libraries + if name in ARDUINO_DISABLED_LIBRARIES: + continue + dependency_name, dependency = _platformio_library_to_dependency(library) + dependencies[dependency_name] = dependency + if CORE.data[KEY_ESP32][KEY_COMPONENTS]: components: dict = CORE.data[KEY_ESP32][KEY_COMPONENTS] for name, component in components.items(): diff --git a/esphome/components/ethernet/__init__.py b/esphome/components/ethernet/__init__.py index 10f9a73863d..3f88f8ef9a4 100644 --- a/esphome/components/ethernet/__init__.py +++ b/esphome/components/ethernet/__init__.py @@ -36,7 +36,6 @@ from esphome.const import ( CONF_VALUE, KEY_CORE, KEY_FRAMEWORK_VERSION, - KEY_NATIVE_IDF, Platform, PlatformFramework, ) @@ -705,7 +704,7 @@ def _filter_source_files() -> list[str]: # and pioarduino doesn't have it builtin (IDF 5.4.2 to 5.x) if eth_type != "JL1101": excluded.append("esp_eth_phy_jl1101.c") - elif CORE.is_esp32 and not CORE.data.get(KEY_NATIVE_IDF, False): + elif CORE.is_esp32 and not CORE.using_toolchain_esp_idf: from esphome.components.esp32 import idf_version # pioarduino has JL1101 builtin on IDF 5.4.2-5.x; exclude custom driver diff --git a/esphome/components/neopixelbus/light.py b/esphome/components/neopixelbus/light.py index 104762c69e0..943fd141f69 100644 --- a/esphome/components/neopixelbus/light.py +++ b/esphome/components/neopixelbus/light.py @@ -244,6 +244,7 @@ async def to_code(config): # disable built in rgb support as it uses the new RMT drivers and will # conflict with NeoPixelBus which uses the legacy drivers cg.add_build_flag("-DESP32_ARDUINO_NO_RGB_BUILTIN") + cg.add_library("SPI", None) cg.add_library("makuna/NeoPixelBus", "2.8.0") else: cg.add_library("makuna/NeoPixelBus", "2.7.3") diff --git a/esphome/components/nrf52/ota.py b/esphome/components/nrf52/ota.py index e4b26b45eb6..eb1caa55958 100644 --- a/esphome/components/nrf52/ota.py +++ b/esphome/components/nrf52/ota.py @@ -142,7 +142,7 @@ async def _smpmgr_upload_connected( with open(firmware, "rb") as file: image = file.read() upload_size = len(image) - progress = ProgressBar() + progress = ProgressBar("Uploading") progress.update(0) try: async for offset in smp_client.upload(image): diff --git a/esphome/const.py b/esphome/const.py index c2bf86d5321..c39225fdec0 100644 --- a/esphome/const.py +++ b/esphome/const.py @@ -15,6 +15,13 @@ VALID_SUBSTITUTIONS_CHARACTERS = ( ARGUMENT_HELP_DEVICE = "Manually specify the serial port/address to use, for example /dev/ttyUSB0. Can be specified multiple times for fallback addresses. Use 'OTA' for resolving from MQTT, DNS or mDNS and avoiding the interactive prompt." +class Toolchain(StrEnum): + """Toolchain identifiers for ESPHome.""" + + PLATFORMIO = "platformio" + ESP_IDF = "esp-idf" + + class Platform(StrEnum): """Platform identifiers for ESPHome.""" @@ -1036,6 +1043,7 @@ CONF_TO = "to" CONF_TO_NTC_RESISTANCE = "to_ntc_resistance" CONF_TO_NTC_TEMPERATURE = "to_ntc_temperature" CONF_TOLERANCE = "tolerance" +CONF_TOOLCHAIN = "toolchain" CONF_TOPIC = "topic" CONF_TOPIC_PREFIX = "topic_prefix" CONF_TOTAL = "total" @@ -1393,7 +1401,6 @@ KEY_FRAMEWORK_VERSION = "framework_version" KEY_NAME = "name" KEY_VARIANT = "variant" KEY_PAST_SAFE_MODE = "past_safe_mode" -KEY_NATIVE_IDF = "native_idf" # Entity categories ENTITY_CATEGORY_NONE = "" diff --git a/esphome/core/__init__.py b/esphome/core/__init__.py index 0cc207aa543..ef0eddc603a 100644 --- a/esphome/core/__init__.py +++ b/esphome/core/__init__.py @@ -17,7 +17,6 @@ from esphome.const import ( CONF_WEB_SERVER, CONF_WIFI, KEY_CORE, - KEY_NATIVE_IDF, KEY_TARGET_FRAMEWORK, KEY_TARGET_PLATFORM, PLATFORM_BK72XX, @@ -28,6 +27,7 @@ from esphome.const import ( PLATFORM_NRF52, PLATFORM_RP2040, PLATFORM_RTL87XX, + Toolchain, ) # pylint: disable=unused-import @@ -618,6 +618,10 @@ class EsphomeCore: # When True, skip network freshness checks for cached external files # (e.g. for `esphome logs`, where remote downloads aren't needed) self.skip_external_update: bool = False + # Toolchain used for building the configuration. None until resolved + # by CLI (--toolchain) or by `esphome.toolchain:` in YAML during + # preload_core_config; defaults to PLATFORMIO if neither sets it. + self.toolchain: Toolchain | None = None def reset(self): from esphome.pins import PIN_SCHEMA_REGISTRY @@ -648,6 +652,7 @@ class EsphomeCore: self.address_cache = None self._config_hash = None self.skip_external_update = False + self.toolchain = None PIN_SCHEMA_REGISTRY.reset() @contextmanager @@ -772,8 +777,8 @@ class EsphomeCore: @property def firmware_bin(self) -> Path: - # Check if using native ESP-IDF build (--native-idf) - if self.data.get(KEY_NATIVE_IDF, False): + # Check if using ESP-IDF toolchain + if self.using_toolchain_esp_idf: return self.relative_build_path("build", f"{self.name}.bin") if self.is_libretiny: return self.relative_pioenvs_path(self.name, "firmware.uf2") @@ -781,10 +786,10 @@ class EsphomeCore: @property def partition_table_bin(self) -> Path: - # Native ESP-IDF (--native-idf): the partition table image is emitted under + # Native ESP-IDF (--toolchain esp-idf): the partition table image is emitted under # build/partition_table/partition-table.bin alongside firmware.bin. PlatformIO writes the # equivalent file as partitions.bin in the env-specific .pioenvs directory. - if self.data.get(KEY_NATIVE_IDF): + if self.using_toolchain_esp_idf: return self.relative_build_path( "build", "partition_table", "partition-table.bin" ) @@ -792,7 +797,7 @@ class EsphomeCore: @property def bootloader_bin(self) -> Path: - if self.data.get(KEY_NATIVE_IDF): + if self.using_toolchain_esp_idf: return self.relative_build_path("build", "bootloader", "bootloader.bin") return self.relative_pioenvs_path(self.name, "bootloader.bin") @@ -853,6 +858,14 @@ class EsphomeCore: ) return self.target_framework == "esp-idf" + @property + def using_toolchain_esp_idf(self): + return self.toolchain == Toolchain.ESP_IDF + + @property + def using_toolchain_platformio(self): + return self.toolchain == Toolchain.PLATFORMIO + @property def using_zephyr(self): return self.target_framework == "zephyr" diff --git a/esphome/espidf/__init__.py b/esphome/espidf/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/esphome/espidf/api.py b/esphome/espidf/api.py new file mode 100644 index 00000000000..847de249a7f --- /dev/null +++ b/esphome/espidf/api.py @@ -0,0 +1,499 @@ +"""ESP-IDF direct build API for ESPHome.""" + +from dataclasses import dataclass, field +import json +import logging +import os +from pathlib import Path +import re +import shutil +import subprocess + +from esphome.components.esp32.const import KEY_ESP32, KEY_FLASH_SIZE, KEY_IDF_VERSION +from esphome.core import CORE, EsphomeError +from esphome.espidf.framework import check_esp_idf_install, get_framework_env + +_LOGGER = logging.getLogger(__name__) + +DOMAIN = "espidf_api" + + +@dataclass +class _CacheData: + paths: dict[str, tuple] = field(default_factory=dict) + env: dict[str, dict[str, str]] = field(default_factory=dict) + cmake_output: dict[Path, str] = field(default_factory=dict) + cmake_tools: dict[Path, dict[str, Path]] = field(default_factory=dict) + + +def _cache() -> _CacheData: + if DOMAIN not in CORE.data: + CORE.data[DOMAIN] = _CacheData() + return CORE.data[DOMAIN] + + +def _get_core_framework_version(): + return str(CORE.data[KEY_ESP32][KEY_IDF_VERSION]) + + +def _get_esphome_esp_idf_paths( + version: str | None = None, +) -> tuple[os.PathLike, os.PathLike]: + version = version or _get_core_framework_version() + paths = _cache().paths + if version not in paths: + paths[version] = check_esp_idf_install(version) + return paths[version] + + +def _get_idf_path(version: str | None = None) -> Path | None: + """Get IDF_PATH from environment or common locations.""" + # Use provided IDF framework if available + if "IDF_PATH" in os.environ: + return Path(os.environ["IDF_PATH"]) + return Path(_get_esphome_esp_idf_paths(version)[0]) + + +def _get_idf_env(version: str | None = None) -> dict[str, str]: + """Get environment variables needed for ESP-IDF build.""" + version = version or _get_core_framework_version() + env_cache = _cache().env + if version not in env_cache: + env_cache[version] = os.environ.copy() + + # Use provided IDF framework if available + if "IDF_PATH" not in os.environ: + env_cache[version] |= get_framework_env( + *_get_esphome_esp_idf_paths(version) + ) + return env_cache[version] + + +def _get_cmake_output(build_dir) -> str: + cmake_output_cache = _cache().cmake_output + if build_dir not in cmake_output_cache: + cmd = ["cmake", "-LA", "-N", "."] + + env = _get_idf_env() + result = subprocess.run( + cmd, + cwd=build_dir, + env=env, + capture_output=True, + text=True, + check=False, + ) + + if result.returncode != 0: + raise RuntimeError(f"CMake failed: {result.stderr}") + + cmake_output_cache[build_dir] = result.stdout + return cmake_output_cache[build_dir] + + +def _get_cmake_tool_path(var_name: str) -> Path: + build_dir = CORE.relative_build_path("build") + cmake_output = _get_cmake_output(build_dir) + + cmake_tools_cache = _cache().cmake_tools + if build_dir not in cmake_tools_cache: + cmake_tools_cache[build_dir] = {} + + if var_name not in cmake_tools_cache[build_dir]: + pattern = rf"^{var_name}:FILEPATH=(.+)$" + match = re.search(pattern, cmake_output, re.MULTILINE) + + if not match: + raise RuntimeError(f"{var_name} not found in CMake output") + + path = match.group(1).strip() + cmake_tools_cache[build_dir][var_name] = Path(path) + + return cmake_tools_cache[build_dir][var_name] + + +def _get_idf_tool(name: str) -> str: + """Return the path to an executable from the ESP-IDF environment PATH or raise if not found.""" + env = _get_idf_env() + executable = shutil.which(name, path=env.get("PATH", None)) + if executable is None: + raise EsphomeError( + f"{name} executable not found in ESP-IDF environment. " + "Check that the IDF environment is correctly set up." + ) + return executable + + +def run_idf_py( + *args, cwd: Path | None = None, capture_output: bool = False +) -> int | str: + """Run idf.py with the given arguments.""" + idf_path = _get_idf_path() + if idf_path is None: + raise EsphomeError("ESP-IDF not found") + + env = _get_idf_env() + python_executable = _get_idf_tool("python") + idf_py = idf_path / "tools" / "idf.py" + # Dispatch idf.py through esphome.espidf.runner, which wraps + # sys.stdout/sys.stderr so ``isatty()`` reports True. This keeps CMake, + # Ninja, and idf.py's own progress-bar code emitting TTY-format output + # (``\r`` cursor moves, ANSI colors, fancy progress bars) even when our + # real stdout is a pipe — e.g. when esphome is running under the Home + # Assistant dashboard add-on. The runner is a plain script (not a + # ``python -m`` module) because IDF's Python venv does not have the + # esphome package installed. + runner_py = Path(__file__).parent / "runner.py" + + cmd = [python_executable, str(runner_py), str(idf_py)] + list(args) + + if cwd is None: + cwd = CORE.build_path + + _LOGGER.debug("Running: %s", " ".join(cmd)) + _LOGGER.debug(" in directory: %s", cwd) + + if capture_output: + result = subprocess.run( + cmd, + cwd=cwd, + env=env, + capture_output=True, + text=True, + check=False, + ) + if result.returncode != 0: + _LOGGER.error("idf.py failed:\n%s", result.stderr) + return result.stdout + result = subprocess.run( + cmd, + cwd=cwd, + env=env, + check=False, + ) + return result.returncode + + +def _get_sdkconfig_args() -> list[str]: + """Get cmake -D flags for the sdkconfig file, if it exists.""" + sdkconfig_path = CORE.relative_build_path(f"sdkconfig.{CORE.name}") + if sdkconfig_path.is_file(): + return ["-D", f"SDKCONFIG={sdkconfig_path}"] + return [] + + +def run_reconfigure() -> int: + """Run cmake reconfigure only (no build).""" + return run_idf_py(*_get_sdkconfig_args(), "reconfigure") + + +def has_outdated_files(): + """Check if the build configuration is stale. + + Returns True if required build files are missing or if configuration inputs + are newer than the generated CMake/Ninja build artifacts. + """ + cmakecache_txt_path = CORE.relative_build_path("build/CMakeCache.txt") + + cmakelists_txt_build_path = CORE.relative_build_path("CMakeLists.txt") + cmakelists_txt_src_path = CORE.relative_src_path("CMakeLists.txt") + build_config_path = CORE.relative_build_path("build/config") + sdkconfig_internal_path = CORE.relative_build_path( + f"sdkconfig.{CORE.name}.esphomeinternal" + ) + dependency_lock_path = CORE.relative_build_path("dependencies.lock") + build_ninja_path = CORE.relative_build_path("build/build.ninja") + + if not os.path.isdir(build_config_path) or not os.listdir(build_config_path): + return True + if not os.path.isfile(cmakecache_txt_path): + return True + if not os.path.isfile(build_ninja_path): + return True + if os.path.isfile(dependency_lock_path) and os.path.getmtime( + dependency_lock_path + ) > os.path.getmtime(build_ninja_path): + return True + + cmakecache_txt_mtime = os.path.getmtime(cmakecache_txt_path) + return any( + os.path.getmtime(f) > cmakecache_txt_mtime + for f in [ + _get_idf_path(), + cmakelists_txt_build_path, + cmakelists_txt_src_path, + sdkconfig_internal_path, + build_config_path, + ] + if f and os.path.exists(f) + ) + + +def need_reconfigure() -> bool: + from esphome.build_gen.espidf import has_discovered_components + + # We need to reconfigure either if the files are outdated or if there is no component discovered + return has_outdated_files() or not has_discovered_components() + + +def _patch_memory_segments(): + """Patch memory.ld to expand IRAM/DRAM for testing mode. + + Mirrors the PlatformIO iram_fix.py.script logic for native IDF builds. + Must be called after cmake configure (which generates memory.ld) and + before the build/link step. + """ + # Same sizes as iram_fix.py.script + testing_iram_size = 0x200000 # 2MB + testing_dram_size = 0x200000 # 2MB + + memory_ld = CORE.relative_build_path( + "build", "esp-idf", "esp_system", "ld", "memory.ld" + ) + if not memory_ld.is_file(): + _LOGGER.warning("Could not find linker script at %s", memory_ld) + return + + content = memory_ld.read_text() + patches = [] + + def _patch_segment(text, segment_name, new_size): + pattern = rf"({re.escape(segment_name)}\s*\([^)]*\)\s*:\s*org\s*=\s*.+?,\s*len\s*=\s*)(\S+[^\n]*)" + if match := re.search(pattern, text, re.DOTALL): + replacement = f"{match.group(1)}{new_size:#x}" + new_text = text[: match.start()] + replacement + text[match.end() :] + if new_text != text: + return new_text, True + return text, False + + content, patched = _patch_segment(content, "iram0_0_seg", testing_iram_size) + if patched: + patches.append(f"IRAM={testing_iram_size:#x}") + + content, patched = _patch_segment(content, "dram0_0_seg", testing_dram_size) + if patched: + patches.append(f"DRAM={testing_dram_size:#x}") + + if patches: + memory_ld.write_text(content) + _LOGGER.info("Patched %s in %s for testing mode", ", ".join(patches), memory_ld) + else: + _LOGGER.warning("Could not patch memory segments in %s", memory_ld) + + +def run_compile(config, verbose: bool) -> int: + """Compile the ESP-IDF project. + + Uses two-phase configure to auto-discover available components: + 1. If no previous build, configure with minimal REQUIRES to discover components + 2. Regenerate CMakeLists.txt with discovered components + 3. Run full build + """ + from esphome.build_gen.espidf import write_project + + # Check if we need to do discovery phase + if need_reconfigure(): + _LOGGER.info("Discovering available ESP-IDF components...") + write_project(minimal=True) + rc = run_reconfigure() + if rc != 0: + _LOGGER.error("Component discovery failed") + return rc + _LOGGER.info("Regenerating CMakeLists.txt with discovered components...") + write_project(minimal=False) + if CORE.testing_mode: + # Reconfigure again so cmake is up to date with the full component + # list. This ensures idf.py build won't re-run cmake, which would + # regenerate memory.ld and wipe the DRAM/IRAM patches applied below. + rc = run_reconfigure() + if rc != 0: + _LOGGER.error("Reconfigure with discovered components failed") + return rc + + # In testing mode, generate the linker script first, patch DRAM/IRAM sizes, + # then build. memory.ld is regenerated by ninja during the build phase, + # so we must patch after it's generated but before linking (same timing + # as iram_fix.py.script's AddPreAction hook in the PlatformIO path). + if CORE.testing_mode: + memory_ld = CORE.relative_build_path( + "build", "esp-idf", "esp_system", "ld", "memory.ld" + ) + build_dir = CORE.relative_build_path("build") + # Build just the memory.ld target - ninja needs the path relative to build dir + memory_ld_target = os.path.relpath(str(memory_ld), str(build_dir)) + env = _get_idf_env() + ninja_executable = _get_idf_tool("ninja") + result = subprocess.run( + [ninja_executable, "-C", str(build_dir), memory_ld_target], + env=env, + check=False, + ) + if result.returncode != 0: + _LOGGER.error("Failed to generate linker script") + return result.returncode + _patch_memory_segments() + + # Build + args = [] + + if verbose: + args.append("-v") + + args.extend(_get_sdkconfig_args()) + args.append("build") + + return run_idf_py(*args) + + +def get_firmware_path() -> Path: + """Get the path to the compiled firmware binary. + + This is the file idf.py writes directly (named after the project), + not the copy used for OTA/factory downloads below. + """ + build_dir = CORE.relative_build_path("build") + return build_dir / f"{CORE.name}.bin" + + +def get_factory_firmware_path() -> Path: + """Get the path to the factory firmware (with bootloader). + + Uses the PlatformIO ``firmware.factory.bin`` naming convention so + the dashboard's download handler — which requests files by name + relative to ``firmware_bin_path.parent`` — finds it. Without this, + the native IDF path produced ``.factory.bin`` and the + dashboard returned 500 trying to locate ``firmware.factory.bin``. + """ + build_dir = CORE.relative_build_path("build") + return build_dir / "firmware.factory.bin" + + +def get_ota_firmware_path() -> Path: + """Get the path to the OTA firmware binary. + + Uses the PlatformIO ``firmware.ota.bin`` naming convention for the + same dashboard-compatibility reason as ``get_factory_firmware_path``. + """ + build_dir = CORE.relative_build_path("build") + return build_dir / "firmware.ota.bin" + + +def get_elf_path() -> Path: + """Get the path to the firmware ELF file. + + idf.py writes ``/.elf`` directly; this returns the + ``/firmware.elf`` copy created by ``create_elf_copy`` so + the dashboard's "download ELF" link can find it under the + PlatformIO-convention name. + """ + build_dir = CORE.relative_build_path("build") + return build_dir / "firmware.elf" + + +def get_objdump_path() -> Path: + return _get_cmake_tool_path("CMAKE_OBJDUMP") + + +def get_readelf_path() -> Path: + return _get_cmake_tool_path("CMAKE_READELF") + + +def get_addr2line_path() -> Path: + return _get_cmake_tool_path("CMAKE_ADDR2LINE") + + +def create_factory_bin() -> bool: + """Create factory.bin by merging bootloader, partition table, and app.""" + build_dir = CORE.relative_build_path("build") + flasher_args_path = build_dir / "flasher_args.json" + + if not flasher_args_path.is_file(): + _LOGGER.warning("flasher_args.json not found, cannot create factory.bin") + return False + + try: + with open(flasher_args_path, encoding="utf-8") as f: + flash_data = json.load(f) + except (json.JSONDecodeError, OSError) as e: + _LOGGER.error("Failed to read flasher_args.json: %s", e) + return False + + # Get flash size from config + flash_size = CORE.data[KEY_ESP32][KEY_FLASH_SIZE] + + # Build esptool merge command + sections = [] + for addr, fname in sorted( + flash_data.get("flash_files", {}).items(), key=lambda kv: int(kv[0], 16) + ): + file_path = build_dir / fname + if file_path.is_file(): + sections.extend([addr, str(file_path)]) + else: + _LOGGER.warning("Flash file not found: %s", file_path) + + if not sections: + _LOGGER.warning("No flash sections found") + return False + + output_path = get_factory_firmware_path() + chip = flash_data.get("extra_esptool_args", {}).get("chip", "esp32") + + env = _get_idf_env() + python_executable = _get_idf_tool("python") + cmd = [ + python_executable, + "-m", + "esptool", + "--chip", + chip, + "merge_bin", + "--flash_size", + flash_size, + "--output", + str(output_path), + ] + sections + + _LOGGER.info("Creating factory.bin...") + result = subprocess.run(cmd, env=env, capture_output=True, text=True, check=False) + + if result.returncode != 0: + _LOGGER.error("Failed to create factory.bin: %s", result.stderr) + return False + + _LOGGER.info("Created: %s", output_path) + return True + + +def create_ota_bin() -> bool: + """Copy the firmware to firmware.ota.bin for ESPHome OTA compatibility.""" + firmware_path = get_firmware_path() + ota_path = get_ota_firmware_path() + + if not firmware_path.is_file(): + _LOGGER.warning("Firmware not found: %s", firmware_path) + return False + + shutil.copy(firmware_path, ota_path) + _LOGGER.info("Created: %s", ota_path) + return True + + +def create_elf_copy() -> bool: + """Copy the ELF binary to firmware.elf for dashboard compatibility. + + idf.py writes the ELF at ``/.elf``; the dashboard's + "download ELF" link requests the literal filename ``firmware.elf`` + (PlatformIO convention), so copy it to that name. + """ + build_dir = CORE.relative_build_path("build") + src_elf = build_dir / f"{CORE.name}.elf" + dst_elf = get_elf_path() + + if not src_elf.is_file(): + _LOGGER.warning("ELF not found: %s", src_elf) + return False + + shutil.copy(src_elf, dst_elf) + _LOGGER.info("Created: %s", dst_elf) + return True diff --git a/esphome/espidf/component.py b/esphome/espidf/component.py new file mode 100644 index 00000000000..bb675d2c77c --- /dev/null +++ b/esphome/espidf/component.py @@ -0,0 +1,937 @@ +from collections.abc import Callable +import glob +import hashlib +import itertools +import json +import logging +import os +from pathlib import Path +import re +import tempfile +from typing import TypeVar +from urllib.parse import urlparse, urlsplit, urlunsplit + +from esphome import git, yaml_util +from esphome.const import KEY_CORE, KEY_FRAMEWORK_VERSION +from esphome.core import CORE, Library +from esphome.espidf.framework import archive_extract_all, download_from_mirrors, rmdir +from esphome.helpers import write_file_if_changed + +_LOGGER = logging.getLogger(__name__) + +PathType = str | os.PathLike + +# +# Constants from platformio +# + +FILTER_REGEX = re.compile(r"([+-])<([^>]+)>") +DEFAULT_BUILD_SRC_FILTER = ( + "+<*> -<.git/> -<.svn/> - - - -" +) +DEFAULT_BUILD_SRC_DIRS = "src" +DEFAULT_BUILD_INCLUDE_DIR = "include" +DEFAULT_BUILD_FLAGS = [] +SRC_FILE_EXTENSIONS = [ + ".c", + ".cpp", + ".cc", + ".cxx", + ".c++", + ".S", + ".spp", + ".SPP", + ".sx", + ".s", + ".asm", + ".ASM", +] + +ESP32_PLATFORM = "espressif32" +DOMAIN = "pio_components" + +# +# Constants for workarounds +# + +REQUIRES_DETECT_PATTERNS = { + "mbedtls": [re.compile(r'^\s*#\s*include\s*[<"]mbedtls[^">]*[">]', re.MULTILINE)], + "esp_netif": [ + re.compile(r'^\s*#\s*include\s*[<"]esp_netif[^">]*[">]', re.MULTILINE) + ], + "esp_driver_gpio": [ + re.compile(r'^\s*#\s*include\s*[<"]driver/gpio\.h[^">]*[">]', re.MULTILINE) + ], + "esp_timer": [ + re.compile(r'^\s*#\s*include\s*[<"]esp_timer\.h[^">]*[">]', re.MULTILINE) + ], + "esp_wifi": [ + re.compile( + r'^\s*#\s*include\s*[<"]WiFi\.h[^">]*[">]', re.MULTILINE + ) # Arduino WiFi + ], +} + +ESPHOME_DATA_KEY = "ESPHOME" +ESPHOME_DATA_EXTRA_CMAKE_KEY = "EXTRA_CMAKE" + + +class Source: + def download(self, dir_suffix: str, force: bool = False) -> Path: + raise NotImplementedError() + + +class URLSource(Source): + def __init__(self, url: str): + self.url = url + + def download(self, dir_suffix: str, force: bool = False) -> Path: + base_dir = Path(CORE.data_dir) / DOMAIN + h = hashlib.new("sha256") + h.update(self.url.encode()) + path = base_dir / h.hexdigest()[:8] / dir_suffix + # Marker file written last to signal a complete extraction. Using a + # marker (instead of just `path.is_dir()`) means an interrupted + # extraction is correctly detected and re-run on the next invocation, + # and lets us extract directly into ``path`` — avoiding a + # post-extraction rename that races with antivirus on Windows. + extracted_marker = path / ".esphome_extracted" + if not extracted_marker.is_file() or force: + rmdir(path, msg=f"Clean up library directory {path}") + + # Download in temporary file + with tempfile.NamedTemporaryFile() as tmp: + _LOGGER.info("Downloading %s ...", self.url) + _LOGGER.debug("Location: %s", path) + + download_from_mirrors([self.url], {}, tmp.file) + + _LOGGER.debug("Extracting archive to %s ...", path) + archive_extract_all(tmp.file, path) + extracted_marker.touch() + return path + + def __str__(self): + return self.url + + +class GitSource(Source): + def __init__(self, url: str, ref: str): + self.url = url + self.ref = ref + + def download(self, dir_suffix: str, force: bool = False) -> Path: + path, _ = git.clone_or_update( + url=self.url, + ref=self.ref, + refresh=git.NEVER_REFRESH if not force else None, + domain=DOMAIN, + submodules=[], + subpath=Path(dir_suffix), + ) + return path + + def __str__(self): + return f"{self.url}#{self.ref}" + + +class InvalidIDFComponent(Exception): + pass + + +class IDFComponent: + def __init__(self, name: str, version: str, source: Source | None): + self.name = name + self.version = version + self.source = source + self.data = {} + self.dependencies: list[IDFComponent] = [] + self._path: Path | None = None + + def __str__(self): + return f"{self.name}@{self.version}={self.source}" + + @property + def path(self) -> Path: + if self._path is None: + raise RuntimeError(f"path not set for component {self}") + return self._path + + @path.setter + def path(self, value: Path) -> None: + self._path = value + + def get_sanitized_name(self): + return re.sub(r"[^a-zA-Z0-9_.\-/]", "_", self.name) + + def get_require_name(self): + return self.get_sanitized_name().replace("/", "__") + + def download(self, force: bool = False): + """ + The dependency name should match the directory name at the end of the override path. + The ESP-IDF build system uses the directory name as the component name, so the directory of the override_path should match the component name. + If you want to specify the full name of the component with the namespace, replace / in the component name with __. + @see https://docs.espressif.com/projects/idf-component-manager/en/latest/reference/manifest_file.html + """ + self.path = self.source.download(self.get_sanitized_name(), force=force) + + +def _sanitize_version(version: str) -> str: + """ + Sanitize a version string by removing common requirement prefixes or a leading v. + + Args: + version: Version string to clean. + + Returns: + Cleaned version string without common requirement symbols. + """ + version = version.strip() + + prefixes = ( + "^", + "~=", + "~", + ">=", + "<=", + "==", + "!=", + ">", + "<", + "=", + "v", + "V", + ) + + for p in prefixes: + if version.startswith(p): + version = version[len(p) :] + break + + return version.strip() + + +def _get_package_from_pio_registry( + username: str | None, pkgname: str, requirements: str +) -> tuple[str, str, str | None, str | None]: + """ + Fetch package information from PlatformIO registry. + + This function queries the PlatformIO registry to find a library package + that matches the given criteria and returns its metadata including version + and download URL. + + Args: + username: The owner/username of the package (can be None) + pkgname: The name of the package + requirements: Version requirements (e.g., "^1.0.0") + + Returns: + tuple[str, str, str | None, str | None]: + A tuple containing (owner, name, version, download_url) + where version and download_url can be None if not found + """ + + from platformio.package.manager._registry import PackageManagerRegistryMixin + from platformio.package.meta import PackageSpec + + # Create a minimal PackageManagerRegistry class + class PackageManagerRegistry(PackageManagerRegistryMixin): + def __init__(self): + self._registry_client = None + self.pkg_type = "library" + + @staticmethod + def is_system_compatible(value, custom_system=None): + return True + + pio_registry = PackageManagerRegistry() + + # Fetch package metadata from registry + package = pio_registry.fetch_registry_package( + PackageSpec( + owner=username, + name=pkgname, + ) + ) + owner = package["owner"]["username"] + name = package["name"] + + # Find the best matching version based on requirements + version = pio_registry.pick_best_registry_version( + package.get("versions"), + PackageSpec(owner=username, name=pkgname, requirements=requirements), + ) + + # If no version found, return with None for version and URL + if not version: + return owner, name, None, None + + # Find the compatible package file for this version + pkgfile = pio_registry.pick_compatible_pkg_file(version["files"]) + + # If no package file found, return with None for URL but valid version + if not pkgfile: + return owner, name, version["name"], None + + return owner, name, version["name"], pkgfile["download_url"] + + +def _patch_component(component: IDFComponent, first_pass: bool): + """ + Apply patches/workarounds to specific components that have known issues. + + This function modifies component data to fix compatibility issues or missing + dependencies for certain libraries. It applies different patches based on + whether it's the first or second pass of processing. + + Args: + component: The IDFComponent object to potentially patch + first_pass: Boolean indicating if this is the first pass of processing + """ + + # Patch only on the second step + if not first_pass and CORE.using_arduino: + # Add the missing dependency to Arduino framework. Source is None so + # the IDF component manager resolves it from the registry instead of + # cloning the 2 GB arduino-esp32 git history. + component.dependencies.append( + IDFComponent( + "espressif/arduino-esp32", + str(CORE.data[KEY_CORE][KEY_FRAMEWORK_VERSION]), + None, + ) + ) + + # + # fastled/FastLED + # + + # Patch only on the first step + if ( + first_pass + and component.name == _owner_pkgname_to_name("fastled", "FastLED") + and not (component.path / "idf_component.yml").is_file() + ): + # Force fake idf_component: This project already support ESP-IDF + (component.path / "idf_component.yml").write_text("") + + +T = TypeVar("T") + + +def _ensure_list(obj: T | list[T]) -> list[T]: + """ + Convert an object to a list if it isn't already a list. + + Args: + obj: Object that may or may not already be a list. + + Returns: + list[T]: The original list if ``obj`` is a list, otherwise a single-item + list containing ``obj``. + """ + return [obj] if not isinstance(obj, list) else obj + + +def _owner_pkgname_to_name(owner: str | None, pkgname: str) -> str: + """ + Convert owner and package name to a standardized component name. + + This function combines owner and package name with a forward slash when + both are provided, otherwise returns just the package name. + + Args: + owner: The owner/username of the package (can be None) + pkgname: The name of the package + + Returns: + str: The standardized component name in "owner/pkgname" format or just "pkgname" + """ + return f"{owner}/{pkgname}" if owner else pkgname + + +def _collect_filtered_files(src_dir: PathType, src_filters: list[str]) -> list[str]: + """ + Recursively match files in a directory according to include/exclude patterns. + + This function processes a list of filter strings that indicate which files + to include or exclude. Each filter is parsed into patterns with a sign: + '+' for inclusion and '-' for exclusion. Directory patterns ending with '/' + are normalized to include all their contents recursively. + + Args: + src_dir (PathType): Root directory to search within. + src_filters (list[str]): List of filter strings, which may contain multiple + patterns. Each pattern can start with '+' or '-' to indicate inclusion + or exclusion. + + Returns: + list[str]: List of matched file paths as strings. Only files (not directories) + are returned, even if a directory matches a pattern. + """ + matches = list( + itertools.chain.from_iterable( + FILTER_REGEX.findall(src_filter) for src_filter in src_filters + ) + ) + + selected = set() + + for sign, pattern in matches: + pattern = pattern.strip() + + if pattern.endswith("/"): + pattern = pattern.rstrip("/") + "/**" + + full_pattern = os.path.join(glob.escape(str(src_dir)), pattern) + + matched = [] + for item in glob.glob(full_pattern, recursive=True): + if not os.path.isdir(item): + matched.append(item) + else: + # PlatformIO quirk: a directory matched with "*" should include all its + # nested files and subdirectories, not just the directory itself. + for root, _, files in os.walk(item): + matched.extend([os.path.join(root, f) for f in files]) + + if sign == "+": + selected.update(matched) + elif sign == "-": + selected.difference_update(matched) + + return [r for r in selected if os.path.isfile(r)] + + +def _convert_library_to_component(library: Library) -> IDFComponent: + """ + Convert a Library object to an IDFComponent object by resolving its metadata. + + This function handles the conversion of library specifications to component + objects, resolving versions through PlatformIO registry when needed or + parsing direct repository URLs. + + Args: + library: The Library object containing name, version, and/or repository information + + Returns: + IDFComponent: The resolved component with name, version, and URL + + Raises: + ValueError: If a repository URL is missing a reference (#) + RuntimeError: If no artifact can be found for the library + """ + name = None + version = None + source = None + + # Repository is provided directly + if library.repository: + # Parse repository URL to extract name and version + split_result = urlsplit(library.repository) + if not split_result.fragment.strip(): + raise ValueError(f"Missing ref in URL {library.repository}") + + # Sanitize name + name = str(split_result.path).strip("/") + name = name.removesuffix(".git") + + # Sanitize version + version = _sanitize_version(split_result.fragment) + repository = urlunsplit(split_result._replace(fragment="")) + + source = GitSource(str(repository), split_result.fragment) + + # Version is provided - resolve using PlatformIO registry + elif library.version: + name = library.name + if "/" not in name: + owner, pkgname = None, name + else: + owner, pkgname = name.split("/", 1) + + owner, pkgname, version, url = _get_package_from_pio_registry( + owner, pkgname, library.version + ) + if url is None: + raise RuntimeError( + f"Can't find an pkg file from PlatformIO registry for library {library}" + ) + + name = _owner_pkgname_to_name(owner, pkgname) + source = URLSource(url) + + if source is None: + raise RuntimeError(f"Can't find an artifact associated to library {library}") + + assert name, "Missing library name" + assert version, "Missing library version" + + return IDFComponent(name, version, source) + + +def _detect_requires(build_src_files: list[str]) -> set[str]: + """ + Detect required components from source files. + + Args: + build_src_files: List of source file paths to analyze + + Returns: + Set of detected required components + """ + detected = set() + + # 1. Process each source file + for file in build_src_files: + path = Path(file) + + if not path.is_file(): + continue + + try: + content = path.read_text(encoding="utf-8", errors="ignore") + except Exception: # pylint: disable=broad-exception-caught + continue + + # 2. Add required component if one of these patterns matches + for require_name, patterns in REQUIRES_DETECT_PATTERNS.items(): + if require_name in detected: + continue # already found + + for pattern in patterns: + if pattern.search(content): + detected.add(require_name) + break + + return detected + + +def _split_list_by_condition( + items: list[str], match_fn: Callable[[str], str | None] +) -> tuple[list[str], list[str]]: + """ + Splits a list into two lists based on a matching function. + + Args: + items: List of items to split. + match_fn: Function that returns a value for items that should go into the "matched" list. + + Returns: + A tuple (matched, non_matched) + """ + matched = [] + non_matched = [] + for item in items: + result = match_fn(item) + if result: + matched.append(result) + else: + non_matched.append(item) + return matched, non_matched + + +def generate_cmakelists_txt(component: IDFComponent) -> str: + """ + Generate a CMakeLists.txt file for an ESP-IDF component. + + This function creates the necessary CMake configuration to build a library + with ESP-IDF, including source files, include directories, dependencies, + and build flags. + + Args: + component: The IDFComponent object containing library metadata and path + + Returns: + str: The complete CMakeLists.txt content as a string + """ + + def escape_entry(p: PathType) -> str: + # In CMakeLists.txt, backslashes need to be escaped + return f'"{str(p)}"'.replace("\\", "\\\\") + + # Extract the values + build_src_dir = component.data.get("build", {}).get("srcDir", None) + if not build_src_dir: + for d in ["src", "Src", "."]: + if (component.path / Path(d)).is_dir(): + build_src_dir = d + break + + build_include_dir = component.data.get("build", {}).get( + "includeDir", DEFAULT_BUILD_INCLUDE_DIR + ) + build_src_filter = _ensure_list( + component.data.get("build", {}).get("srcFilter", DEFAULT_BUILD_SRC_FILTER) + ) + build_flags = _ensure_list( + component.data.get("build", {}).get("flags", DEFAULT_BUILD_FLAGS) + ) + + # List all sources files + build_src_files = _collect_filtered_files( + component.path / Path(build_src_dir), build_src_filter + ) + + # Detect in the files which requirements to add + # By default in platformio, all the components are added: we need to detect them when using ESP-IDF + requires = _detect_requires(build_src_files) + + # Dependencies are required + for dependency in component.dependencies: + requires.add(dependency.get_require_name()) + + # Only keep sources + build_src_files = [os.path.relpath(p, component.path) for p in build_src_files] + build_src_files = [ + f for f in build_src_files if os.path.splitext(f)[1] in SRC_FILE_EXTENSIONS + ] + + # Handle build flags + include_dir_flags, build_flags = _split_list_by_condition( + build_flags, lambda a: a[2:].strip() if a.startswith("-I") else None + ) + link_directories, build_flags = _split_list_by_condition( + build_flags, lambda a: a[2:].strip() if a.startswith("-L") else None + ) + link_libraries, build_flags = _split_list_by_condition( + build_flags, lambda a: a[2:].strip() if a.startswith("-l") else None + ) + + # Split include directories from build_flags + # Only keep an include directory if it exists + build_include_dirs = [build_include_dir, build_src_dir] + include_dir_flags + build_include_dirs = [ + d for d in build_include_dirs if (component.path / Path(d)).is_dir() + ] + + # Split build_flags list into private and public lists + private_build_flags, public_build_flags = _split_list_by_condition( + build_flags, lambda a: a if a.startswith("-W") else None + ) + + # Generate the component + content = "idf_component_register(\n" + if build_src_files: + str_srcs = " ".join([escape_entry(p) for p in sorted(build_src_files)]) + content += f" SRCS {str_srcs}\n" + if build_include_dirs: + str_include_dirs = " ".join([escape_entry(p) for p in build_include_dirs]) + content += f" INCLUDE_DIRS {str_include_dirs}\n" + if requires: + str_requires = " ".join(sorted(requires)) + content += f" REQUIRES {str_requires}\n" + content += ")\n" + + # Add public and private build flags + if public_build_flags: + content += "target_compile_options(${COMPONENT_LIB} PUBLIC\n" + for build_flag in public_build_flags: + str_build_flag = escape_entry(build_flag) + content += f" {str_build_flag}\n" + content += ")\n" + if private_build_flags: + content += "target_compile_options(${COMPONENT_LIB} PRIVATE\n" + for build_flag in private_build_flags: + str_build_flag = escape_entry(build_flag) + content += f" {str_build_flag}\n" + content += ")\n" + + # Add library paths and files + if link_directories: + content += "target_link_directories(${COMPONENT_LIB} INTERFACE\n" + for link_directory in link_directories: + str_build_flag = escape_entry(link_directory) + content += f" {str_build_flag}\n" + content += ")\n" + + if link_libraries: + content += "target_link_libraries(${COMPONENT_LIB} INTERFACE\n" + for link_library in link_libraries: + str_build_flag = escape_entry(link_library) + content += f" {str_build_flag}\n" + content += ")\n" + + # Add custom CMake scripts + content += "\n".join( + component.data.get(ESPHOME_DATA_KEY, {}).get(ESPHOME_DATA_EXTRA_CMAKE_KEY, []) + ) + + return content + + +def generate_idf_component_yml(component: IDFComponent) -> str: + """ + Generate ESP-IDF component YAML configuration for a library. + + Args: + component: IDFComponent object to generate YAML for + + Returns: + YAML string representation of ESP-IDF component configuration + """ + data = {} + + description = component.data.get("description") + if description: + data["description"] = description + + # Do not use the version from library.json/library.properties; it may be incorrect. + data["version"] = component.version + + repository = component.data.get("repository", {}).get("url", None) + if repository: + data["repository"] = repository + + for dependency in component.dependencies: + # Initialize dependencies section if needed + if "dependencies" not in data: + data["dependencies"] = {} + + # Add this dependency to dependencies + dep = {} + dep["version"] = dependency.version + + # Should use dependency.path as override path + try: + dep["override_path"] = str(dependency.path) + except RuntimeError as e: + # No local path; let the IDF component manager resolve. + # GitSource gives an explicit URL; arduino-esp32 is resolved by + # version from the registry. Anything else is a bug. + if isinstance(dependency.source, GitSource): + dep["git"] = dependency.source.url + elif dependency.name != "espressif/arduino-esp32": + raise e + + data["dependencies"][dependency.get_sanitized_name()] = dep + + return yaml_util.dump(data) + + +def _check_library_data(data: dict): + """ + Check if a library data is compatible with the ESP-IDF framework. + + Args: + component: IDFComponent object being processed + + Raises: + ValueError: If library has unsupported platforms or frameworks + """ + platforms = data.get("platforms", "*") + if isinstance(platforms, str): + platforms = [a.strip() for a in platforms.split(",")] + platforms = _ensure_list(platforms) + + # Check if library supports ESP-IDF platform + valid_platforms = "*" in platforms or ESP32_PLATFORM in platforms + + if not valid_platforms: + raise InvalidIDFComponent(f"Unsupported library platforms: {platforms}") + + frameworks = data.get("frameworks", "*") + if isinstance(frameworks, str): + frameworks = [a.strip() for a in frameworks.split(",")] + frameworks = _ensure_list(frameworks) + + # Check if library supports ESP-IDF framework + framework = "arduino" if CORE.using_arduino else "espidf" + valid_framework = "*" in frameworks or framework in frameworks + + if not valid_framework: + raise InvalidIDFComponent(f"Unsupported library frameworks: {frameworks}") + + extra_script = data.get("build", {}).get("extraScript", None) + if extra_script: + _LOGGER.warning( + 'Extra scripts are not supported. The script "%s" will not be executed.', + extra_script, + ) + + +def _process_dependencies(component: IDFComponent): + """ + Process library dependencies and generate ESP-IDF components. + + Args: + component: IDFComponent object being processed + + Returns: + None + """ + + name, version = component.name, component.version + dependencies = component.data.get("dependencies") + if not dependencies: + return + + _LOGGER.info("Processing %s@%s component dependencies...", name, version) + for dependency in dependencies: + # Validate dependency structure + if not all(k in dependency for k in ("name", "version")): + _LOGGER.debug("Ignore invalid library: %s", dependency) + continue + + try: + _check_library_data(dependency) + except InvalidIDFComponent as e: + _LOGGER.debug( + "Skip %s@%s: %s", dependency["name"], dependency["version"], str(e) + ) + continue + + # The version field may actually contain a URL + version = dependency["version"] + url = None + try: + result = urlparse(version) + if all([result.scheme, result.netloc]): + url, version = version, None + except (TypeError, ValueError): + pass + + # Generate ESP-IDF component from PlatformIO library + component.dependencies.append( + _generate_idf_component( + Library( + _owner_pkgname_to_name( + dependency.get("owner", None), dependency.get("name") + ), + version, + url, + ) + ) + ) + + +def _parse_library_json(library_json_path: PathType): + """ + Load and parse a JSON file describing a library. + + Args: + library_json_path (PathType): Path to the JSON file. + + Returns: + dict: Parsed JSON content as a Python dictionary. + """ + with open(library_json_path, encoding="utf8") as fp: + return json.load(fp) + + +def _parse_library_properties(library_properties_path: PathType): + """ + Parse a key-value platformio .properties style file into a dictionary. + + Args: + library_properties_path (PathType): Path to the properties file. + + Returns: + dict[str, str]: Mapping of parsed property keys to values. + """ + with open(library_properties_path, encoding="utf8") as fp: + data = {} + for line in fp.read().splitlines(): + line = line.strip() + if not line or "=" not in line: + continue + # skip comments + if line.startswith("#"): + continue + key, value = line.split("=", 1) + if not value.strip(): + continue + data[key.strip()] = value.strip() + return data + + +def _generate_idf_component(library: Library, force: bool = False) -> IDFComponent: + """ + Generate an ESP-IDF component from a library specification. + + This function resolves the library, downloads it, processes metadata files, + and generates necessary ESP-IDF build files (CMakeLists.txt, idf_component.yml). + + Args: + library: The library specification containing name, version, and repository URL + force: If True, forces re-download of the library even if it exists locally + + Returns: + IDFComponent: The generated component object with resolved metadata + """ + _LOGGER.info("Generate IDF component for %s library ...", library) + + # Resolve component name, version and url + component = _convert_library_to_component(library) + name, version = component.name, component.version + + # Download the library + component.download(force) + + # Paths to component metadata and build files + library_json_path = component.path / "library.json" + library_properties_path = component.path / "library.properties" + cmakelists_txt_path = component.path / "CMakeLists.txt" + idf_component_yml_path = component.path / "idf_component.yml" + + # Apply patches to the library metadata + _patch_component(component, True) + + if cmakelists_txt_path.is_file() and idf_component_yml_path.is_file(): + # Already an ESP-IDF component + return component + + if library_json_path.is_file(): + component.data = _parse_library_json(library_json_path) + elif library_properties_path.is_file(): + component.data = _parse_library_properties(library_properties_path) + else: + raise RuntimeError( + "Invalid PIO library: missing library.json and/or library.properties" + ) + + # Apply additional patches to the library metadata + _patch_component(component, False) + + # Check if the component is usable with ESP-IDF + _check_library_data(component.data) + + # Handle the dependencies (convert PlatformIO library to ESP-IDF component if needed) + _process_dependencies(component) + + # Generate files + _LOGGER.debug("Generating CMakeLists.txt for %s@%s ...", name, version) + write_file_if_changed( + cmakelists_txt_path, + generate_cmakelists_txt(component), + ) + + _LOGGER.debug("Generating idf_component.yml for %s@%s ...", name, version) + write_file_if_changed( + idf_component_yml_path, + generate_idf_component_yml(component), + ) + + return component + + +def generate_idf_component( + library: Library, force: bool = False +) -> tuple[str, str, Path]: + """ + Generate an ESP-IDF component and return its name, version, and path. + + This is a wrapper function that calls _generate_idf_component and returns + the standardized tuple format (name, version, path). + + Args: + library: The library specification containing name, version, and repository URL + force: If True, forces re-download of the library even if it exists locally + + Returns: + tuple[str, str, Path]: A tuple containing (component_name, component_version, component_path) + """ + component = _generate_idf_component(library, force) + return component.get_sanitized_name(), component.version, component.path diff --git a/esphome/espidf/framework.py b/esphome/espidf/framework.py new file mode 100644 index 00000000000..7ff373aba83 --- /dev/null +++ b/esphome/espidf/framework.py @@ -0,0 +1,1098 @@ +"""ESP-IDF framework tools for ESPHome.""" + +from collections.abc import Iterable +from contextlib import ExitStack +import io +import json +import logging +import os +from pathlib import Path +import shutil +import subprocess +import sys +import tempfile +from typing import IO + +import requests + +from esphome.config_validation import Version +from esphome.core import CORE +from esphome.helpers import ProgressBar, get_str_env, rmtree + +PathType = str | os.PathLike + +_LOGGER = logging.getLogger(__name__) + +_SCRIPTS_DIR = Path(__file__).parent + + +def _str_to_lst_of_str(a: str) -> list[str]: + """ + Convert a string to a list of string + + Args: + a: A string containing semicolon-separated values + + Returns: + list of strings + """ + return list(f.strip() for f in a.split(";") if f.strip()) + + +ESPHOME_STAMP_FILE = ".esphome.stamp.json" + +# Cache-buster baked into the stamp file. Bump this whenever a change would +# make pre-existing stamped installs invalid, e.g.: +# - the inlined Python helpers (_get_idf_version, _get_idf_tool_paths) are +# rewritten in a way that's incompatible with prior installs +# - the stamp_info schema changes (keys added/renamed/removed) +# - the tool selection or env-construction logic changes meaning +# Bumping triggers a full reinstall on every user's next run. +STAMP_SCHEMA_VERSION = "0" + +ESPHOME_IDF_DEFAULT_TARGETS = _str_to_lst_of_str( + os.environ.get("ESPHOME_IDF_DEFAULT_TARGETS", "all") +) + +ESPHOME_IDF_DEFAULT_TOOLS = _str_to_lst_of_str( + os.environ.get("ESPHOME_IDF_DEFAULT_TOOLS", "cmake;ninja") +) + +ESPHOME_IDF_DEFAULT_TOOLS_FORCE = _str_to_lst_of_str( + os.environ.get("ESPHOME_IDF_DEFAULT_TOOLS_FORCE", "required") +) + +ESPHOME_IDF_DEFAULT_FEATURES = _str_to_lst_of_str( + os.environ.get("ESPHOME_IDF_DEFAULT_FEATURES", "core") +) + +ESPHOME_IDF_FRAMEWORK_MIRRORS = _str_to_lst_of_str( + os.environ.get( + "ESPHOME_IDF_FRAMEWORK_MIRRORS", + "https://github.com/espressif/esp-idf/releases/download/v{VERSION}/esp-idf-v{VERSION}.zip;https://github.com/espressif/esp-idf/releases/download/v{MAJOR}.{MINOR}/esp-idf-v{MAJOR}.{MINOR}.zip", + ) +) + +ESP_IDF_CONSTRAINTS_MIRRORS = _str_to_lst_of_str( + os.environ.get( + "ESP_IDF_CONSTRAINTS_MIRRORS", + "https://dl.espressif.com/dl/esp-idf/espidf.constraints.v{VERSION}.txt", + ) +) + + +def _get_idf_tools_path() -> Path: + """ + Get the path to the ESP-IDF tools directory. + + Returns: + Path object pointing to the ESP-IDF tools directory + """ + if "ESPHOME_ESP_IDF_PREFIX" in os.environ: + return Path(get_str_env("ESPHOME_ESP_IDF_PREFIX", None)).expanduser() + return CORE.data_dir / "idf" + + +def _get_framework_path(version: str) -> Path: + """ + Get the path to the ESPHome ESP-IDF framework directory for a specific version. + + Args: + version: ESP-IDF version string + + Returns: + Path object pointing to the framework directory + """ + return _get_idf_tools_path() / "frameworks" / f"{version}" + + +def _get_python_env_path(version: str) -> Path: + """ + Get the path to the ESPHome ESP-IDF Python environment directory for a specific version. + + Args: + version: ESP-IDF version string + + Returns: + Path object pointing to the Python environment directory + """ + return _get_idf_tools_path() / "penvs" / f"{version}" + + +def rmdir(directory: PathType, msg: str | None = None): + """ + Remove a directory and its contents recursively if it exists. + + Args: + directory: Path to the directory to be removed + msg: Optional debug message to log before removal or it an error occurs + + Returns: + None + + Raises: + RuntimeError: If directory removal fails + """ + if os.path.isdir(directory): + try: + if msg: + _LOGGER.debug(msg) + rmtree(directory) + except OSError as e: + raise RuntimeError( + f"Error during {msg}: can't remove `{directory}`. Please remove it manually!" + ) from e + + +def _get_pythonexe_path() -> str: + """ + Get the path to the Python executable. + + Returns: + Path to Python executable as string + """ + # Try to get PYTHONEXEPATH environment variable + # Fallback to sys.executable if not set + return os.environ.get("PYTHONEXEPATH", os.path.normpath(sys.executable)) + + +def _get_python_env_executable_path(root: PathType, binary: str) -> Path: + """ + Get the path to a Python environment executable file. + + Args: + root: Root directory of the Python environment + binary: Name of the executable binary + + Returns: + Path object pointing to the executable file + """ + if os.name == "nt": + return Path(root) / "Scripts" / f"{binary}.exe" + return Path(root) / "bin" / binary + + +def _check_stamp(file: PathType, data: dict[str, str]) -> bool: + """ + Check if a stamp file contains the expected data. + + Args: + file: Path to the stamp file + data: Dictionary containing expected data + + Returns: + True if file exists and contains expected data, False otherwise + """ + if not Path(file).is_file(): + return False + + try: + with open(file, encoding="utf-8") as f: + return json.load(f) == data + except (json.JSONDecodeError, OSError): + return False + + +def _write_stamp(file: PathType, data: dict[str, str]): + """ + Write data to a stamp file in JSON format. + + Args: + file: Path to the stamp file to write + data: Dictionary containing data to write + """ + with open(file, "w", encoding="utf8") as fp: + json.dump(data, fp) + + +def _exec( + cmd: list[str], + msg: str | None = None, + env: dict[str, str] | None = None, + stream_output: bool = False, +) -> tuple[bool, str | None, str | None]: + """ + Execute a command and return results. + + Args: + cmd: list of command arguments + msg: Optional custom message for logging + env: Optional dictionary of environment variables to set + stream_output: If True, inherit parent stdio so the subprocess prints + directly to the terminal (useful for commands that produce their + own progress output). stdout/stderr are not captured in this mode. + + Returns: + tuple of (success: bool, stdout: str or None, stderr: str or None). + When stream_output is True, stdout and stderr are always None. + """ + cmd_str = msg or " ".join(cmd) + try: + _LOGGER.debug("%s - running ...", cmd_str) + + run_env = os.environ.copy() + if env: + run_env.update(env) + + if stream_output: + result = subprocess.run(cmd, check=False, env=run_env) + stdout = stderr = None + else: + result = subprocess.run( + cmd, + capture_output=True, + text=True, + check=False, + env=run_env, + ) + stdout = result.stdout + stderr = result.stderr + + if result.returncode != 0: + if stream_output: + _LOGGER.error("%s - failed (returncode=%s)", cmd_str, result.returncode) + else: + tail = (stderr or stdout or "").strip()[-1000:] + _LOGGER.error( + "%s - failed (returncode=%s). Tail:\n%s", + cmd_str, + result.returncode, + tail, + ) + return False, stdout, stderr + + _LOGGER.debug("%s - executed successfully", cmd_str) + return True, stdout, stderr + + except (subprocess.SubprocessError, OSError) as e: + _LOGGER.error("%s - error: %s", cmd_str, str(e)) + return False, None, None + + +def _exec_ok(*args, **kwargs) -> bool: + """ + Execute a command and return only the success status. + + Args: + *args: Positional arguments to pass to _exec function + **kwargs: Keyword arguments to pass to _exec function + + Returns: + True if command executed successfully, False otherwise + """ + return _exec(*args, **kwargs)[0] + + +def _get_idf_version( + idf_framework_root: PathType, env: dict[str, str] | None = None +) -> str: + """ + Get the ESP-IDF version from the specified framework root. + + Args: + idf_framework_root: Path to the ESP-IDF framework root directory + env: Optional dictionary of environment variables to set + + Returns: + String containing ESP-IDF version + + Raises: + RuntimeError: If ESP-IDF version cannot be determined + """ + + cmd = [ + _get_pythonexe_path(), + str(_SCRIPTS_DIR / "get_idf_version.py"), + str(idf_framework_root), + ] + + success, stdout, stderr = _exec( + cmd, + msg="ESP-IDF version", + env=(env or os.environ) + | {"PYTHONPATH": str(Path(idf_framework_root) / "tools")}, + ) + if stdout: + stdout = stdout.strip() + if not success or not stdout: + detail = (stderr or "").strip() + raise RuntimeError( + f"Can't get ESP-IDF version of {idf_framework_root}" + + (f": {detail}" if detail else "") + ) + return stdout + + +def _get_idf_tool_paths( + idf_framework_root: PathType, env: dict[str, str] | None = None +) -> tuple[list[str], dict[str, str]]: + """ + Get ESP-IDF tool paths and environment variables needed for building. + + Args: + idf_framework_root: Path to the ESP-IDF framework root directory + env: Optional dictionary of environment variables to set + + Returns: + tuple containing (list of tool paths, dictionary of environment variables) + + Raises: + RuntimeError: If ESP-IDF tool paths cannot be determined + """ + + cmd = [ + _get_pythonexe_path(), + str(_SCRIPTS_DIR / "get_idf_tool_paths.py"), + str(idf_framework_root), + ] + + success, stdout, stderr = _exec( + cmd, + msg="ESP-IDF tool paths", + env=(env or os.environ) + | {"PYTHONPATH": str(Path(idf_framework_root) / "tools")}, + ) + if not success or not stdout: + detail = (stderr or "").strip() + raise RuntimeError( + f"Can't get ESP-IDF tool paths of {idf_framework_root}" + + (f": {detail}" if detail else "") + ) + + # Extract json values + try: + data = json.loads(stdout) + return data["paths_to_export"], data["export_vars"] + except Exception as e: + raise RuntimeError( + f"Can't extract ESP-IDF tool paths of {idf_framework_root}" + ) from e + + +def _get_python_version( + python_executable: PathType, + env: dict[str, str] | None = None, + throw_exception=False, +) -> str | None: + """ + Get the Python version from the specified executable. + + Args: + python_executable: Path to the Python executable to check + env: Optional dictionary of environment variables to set + throw_exception: If True, raise RuntimeError when version can't be determined + + Returns: + String containing Python version in "major.minor.patch" format, or None if failed + """ + + script = """ +import sys +print(".".join([str(x) for x in sys.version_info])) +""" + cmd = [python_executable, "-c", script] + + success, stdout, _ = _exec(cmd, msg="Python version", env=env) + + if stdout: + stdout = stdout.strip() + if throw_exception and (not success or not stdout): + raise RuntimeError(f"Can't get Python version of {python_executable}") + return stdout + + +def _create_venv(root: PathType, msg: str | None = None): + """ + Create a Python virtual environment. + + Args: + root: Path to the virtual environment directory + msg: Optional message for logging + + Returns: + None + + Raises: + Exception: If virtual environment creation fails + """ + cmd = [_get_pythonexe_path(), "-m", "venv", "--clear", root] + if not _exec_ok(cmd, msg=f"Create Python virtual environment for {msg}"): + raise RuntimeError(f"Can't create Python virtual environment for {msg}") + + +def _detect_archive_root(names: Iterable[str]) -> str | None: + """Detect a single top-level directory shared by all archive entries. + + Returns the directory name if every non-empty entry sits under the same + top-level directory, else ``None``. Extraction helpers use this to strip + the wrapper directory commonly found in source archives during extraction + rather than renaming it afterwards — post-extraction renames are + unreliable on Windows because antivirus and the search indexer briefly + hold handles on freshly written files. + """ + root: str | None = None + has_descendant = False + for raw in names: + name = raw.replace("\\", "/").strip("/") + if not name: + continue + first, sep, _ = name.partition("/") + if root is None: + root = first + elif root != first: + return None + if sep: + has_descendant = True + return root if has_descendant else None + + +def _tar_extract_all( + data: io.BufferedIOBase, + extract_dir: PathType = ".", + progress_header: str | None = None, +): + """ + Extract a TAR archive to the specified directory. + + Implementation is inspired by Python 3.12's tarfile data filtering logic. + This can be replaced with the standard library implementation once + support for Python 3.11 is no longer required. + + Args: + data: File-like object containing the TAR archive + extract_dir: Directory to extract contents to + progress_header: If set, show a progress bar with this header + """ + import stat + import tarfile + + extract_dir = os.fspath(extract_dir) + abs_dest = os.path.abspath(extract_dir) + + with tarfile.open(fileobj=data, mode="r") as tar_ref: + all_members = tar_ref.getmembers() + + # Detect a single common top-level directory and strip it during + # extraction so we don't have to flatten it via a rename afterwards. + strip_root = _detect_archive_root(m.name for m in all_members) + strip_prefix = f"{strip_root}/" if strip_root is not None else None + + safe_members = [] + + for member in all_members: + name = member.name + + # 1. Strip leading slashes + name = name.lstrip("/" + os.sep) + + # 2. Reject absolute paths (incl. Windows drive) + if os.path.isabs(name) or ( + os.name == "nt" and ":" in name.split(os.sep)[0] + ): + continue + + # 3. Strip wrapper directory if one was detected + if strip_prefix is not None: + norm = name.replace("\\", "/") + if norm in (strip_root, strip_prefix): + continue + if not norm.startswith(strip_prefix): + continue + name = norm[len(strip_prefix) :] + + # 4. Compute final path + target_path = os.path.realpath(os.path.join(abs_dest, name)) + if os.path.commonpath([abs_dest, target_path]) != abs_dest: + continue + + # 5. Validate links properly + if member.issym() or member.islnk(): + linkname = member.linkname + + # Reject absolute link targets + if os.path.isabs(linkname): + continue + + # Strip leading slashes + linkname = os.path.normpath(linkname) + + if member.issym(): + link_target = os.path.join( + abs_dest, os.path.dirname(name), linkname + ) + else: + link_target = os.path.join(abs_dest, linkname) + link_target = os.path.realpath(link_target) + + if os.path.commonpath([abs_dest, link_target]) != abs_dest: + continue + + # write back normalized linkname + member.linkname = linkname + + # 6. Sanitize permissions + mode = member.mode + if mode is not None: + # Strip high bits & group/other write bits + mode &= ( + stat.S_IRWXU + | stat.S_IRGRP + | stat.S_IXGRP + | stat.S_IROTH + | stat.S_IXOTH + ) + if member.isfile() or member.islnk(): + # remove exec bits unless explicitly user-executable + if not (mode & stat.S_IXUSR): + mode &= ~(stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH) + mode |= stat.S_IRUSR | stat.S_IWUSR + elif member.isdir() or member.issym(): + # Ignore mode for directories & symlinks + mode = None + else: + # Block special files + continue + + member.mode = mode + + # 7. Strip ownership + member.uid = None + member.gid = None + member.uname = None + member.gname = None + + # 8. Assign sanitized name back + member.name = name + + safe_members.append(member) + + total = len(safe_members) + progress = ( + ProgressBar(progress_header) if progress_header and total > 0 else None + ) + for i, member in enumerate(safe_members, 1): + tar_ref.extract(member, abs_dest) + if progress is not None: + progress.update(i / total) + if progress is not None: + progress.update(1) + + +def _zip_extract_all( + data: io.BufferedIOBase, + extract_dir: PathType = ".", + progress_header: str | None = None, +): + """ + Extract a ZIP archive to the specified directory. + + Args: + data: File-like object containing the ZIP archive + extract_dir: Directory to extract contents to + progress_header: If set, show a progress bar with this header + """ + import zipfile + + extract_dir = os.path.abspath(extract_dir) + + with zipfile.ZipFile(data, "r") as zip_ref: + all_members = zip_ref.infolist() + + # Detect a single common top-level directory and strip it during + # extraction so we don't have to flatten it via a rename afterwards. + strip_root = _detect_archive_root(m.filename for m in all_members) + strip_prefix = f"{strip_root}/" if strip_root is not None else None + + total = len(all_members) + progress = ( + ProgressBar(progress_header) if progress_header and total > 0 else None + ) + + for i, member in enumerate(all_members, 1): + # 1. Normalize name + name = member.filename.lstrip("/\\") + + # 2. Reject absolute paths / Windows drives + if os.path.isabs(name) or ( + os.name == "nt" and ":" in name.split(os.sep)[0] + ): + continue + + # 3. Strip wrapper directory if one was detected + if strip_prefix is not None: + norm = name.replace("\\", "/") + if norm in (strip_root, strip_prefix): + continue + if not norm.startswith(strip_prefix): + continue + name = norm[len(strip_prefix) :] + + # 4. Compute safe target path + target_path = os.path.abspath(os.path.join(extract_dir, name)) + + if os.path.commonpath([extract_dir, target_path]) != extract_dir: + raise ValueError(f"Unsafe path detected: {member.filename}") + + # 5. Assign sanitized name back + member.filename = name + + # 6. Extract + zip_ref.extract(member, extract_dir) + + if progress is not None: + progress.update(i / total) + if progress is not None: + progress.update(1) + + +_ARCHIVE_MAGIC_MAP = { + b"\x1f\x8b\x08": _tar_extract_all, + b"\x42\x5a\x68": _tar_extract_all, + b"\xfd\x37\x7a\x58\x5a\x00": _tar_extract_all, + b"\x50\x4b\x03\x04": _zip_extract_all, +} + + +def archive_extract_all( + archive: PathType | io.RawIOBase | IO[bytes], + extract_dir: PathType = ".", + progress_header: str | None = None, +): + """ + Extract an archive file to the specified directory. + + Args: + archive: Path to archive file or file-like object + extract_dir: Directory to extract contents to + progress_header: If set, show a progress bar with this header + + Raises: + TypeError: If archive is not a valid type + ValueError: If archive format is unsupported + """ + + # 1. Handle different archive input types + with ExitStack() as stack: + archive_ref: io.BufferedIOBase + if isinstance(archive, (str, os.PathLike)): + archive_ref = stack.enter_context(open(archive, "rb")) + elif isinstance(archive, (io.BufferedReader, io.BufferedRandom)): + archive_ref = archive + elif isinstance(archive, io.RawIOBase): + archive_ref = io.BufferedReader(archive) + else: + raise TypeError( + f"archive must be str, Path, or file-like object: {type(archive)}" + ) + + # 2. Detect archive format and select appropriate extraction function + matched_fct = None + magic_len = max(len(k) for k in _ARCHIVE_MAGIC_MAP) + header = archive_ref.peek(magic_len) + for magic, fct in _ARCHIVE_MAGIC_MAP.items(): + if header.startswith(magic): + matched_fct = fct + break + if matched_fct is None: + raise ValueError("Unsupported archive format") + matched_fct(archive_ref, extract_dir, progress_header=progress_header) + + +def download_from_mirrors( + mirrors: list[str], + substitutions: dict[str, str], + target: io.RawIOBase | IO[bytes] | PathType, + timeout: int = 30, +) -> str | None: + """ + Download file from multiple mirrors with substitution support. + + Args: + mirrors: list of mirror URLs + substitutions: Dictionary of substitutions to apply to URLs + target: Target file path or file-like object + timeout: Download timeout in seconds + + Returns: + The source URL. + + Raises: + Exception: If all download attempts fail + """ + # 1. Open target file for writing if path given + with ExitStack() as stack: + if isinstance(target, (str, os.PathLike)): + f = stack.enter_context(open(target, "wb")) + elif isinstance(target, (io.RawIOBase, io.IOBase)): + f = target + else: + raise TypeError( + f"target must be str, Path, or file-like object: {type(target)}" + ) + + # 2. Try each mirror in order + last_exception = None + + for mirror in mirrors: + # 3. Apply substitutions to URL + url = mirror.format(**substitutions) + + _LOGGER.debug("Trying downloading from %s", url) + + try: + # 4. Reset file pointer and download + f.seek(0) + f.truncate(0) + + with requests.get(url, stream=True, timeout=timeout) as r: + r.raise_for_status() + + total_size = int(r.headers.get("content-length", 0)) + downloaded = 0 + + progress = ProgressBar("Downloading") if total_size > 0 else None + + for chunk in r.iter_content(chunk_size=8192): + if chunk: + f.write(chunk) + + downloaded += len(chunk) + + if progress is not None: + progress.update(downloaded / total_size) + + if progress is not None: + progress.update(1) + + _LOGGER.debug("Downloaded successfully from: %s", url) + + # 6. Reset file pointer and return + f.seek(0) + return url + + except Exception as e: # pylint: disable=broad-exception-caught + _LOGGER.debug("Failed to download %s: %s", url, str(e)) + last_exception = e + + # 7. Raise last exception if all mirrors failed + if last_exception: + raise last_exception + return None + + +def _check_esphome_idf_framework_install( + version: str, + targets: list[str], + tools: list[str], + force: bool = False, + env: dict[str, str] | None = None, +) -> tuple[Path, bool]: + """ + Check and install ESP-IDF framework. + + Args: + version: ESP-IDF version to check/install + targets: Target platforms to install + tools: list of tools to install + force: If True, force reinstallation + env: Optional dictionary of environment variables to set + + Returns: + tuple of (framework_path, install_flag) + """ + + # Sanitize inputs + targets = sorted(set(targets)) + tools = sorted(set(tools)) + + stamp_info = {} + stamp_info["schema_version"] = STAMP_SCHEMA_VERSION + stamp_info["targets"] = targets + stamp_info["tools"] = tools + # TODO: Add stamp with this module version + + # 1. Get framework path and stamp file path + framework_path = _get_framework_path(version) + extracted_marker = framework_path / ".esphome_extracted" + env_stamp_file = framework_path / ESPHOME_STAMP_FILE + idf_tools_path = framework_path / "tools" / "idf_tools.py" + _LOGGER.info("Checking ESP-IDF %s framework ...", version) + + # 2. Download and extract the framework if not already extracted. + # The marker is written last after extraction succeeds, so its presence + # is the authoritative "extraction complete" signal — no half-extracted + # tree can pass for installed. Extracting directly into framework_path + # avoids post-extraction renames that race with antivirus on Windows. + # Tool install state is tracked separately by the stamp file in step 3, + # so we only re-extract when extraction itself is missing or incomplete. + install = force or not extracted_marker.is_file() + if install: + rmdir(framework_path, msg=f"Clean up ESP-IDF {version} framework") + + # Download in temporary file + with tempfile.NamedTemporaryFile() as tmp: + _LOGGER.info("Downloading ESP-IDF %s framework ...", version) + + # Create substitutions for the URLs + substitutions = {"VERSION": version} + try: + ver = Version.parse(version) + substitutions["MAJOR"] = str(ver.major) + substitutions["MINOR"] = str(ver.minor) + substitutions["PATCH"] = str(ver.patch) + substitutions["EXTRA"] = ver.extra + except ValueError: + pass + + download_from_mirrors( + ESPHOME_IDF_FRAMEWORK_MIRRORS, substitutions, tmp.file + ) + + _LOGGER.info("Extracting ESP-IDF %s framework ...", version) + archive_extract_all(tmp.file, framework_path, progress_header="Extracting") + extracted_marker.touch() + + # 3. Check if the framework tools are the same and correctly installed + if not install: + install = True + if _check_stamp(env_stamp_file, stamp_info): + _LOGGER.info("Checking ESP-IDF %s framework installation ...", version) + cmd = [ + _get_pythonexe_path(), + str(idf_tools_path), + "--non-interactive", + "check", + ] + if _exec_ok(cmd, msg=f"ESP-IDF {version} check", env=env): + install = False + + # 4. Install framework tools if not installed or needs update + if install: + _LOGGER.info("Installing ESP-IDF %s framework ...", version) + targets_str = ",".join(targets) + cmd = [ + _get_pythonexe_path(), + str(idf_tools_path), + "--non-interactive", + "install", + f"--targets={targets_str}", + ] + tools + if not _exec_ok( + cmd, + msg=f"ESP-IDF {version} framework installation", + env=env, + stream_output=True, + ): + raise RuntimeError(f"ESP-IDF {version} framework installation failure") + + _write_stamp(env_stamp_file, stamp_info) + + return framework_path, install + + +def _check_esp_idf_python_env_install( + version: str, + features: list[str], + force: bool = False, + env: dict[str, str] | None = None, +) -> tuple[Path, bool]: + """ + Check and install ESP-IDF Python environment. + + Args: + version: ESP-IDF version to check/install + features: Features to install + force: If True, force reinstallation + env: Environment variables to use + + Returns: + tuple of (python_env_path, install_flag) + """ + + # Sanitize inputs + features = sorted(set(features)) + + stamp_info = {} + stamp_info["schema_version"] = STAMP_SCHEMA_VERSION + stamp_info["features"] = features + + framework_path = _get_framework_path(version) + python_env_path = _get_python_env_path(version) + env_stamp_file = python_env_path / ESPHOME_STAMP_FILE + env_python_path = _get_python_env_executable_path(python_env_path, "python") + + _LOGGER.info("Checking ESP-IDF %s Python environment ...", version) + install = force or not python_env_path.is_dir() or not env_python_path.is_file() + if not install: + # Check it against the stamp file + install = True + python_version = _get_python_version(env_python_path, env=env) + if python_version: + stamp_info["python_version"] = python_version + if _check_stamp(env_stamp_file, stamp_info): + install = False + + if install: + rmdir(python_env_path, msg=f"Clean up ESP-IDF {version} Python environment") + + _create_venv(python_env_path, msg=f"ESP-IDF {version}") + + esp_idf_version = _get_idf_version(framework_path, env=env) + constraint_file_path = ( + _get_idf_tools_path() / f"espidf.constraints.v{esp_idf_version}.txt" + ) + _LOGGER.debug("ESP-IDF version %s", esp_idf_version) + + _LOGGER.info("Downloading constraints file for ESP-IDF %s ...", esp_idf_version) + download_from_mirrors( + ESP_IDF_CONSTRAINTS_MIRRORS, + {"VERSION": esp_idf_version}, + constraint_file_path, + ) + + cmd_pip_install = [ + str(env_python_path), + "-m", + "pip", + "install", + "--upgrade", + "--constraint", + constraint_file_path, + ] + + _LOGGER.info("Installing ESP-IDF %s Python dependencies ...", version) + cmd = cmd_pip_install + [ + "pip", + "setuptools", + ] + if not _exec_ok( + cmd, + msg=f"Upgrade ESP-IDF {version} Python environment packages", + env=env, + ): + raise RuntimeError( + f"Upgrade ESP-IDF {version} Python environment packages failure" + ) + + for feature in features: + requirements_file = ( + framework_path + / "tools" + / "requirements" + / f"requirements.{feature}.txt" + ) + cmd = cmd_pip_install + [ + "-r", + str(requirements_file), + ] + if not _exec_ok( + cmd, + msg=f"Install ESP-IDF {version} Python dependencies for {feature}", + env=env, + ): + raise RuntimeError( + f"Install ESP-IDF {version} Python dependencies for {feature} failure" + ) + + stamp_info["python_version"] = _get_python_version( + env_python_path, env=env, throw_exception=True + ) + _write_stamp(env_stamp_file, stamp_info) + + return python_env_path, install + + +def check_esp_idf_install( + version: str, + targets: list[str] | None = None, + tools: list[str] | None = None, + features: list[str] | None = None, + force: bool = False, +) -> tuple[Path, Path]: + """ + Check and install ESP-IDF framework and Python environment. + + Args: + version: ESP-IDF version to check/install + targets: Target platforms to install + tools: list of tools to install + features: Features to install + force: If True, force reinstallation + + Returns: + tuple of (framework_path, python_env_path) + """ + env = {} + env["IDF_TOOLS_PATH"] = str(_get_idf_tools_path()) + env["IDF_PATH"] = "" + + targets = targets or ESPHOME_IDF_DEFAULT_TARGETS + + # Determine which tools need to be installed if not provided + if tools is None: + tools = [] + for tool in set(ESPHOME_IDF_DEFAULT_TOOLS) | set( + ESPHOME_IDF_DEFAULT_TOOLS_FORCE + ): + # Check if the tool exist + if tool in ESPHOME_IDF_DEFAULT_TOOLS_FORCE or not shutil.which(tool): + tools.append(tool) + + # 1) Framework + framework_path, installed = _check_esphome_idf_framework_install( + version, targets, tools, force=force, env=env + ) + + features = features or ESPHOME_IDF_DEFAULT_FEATURES + + # 2) Python env + python_env_path, installed = _check_esp_idf_python_env_install( + version, features, force=force or installed, env=env + ) + + return framework_path, python_env_path + + +def get_framework_env( + framework_path: PathType, + python_env_path: PathType | None = None, + env: dict[str, str] | None = None, +): + """ + Get environment variables for ESP-IDF framework. + + Args: + framework_path: Path to the ESP-IDF framework + python_env_path: Optional path to Python environment + env: Optional dictionary of environment variables to set + + Returns: + Dictionary containing updated environment variables + """ + # 1. Initialize base environment with extra ESP-IDF environment variables + env = env.copy() if env else {} + env["IDF_TOOLS_PATH"] = str(_get_idf_tools_path()) + env["IDF_PATH"] = "" + + # 2. Get existing PATH from env or os.environ + if "PATH" in env: + path_list = env["PATH"].split(os.pathsep) + else: + path_list = os.environ["PATH"].split(os.pathsep) + + # 3. If Python environment path is provided, add it to PATH and set IDF_PYTHON_ENV_PATH + if python_env_path: + python_path = _get_python_env_executable_path(python_env_path, "python") + path_list.insert(0, str(python_path.parent)) + env["IDF_PYTHON_ENV_PATH"] = str(python_env_path) + + # 4. Set framework-specific environment variables + env["IDF_PATH"] = str(framework_path) + env["ESP_IDF_VERSION"] = _get_idf_version(framework_path, env) + + # 5. Get and add tool paths and environment variables + paths_to_export, export_vars = _get_idf_tool_paths(framework_path, env) + env.update(export_vars) + env["PATH"] = os.pathsep.join(paths_to_export + path_list) + + return env diff --git a/esphome/espidf/get_idf_tool_paths.py b/esphome/espidf/get_idf_tool_paths.py new file mode 100644 index 00000000000..2e8859631d0 --- /dev/null +++ b/esphome/espidf/get_idf_tool_paths.py @@ -0,0 +1,51 @@ +"""Print JSON ``{paths_to_export, export_vars}`` for ESP-IDF tools. + +Run via ``python ``. PYTHONPATH must include +``/tools`` so ``idf_tools`` is importable. Exits with +status 1 and prints ``Missing ESP-IDF tools: ...`` on stderr if any tool is +not installed. +""" + +# pylint: disable=import-error # idf_tools is on PYTHONPATH at runtime only + +import json +import os +import sys +from types import SimpleNamespace + +from idf_tools import ( + TOOLS_FILE, + IDFEnv, + IDFTool, + filter_tools_info, + g, + load_tools_info, + process_tool, +) + +g.idf_path = sys.argv[1] +g.idf_tools_path = os.environ.get("IDF_TOOLS_PATH") +g.tools_json = os.path.join(g.idf_path, TOOLS_FILE) + +tools_info = filter_tools_info(IDFEnv.get_idf_env(), load_tools_info()) +args = SimpleNamespace(prefer_system=False) +paths_to_export: list[str] = [] +export_vars: dict[str, str] = {} +missing_tools: list[str] = [] + +for name, tool in tools_info.items(): + if tool.get_install_type() == IDFTool.INSTALL_NEVER: + continue + tool_paths, tool_vars, found = process_tool( + tool, name, args, "install_cmd", "prefer_system_hint" + ) + if not found: + missing_tools.append(name) + paths_to_export += tool_paths + export_vars |= tool_vars + +if missing_tools: + print("Missing ESP-IDF tools: " + ", ".join(missing_tools), file=sys.stderr) + raise SystemExit(1) + +print(json.dumps({"paths_to_export": paths_to_export, "export_vars": export_vars})) diff --git a/esphome/espidf/get_idf_version.py b/esphome/espidf/get_idf_version.py new file mode 100644 index 00000000000..5be51275ec3 --- /dev/null +++ b/esphome/espidf/get_idf_version.py @@ -0,0 +1,14 @@ +"""Print the ESP-IDF version of a given framework root. + +Run via ``python ``. PYTHONPATH must include +``/tools`` so ``idf_tools`` is importable. +""" + +# pylint: disable=import-error # idf_tools is on PYTHONPATH at runtime only + +import sys + +from idf_tools import g, get_idf_version + +g.idf_path = sys.argv[1] +print(get_idf_version()) diff --git a/esphome/espidf/runner.py b/esphome/espidf/runner.py new file mode 100644 index 00000000000..e740ab72854 --- /dev/null +++ b/esphome/espidf/runner.py @@ -0,0 +1,223 @@ +r"""Subprocess entry point for running ``idf.py`` with stdio wrapping. + +Invoked as ``python runner.py [script args...]``. + +Wraps ``sys.stdout`` and ``sys.stderr`` with a ``_FilteringTTYStream`` +shim so that: + +1. ``isatty()`` unconditionally returns True. CMake, Ninja, and idf.py's + own progress-bar code all check ``stream.isatty()`` to decide between + TTY-format output (``\\r`` cursor moves, ANSI colors, fancy progress + bars) and a plain fallback. With the wrapper in place they always + emit TTY format, even when our real stdout is a pipe to the parent + process (e.g. running under the Home Assistant dashboard add-on). + Downstream consumers — local terminals and the HA dashboard log + viewer — render the TTY control sequences correctly. + +2. ``FILTER_IDF_LINES`` is applied inside the shim's ``write()`` so + noisy idf.py output is dropped before it leaves this subprocess. + Filtering is skipped when ``-v`` / ``--verbose`` appears in argv so + verbose mode still shows everything. + +ESP-IDF runs under its own Python virtual environment which does not +have the ``esphome`` package installed, so the runner is intentionally +self-contained: no imports from ``esphome`` at all. The line-filtering +wrapper is inlined below rather than imported from +``esphome.util.RedirectText`` for that reason. +""" + +import sys + +# Regex patterns matched against each line of idf.py / CMake / Ninja +# output. Lines that match are dropped before reaching the parent +# process. Patterns are anchored at the start of the line (the shim +# uses ``re.match``). Disabled when the user passes ``-v`` / +# ``--verbose`` to ``esphome compile``. +FILTER_IDF_LINES: list[str] = [ + # idf.py's "how to flash" block at the end of a successful build. + # ESPHome handles flashing itself, so these instructions just clutter + # the output. + r"Project build complete\.", + r" idf\.py ", + r" python -m esptool ", + r"or$", + r"or from the ", + # CMake dumps the full list of IDF component paths on one giant line. + # It's purely informational and bloats the log. + r"-- Component paths:", + # CMake lists every linker script it adds (dozens of lines) and the + # complete flat list of IDF components on one giant line. Neither + # has diagnostic value for end users. + r"-- Adding linker script ", + r"-- Components:", + # IDF component manager notices: emitted on first build (no lock), + # once per stubbed dependency, plus the final "Processing N + # dependencies" enumeration. Patterns allow a leading run of dots + # because the component manager prints progress dots on the same + # line, so a NOTICE often arrives prefixed with ".NOTICE:" or + # "...........NOTICE:". + r"\.*NOTICE: ", +] + + +def main() -> int: + # ---- sys.path fix-up --------------------------------------------------- + # + # When Python runs this file as ``python runner.py``, it prepends the + # script's directory — ``/esphome/espidf/`` — to + # ``sys.path[0]``. That directory is part of the esphome package whose + # sibling ``types.py`` (in ``esphome/``) collides with stdlib ``types``. + # Any subsequent import that transitively touches ``types`` (``runpy``, + # ``pathlib``, ``functools``, ``typing``, ...) could resolve the wrong + # module. Drop the entry pre-emptively. ``sys`` is a built-in so + # importing it at module level earlier did not trigger the shadow. + if sys.path and sys.path[0]: + sys.path.pop(0) + # ---- end sys.path fix-up ----------------------------------------------- + + import os + import re + import runpy + + # Patch ``os.get_terminal_size`` to return a fallback size instead + # of raising ``OSError`` when the underlying fd isn't a real + # terminal. + # + # idf.py's ``fit_text_in_terminal`` (in ``idf_py_actions/tools.py``) + # unconditionally calls ``os.get_terminal_size()`` to format ninja + # progress lines. When that raises ``[Errno 25] Inappropriate + # ioctl for device`` on our pipe-backed stdout, idf.py catches the + # exception as ``EnvironmentError`` and silently exits its stdout + # reader coroutine — dropping all ninja build output from that + # point on. Returning a valid value keeps the coroutine alive so + # progress and error lines continue to flow through to the parent + # process. + # + # Honour the ``COLUMNS`` / ``LINES`` env vars if the caller set + # them explicitly. Otherwise fall back to ``(0, 0)``, which + # ``fit_text_in_terminal`` treats as "unknown width, don't + # truncate" (see the ``if not terminal_width: return out`` guard). + # Downstream log viewers (local terminals, the HA dashboard) wrap + # or scroll long lines themselves, so we'd rather emit the full + # file path than have idf.py elide its middle. + _orig_get_terminal_size = os.get_terminal_size + + def _get_terminal_size_fallback(fd: int = 1) -> os.terminal_size: + try: + return _orig_get_terminal_size(fd) + except OSError: + try: + columns = int(os.environ.get("COLUMNS", "0")) + except ValueError: + columns = 0 + try: + lines = int(os.environ.get("LINES", "0")) + except ValueError: + lines = 0 + return os.terminal_size((columns, lines)) + + os.get_terminal_size = _get_terminal_size_fallback # type: ignore[assignment] + + # Strip ANSI escape sequences before comparing a line against the filter + # patterns, so colorized lines still match plain-text patterns. + ansi_escape = re.compile(r"\033[@-_][0-?]*[ -/]*[@-~]") + + class _FilteringTTYStream: + r"""Minimal stdout/stderr wrapper. + + * ``isatty()`` unconditionally returns True, tricking downstream + code into emitting TTY-format output. + * Input is split on ``\\n`` / ``\\r`` via + ``str.splitlines(keepends=True)`` and any complete line whose + ANSI-stripped, right-stripped form matches one of + ``filter_lines`` is dropped. + * Incomplete trailing chunks are held in a buffer until a + terminator arrives. + + Mirrors the matching semantics of ``esphome.util.RedirectText`` + so filter patterns behave identically in both the PlatformIO + and IDF runner paths. + """ + + def __init__(self, stream, filter_lines: list[str] | None) -> None: + self._stream = stream + if filter_lines: + combined = r"|".join(r"(?:" + p + r")" for p in filter_lines) + self._filter_pattern: re.Pattern[str] | None = re.compile(combined) + else: + self._filter_pattern = None + self._line_buffer = "" + + def __getattr__(self, name: str): + return getattr(self._stream, name) + + def isatty(self) -> bool: + return True + + def flush(self) -> None: + self._stream.flush() + + def write(self, data) -> int: + # Text streams normally hand us ``str``; decode in case + # somebody writes bytes directly. + if not isinstance(data, str): + data = data.decode(errors="replace") + + if self._filter_pattern is None: + self._stream.write(data) + return len(data) + + self._line_buffer += data + for line in self._line_buffer.splitlines(keepends=True): + if "\n" not in line and "\r" not in line: + # Incomplete — hold until we see a terminator. + self._line_buffer = line + break + self._line_buffer = "" + + stripped = ansi_escape.sub("", line).rstrip() + if self._filter_pattern.match(stripped) is not None: + continue + self._stream.write(line) + return len(data) + + if len(sys.argv) < 2: + print( + "usage: runner.py [args...]", + file=sys.stderr, + ) + return 2 + + script_path = sys.argv[1] + + # Mirror the platformio_runner behaviour: verbose mode disables the + # line filter so all output reaches the user. + is_verbose = any(arg in ("-v", "--verbose") for arg in sys.argv[2:]) + filter_lines = None if is_verbose else FILTER_IDF_LINES or None + + sys.stdout = _FilteringTTYStream(sys.stdout, filter_lines) # type: ignore[assignment] + sys.stderr = _FilteringTTYStream(sys.stderr, filter_lines) # type: ignore[assignment] + + # Shift argv so the target script sees its own path as argv[0] and + # its own arguments starting at argv[1]. runpy.run_path does not + # modify sys.argv itself. + sys.argv = [script_path] + sys.argv[2:] + + # Emulate Python's default behaviour of prepending the script's + # directory to sys.path[0] when running ``python script.py``. + # runpy.run_path does not do this automatically, but idf.py relies + # on it to import its sibling modules (python_version_checker, + # idf_py_actions, ...). + script_dir = os.path.dirname(os.path.abspath(script_path)) + if script_dir not in sys.path: + sys.path.insert(0, script_dir) + + # If idf.py calls sys.exit(), SystemExit propagates out of run_path + # and carries the exit code back to our caller. For normal returns, + # fall through and exit with 0. + runpy.run_path(script_path, run_name="__main__") + return 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/esphome/espidf_api.py b/esphome/espidf_api.py deleted file mode 100644 index 9ebcc48513c..00000000000 --- a/esphome/espidf_api.py +++ /dev/null @@ -1,274 +0,0 @@ -"""ESP-IDF direct build API for ESPHome.""" - -import json -import logging -import os -from pathlib import Path -import shutil -import subprocess - -from esphome.components.esp32.const import KEY_ESP32, KEY_FLASH_SIZE -from esphome.core import CORE, EsphomeError - -_LOGGER = logging.getLogger(__name__) - - -def _get_idf_path() -> Path | None: - """Get IDF_PATH from environment or common locations.""" - # Check environment variable first - if "IDF_PATH" in os.environ: - path = Path(os.environ["IDF_PATH"]) - if path.is_dir(): - return path - - # Check common installation locations - common_paths = [ - Path.home() / "esp" / "esp-idf", - Path.home() / ".espressif" / "esp-idf", - Path("/opt/esp-idf"), - ] - - for path in common_paths: - if path.is_dir() and (path / "tools" / "idf.py").is_file(): - return path - - return None - - -def _get_idf_env() -> dict[str, str]: - """Get environment variables needed for ESP-IDF build. - - Requires the user to have sourced export.sh before running esphome. - """ - env = os.environ.copy() - - idf_path = _get_idf_path() - if idf_path is None: - raise EsphomeError( - "ESP-IDF not found. Please install ESP-IDF and source export.sh:\n" - " git clone -b v5.3.2 --recursive https://github.com/espressif/esp-idf.git ~/esp-idf\n" - " cd ~/esp-idf && ./install.sh\n" - " source ~/esp-idf/export.sh\n" - "See: https://docs.espressif.com/projects/esp-idf/en/latest/esp32/get-started/" - ) - - env["IDF_PATH"] = str(idf_path) - return env - - -def run_idf_py( - *args, cwd: Path | None = None, capture_output: bool = False -) -> int | str: - """Run idf.py with the given arguments.""" - idf_path = _get_idf_path() - if idf_path is None: - raise EsphomeError("ESP-IDF not found") - - env = _get_idf_env() - idf_py = idf_path / "tools" / "idf.py" - - cmd = ["python", str(idf_py)] + list(args) - - if cwd is None: - cwd = CORE.build_path - - _LOGGER.debug("Running: %s", " ".join(cmd)) - _LOGGER.debug(" in directory: %s", cwd) - - if capture_output: - result = subprocess.run( - cmd, - cwd=cwd, - env=env, - capture_output=True, - text=True, - check=False, - ) - if result.returncode != 0: - _LOGGER.error("idf.py failed:\n%s", result.stderr) - return result.stdout - result = subprocess.run( - cmd, - cwd=cwd, - env=env, - check=False, - ) - return result.returncode - - -def run_reconfigure() -> int: - """Run cmake reconfigure only (no build).""" - return run_idf_py("reconfigure") - - -def has_outdated_files(): - """Check if the build configuration is stale. - - Returns True if required build files are missing or if configuration inputs - are newer than the generated CMake/Ninja build artifacts. - """ - cmakecache_txt_path = CORE.relative_build_path("build/CMakeCache.txt") - - cmakelists_txt_build_path = CORE.relative_build_path("CMakeLists.txt") - cmakelists_txt_src_path = CORE.relative_src_path("CMakeLists.txt") - build_config_path = CORE.relative_build_path("build/config") - sdkconfig_internal_path = CORE.relative_build_path( - f"sdkconfig.{CORE.name}.esphomeinternal" - ) - dependency_lock_path = CORE.relative_build_path("dependencies.lock") - build_ninja_path = CORE.relative_build_path("build/build.ninja") - - if not os.path.isdir(build_config_path) or not os.listdir(build_config_path): - return True - if not os.path.isfile(cmakecache_txt_path): - return True - if not os.path.isfile(build_ninja_path): - return True - if os.path.isfile(dependency_lock_path) and os.path.getmtime( - dependency_lock_path - ) > os.path.getmtime(build_ninja_path): - return True - - cmakecache_txt_mtime = os.path.getmtime(cmakecache_txt_path) - return any( - os.path.getmtime(f) > cmakecache_txt_mtime - for f in [ - _get_idf_path(), - cmakelists_txt_build_path, - cmakelists_txt_src_path, - sdkconfig_internal_path, - build_config_path, - ] - if f and os.path.exists(f) - ) - - -def need_reconfigure() -> bool: - from esphome.build_gen.espidf import has_discovered_components - - # We need to reconfigure either if the files are outdated or if there is no component discovered - return has_outdated_files() or not has_discovered_components() - - -def run_compile(config, verbose: bool) -> int: - """Compile the ESP-IDF project. - - Uses two-phase configure to auto-discover available components: - 1. If no previous build, configure with minimal REQUIRES to discover components - 2. Regenerate CMakeLists.txt with discovered components - 3. Run full build - """ - from esphome.build_gen.espidf import write_project - - # Check if we need to do discovery phase - if need_reconfigure(): - _LOGGER.info("Discovering available ESP-IDF components...") - write_project(minimal=True) - rc = run_reconfigure() - if rc != 0: - _LOGGER.error("Component discovery failed") - return rc - _LOGGER.info("Regenerating CMakeLists.txt with discovered components...") - write_project(minimal=False) - - # Build - args = [] - - if verbose: - args.append("-v") - - args.append("build") - - # Set the sdkconfig file - sdkconfig_path = CORE.relative_build_path(f"sdkconfig.{CORE.name}") - if sdkconfig_path.is_file(): - args.extend(["-D", f"SDKCONFIG={sdkconfig_path}"]) - - return run_idf_py(*args) - - -def get_firmware_path() -> Path: - """Get the path to the compiled firmware binary.""" - build_dir = CORE.relative_build_path("build") - return build_dir / f"{CORE.name}.bin" - - -def get_factory_firmware_path() -> Path: - """Get the path to the factory firmware (with bootloader).""" - build_dir = CORE.relative_build_path("build") - return build_dir / f"{CORE.name}.factory.bin" - - -def create_factory_bin() -> bool: - """Create factory.bin by merging bootloader, partition table, and app.""" - build_dir = CORE.relative_build_path("build") - flasher_args_path = build_dir / "flasher_args.json" - - if not flasher_args_path.is_file(): - _LOGGER.warning("flasher_args.json not found, cannot create factory.bin") - return False - - try: - with open(flasher_args_path, encoding="utf-8") as f: - flash_data = json.load(f) - except (json.JSONDecodeError, OSError) as e: - _LOGGER.error("Failed to read flasher_args.json: %s", e) - return False - - # Get flash size from config - flash_size = CORE.data[KEY_ESP32][KEY_FLASH_SIZE] - - # Build esptool merge command - sections = [] - for addr, fname in sorted( - flash_data.get("flash_files", {}).items(), key=lambda kv: int(kv[0], 16) - ): - file_path = build_dir / fname - if file_path.is_file(): - sections.extend([addr, str(file_path)]) - else: - _LOGGER.warning("Flash file not found: %s", file_path) - - if not sections: - _LOGGER.warning("No flash sections found") - return False - - output_path = get_factory_firmware_path() - chip = flash_data.get("extra_esptool_args", {}).get("chip", "esp32") - - cmd = [ - "python", - "-m", - "esptool", - "--chip", - chip, - "merge_bin", - "--flash_size", - flash_size, - "--output", - str(output_path), - ] + sections - - _LOGGER.info("Creating factory.bin...") - result = subprocess.run(cmd, capture_output=True, text=True, check=False) - - if result.returncode != 0: - _LOGGER.error("Failed to create factory.bin: %s", result.stderr) - return False - - _LOGGER.info("Created: %s", output_path) - return True - - -def create_ota_bin() -> bool: - """Copy the firmware to .ota.bin for ESPHome OTA compatibility.""" - firmware_path = get_firmware_path() - ota_path = firmware_path.with_suffix(".ota.bin") - - if not firmware_path.is_file(): - _LOGGER.warning("Firmware not found: %s", firmware_path) - return False - - shutil.copy(firmware_path, ota_path) - _LOGGER.info("Created: %s", ota_path) - return True diff --git a/esphome/espota2.py b/esphome/espota2.py index 576b1c6b2da..c13c3ea207f 100644 --- a/esphome/espota2.py +++ b/esphome/espota2.py @@ -441,7 +441,7 @@ def perform_ota( start_time = time.perf_counter() offset = 0 - progress = ProgressBar() + progress = ProgressBar("Uploading") while True: chunk = upload_contents[offset : offset + UPLOAD_BLOCK_SIZE] if not chunk: diff --git a/esphome/helpers.py b/esphome/helpers.py index 9d341af1461..62ddc489ba2 100644 --- a/esphome/helpers.py +++ b/esphome/helpers.py @@ -11,7 +11,7 @@ import shutil import stat import sys import tempfile -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, TextIO from urllib.parse import urlparse from esphome.const import __version__ as ESPHOME_VERSION @@ -617,10 +617,15 @@ def sanitize(value): class ProgressBar: """A simple terminal progress bar for upload operations.""" - def __init__(self) -> None: + def __init__(self, header: str, stream: TextIO | None = None) -> None: + self.header = header + self.stream = stream or sys.stderr self.last_progress: int | None = None + self.enabled = hasattr(self.stream, "isatty") and self.stream.isatty() def update(self, progress: float) -> None: + if not self.enabled: + return bar_length = 60 status = "" if progress >= 1: @@ -631,11 +636,13 @@ class ProgressBar: return self.last_progress = new_progress block = int(round(bar_length * progress)) - text = f"\rUploading: [{'=' * block + ' ' * (bar_length - block)}] {new_progress}% {status}" + text = f"\r{self.header}: [{'=' * block + ' ' * (bar_length - block)}] {new_progress}% {status}" sys.stderr.write(text) sys.stderr.flush() def done(self) -> None: + if not self.enabled: + return sys.stderr.write("\n") sys.stderr.flush() diff --git a/esphome/web_server_ota.py b/esphome/web_server_ota.py index a49f46b270d..7c31c1b1232 100644 --- a/esphome/web_server_ota.py +++ b/esphome/web_server_ota.py @@ -60,7 +60,7 @@ class _MultipartStreamer: self._idx = 0 self._total = len(prefix) + file_size + len(suffix) self._sent = 0 - self.progress = ProgressBar() + self.progress = ProgressBar("Uploading") def __len__(self) -> int: return self._total diff --git a/script/ci-custom.py b/script/ci-custom.py index 8cd8fd75444..25db32105c6 100755 --- a/script/ci-custom.py +++ b/script/ci-custom.py @@ -562,7 +562,7 @@ def lint_constants_usage(): # Maximum allowed CONF_ constants in esphome/const.py. # This file is frozen — new constants go in esphome/components/const/__init__.py. # Decrease this number when constants are moved out of const.py. -CONST_PY_MAX_CONF = 1011 +CONST_PY_MAX_CONF = 1012 @lint_content_check(include=["esphome/const.py"]) diff --git a/script/test_build_components.py b/script/test_build_components.py index 82d05f78b23..10c5e5463fe 100755 --- a/script/test_build_components.py +++ b/script/test_build_components.py @@ -175,7 +175,7 @@ def group_components_by_platform( } -def format_github_summary(test_results: list[TestResult]) -> str: +def format_github_summary(test_results: list[TestResult], toolchain=None) -> str: """Format test results as GitHub Actions job summary markdown. Args: @@ -225,11 +225,12 @@ def format_github_summary(test_results: list[TestResult]) -> str: lines.append("```bash\n") # Generate one command per platform and test type + extra_arguments = f" --toolchain {toolchain}" if toolchain else "" platform_components = group_components_by_platform(failed_results) for platform, test_type in sorted(platform_components.keys()): components_csv = ",".join(platform_components[(platform, test_type)]) lines.append( - f"script/test_build_components.py -c {components_csv} -t {platform} -e {test_type}\n" + f"script/test_build_components.py -c {components_csv} -t {platform} -e {test_type}{extra_arguments}\n" ) lines.append("```\n") @@ -274,13 +275,15 @@ def format_github_summary(test_results: list[TestResult]) -> str: return "".join(lines) -def write_github_summary(test_results: list[TestResult]) -> None: +def write_github_summary( + test_results: list[TestResult], toolchain: str | None = None +) -> None: """Write GitHub Actions job summary with test results and timing. Args: test_results: List of all test results """ - summary_content = format_github_summary(test_results) + summary_content = format_github_summary(test_results, toolchain) with open(os.environ["GITHUB_STEP_SUMMARY"], "a", encoding="utf-8") as f: f.write(summary_content) @@ -308,6 +311,7 @@ def run_esphome_test( esphome_command: str, continue_on_fail: bool, use_testing_mode: bool = False, + toolchain: str | None = None, ) -> TestResult: """Run esphome test for a single component. @@ -367,8 +371,14 @@ def run_esphome_test( ] ) - # Add command and config file - cmd.extend([esphome_command, str(output_file)]) + if toolchain: + cmd.extend(["--toolchain", toolchain]) + + # Add command + cmd.append(esphome_command) + + # Add config file + cmd.append(str(output_file)) # Build command string for display/logging cmd_str = " ".join(cmd) @@ -432,6 +442,7 @@ def run_grouped_test( tests_dir: Path, esphome_command: str, continue_on_fail: bool, + toolchain: str | None = None, ) -> TestResult: """Run esphome test for a group of components with shared bus configs. @@ -510,10 +521,16 @@ def run_grouped_test( "-s", "target_platform", platform, - esphome_command, - str(output_file), ] + if toolchain: + cmd.extend(["--toolchain", toolchain]) + + # Add command + cmd.append(esphome_command) + + cmd.append(str(output_file)) + # Build command string for display/logging cmd_str = " ".join(cmd) @@ -576,6 +593,7 @@ def run_grouped_component_tests( esphome_command: str, continue_on_fail: bool, additional_isolated: set[str] | None = None, + toolchain: str | None = None, ) -> tuple[set[tuple[str, str]], list[TestResult]]: """Run grouped component tests. @@ -879,6 +897,7 @@ def run_grouped_component_tests( tests_dir=tests_dir, esphome_command=esphome_command, continue_on_fail=continue_on_fail, + toolchain=toolchain, ) # Mark all components as tested @@ -902,6 +921,7 @@ def run_individual_component_test( continue_on_fail: bool, tested_components: set[tuple[str, str]], test_results: list[TestResult], + toolchain: str | None = None, ) -> None: """Run an individual component test if not already tested in a group. @@ -930,6 +950,7 @@ def run_individual_component_test( build_dir=build_dir, esphome_command=esphome_command, continue_on_fail=continue_on_fail, + toolchain=toolchain, ) test_results.append(test_result) @@ -942,6 +963,7 @@ def test_components( enable_grouping: bool = True, isolated_components: set[str] | None = None, base_only: bool = False, + toolchain: str | None = None, ) -> int: """Test components with optional intelligent grouping. @@ -1018,6 +1040,7 @@ def test_components( esphome_command=esphome_command, continue_on_fail=continue_on_fail, additional_isolated=isolated_components, + toolchain=toolchain, ) test_results.extend(grouped_results) @@ -1046,6 +1069,7 @@ def test_components( continue_on_fail=continue_on_fail, tested_components=tested_components, test_results=test_results, + toolchain=toolchain, ) else: # Platform-specific test @@ -1078,6 +1102,7 @@ def test_components( continue_on_fail=continue_on_fail, tested_components=tested_components, test_results=test_results, + toolchain=toolchain, ) # Separate results into passed and failed @@ -1098,17 +1123,18 @@ def test_components( print("\n" + "=" * 80) print("Commands to reproduce failures (copy-paste to reproduce locally):") print("=" * 80) + extra_arguments = f" --toolchain {toolchain}" if toolchain else "" platform_components = group_components_by_platform(failed_results) for platform, test_type in sorted(platform_components.keys()): components_csv = ",".join(platform_components[(platform, test_type)]) print( - f"script/test_build_components.py -c {components_csv} -t {platform} -e {test_type}" + f"script/test_build_components.py -c {components_csv} -t {platform} -e {test_type}{extra_arguments}" ) print() # Write GitHub Actions job summary if in CI if os.environ.get("GITHUB_STEP_SUMMARY"): - write_github_summary(test_results) + write_github_summary(test_results, toolchain=toolchain) if failed_results: return 1 @@ -1161,6 +1187,10 @@ def main() -> int: action="store_true", help="Only test base test files (test.*.yaml), not variant files (test-*.yaml)", ) + parser.add_argument( + "--toolchain", + help="Select toolchain for compiling.", + ) args = parser.parse_args() @@ -1180,6 +1210,7 @@ def main() -> int: enable_grouping=not args.no_grouping, isolated_components=isolated_components, base_only=args.base_only, + toolchain=args.toolchain, ) diff --git a/tests/component_tests/esp32/test_esp32.py b/tests/component_tests/esp32/test_esp32.py index 203f4841072..f0f96e9adcc 100644 --- a/tests/component_tests/esp32/test_esp32.py +++ b/tests/component_tests/esp32/test_esp32.py @@ -16,8 +16,8 @@ from esphome.const import ( CONF_ESPHOME, CONF_IGNORE_PIN_VALIDATION_ERROR, CONF_NUMBER, - KEY_NATIVE_IDF, PlatformFramework, + Toolchain, ) from esphome.core import CORE from tests.component_tests.types import SetCoreConfigCallable @@ -266,7 +266,7 @@ def test_native_idf_enables_reproducible_build( CORE.config_path = component_config_path("reproducible_build.yaml") CORE.config = read_config({}) - CORE.data[KEY_NATIVE_IDF] = True + CORE.toolchain = Toolchain.ESP_IDF generate_cpp_contents(CORE.config) sdkconfig = CORE.data[KEY_ESP32][KEY_SDKCONFIG_OPTIONS] diff --git a/tests/unit_tests/test_core.py b/tests/unit_tests/test_core.py index 9dc37918ae9..1a52e6b29ed 100644 --- a/tests/unit_tests/test_core.py +++ b/tests/unit_tests/test_core.py @@ -855,7 +855,7 @@ class TestEsphomeCore: def test_bootloader_bin__native_idf(self, target): """Native ESP-IDF builds emit the bootloader under build/bootloader/bootloader.bin.""" - target.data[const.KEY_NATIVE_IDF] = True + target.toolchain = const.Toolchain.ESP_IDF assert target.bootloader_bin == Path( "foo/build/build/bootloader/bootloader.bin" @@ -864,7 +864,7 @@ class TestEsphomeCore: def test_bootloader_bin__platformio(self, target): """For PlatformIO builds bootloader.bin lives in the env-specific .pioenvs directory.""" target.name = "test-device" - target.data[const.KEY_NATIVE_IDF] = False + target.toolchain = const.Toolchain.PLATFORMIO assert target.bootloader_bin == Path( "foo/build/.pioenvs/test-device/bootloader.bin" diff --git a/tests/unit_tests/test_espidf_component.py b/tests/unit_tests/test_espidf_component.py new file mode 100644 index 00000000000..caef10eea32 --- /dev/null +++ b/tests/unit_tests/test_espidf_component.py @@ -0,0 +1,357 @@ +import json +import os +from unittest.mock import MagicMock + +import pytest + +from esphome.const import ( + KEY_CORE, + KEY_TARGET_FRAMEWORK, + KEY_TARGET_PLATFORM, + Framework, + Platform, +) +from esphome.core import CORE, Library +import esphome.espidf.component +from esphome.espidf.component import ( + GitSource, + IDFComponent, + InvalidIDFComponent, + URLSource, + _check_library_data, + _collect_filtered_files, + _convert_library_to_component, + _detect_requires, + _parse_library_json, + _parse_library_properties, + _process_dependencies, + _split_list_by_condition, + generate_cmakelists_txt, + generate_idf_component_yml, +) + + +@pytest.fixture(name="tmp_component") +def fixture_tmp_component(tmp_path): + c = IDFComponent("owner/name", "1.0.0", source=MagicMock()) + c.path = tmp_path + return c + + +@pytest.fixture(name="esp32_idf_core") +def fixture_esp32_idf_core(): + CORE.data[KEY_CORE] = {} + CORE.data[KEY_CORE][KEY_TARGET_PLATFORM] = str(Platform.ESP32) + CORE.data[KEY_CORE][KEY_TARGET_FRAMEWORK] = str(Framework.ESP_IDF) + + +def test_idf_component_str(): + c = IDFComponent("foo/bar", "1.0", source=URLSource("http://dummy.com")) + assert str(c) == "foo/bar@1.0=http://dummy.com" + + +def test_idf_component_sanitized_name(): + c = IDFComponent("foo/bar bar-bar", "1.0", source=URLSource("http://dummy.com")) + assert c.get_sanitized_name() == "foo/bar_bar-bar" + + +def test_idf_component_require_name(): + c = IDFComponent("foo/bar", "1.0", source=URLSource("http://dummy.com")) + assert c.get_require_name() == "foo__bar" + + +def test_collect_filtered_files_basic(tmp_path): + f1 = tmp_path / "a.c" + f2 = tmp_path / "b" / "b.cpp" + f1.write_text("int a;") + f2.parent.mkdir(parents=True) + f2.write_text("int b;") + + result = _collect_filtered_files(tmp_path, ["+<*>"]) + assert str(f1) in result + assert str(f2) in result + + +def test_collect_filtered_files_exclude(tmp_path): + f1 = tmp_path / "a.c" + f2 = tmp_path / "b.cpp" + f1.write_text("int a;") + f2.write_text("int b;") + + result = _collect_filtered_files(tmp_path, ["+<*> -<*.cpp>"]) + assert str(f1) in result + assert str(f2) not in result + + +def test_detect_requires(tmp_path): + f = tmp_path / "main.c" + f.write_text('#include "mbedtls/foo.h"') + + result = _detect_requires([str(f)]) + assert "mbedtls" in result + + +def test_detect_requires_ignores_invalid_file(tmp_path): + result = _detect_requires([str(tmp_path / "missing.c")]) + assert result == set() + + +def test_split_list_by_condition(): + items = ["-Iinclude", "-Llib", "-Wall"] + + matched, rest = _split_list_by_condition( + items, lambda x: x[2:] if x.startswith("-I") else None + ) + + assert matched == ["include"] + assert "-Llib" in rest + assert "-Wall" in rest + + +def test_generate_cmakelists_txt_basic(tmp_component): + src_dir = tmp_component.path / "src" + src_dir.mkdir() + f = src_dir / "main.c" + f.write_text("int main() {}") + + tmp_component.data = {} + + content = generate_cmakelists_txt(tmp_component) + + assert "idf_component_register" in content + assert "main.c" in content + + +def test_generate_cmakelists_txt_with_flags(tmp_component, tmp_path): + src_dir = tmp_component.path / "src" + src_dir.mkdir() + (src_dir / "main.c").write_text("int main() {}") + + dep = IDFComponent("dep", "1.0", source=URLSource("http://dummy.com")) + dep.path = tmp_path / "dep" + tmp_component.dependencies = [dep] + + tmp_component.data = { + "build": {"flags": ["-Iinclude", "-Llib", "-lmylib", "-Wall", "-DTEST"]} + } + + content = generate_cmakelists_txt(tmp_component) + sep = "\\\\" if os.name == "nt" else "/" + assert ( + content + == f"""idf_component_register( + SRCS "src{sep}main.c" + INCLUDE_DIRS "src" + REQUIRES dep +) +target_compile_options(${{COMPONENT_LIB}} PUBLIC + "-DTEST" +) +target_compile_options(${{COMPONENT_LIB}} PRIVATE + "-Wall" +) +target_link_directories(${{COMPONENT_LIB}} INTERFACE + "lib" +) +target_link_libraries(${{COMPONENT_LIB}} INTERFACE + "mylib" +) +""" + ) + + +def test_generate_idf_component_yml_basic(tmp_component): + tmp_component.data = {"description": "test", "repository": {"url": "http://aaa"}} + result = generate_idf_component_yml(tmp_component) + + assert result == "description: test\nversion: 1.0.0\nrepository: http://aaa\n" + + +def test_generate_idf_component_yml_with_dependencies(tmp_component, tmp_path): + dep = IDFComponent("dep", "1.0", source=URLSource("http://dummy.com")) + dep.path = tmp_path / "dep" + + tmp_component.dependencies = [dep] + tmp_component.data = {} + + result = generate_idf_component_yml(tmp_component) + + assert ( + result + == f"""version: 1.0.0 +dependencies: + dep: + version: '1.0' + override_path: {dep.path} +""" + ) + + +def test_generate_idf_component_yml_arduino_registry_dep(tmp_component): + # Synthetic arduino-esp32 dep with no source / no path: should emit a + # version-only entry so the IDF component manager resolves it from the + # registry instead of via git. + dep = IDFComponent("espressif/arduino-esp32", "3.3.8", source=None) + + tmp_component.dependencies = [dep] + tmp_component.data = {} + + result = generate_idf_component_yml(tmp_component) + + assert ( + result + == """version: 1.0.0 +dependencies: + espressif/arduino-esp32: + version: 3.3.8 +""" + ) + + +def test_generate_idf_component_yml_missing_path_reraises(tmp_component): + # A dep without a path and without a recognised source should re-raise + # the underlying RuntimeError instead of silently producing a bad manifest. + dep = IDFComponent("foo/bar", "1.0", source=None) + + tmp_component.dependencies = [dep] + tmp_component.data = {} + + with pytest.raises(RuntimeError): + generate_idf_component_yml(tmp_component) + + +def test_check_library_data_valid(esp32_idf_core): + _check_library_data({"platforms": "*", "frameworks": "*"}) + + +def test_check_library_data_valid2(esp32_idf_core): + _check_library_data({"platforms": "*"}) + + +def test_check_library_data_valid3(esp32_idf_core): + _check_library_data({}) + + +def test_check_library_data_valid4(esp32_idf_core): + _check_library_data({"platforms": "espressif32", "frameworks": "*"}) + + +def test_check_library_data_valid5(esp32_idf_core): + _check_library_data({"platforms": "*", "frameworks": "espidf"}) + + +def test_check_library_data_invalid_platform(esp32_idf_core): + with pytest.raises(InvalidIDFComponent): + _check_library_data({"platforms": ["other"], "frameworks": "*"}) + + +def test_check_library_data_invalid_framework(esp32_idf_core): + with pytest.raises(InvalidIDFComponent): + _check_library_data({"platforms": "*", "frameworks": ["other"]}) + + +def test_extra_script_logs_warning(caplog, esp32_idf_core): + extra_script = "myscript.sh" + + with caplog.at_level("WARNING"): + _check_library_data({"build": {"extraScript": extra_script}}) + + assert "not supported" in caplog.text + assert "myscript.sh" in caplog.text + + +def test_parse_library_json(tmp_path): + f = tmp_path / "library.json" + f.write_text(json.dumps({"name": "test"})) + + result = _parse_library_json(f) + assert result["name"] == "test" + + +def test_parse_library_properties(tmp_path): + f = tmp_path / "library.properties" + f.write_text( + """ +name=Test +version=1.0 +# description=ABCD +empty= +""" + ) + + result = _parse_library_properties(f) + + assert result["name"] == "Test" + assert result["version"] == "1.0" + assert "empty" not in result + + +def test_convert_library_with_repository(): + lib = Library("name", None, "https://github.com/foo/bar.git#v1.2.3") + + result = _convert_library_to_component(lib) + + assert result.name == "foo/bar" + assert result.version == "1.2.3" + assert isinstance(result.source, GitSource) + + +def test_convert_library_missing_ref(): + lib = Library("name", None, "https://github.com/foo/bar.git") + + with pytest.raises(ValueError): + _convert_library_to_component(lib) + + +def test_convert_library_registry(monkeypatch): + lib = Library("foo/bar", "^1.0.0", None) + + monkeypatch.setattr( + esphome.espidf.component, + "_get_package_from_pio_registry", + lambda o, n, r: ("foo", "bar", "1.2.3", "http://example.com/pkg.zip"), + ) + + result = _convert_library_to_component(lib) + + assert result.name == "foo/bar" + assert result.version == "1.2.3" + assert isinstance(result.source, URLSource) + + +def test_process_dependencies_adds_valid_dependency(tmp_component, monkeypatch): + tmp_component.data = { + "dependencies": [ + { + "name": "foo", + "version": "1.0", + } + ] + } + + monkeypatch.setattr( + esphome.espidf.component, + "_generate_idf_component", + lambda lib: esphome.espidf.component.IDFComponent( + lib.name, lib.version, source=URLSource("http://dummy.com") + ), + ) + + monkeypatch.setattr(esphome.espidf.component, "_check_library_data", lambda x: None) + + _process_dependencies(tmp_component) + + assert len(tmp_component.dependencies) == 1 + + +def test_process_dependencies_skips_invalid(tmp_component): + tmp_component.data = { + "dependencies": [ + {"name": "foo", "version": "1.0", "platforms": ["arduino"]}, + {"invalid": "entry"}, + ] + } + + _process_dependencies(tmp_component) + + assert tmp_component.dependencies == [] diff --git a/tests/unit_tests/test_espota2.py b/tests/unit_tests/test_espota2.py index b22ad461132..d7fcedfd66d 100644 --- a/tests/unit_tests/test_espota2.py +++ b/tests/unit_tests/test_espota2.py @@ -604,7 +604,8 @@ def test_run_ota_wrapper(mock_run_ota_impl: Mock) -> None: def test_progress_bar(capsys: CaptureFixture[str]) -> None: """Test ProgressBar functionality.""" - progress = espota2.ProgressBar() + progress = espota2.ProgressBar("Uploading") + progress.enabled = True # Fake TTY # Test initial update progress.update(0.0) diff --git a/tests/unit_tests/test_main.py b/tests/unit_tests/test_main.py index 2823310f0e4..3eb50de76b4 100644 --- a/tests/unit_tests/test_main.py +++ b/tests/unit_tests/test_main.py @@ -88,6 +88,7 @@ from esphome.const import ( PLATFORM_ESP32, PLATFORM_ESP8266, PLATFORM_RP2040, + Toolchain, ) from esphome.core import CORE, EsphomeError from esphome.espota2 import ( @@ -148,6 +149,7 @@ def setup_core( config[CONF_WIFI] = {CONF_USE_ADDRESS: address} CORE.config = config + CORE.toolchain = Toolchain.PLATFORMIO if platform is not None: CORE.data[KEY_CORE] = {}