diff --git a/esphome/__main__.py b/esphome/__main__.py index 825a502dbf3..c1451c5fafd 100644 --- a/esphome/__main__.py +++ b/esphome/__main__.py @@ -1125,11 +1125,18 @@ def upload_program( # MQTT and MQTTIP are also OTA paths; MQTTIP gets resolved to a real IP later by # _resolve_network_devices(). Only SERIAL and BOOTSEL are non-OTA upload paths. - if port_type in (PortType.SERIAL, PortType.BOOTSEL) and getattr( - args, "partition_table", False + is_partition_table = getattr(args, "partition_table", False) + is_bootloader = getattr(args, "bootloader", False) + if is_partition_table and is_bootloader: + raise EsphomeError( + "The options --partition-table and --bootloader can't be used together." + ) + option_string = "--partition-table" if is_partition_table else "--bootloader" + if port_type in (PortType.SERIAL, PortType.BOOTSEL) and ( + is_partition_table or is_bootloader ): raise EsphomeError( - "The option --partition-table can only be used for Over The Air updates." + f"The option {option_string} can only be used for Over The Air updates." ) if port_type == PortType.BOOTSEL: @@ -1158,9 +1165,9 @@ def upload_program( network_devices = _resolve_network_devices(devices, config, args) if chosen_platform == CONF_WEB_SERVER: - if getattr(args, "partition_table", False): + if is_partition_table or is_bootloader: raise EsphomeError( - "--partition-table is only supported with the esphome OTA platform; " + f"{option_string} is only supported with the esphome OTA platform; " "the web_server OTA path can only update the firmware image." ) binary = CORE.firmware_bin @@ -1228,25 +1235,34 @@ def _upload_via_native_api( remote_port = int(ota_conf[CONF_PORT]) password = ota_conf.get(CONF_PASSWORD) + def check_partition_access(option_string: str) -> None: + if not ota_conf.get("allow_partition_access"): + raise EsphomeError( + f"The option {option_string} requires 'allow_partition_access: true' on the " + "esphome OTA platform in the device's YAML configuration. Add it, recompile, " + f"flash a build with the option enabled, and then retry {option_string}." + ) + binary = CORE.firmware_bin ota_type = espota2.OTA_TYPE_UPDATE_APP if getattr(args, "partition_table", False): # Fail fast if the resolved ESPHome OTA config does not enable allow_partition_access. # The device-side handshake also rejects this with "Device only supports app updates", # but checking here surfaces the misconfiguration before opening a network connection. - if not ota_conf.get("allow_partition_access"): - raise EsphomeError( - "The option --partition-table requires 'allow_partition_access: true' on the " - "esphome OTA platform in the device's YAML configuration. Add it, recompile, " - "flash a build with the option enabled, and then retry --partition-table." - ) + check_partition_access("--partition-table") binary = CORE.partition_table_bin ota_type = espota2.OTA_TYPE_UPDATE_PARTITION_TABLE + elif getattr(args, "bootloader", False): + check_partition_access("--bootloader") + binary = CORE.bootloader_bin + ota_type = espota2.OTA_TYPE_UPDATE_BOOTLOADER if getattr(args, "file", None) is not None: binary = Path(args.file) if ota_type == espota2.OTA_TYPE_UPDATE_PARTITION_TABLE: _validate_partition_table_binary(binary) + if ota_type == espota2.OTA_TYPE_UPDATE_BOOTLOADER: + _validate_bootloader_binary(binary) return espota2.run_ota(network_devices, remote_port, password, binary, ota_type) @@ -1281,6 +1297,7 @@ def _upload_via_web_server( _PARTITION_TABLE_MAX_LEN = 0xC00 _ESP_PARTITION_MAGIC = 0x50AA _ESP_PARTITION_MAGIC_MD5 = 0xEBEB +_ESP_IMAGE_HEADER_MAGIC = 0xE9 def _validate_partition_table_binary(binary: Path) -> None: @@ -1326,6 +1343,28 @@ def _validate_partition_table_binary(binary: Path) -> None: ) +def _validate_bootloader_binary(binary: Path) -> None: + """Validate that ``binary`` looks like an ESP32 bootloader image.""" + try: + data = binary.read_bytes() + except OSError as err: + raise EsphomeError(f"Cannot read bootloader file '{binary}': {err}") from err + + if not data: + raise EsphomeError( + f"Bootloader file '{binary}' is empty. " + "This file does not look like an ESP32 bootloader." + ) + + first_magic = data[0] + if first_magic != _ESP_IMAGE_HEADER_MAGIC: + raise EsphomeError( + f"Bootloader file '{binary}' does not start with the expected " + f"image header magic 0x{_ESP_IMAGE_HEADER_MAGIC:02X} (got 0x{first_magic:02X}). " + "This file does not look like an ESP32 bootloader." + ) + + def show_logs(config: ConfigType, args: ArgsProtocol, devices: list[str]) -> int | None: try: module = importlib.import_module("esphome.components." + CORE.target_platform) @@ -2009,6 +2048,11 @@ def parse_args(argv): help="Upload as partition table (OTA).", action="store_true", ) + parser_upload.add_argument( + "--bootloader", + help="Upload as bootloader (OTA).", + action="store_true", + ) parser_logs = subparsers.add_parser( "logs", diff --git a/esphome/components/esphome/ota/ota_esphome.cpp b/esphome/components/esphome/ota/ota_esphome.cpp index 5d3deca4896..843028fc974 100644 --- a/esphome/components/esphome/ota/ota_esphome.cpp +++ b/esphome/components/esphome/ota/ota_esphome.cpp @@ -125,8 +125,11 @@ void ESPHomeOTAComponent::dump_config() { it = esp_partition_next(it); } esp_partition_iterator_release(it); -#endif -#endif + esp_bootloader_desc_t bootloader_desc; + esp_err_t err = esp_ota_get_bootloader_description(nullptr, &bootloader_desc); + ESP_LOGCONFIG(TAG, " Bootloader: ESP-IDF %s", (err == ESP_OK) ? bootloader_desc.idf_ver : "version unknown"); +#endif // USE_ESP32 +#endif // USE_OTA_PARTITIONS } void ESPHomeOTAComponent::loop() { @@ -336,7 +339,6 @@ void ESPHomeOTAComponent::handle_data_() { /// wakeable_delay() in read(); /// write() always returns immediately ota::OTAResponseTypes error_code = ota::OTA_RESPONSE_ERROR_UNKNOWN; - bool update_started = false; size_t total = 0; uint32_t last_progress = 0; uint32_t last_data_ms = 0; @@ -399,7 +401,6 @@ void ESPHomeOTAComponent::handle_data_() { error_code = this->backend_->begin(ota_size, ota_type); if (error_code != ota::OTA_RESPONSE_OK) goto error; // NOLINT(cppcoreguidelines-avoid-goto) - update_started = true; // Acknowledge prepare OK - 1 byte this->write_byte_(ota::OTA_RESPONSE_UPDATE_PREPARE_OK); @@ -510,8 +511,12 @@ void ESPHomeOTAComponent::handle_data_() { error: this->write_byte_(static_cast(error_code)); - // Abort backend before cleanup - cleanup_connection_() destroys the backend - if (this->backend_ != nullptr && update_started) { + // Abort backend before cleanup - cleanup_connection_() destroys the backend. + // Always call abort() unconditionally: backends register external partitions before + // esp_ota_begin (partition table / bootloader paths), and abort() is responsible for + // releasing those even if begin() failed before an OTA handle was opened. The IDF + // backend's esp_ota_abort(0) is documented as harmless. + if (this->backend_ != nullptr) { this->backend_->abort(); } diff --git a/esphome/components/ota/ota_backend.h b/esphome/components/ota/ota_backend.h index 5888a8e12d4..de236c19513 100644 --- a/esphome/components/ota/ota_backend.h +++ b/esphome/components/ota/ota_backend.h @@ -44,6 +44,8 @@ enum OTAResponseTypes { OTA_RESPONSE_ERROR_UNSUPPORTED_OTA_TYPE = 0x8E, OTA_RESPONSE_ERROR_PARTITION_TABLE_VERIFY = 0x8F, OTA_RESPONSE_ERROR_PARTITION_TABLE_UPDATE = 0x90, + OTA_RESPONSE_ERROR_BOOTLOADER_VERIFY = 0x91, + OTA_RESPONSE_ERROR_BOOTLOADER_UPDATE = 0x92, OTA_RESPONSE_ERROR_UNKNOWN = 0xFF, }; @@ -58,6 +60,7 @@ enum OTAState { enum OTAType : uint8_t { OTA_TYPE_UPDATE_APP = 0x00, OTA_TYPE_UPDATE_PARTITION_TABLE = 0x01, + OTA_TYPE_UPDATE_BOOTLOADER = 0x02, }; /** Listener interface for OTA state changes. diff --git a/esphome/components/ota/ota_backend_esp_idf.cpp b/esphome/components/ota/ota_backend_esp_idf.cpp index 50a0988ba21..f391c1791aa 100644 --- a/esphome/components/ota/ota_backend_esp_idf.cpp +++ b/esphome/components/ota/ota_backend_esp_idf.cpp @@ -31,7 +31,13 @@ OTAResponseTypes IDFOTABackend::begin(size_t image_size, ota::OTAType ota_type) this->md5_.init(); return OTA_RESPONSE_OK; } - if (this->ota_type_ != ota::OTA_TYPE_UPDATE_APP) { + if (this->ota_type_ == ota::OTA_TYPE_UPDATE_BOOTLOADER) { + OTAResponseTypes result = this->prepare_bootloader_update_(image_size); + if (result != OTA_RESPONSE_OK) { + return result; + } + } + if (!this->is_app_or_bootloader_update_()) { return OTA_RESPONSE_ERROR_UNSUPPORTED_OTA_TYPE; } #else @@ -55,6 +61,7 @@ OTAResponseTypes IDFOTABackend::begin(size_t image_size, ota::OTAType ota_type) esp_err_t err = esp_ota_begin(this->partition_, image_size, &this->update_handle_); if (err != ESP_OK) { + ESP_LOGE(TAG, "esp_ota_begin failed (err=0x%X)", err); esp_ota_abort(this->update_handle_); this->update_handle_ = 0; if (err == ESP_ERR_INVALID_SIZE) { @@ -64,6 +71,14 @@ OTAResponseTypes IDFOTABackend::begin(size_t image_size, ota::OTAType ota_type) } return OTA_RESPONSE_ERROR_UNKNOWN; } +#ifdef USE_OTA_PARTITIONS + if (this->ota_type_ == ota::OTA_TYPE_UPDATE_BOOTLOADER) { + OTAResponseTypes result = this->setup_bootloader_staging_(); + if (result != OTA_RESPONSE_OK) { + return result; + } + } +#endif this->md5_.init(); return OTA_RESPONSE_OK; } @@ -85,13 +100,14 @@ OTAResponseTypes IDFOTABackend::write(uint8_t *data, size_t len) { this->md5_.add(data, len); return OTA_RESPONSE_OK; } - if (this->ota_type_ != ota::OTA_TYPE_UPDATE_APP) { + if (!this->is_app_or_bootloader_update_()) { return OTA_RESPONSE_ERROR_UNSUPPORTED_OTA_TYPE; } #endif esp_err_t err = esp_ota_write(this->update_handle_, data, len); this->md5_.add(data, len); if (err != ESP_OK) { + ESP_LOGE(TAG, "esp_ota_write failed (err=0x%X)", err); if (err == ESP_ERR_OTA_VALIDATE_FAILED) { return OTA_RESPONSE_ERROR_MAGIC; } else if (err == ESP_ERR_FLASH_OP_TIMEOUT || err == ESP_ERR_FLASH_OP_FAIL) { @@ -114,12 +130,20 @@ OTAResponseTypes IDFOTABackend::end() { if (this->ota_type_ == ota::OTA_TYPE_UPDATE_PARTITION_TABLE) { return this->update_partition_table(); } - if (this->ota_type_ != ota::OTA_TYPE_UPDATE_APP) { + if (!this->is_app_or_bootloader_update_()) { return OTA_RESPONSE_ERROR_UNSUPPORTED_OTA_TYPE; } #endif esp_err_t err = esp_ota_end(this->update_handle_); this->update_handle_ = 0; + if (err != ESP_OK) { + ESP_LOGE(TAG, "esp_ota_end failed (err=0x%X)", err); + } +#ifdef USE_OTA_PARTITIONS + if (this->ota_type_ == ota::OTA_TYPE_UPDATE_BOOTLOADER) { + return this->finalize_bootloader_update_(err); + } +#endif if (err == ESP_OK) { err = esp_ota_set_boot_partition(this->partition_); if (err == ESP_OK) { @@ -146,6 +170,10 @@ void IDFOTABackend::abort() { esp_partition_deregister_external(this->partition_table_part_); this->partition_table_part_ = nullptr; } + if (this->bootloader_part_ != nullptr) { + esp_partition_deregister_external(this->bootloader_part_); + this->bootloader_part_ = nullptr; + } #endif // esp_ota_abort with handle 0 returns ESP_ERR_INVALID_ARG harmlessly, so this is safe whether // or not an update is in flight. diff --git a/esphome/components/ota/ota_backend_esp_idf.h b/esphome/components/ota/ota_backend_esp_idf.h index 54fdd24f934..73dd685df69 100644 --- a/esphome/components/ota/ota_backend_esp_idf.h +++ b/esphome/components/ota/ota_backend_esp_idf.h @@ -39,6 +39,18 @@ class IDFOTABackend final { OTAResponseTypes validate_new_partition_table_(uint32_t running_app_offset, size_t running_app_size, PartitionTablePlan &plan); OTAResponseTypes update_partition_table(); + OTAResponseTypes register_and_validate_partition_table_part_(); + // Defined in ota_bootloader_esp_idf.cpp: + OTAResponseTypes register_and_validate_bootloader_part_(); + OTAResponseTypes prepare_bootloader_update_(size_t image_size); + OTAResponseTypes setup_bootloader_staging_(); + OTAResponseTypes finalize_bootloader_update_(esp_err_t ota_end_err); + + // The OTA types that flow through esp_ota_begin/write/end. Partition-table updates take a + // separate code path that buffers the table in RAM and never touches the OTA handle. + bool is_app_or_bootloader_update_() const { + return this->ota_type_ == ota::OTA_TYPE_UPDATE_APP || this->ota_type_ == ota::OTA_TYPE_UPDATE_BOOTLOADER; + } #endif private: @@ -55,6 +67,7 @@ class IDFOTABackend final { size_t buf_written_{0}; size_t image_size_{0}; const esp_partition_t *partition_table_part_{nullptr}; + const esp_partition_t *bootloader_part_{nullptr}; ota::OTAType ota_type_{ota::OTA_TYPE_UPDATE_APP}; #endif }; diff --git a/esphome/components/ota/ota_bootloader_esp_idf.cpp b/esphome/components/ota/ota_bootloader_esp_idf.cpp new file mode 100644 index 00000000000..062e4d0811d --- /dev/null +++ b/esphome/components/ota/ota_bootloader_esp_idf.cpp @@ -0,0 +1,133 @@ +#ifdef USE_ESP32 +#include "ota_backend_esp_idf.h" + +#include "esphome/core/defines.h" + +#ifdef USE_OTA_PARTITIONS +#include "esphome/core/log.h" + +#include +#include + +namespace esphome::ota { + +static const char *const TAG = "ota.idf"; + +OTAResponseTypes IDFOTABackend::register_and_validate_bootloader_part_() { + // Register the bootloader partition + esp_err_t err = esp_partition_register_external(nullptr, ESP_PRIMARY_BOOTLOADER_OFFSET, ESP_BOOTLOADER_SIZE, + "PrimaryBTLDR", ESP_PARTITION_TYPE_BOOTLOADER, + ESP_PARTITION_SUBTYPE_BOOTLOADER_PRIMARY, &this->bootloader_part_); + if (err != ESP_OK) { + ESP_LOGE(TAG, "esp_partition_register_external failed (bootloader) (err=0x%X)", err); + return OTA_RESPONSE_ERROR_BOOTLOADER_VERIFY; + } + + // Verify existing bootloader to make sure ESP_PRIMARY_BOOTLOADER_OFFSET is correct + esp_image_metadata_t data = {}; + const esp_partition_pos_t part_pos = { + .offset = this->bootloader_part_->address, + .size = this->bootloader_part_->size, + }; + err = esp_image_verify(ESP_IMAGE_VERIFY, &part_pos, &data); + if (err != ESP_OK) { + ESP_LOGE(TAG, "esp_image_verify failed (existing bootloader) (err=0x%X)", err); + return OTA_RESPONSE_ERROR_BOOTLOADER_VERIFY; + } + return OTA_RESPONSE_OK; +} + +// Pre-esp_ota_begin: enforce size limit, register/verify the existing bootloader, and validate the +// partition table to confirm the bootloader region is at the expected offset (and therefore the +// expected size). The partition table registration is released here; abort() cleans up the +// bootloader registration if any later step fails. +OTAResponseTypes IDFOTABackend::prepare_bootloader_update_(size_t image_size) { + if (image_size > ESP_BOOTLOADER_SIZE) { + ESP_LOGE(TAG, "Length of received data exceeds the available bootloader size: expected <=%zu bytes, got %zu", + ESP_BOOTLOADER_SIZE, image_size); + return OTA_RESPONSE_ERROR_BOOTLOADER_VERIFY; + } + OTAResponseTypes result = this->register_and_validate_bootloader_part_(); + if (result != OTA_RESPONSE_OK) { + return result; + } + result = this->register_and_validate_partition_table_part_(); + if (result != OTA_RESPONSE_OK) { + return OTA_RESPONSE_ERROR_BOOTLOADER_VERIFY; + } + esp_partition_deregister_external(this->partition_table_part_); + this->partition_table_part_ = nullptr; + return OTA_RESPONSE_OK; +} + +// Post-esp_ota_begin: verify the staging app partition is large enough, erase it, and redirect the +// final write target to the bootloader partition. esp_ota_set_final_partition is called with +// `restore_old_data=false` because we erased the staging region in advance. +OTAResponseTypes IDFOTABackend::setup_bootloader_staging_() { + if (this->partition_->size < this->bootloader_part_->size) { + ESP_LOGE(TAG, "Staging partition too small"); + return OTA_RESPONSE_ERROR_BOOTLOADER_VERIFY; + } + // Erase full size of the bootloader partition in the staging partition + // to avoid copying old data to the bootloader partition later + esp_err_t err = esp_partition_erase_range(this->partition_, 0, this->bootloader_part_->size); + if (err != ESP_OK) { + ESP_LOGW(TAG, "esp_partition_erase_range failed (err=0x%X)", err); + // No critical error, don't return + } + err = esp_ota_set_final_partition(this->update_handle_, this->bootloader_part_, false); + if (err != ESP_OK) { + esp_ota_abort(this->update_handle_); + this->update_handle_ = 0; + ESP_LOGE(TAG, "esp_ota_set_final_partition failed (err=0x%X)", err); + return OTA_RESPONSE_ERROR_BOOTLOADER_VERIFY; + } + return OTA_RESPONSE_OK; +} + +// After esp_ota_end: copy the staged image into the bootloader partition. esp_partition_copy is +// the only window in which a power loss can render the device unbootable; everything before this +// point either preserves the existing bootloader or fails harmlessly. After a successful copy the +// first sector of staging is wiped so the device can't accidentally boot from it, and the +// bootloader partition is deregistered. +OTAResponseTypes IDFOTABackend::finalize_bootloader_update_(esp_err_t ota_end_err) { + if (ota_end_err != ESP_OK) { + return OTA_RESPONSE_ERROR_BOOTLOADER_VERIFY; + } + esp_bootloader_desc_t bootloader_desc; + esp_err_t desc_err = esp_ota_get_bootloader_description(this->partition_, &bootloader_desc); +#ifdef USE_ESP32_SRAM1_AS_IRAM + if (desc_err != ESP_OK) { + ESP_LOGE(TAG, "New bootloader does not support SRAM1 as IRAM"); + return OTA_RESPONSE_ERROR_BOOTLOADER_VERIFY; + } +#endif + ESP_LOGE(TAG, "Starting bootloader update.\n" + " DO NOT REMOVE POWER until the update completes successfully.\n" + " Loss of power during this operation may render the device\n" + " unable to boot until it is recovered via a serial flash."); + esp_err_t err = esp_partition_copy(this->bootloader_part_, 0, this->partition_, 0, this->bootloader_part_->size); + if (err != ESP_OK) { + ESP_LOGE(TAG, "esp_partition_copy failed (err=0x%X)", err); + // Only if esp_partition_copy failed there's a chance of the device being unbootable + return OTA_RESPONSE_ERROR_BOOTLOADER_UPDATE; + } + ESP_LOGI(TAG, + "Successfully installed the new bootloader\n" + " ESP-IDF %s", + (desc_err == ESP_OK) ? bootloader_desc.idf_ver : "version unknown"); + // Wipe first sector of staging partition to make sure the device can't boot from it + err = esp_partition_erase_range(this->partition_, 0, this->partition_->erase_size); + if (err != ESP_OK) { + ESP_LOGW(TAG, "esp_partition_erase_range failed (err=0x%X)", err); + // No critical error, don't return + } + esp_partition_deregister_external(this->bootloader_part_); + this->bootloader_part_ = nullptr; + return OTA_RESPONSE_OK; +} + +} // namespace esphome::ota + +#endif // USE_OTA_PARTITIONS +#endif // USE_ESP32 diff --git a/esphome/components/ota/ota_partitions_esp_idf.cpp b/esphome/components/ota/ota_partitions_esp_idf.cpp index f7fd529986d..f91e88bde0a 100644 --- a/esphome/components/ota/ota_partitions_esp_idf.cpp +++ b/esphome/components/ota/ota_partitions_esp_idf.cpp @@ -45,32 +45,14 @@ static const esp_partition_t *find_app_partition_at(uint32_t address, size_t min // can write to it; abort() releases it on error. OTAResponseTypes IDFOTABackend::validate_new_partition_table_(uint32_t running_app_offset, size_t running_app_size, PartitionTablePlan &plan) { - esp_err_t err = esp_partition_register_external( - nullptr, ESP_PRIMARY_PARTITION_TABLE_OFFSET, ESP_PARTITION_TABLE_SIZE, "PrimaryPrtTable", - ESP_PARTITION_TYPE_PARTITION_TABLE, ESP_PARTITION_SUBTYPE_PARTITION_TABLE_PRIMARY, &this->partition_table_part_); - if (err != ESP_OK) { - ESP_LOGE(TAG, "esp_partition_register_external failed (err=0x%X)", err); - return OTA_RESPONSE_ERROR_PARTITION_TABLE_VERIFY; + OTAResponseTypes validate_result = this->register_and_validate_partition_table_part_(); + if (validate_result != OTA_RESPONSE_OK) { + return validate_result; } int num_partitions = 0; - const esp_partition_info_t *existing_partition_table = nullptr; - esp_partition_mmap_handle_t partition_table_map; - err = esp_partition_mmap(this->partition_table_part_, 0, ESP_PARTITION_TABLE_MAX_LEN, ESP_PARTITION_MMAP_DATA, - reinterpret_cast(&existing_partition_table), &partition_table_map); - if (err != ESP_OK) { - ESP_LOGE(TAG, "esp_partition_mmap failed (err=0x%X)", err); - return OTA_RESPONSE_ERROR_PARTITION_TABLE_VERIFY; - } - err = esp_partition_table_verify(existing_partition_table, true, &num_partitions); - esp_partition_munmap(partition_table_map); - if (err != ESP_OK) { - ESP_LOGE(TAG, "esp_partition_table_verify failed (existing partition table) (err=0x%X)", err); - return OTA_RESPONSE_ERROR_PARTITION_TABLE_VERIFY; - } - const esp_partition_info_t *new_partition_table = reinterpret_cast(this->buf_); - err = esp_partition_table_verify(new_partition_table, true, &num_partitions); + esp_err_t err = esp_partition_table_verify(new_partition_table, true, &num_partitions); if (err != ESP_OK) { ESP_LOGE(TAG, "esp_partition_table_verify failed (new partition table) (err=0x%X)", err); return OTA_RESPONSE_ERROR_PARTITION_TABLE_VERIFY; @@ -288,6 +270,33 @@ OTAResponseTypes IDFOTABackend::update_partition_table() { return OTA_RESPONSE_OK; } +OTAResponseTypes IDFOTABackend::register_and_validate_partition_table_part_() { + esp_err_t err = esp_partition_register_external( + nullptr, ESP_PRIMARY_PARTITION_TABLE_OFFSET, ESP_PARTITION_TABLE_SIZE, "PrimaryPrtTable", + ESP_PARTITION_TYPE_PARTITION_TABLE, ESP_PARTITION_SUBTYPE_PARTITION_TABLE_PRIMARY, &this->partition_table_part_); + if (err != ESP_OK) { + ESP_LOGE(TAG, "esp_partition_register_external failed (partition table) (err=0x%X)", err); + return OTA_RESPONSE_ERROR_PARTITION_TABLE_VERIFY; + } + + int num_partitions = 0; + const esp_partition_info_t *existing_partition_table = nullptr; + esp_partition_mmap_handle_t partition_table_map; + err = esp_partition_mmap(this->partition_table_part_, 0, ESP_PARTITION_TABLE_MAX_LEN, ESP_PARTITION_MMAP_DATA, + reinterpret_cast(&existing_partition_table), &partition_table_map); + if (err != ESP_OK) { + ESP_LOGE(TAG, "esp_partition_mmap failed (partition table) (err=0x%X)", err); + return OTA_RESPONSE_ERROR_PARTITION_TABLE_VERIFY; + } + err = esp_partition_table_verify(existing_partition_table, true, &num_partitions); + esp_partition_munmap(partition_table_map); + if (err != ESP_OK) { + ESP_LOGE(TAG, "esp_partition_table_verify failed (existing partition table) (err=0x%X)", err); + return OTA_RESPONSE_ERROR_PARTITION_TABLE_VERIFY; + } + return OTA_RESPONSE_OK; +} + // Process-scoped cache. Cannot be a backend member: backends are per-connection but the cache // must outlive a connection that called esp_partition_unload_all(), after which // esp_ota_get_running_partition() no longer returns valid data. diff --git a/esphome/core/__init__.py b/esphome/core/__init__.py index 94a48dd31b0..0cc207aa543 100644 --- a/esphome/core/__init__.py +++ b/esphome/core/__init__.py @@ -790,6 +790,12 @@ class EsphomeCore: ) return self.relative_pioenvs_path(self.name, "partitions.bin") + @property + def bootloader_bin(self) -> Path: + if self.data.get(KEY_NATIVE_IDF): + return self.relative_build_path("build", "bootloader", "bootloader.bin") + return self.relative_pioenvs_path(self.name, "bootloader.bin") + @property def target_platform(self): return self.data[KEY_CORE][KEY_TARGET_PLATFORM] diff --git a/esphome/espota2.py b/esphome/espota2.py index b2a1fd2a40a..576b1c6b2da 100644 --- a/esphome/espota2.py +++ b/esphome/espota2.py @@ -17,6 +17,7 @@ from esphome.helpers import ProgressBar, resolve_ip_address OTA_TYPE_UPDATE_APP = 0x00 OTA_TYPE_UPDATE_PARTITION_TABLE = 0x01 +OTA_TYPE_UPDATE_BOOTLOADER = 0x02 RESPONSE_OK = 0x00 RESPONSE_REQUEST_AUTH = 0x01 @@ -49,6 +50,8 @@ RESPONSE_ERROR_SIGNATURE_INVALID = 0x8D RESPONSE_ERROR_UNSUPPORTED_OTA_TYPE = 0x8E RESPONSE_ERROR_PARTITION_TABLE_VERIFY = 0x8F RESPONSE_ERROR_PARTITION_TABLE_UPDATE = 0x90 +RESPONSE_ERROR_BOOTLOADER_VERIFY = 0x91 +RESPONSE_ERROR_BOOTLOADER_UPDATE = 0x92 RESPONSE_ERROR_UNKNOWN = 0xFF OTA_VERSION_1_0 = 1 @@ -66,7 +69,7 @@ SERVER_FEATURE_SUPPORTS_PARTITION_ACCESS = 0x02 # updates extend this set. Anything outside the set is rejected up front so callers # of perform_ota/run_ota get a clear error instead of a post-auth 0x8E from the device. _SUPPORTED_OTA_TYPES: frozenset[int] = frozenset( - {OTA_TYPE_UPDATE_APP, OTA_TYPE_UPDATE_PARTITION_TABLE} + {OTA_TYPE_UPDATE_APP, OTA_TYPE_UPDATE_PARTITION_TABLE, OTA_TYPE_UPDATE_BOOTLOADER} ) UPLOAD_BLOCK_SIZE = 8192 @@ -143,6 +146,16 @@ _ERROR_MESSAGES: dict[int, str] = { "the partition table update without rebooting the device. If the device " "fails to boot, recover it via a serial flash." ), + RESPONSE_ERROR_BOOTLOADER_VERIFY: ( + "The bootloader update could not be verified. No changes were " + "made to the bootloader. Check the logs for more information and retry." + ), + RESPONSE_ERROR_BOOTLOADER_UPDATE: ( + "An error occurred while updating the bootloader. The device is now " + "in a degraded state and may not be able to boot. Open the logs and retry " + "the bootloader update without rebooting the device. If the device " + "fails to boot, recover it via a serial flash." + ), RESPONSE_ERROR_UNKNOWN: "Unknown error from ESP", } @@ -325,15 +338,24 @@ def perform_ota( # Any non-app OTA type requires the extended protocol and the # partition-access server feature. Reject up front so the user gets # a clear capability error instead of a post-auth 0x8E from the device. + flag_name = { + OTA_TYPE_UPDATE_PARTITION_TABLE: "--partition-table", + OTA_TYPE_UPDATE_BOOTLOADER: "--bootloader", + }.get(ota_type, f"OTA type 0x{ota_type:02X}") if not extended_proto: raise OTAError( - f"Device does not support extended OTA protocol; " - f"OTA type 0x{ota_type:02X} requires it" + f"Device does not support the extended OTA protocol that " + f"{flag_name} requires. The running firmware is too old; " + f"recompile and upload a current ESPHome firmware via a " + f"regular OTA (without {flag_name}), then retry." ) if not (features & SERVER_FEATURE_SUPPORTS_PARTITION_ACCESS): raise OTAError( - f"Device does not support partition access; " - f"OTA type 0x{ota_type:02X} cannot be used" + f"The running firmware was built without " + f"'allow_partition_access: true', so {flag_name} cannot be " + f"used. Add the option to the esphome OTA platform in your " + f"YAML, recompile and upload (without {flag_name}), then " + f"retry {flag_name}." ) if features & SERVER_FEATURE_SUPPORTS_COMPRESSION: diff --git a/tests/unit_tests/test_core.py b/tests/unit_tests/test_core.py index 22be59653aa..9dc37918ae9 100644 --- a/tests/unit_tests/test_core.py +++ b/tests/unit_tests/test_core.py @@ -853,6 +853,23 @@ class TestEsphomeCore: target.testing_ensure_platform_registered("sensor") assert target.platform_counts["sensor"] == 3 + def test_bootloader_bin__native_idf(self, target): + """Native ESP-IDF builds emit the bootloader under build/bootloader/bootloader.bin.""" + target.data[const.KEY_NATIVE_IDF] = True + + assert target.bootloader_bin == Path( + "foo/build/build/bootloader/bootloader.bin" + ) + + def test_bootloader_bin__platformio(self, target): + """For PlatformIO builds bootloader.bin lives in the env-specific .pioenvs directory.""" + target.name = "test-device" + target.data[const.KEY_NATIVE_IDF] = False + + assert target.bootloader_bin == Path( + "foo/build/.pioenvs/test-device/bootloader.bin" + ) + def test_add_library__extracts_short_name_from_path(self, target): """Test add_library extracts short name from library paths like owner/lib.""" target.data[const.KEY_CORE] = { diff --git a/tests/unit_tests/test_espota2.py b/tests/unit_tests/test_espota2.py index 2cad1d2ec81..b22ad461132 100644 --- a/tests/unit_tests/test_espota2.py +++ b/tests/unit_tests/test_espota2.py @@ -201,6 +201,14 @@ def test_receive_exactly_socket_error(mock_socket: Mock) -> None: espota2.RESPONSE_ERROR_PARTITION_TABLE_UPDATE, "Error: An error occurred while updating the partition table", ), + ( + espota2.RESPONSE_ERROR_BOOTLOADER_VERIFY, + "Error: The bootloader update could not be verified", + ), + ( + espota2.RESPONSE_ERROR_BOOTLOADER_UPDATE, + "Error: An error occurred while updating the bootloader", + ), (espota2.RESPONSE_ERROR_UNKNOWN, "Unknown error from ESP"), ], ) @@ -992,7 +1000,8 @@ def test_perform_ota_non_app_type_requires_extended_protocol( mock_socket.recv.side_effect = recv_responses with pytest.raises( - espota2.OTAError, match="Device does not support extended OTA protocol" + espota2.OTAError, + match="Device does not support the extended OTA protocol", ): espota2.perform_ota( mock_socket, @@ -1026,7 +1035,8 @@ def test_perform_ota_non_app_type_requires_partition_access( mock_socket.recv.side_effect = recv_responses with pytest.raises( - espota2.OTAError, match="Device does not support partition access" + espota2.OTAError, + match=(r"running firmware was built without 'allow_partition_access: true'"), ): espota2.perform_ota( mock_socket, @@ -1037,6 +1047,60 @@ def test_perform_ota_non_app_type_requires_partition_access( ) +@pytest.mark.usefixtures("mock_time") +def test_perform_ota_partition_access_error_names_bootloader_flag( + mock_socket: Mock, mock_file: io.BytesIO +) -> None: + """Bootloader OTA against a stale device must point at the --bootloader flag.""" + recv_responses = [ + bytes([espota2.RESPONSE_OK]), + bytes([espota2.OTA_VERSION_2_0]), + bytes([espota2.RESPONSE_FEATURE_FLAGS]), + bytes([0]), # No partition access + ] + + mock_socket.recv.side_effect = recv_responses + + with pytest.raises( + espota2.OTAError, + match=r"--bootloader.*recompile and upload.*--bootloader.*retry --bootloader", + ): + espota2.perform_ota( + mock_socket, + "testpass", + mock_file, + "test.bin", + espota2.OTA_TYPE_UPDATE_BOOTLOADER, + ) + + +@pytest.mark.usefixtures("mock_time") +def test_perform_ota_partition_access_error_names_partition_table_flag( + mock_socket: Mock, mock_file: io.BytesIO +) -> None: + """Partition-table OTA against a stale device must point at the --partition-table flag.""" + recv_responses = [ + bytes([espota2.RESPONSE_OK]), + bytes([espota2.OTA_VERSION_2_0]), + bytes([espota2.RESPONSE_FEATURE_FLAGS]), + bytes([0]), # No partition access + ] + + mock_socket.recv.side_effect = recv_responses + + with pytest.raises( + espota2.OTAError, + match=r"--partition-table.*retry --partition-table", + ): + espota2.perform_ota( + mock_socket, + "testpass", + mock_file, + "test.bin", + espota2.OTA_TYPE_UPDATE_PARTITION_TABLE, + ) + + def test_check_error_detects_errors_when_expect_is_none() -> None: """check_error must surface device error bytes even when expect is None. diff --git a/tests/unit_tests/test_main.py b/tests/unit_tests/test_main.py index 4ab7bb3344b..4b0590cf76d 100644 --- a/tests/unit_tests/test_main.py +++ b/tests/unit_tests/test_main.py @@ -24,6 +24,7 @@ from esphome.__main__ import ( _get_configured_xtal_freq, _make_crystal_freq_callback, _resolve_network_devices, + _validate_bootloader_binary, _validate_partition_table_binary, choose_upload_log_host, command_analyze_memory, @@ -89,7 +90,11 @@ from esphome.const import ( PLATFORM_RP2040, ) from esphome.core import CORE, EsphomeError -from esphome.espota2 import OTA_TYPE_UPDATE_APP, OTA_TYPE_UPDATE_PARTITION_TABLE +from esphome.espota2 import ( + OTA_TYPE_UPDATE_APP, + OTA_TYPE_UPDATE_BOOTLOADER, + OTA_TYPE_UPDATE_PARTITION_TABLE, +) from esphome.util import BootselResult, FlashImage from esphome.zeroconf import _await_discovery, discover_mdns_devices @@ -1127,6 +1132,7 @@ class MockArgs: output: str | None = None ota_platform: str | None = None partition_table: bool = False + bootloader: bool = False def test_upload_program_serial_esp32( @@ -1816,6 +1822,27 @@ def test_validate_partition_table_binary_missing_file(tmp_path: Path) -> None: _validate_partition_table_binary(tmp_path / "does-not-exist.bin") +def test_validate_bootloader_binary_rejects_wrong_magic(tmp_path: Path) -> None: + data = bytearray(_make_bootloader_bytes()) + data[0] = 0x00 + f = tmp_path / "bootloader.bin" + f.write_bytes(bytes(data)) + with pytest.raises(EsphomeError, match="magic"): + _validate_bootloader_binary(f) + + +def test_validate_bootloader_binary_missing_file(tmp_path: Path) -> None: + with pytest.raises(EsphomeError, match="Cannot read bootloader file"): + _validate_bootloader_binary(tmp_path / "does-not-exist.bin") + + +def test_validate_bootloader_binary_rejects_empty_file(tmp_path: Path) -> None: + f = tmp_path / "bootloader.bin" + f.write_bytes(b"") + with pytest.raises(EsphomeError, match="is empty"): + _validate_bootloader_binary(f) + + def test_upload_program_ota_partition_table_invalid_file( mock_run_ota: Mock, mock_get_port_type: Mock, @@ -1869,7 +1896,155 @@ def test_upload_program_ota_partition_table_without_allow_flag( with pytest.raises( EsphomeError, - match="requires 'allow_partition_access: true'", + match=( + r"The option --partition-table requires 'allow_partition_access: true'.*" + r"retry --partition-table" + ), + ): + upload_program(config, args, devices) + mock_run_ota.assert_not_called() + + +def _make_bootloader_bytes() -> bytes: + """Build a minimal bootloader image accepted by _validate_bootloader_binary.""" + table = bytearray(b"\xff") + # Starts with: ESP_IMAGE_HEADER_MAGIC (0xE9) + table[0] = 0xE9 + return bytes(table) + + +def test_upload_program_ota_bootloader_with_file_arg( + mock_run_ota: Mock, + mock_get_port_type: Mock, + tmp_path: Path, +) -> None: + """Test upload_program with OTA and bootloader.""" + setup_core(platform=PLATFORM_ESP32, tmp_path=tmp_path) + + mock_get_port_type.return_value = "NETWORK" + mock_run_ota.return_value = (0, "192.168.1.100") + + bootloader_file = tmp_path / "bootloader.bin" + bootloader_file.write_bytes(_make_bootloader_bytes()) + + config = { + CONF_OTA: [ + { + CONF_PLATFORM: CONF_ESPHOME, + CONF_PORT: 3232, + "allow_partition_access": True, + } + ] + } + args = MockArgs(file=str(bootloader_file), bootloader=True) + devices = ["192.168.1.100"] + + exit_code, host = upload_program(config, args, devices) + + assert exit_code == 0 + assert host == "192.168.1.100" + mock_run_ota.assert_called_once_with( + ["192.168.1.100"], + 3232, + None, + bootloader_file, + OTA_TYPE_UPDATE_BOOTLOADER, + ) + + +def test_upload_program_ota_partition_table_and_bootloader_options( + mock_run_ota: Mock, + mock_get_port_type: Mock, + tmp_path: Path, +) -> None: + """--partition-table and --bootloader can't be used together.""" + setup_core(platform=PLATFORM_ESP32, tmp_path=tmp_path) + + mock_get_port_type.return_value = "NETWORK" + + config = { + CONF_OTA: [ + { + CONF_PLATFORM: CONF_ESPHOME, + CONF_PORT: 3232, + "allow_partition_access": True, + } + ] + } + args = MockArgs(file="partitions.bin", partition_table=True, bootloader=True) + devices = ["192.168.1.100"] + + with pytest.raises( + EsphomeError, + match="--partition-table and --bootloader", + ): + upload_program(config, args, devices) + mock_run_ota.assert_not_called() + + +def test_upload_program_ota_bootloader_without_allow_flag( + mock_run_ota: Mock, + mock_get_port_type: Mock, + tmp_path: Path, +) -> None: + """--bootloader must fail fast when allow_partition_access is not enabled in YAML.""" + setup_core(platform=PLATFORM_ESP32, tmp_path=tmp_path) + + mock_get_port_type.return_value = "NETWORK" + + config = { + CONF_OTA: [ + { + CONF_PLATFORM: CONF_ESPHOME, + CONF_PORT: 3232, + } + ] + } + args = MockArgs(file="bootloader.bin", bootloader=True) + devices = ["192.168.1.100"] + + with pytest.raises( + EsphomeError, + match=( + r"The option --bootloader requires 'allow_partition_access: true'.*" + r"retry --bootloader" + ), + ): + upload_program(config, args, devices) + mock_run_ota.assert_not_called() + + +def test_upload_program_ota_bootloader_platform_web_server( + mock_run_ota: Mock, + mock_get_port_type: Mock, + tmp_path: Path, +) -> None: + """Test bootloader upload with web_server OTA.""" + setup_core(platform=PLATFORM_ESP32, tmp_path=tmp_path) + + mock_get_port_type.return_value = "NETWORK" + + bootloader_file = tmp_path / "bootloader.bin" + bootloader_file.write_bytes(_make_bootloader_bytes()) + + config = { + CONF_OTA: [ + { + CONF_PLATFORM: CONF_WEB_SERVER, + CONF_WEB_SERVER: { + CONF_PORT: 80, + CONF_AUTH: {CONF_USERNAME: "admin", CONF_PASSWORD: "pw"}, + }, + "allow_partition_access": True, + } + ] + } + args = MockArgs(file=str(bootloader_file), bootloader=True) + devices = ["192.168.1.100"] + + with pytest.raises( + EsphomeError, + match="the web_server OTA path can only update the firmware image", ): upload_program(config, args, devices) mock_run_ota.assert_not_called()