[ota] Add bootloader update functionality to ota component (#16238)
CI / Create common environment (push) Has been cancelled
CI / Check pylint (push) Has been cancelled
CI / Run script/ci-custom (push) Has been cancelled
CI / Check import esphome.__main__ time (push) Has been cancelled
CI / Test downstream esphome/device-builder (push) Has been cancelled
CI / Run pytest (macOS-latest, 3.11) (push) Has been cancelled
CI / Run pytest (macOS-latest, 3.14) (push) Has been cancelled
CI / Run pytest (ubuntu-latest, 3.11) (push) Has been cancelled
CI / Run pytest (ubuntu-latest, 3.13) (push) Has been cancelled
CI / Run pytest (ubuntu-latest, 3.14) (push) Has been cancelled
CI / Run pytest (windows-latest, 3.11) (push) Has been cancelled
CI / Run pytest (windows-latest, 3.14) (push) Has been cancelled
CI / Determine which jobs to run (push) Has been cancelled
CI / Run integration tests (${{ matrix.bucket.name }}) (push) Has been cancelled
CI / Run C++ unit tests (push) Has been cancelled
CI / Run CodSpeed benchmarks (push) Has been cancelled
CI / Run script/clang-tidy for ESP32 IDF (push) Has been cancelled
CI / Run script/clang-tidy for ESP8266 (push) Has been cancelled
CI / Run script/clang-tidy for ZEPHYR (push) Has been cancelled
CI / Run script/clang-tidy for ESP32 Arduino (push) Has been cancelled
CI / Run script/clang-tidy for ESP32 Arduino 1/4 (push) Has been cancelled
CI / Run script/clang-tidy for ESP32 Arduino 2/4 (push) Has been cancelled
CI / Run script/clang-tidy for ESP32 Arduino 3/4 (push) Has been cancelled
CI / Run script/clang-tidy for ESP32 Arduino 4/4 (push) Has been cancelled
CI / Test components batch (${{ matrix.components }}) (push) Has been cancelled
CI / pre-commit.ci lite (push) Has been cancelled
CI / Build target branch for memory impact (push) Has been cancelled
CI / Build PR branch for memory impact (push) Has been cancelled
CI / Comment memory impact (push) Has been cancelled
CI / CI Status (push) Has been cancelled

Co-authored-by: pre-commit-ci-lite[bot] <117423508+pre-commit-ci-lite[bot]@users.noreply.github.com>
Co-authored-by: J. Nick Koston <nick@home-assistant.io>
This commit is contained in:
Mat931
2026-05-08 21:36:06 +00:00
committed by GitHub
parent 3abf2c99a2
commit 1365251365
12 changed files with 570 additions and 51 deletions
+55 -11
View File
@@ -1125,11 +1125,18 @@ def upload_program(
# MQTT and MQTTIP are also OTA paths; MQTTIP gets resolved to a real IP later by
# _resolve_network_devices(). Only SERIAL and BOOTSEL are non-OTA upload paths.
if port_type in (PortType.SERIAL, PortType.BOOTSEL) and getattr(
args, "partition_table", False
is_partition_table = getattr(args, "partition_table", False)
is_bootloader = getattr(args, "bootloader", False)
if is_partition_table and is_bootloader:
raise EsphomeError(
"The options --partition-table and --bootloader can't be used together."
)
option_string = "--partition-table" if is_partition_table else "--bootloader"
if port_type in (PortType.SERIAL, PortType.BOOTSEL) and (
is_partition_table or is_bootloader
):
raise EsphomeError(
"The option --partition-table can only be used for Over The Air updates."
f"The option {option_string} can only be used for Over The Air updates."
)
if port_type == PortType.BOOTSEL:
@@ -1158,9 +1165,9 @@ def upload_program(
network_devices = _resolve_network_devices(devices, config, args)
if chosen_platform == CONF_WEB_SERVER:
if getattr(args, "partition_table", False):
if is_partition_table or is_bootloader:
raise EsphomeError(
"--partition-table is only supported with the esphome OTA platform; "
f"{option_string} is only supported with the esphome OTA platform; "
"the web_server OTA path can only update the firmware image."
)
binary = CORE.firmware_bin
@@ -1228,25 +1235,34 @@ def _upload_via_native_api(
remote_port = int(ota_conf[CONF_PORT])
password = ota_conf.get(CONF_PASSWORD)
def check_partition_access(option_string: str) -> None:
if not ota_conf.get("allow_partition_access"):
raise EsphomeError(
f"The option {option_string} requires 'allow_partition_access: true' on the "
"esphome OTA platform in the device's YAML configuration. Add it, recompile, "
f"flash a build with the option enabled, and then retry {option_string}."
)
binary = CORE.firmware_bin
ota_type = espota2.OTA_TYPE_UPDATE_APP
if getattr(args, "partition_table", False):
# Fail fast if the resolved ESPHome OTA config does not enable allow_partition_access.
# The device-side handshake also rejects this with "Device only supports app updates",
# but checking here surfaces the misconfiguration before opening a network connection.
if not ota_conf.get("allow_partition_access"):
raise EsphomeError(
"The option --partition-table requires 'allow_partition_access: true' on the "
"esphome OTA platform in the device's YAML configuration. Add it, recompile, "
"flash a build with the option enabled, and then retry --partition-table."
)
check_partition_access("--partition-table")
binary = CORE.partition_table_bin
ota_type = espota2.OTA_TYPE_UPDATE_PARTITION_TABLE
elif getattr(args, "bootloader", False):
check_partition_access("--bootloader")
binary = CORE.bootloader_bin
ota_type = espota2.OTA_TYPE_UPDATE_BOOTLOADER
if getattr(args, "file", None) is not None:
binary = Path(args.file)
if ota_type == espota2.OTA_TYPE_UPDATE_PARTITION_TABLE:
_validate_partition_table_binary(binary)
if ota_type == espota2.OTA_TYPE_UPDATE_BOOTLOADER:
_validate_bootloader_binary(binary)
return espota2.run_ota(network_devices, remote_port, password, binary, ota_type)
@@ -1281,6 +1297,7 @@ def _upload_via_web_server(
_PARTITION_TABLE_MAX_LEN = 0xC00
_ESP_PARTITION_MAGIC = 0x50AA
_ESP_PARTITION_MAGIC_MD5 = 0xEBEB
_ESP_IMAGE_HEADER_MAGIC = 0xE9
def _validate_partition_table_binary(binary: Path) -> None:
@@ -1326,6 +1343,28 @@ def _validate_partition_table_binary(binary: Path) -> None:
)
def _validate_bootloader_binary(binary: Path) -> None:
"""Validate that ``binary`` looks like an ESP32 bootloader image."""
try:
data = binary.read_bytes()
except OSError as err:
raise EsphomeError(f"Cannot read bootloader file '{binary}': {err}") from err
if not data:
raise EsphomeError(
f"Bootloader file '{binary}' is empty. "
"This file does not look like an ESP32 bootloader."
)
first_magic = data[0]
if first_magic != _ESP_IMAGE_HEADER_MAGIC:
raise EsphomeError(
f"Bootloader file '{binary}' does not start with the expected "
f"image header magic 0x{_ESP_IMAGE_HEADER_MAGIC:02X} (got 0x{first_magic:02X}). "
"This file does not look like an ESP32 bootloader."
)
def show_logs(config: ConfigType, args: ArgsProtocol, devices: list[str]) -> int | None:
try:
module = importlib.import_module("esphome.components." + CORE.target_platform)
@@ -2009,6 +2048,11 @@ def parse_args(argv):
help="Upload as partition table (OTA).",
action="store_true",
)
parser_upload.add_argument(
"--bootloader",
help="Upload as bootloader (OTA).",
action="store_true",
)
parser_logs = subparsers.add_parser(
"logs",
+11 -6
View File
@@ -125,8 +125,11 @@ void ESPHomeOTAComponent::dump_config() {
it = esp_partition_next(it);
}
esp_partition_iterator_release(it);
#endif
#endif
esp_bootloader_desc_t bootloader_desc;
esp_err_t err = esp_ota_get_bootloader_description(nullptr, &bootloader_desc);
ESP_LOGCONFIG(TAG, " Bootloader: ESP-IDF %s", (err == ESP_OK) ? bootloader_desc.idf_ver : "version unknown");
#endif // USE_ESP32
#endif // USE_OTA_PARTITIONS
}
void ESPHomeOTAComponent::loop() {
@@ -336,7 +339,6 @@ void ESPHomeOTAComponent::handle_data_() {
/// wakeable_delay() in read();
/// write() always returns immediately
ota::OTAResponseTypes error_code = ota::OTA_RESPONSE_ERROR_UNKNOWN;
bool update_started = false;
size_t total = 0;
uint32_t last_progress = 0;
uint32_t last_data_ms = 0;
@@ -399,7 +401,6 @@ void ESPHomeOTAComponent::handle_data_() {
error_code = this->backend_->begin(ota_size, ota_type);
if (error_code != ota::OTA_RESPONSE_OK)
goto error; // NOLINT(cppcoreguidelines-avoid-goto)
update_started = true;
// Acknowledge prepare OK - 1 byte
this->write_byte_(ota::OTA_RESPONSE_UPDATE_PREPARE_OK);
@@ -510,8 +511,12 @@ void ESPHomeOTAComponent::handle_data_() {
error:
this->write_byte_(static_cast<uint8_t>(error_code));
// Abort backend before cleanup - cleanup_connection_() destroys the backend
if (this->backend_ != nullptr && update_started) {
// Abort backend before cleanup - cleanup_connection_() destroys the backend.
// Always call abort() unconditionally: backends register external partitions before
// esp_ota_begin (partition table / bootloader paths), and abort() is responsible for
// releasing those even if begin() failed before an OTA handle was opened. The IDF
// backend's esp_ota_abort(0) is documented as harmless.
if (this->backend_ != nullptr) {
this->backend_->abort();
}
+3
View File
@@ -44,6 +44,8 @@ enum OTAResponseTypes {
OTA_RESPONSE_ERROR_UNSUPPORTED_OTA_TYPE = 0x8E,
OTA_RESPONSE_ERROR_PARTITION_TABLE_VERIFY = 0x8F,
OTA_RESPONSE_ERROR_PARTITION_TABLE_UPDATE = 0x90,
OTA_RESPONSE_ERROR_BOOTLOADER_VERIFY = 0x91,
OTA_RESPONSE_ERROR_BOOTLOADER_UPDATE = 0x92,
OTA_RESPONSE_ERROR_UNKNOWN = 0xFF,
};
@@ -58,6 +60,7 @@ enum OTAState {
enum OTAType : uint8_t {
OTA_TYPE_UPDATE_APP = 0x00,
OTA_TYPE_UPDATE_PARTITION_TABLE = 0x01,
OTA_TYPE_UPDATE_BOOTLOADER = 0x02,
};
/** Listener interface for OTA state changes.
+31 -3
View File
@@ -31,7 +31,13 @@ OTAResponseTypes IDFOTABackend::begin(size_t image_size, ota::OTAType ota_type)
this->md5_.init();
return OTA_RESPONSE_OK;
}
if (this->ota_type_ != ota::OTA_TYPE_UPDATE_APP) {
if (this->ota_type_ == ota::OTA_TYPE_UPDATE_BOOTLOADER) {
OTAResponseTypes result = this->prepare_bootloader_update_(image_size);
if (result != OTA_RESPONSE_OK) {
return result;
}
}
if (!this->is_app_or_bootloader_update_()) {
return OTA_RESPONSE_ERROR_UNSUPPORTED_OTA_TYPE;
}
#else
@@ -55,6 +61,7 @@ OTAResponseTypes IDFOTABackend::begin(size_t image_size, ota::OTAType ota_type)
esp_err_t err = esp_ota_begin(this->partition_, image_size, &this->update_handle_);
if (err != ESP_OK) {
ESP_LOGE(TAG, "esp_ota_begin failed (err=0x%X)", err);
esp_ota_abort(this->update_handle_);
this->update_handle_ = 0;
if (err == ESP_ERR_INVALID_SIZE) {
@@ -64,6 +71,14 @@ OTAResponseTypes IDFOTABackend::begin(size_t image_size, ota::OTAType ota_type)
}
return OTA_RESPONSE_ERROR_UNKNOWN;
}
#ifdef USE_OTA_PARTITIONS
if (this->ota_type_ == ota::OTA_TYPE_UPDATE_BOOTLOADER) {
OTAResponseTypes result = this->setup_bootloader_staging_();
if (result != OTA_RESPONSE_OK) {
return result;
}
}
#endif
this->md5_.init();
return OTA_RESPONSE_OK;
}
@@ -85,13 +100,14 @@ OTAResponseTypes IDFOTABackend::write(uint8_t *data, size_t len) {
this->md5_.add(data, len);
return OTA_RESPONSE_OK;
}
if (this->ota_type_ != ota::OTA_TYPE_UPDATE_APP) {
if (!this->is_app_or_bootloader_update_()) {
return OTA_RESPONSE_ERROR_UNSUPPORTED_OTA_TYPE;
}
#endif
esp_err_t err = esp_ota_write(this->update_handle_, data, len);
this->md5_.add(data, len);
if (err != ESP_OK) {
ESP_LOGE(TAG, "esp_ota_write failed (err=0x%X)", err);
if (err == ESP_ERR_OTA_VALIDATE_FAILED) {
return OTA_RESPONSE_ERROR_MAGIC;
} else if (err == ESP_ERR_FLASH_OP_TIMEOUT || err == ESP_ERR_FLASH_OP_FAIL) {
@@ -114,12 +130,20 @@ OTAResponseTypes IDFOTABackend::end() {
if (this->ota_type_ == ota::OTA_TYPE_UPDATE_PARTITION_TABLE) {
return this->update_partition_table();
}
if (this->ota_type_ != ota::OTA_TYPE_UPDATE_APP) {
if (!this->is_app_or_bootloader_update_()) {
return OTA_RESPONSE_ERROR_UNSUPPORTED_OTA_TYPE;
}
#endif
esp_err_t err = esp_ota_end(this->update_handle_);
this->update_handle_ = 0;
if (err != ESP_OK) {
ESP_LOGE(TAG, "esp_ota_end failed (err=0x%X)", err);
}
#ifdef USE_OTA_PARTITIONS
if (this->ota_type_ == ota::OTA_TYPE_UPDATE_BOOTLOADER) {
return this->finalize_bootloader_update_(err);
}
#endif
if (err == ESP_OK) {
err = esp_ota_set_boot_partition(this->partition_);
if (err == ESP_OK) {
@@ -146,6 +170,10 @@ void IDFOTABackend::abort() {
esp_partition_deregister_external(this->partition_table_part_);
this->partition_table_part_ = nullptr;
}
if (this->bootloader_part_ != nullptr) {
esp_partition_deregister_external(this->bootloader_part_);
this->bootloader_part_ = nullptr;
}
#endif
// esp_ota_abort with handle 0 returns ESP_ERR_INVALID_ARG harmlessly, so this is safe whether
// or not an update is in flight.
@@ -39,6 +39,18 @@ class IDFOTABackend final {
OTAResponseTypes validate_new_partition_table_(uint32_t running_app_offset, size_t running_app_size,
PartitionTablePlan &plan);
OTAResponseTypes update_partition_table();
OTAResponseTypes register_and_validate_partition_table_part_();
// Defined in ota_bootloader_esp_idf.cpp:
OTAResponseTypes register_and_validate_bootloader_part_();
OTAResponseTypes prepare_bootloader_update_(size_t image_size);
OTAResponseTypes setup_bootloader_staging_();
OTAResponseTypes finalize_bootloader_update_(esp_err_t ota_end_err);
// The OTA types that flow through esp_ota_begin/write/end. Partition-table updates take a
// separate code path that buffers the table in RAM and never touches the OTA handle.
bool is_app_or_bootloader_update_() const {
return this->ota_type_ == ota::OTA_TYPE_UPDATE_APP || this->ota_type_ == ota::OTA_TYPE_UPDATE_BOOTLOADER;
}
#endif
private:
@@ -55,6 +67,7 @@ class IDFOTABackend final {
size_t buf_written_{0};
size_t image_size_{0};
const esp_partition_t *partition_table_part_{nullptr};
const esp_partition_t *bootloader_part_{nullptr};
ota::OTAType ota_type_{ota::OTA_TYPE_UPDATE_APP};
#endif
};
@@ -0,0 +1,133 @@
#ifdef USE_ESP32
#include "ota_backend_esp_idf.h"
#include "esphome/core/defines.h"
#ifdef USE_OTA_PARTITIONS
#include "esphome/core/log.h"
#include <esp_image_format.h>
#include <esp_ota_ops.h>
namespace esphome::ota {
static const char *const TAG = "ota.idf";
OTAResponseTypes IDFOTABackend::register_and_validate_bootloader_part_() {
// Register the bootloader partition
esp_err_t err = esp_partition_register_external(nullptr, ESP_PRIMARY_BOOTLOADER_OFFSET, ESP_BOOTLOADER_SIZE,
"PrimaryBTLDR", ESP_PARTITION_TYPE_BOOTLOADER,
ESP_PARTITION_SUBTYPE_BOOTLOADER_PRIMARY, &this->bootloader_part_);
if (err != ESP_OK) {
ESP_LOGE(TAG, "esp_partition_register_external failed (bootloader) (err=0x%X)", err);
return OTA_RESPONSE_ERROR_BOOTLOADER_VERIFY;
}
// Verify existing bootloader to make sure ESP_PRIMARY_BOOTLOADER_OFFSET is correct
esp_image_metadata_t data = {};
const esp_partition_pos_t part_pos = {
.offset = this->bootloader_part_->address,
.size = this->bootloader_part_->size,
};
err = esp_image_verify(ESP_IMAGE_VERIFY, &part_pos, &data);
if (err != ESP_OK) {
ESP_LOGE(TAG, "esp_image_verify failed (existing bootloader) (err=0x%X)", err);
return OTA_RESPONSE_ERROR_BOOTLOADER_VERIFY;
}
return OTA_RESPONSE_OK;
}
// Pre-esp_ota_begin: enforce size limit, register/verify the existing bootloader, and validate the
// partition table to confirm the bootloader region is at the expected offset (and therefore the
// expected size). The partition table registration is released here; abort() cleans up the
// bootloader registration if any later step fails.
OTAResponseTypes IDFOTABackend::prepare_bootloader_update_(size_t image_size) {
if (image_size > ESP_BOOTLOADER_SIZE) {
ESP_LOGE(TAG, "Length of received data exceeds the available bootloader size: expected <=%zu bytes, got %zu",
ESP_BOOTLOADER_SIZE, image_size);
return OTA_RESPONSE_ERROR_BOOTLOADER_VERIFY;
}
OTAResponseTypes result = this->register_and_validate_bootloader_part_();
if (result != OTA_RESPONSE_OK) {
return result;
}
result = this->register_and_validate_partition_table_part_();
if (result != OTA_RESPONSE_OK) {
return OTA_RESPONSE_ERROR_BOOTLOADER_VERIFY;
}
esp_partition_deregister_external(this->partition_table_part_);
this->partition_table_part_ = nullptr;
return OTA_RESPONSE_OK;
}
// Post-esp_ota_begin: verify the staging app partition is large enough, erase it, and redirect the
// final write target to the bootloader partition. esp_ota_set_final_partition is called with
// `restore_old_data=false` because we erased the staging region in advance.
OTAResponseTypes IDFOTABackend::setup_bootloader_staging_() {
if (this->partition_->size < this->bootloader_part_->size) {
ESP_LOGE(TAG, "Staging partition too small");
return OTA_RESPONSE_ERROR_BOOTLOADER_VERIFY;
}
// Erase full size of the bootloader partition in the staging partition
// to avoid copying old data to the bootloader partition later
esp_err_t err = esp_partition_erase_range(this->partition_, 0, this->bootloader_part_->size);
if (err != ESP_OK) {
ESP_LOGW(TAG, "esp_partition_erase_range failed (err=0x%X)", err);
// No critical error, don't return
}
err = esp_ota_set_final_partition(this->update_handle_, this->bootloader_part_, false);
if (err != ESP_OK) {
esp_ota_abort(this->update_handle_);
this->update_handle_ = 0;
ESP_LOGE(TAG, "esp_ota_set_final_partition failed (err=0x%X)", err);
return OTA_RESPONSE_ERROR_BOOTLOADER_VERIFY;
}
return OTA_RESPONSE_OK;
}
// After esp_ota_end: copy the staged image into the bootloader partition. esp_partition_copy is
// the only window in which a power loss can render the device unbootable; everything before this
// point either preserves the existing bootloader or fails harmlessly. After a successful copy the
// first sector of staging is wiped so the device can't accidentally boot from it, and the
// bootloader partition is deregistered.
OTAResponseTypes IDFOTABackend::finalize_bootloader_update_(esp_err_t ota_end_err) {
if (ota_end_err != ESP_OK) {
return OTA_RESPONSE_ERROR_BOOTLOADER_VERIFY;
}
esp_bootloader_desc_t bootloader_desc;
esp_err_t desc_err = esp_ota_get_bootloader_description(this->partition_, &bootloader_desc);
#ifdef USE_ESP32_SRAM1_AS_IRAM
if (desc_err != ESP_OK) {
ESP_LOGE(TAG, "New bootloader does not support SRAM1 as IRAM");
return OTA_RESPONSE_ERROR_BOOTLOADER_VERIFY;
}
#endif
ESP_LOGE(TAG, "Starting bootloader update.\n"
" DO NOT REMOVE POWER until the update completes successfully.\n"
" Loss of power during this operation may render the device\n"
" unable to boot until it is recovered via a serial flash.");
esp_err_t err = esp_partition_copy(this->bootloader_part_, 0, this->partition_, 0, this->bootloader_part_->size);
if (err != ESP_OK) {
ESP_LOGE(TAG, "esp_partition_copy failed (err=0x%X)", err);
// Only if esp_partition_copy failed there's a chance of the device being unbootable
return OTA_RESPONSE_ERROR_BOOTLOADER_UPDATE;
}
ESP_LOGI(TAG,
"Successfully installed the new bootloader\n"
" ESP-IDF %s",
(desc_err == ESP_OK) ? bootloader_desc.idf_ver : "version unknown");
// Wipe first sector of staging partition to make sure the device can't boot from it
err = esp_partition_erase_range(this->partition_, 0, this->partition_->erase_size);
if (err != ESP_OK) {
ESP_LOGW(TAG, "esp_partition_erase_range failed (err=0x%X)", err);
// No critical error, don't return
}
esp_partition_deregister_external(this->bootloader_part_);
this->bootloader_part_ = nullptr;
return OTA_RESPONSE_OK;
}
} // namespace esphome::ota
#endif // USE_OTA_PARTITIONS
#endif // USE_ESP32
@@ -45,32 +45,14 @@ static const esp_partition_t *find_app_partition_at(uint32_t address, size_t min
// can write to it; abort() releases it on error.
OTAResponseTypes IDFOTABackend::validate_new_partition_table_(uint32_t running_app_offset, size_t running_app_size,
PartitionTablePlan &plan) {
esp_err_t err = esp_partition_register_external(
nullptr, ESP_PRIMARY_PARTITION_TABLE_OFFSET, ESP_PARTITION_TABLE_SIZE, "PrimaryPrtTable",
ESP_PARTITION_TYPE_PARTITION_TABLE, ESP_PARTITION_SUBTYPE_PARTITION_TABLE_PRIMARY, &this->partition_table_part_);
if (err != ESP_OK) {
ESP_LOGE(TAG, "esp_partition_register_external failed (err=0x%X)", err);
return OTA_RESPONSE_ERROR_PARTITION_TABLE_VERIFY;
OTAResponseTypes validate_result = this->register_and_validate_partition_table_part_();
if (validate_result != OTA_RESPONSE_OK) {
return validate_result;
}
int num_partitions = 0;
const esp_partition_info_t *existing_partition_table = nullptr;
esp_partition_mmap_handle_t partition_table_map;
err = esp_partition_mmap(this->partition_table_part_, 0, ESP_PARTITION_TABLE_MAX_LEN, ESP_PARTITION_MMAP_DATA,
reinterpret_cast<const void **>(&existing_partition_table), &partition_table_map);
if (err != ESP_OK) {
ESP_LOGE(TAG, "esp_partition_mmap failed (err=0x%X)", err);
return OTA_RESPONSE_ERROR_PARTITION_TABLE_VERIFY;
}
err = esp_partition_table_verify(existing_partition_table, true, &num_partitions);
esp_partition_munmap(partition_table_map);
if (err != ESP_OK) {
ESP_LOGE(TAG, "esp_partition_table_verify failed (existing partition table) (err=0x%X)", err);
return OTA_RESPONSE_ERROR_PARTITION_TABLE_VERIFY;
}
const esp_partition_info_t *new_partition_table = reinterpret_cast<const esp_partition_info_t *>(this->buf_);
err = esp_partition_table_verify(new_partition_table, true, &num_partitions);
esp_err_t err = esp_partition_table_verify(new_partition_table, true, &num_partitions);
if (err != ESP_OK) {
ESP_LOGE(TAG, "esp_partition_table_verify failed (new partition table) (err=0x%X)", err);
return OTA_RESPONSE_ERROR_PARTITION_TABLE_VERIFY;
@@ -288,6 +270,33 @@ OTAResponseTypes IDFOTABackend::update_partition_table() {
return OTA_RESPONSE_OK;
}
OTAResponseTypes IDFOTABackend::register_and_validate_partition_table_part_() {
esp_err_t err = esp_partition_register_external(
nullptr, ESP_PRIMARY_PARTITION_TABLE_OFFSET, ESP_PARTITION_TABLE_SIZE, "PrimaryPrtTable",
ESP_PARTITION_TYPE_PARTITION_TABLE, ESP_PARTITION_SUBTYPE_PARTITION_TABLE_PRIMARY, &this->partition_table_part_);
if (err != ESP_OK) {
ESP_LOGE(TAG, "esp_partition_register_external failed (partition table) (err=0x%X)", err);
return OTA_RESPONSE_ERROR_PARTITION_TABLE_VERIFY;
}
int num_partitions = 0;
const esp_partition_info_t *existing_partition_table = nullptr;
esp_partition_mmap_handle_t partition_table_map;
err = esp_partition_mmap(this->partition_table_part_, 0, ESP_PARTITION_TABLE_MAX_LEN, ESP_PARTITION_MMAP_DATA,
reinterpret_cast<const void **>(&existing_partition_table), &partition_table_map);
if (err != ESP_OK) {
ESP_LOGE(TAG, "esp_partition_mmap failed (partition table) (err=0x%X)", err);
return OTA_RESPONSE_ERROR_PARTITION_TABLE_VERIFY;
}
err = esp_partition_table_verify(existing_partition_table, true, &num_partitions);
esp_partition_munmap(partition_table_map);
if (err != ESP_OK) {
ESP_LOGE(TAG, "esp_partition_table_verify failed (existing partition table) (err=0x%X)", err);
return OTA_RESPONSE_ERROR_PARTITION_TABLE_VERIFY;
}
return OTA_RESPONSE_OK;
}
// Process-scoped cache. Cannot be a backend member: backends are per-connection but the cache
// must outlive a connection that called esp_partition_unload_all(), after which
// esp_ota_get_running_partition() no longer returns valid data.
+6
View File
@@ -790,6 +790,12 @@ class EsphomeCore:
)
return self.relative_pioenvs_path(self.name, "partitions.bin")
@property
def bootloader_bin(self) -> Path:
if self.data.get(KEY_NATIVE_IDF):
return self.relative_build_path("build", "bootloader", "bootloader.bin")
return self.relative_pioenvs_path(self.name, "bootloader.bin")
@property
def target_platform(self):
return self.data[KEY_CORE][KEY_TARGET_PLATFORM]
+27 -5
View File
@@ -17,6 +17,7 @@ from esphome.helpers import ProgressBar, resolve_ip_address
OTA_TYPE_UPDATE_APP = 0x00
OTA_TYPE_UPDATE_PARTITION_TABLE = 0x01
OTA_TYPE_UPDATE_BOOTLOADER = 0x02
RESPONSE_OK = 0x00
RESPONSE_REQUEST_AUTH = 0x01
@@ -49,6 +50,8 @@ RESPONSE_ERROR_SIGNATURE_INVALID = 0x8D
RESPONSE_ERROR_UNSUPPORTED_OTA_TYPE = 0x8E
RESPONSE_ERROR_PARTITION_TABLE_VERIFY = 0x8F
RESPONSE_ERROR_PARTITION_TABLE_UPDATE = 0x90
RESPONSE_ERROR_BOOTLOADER_VERIFY = 0x91
RESPONSE_ERROR_BOOTLOADER_UPDATE = 0x92
RESPONSE_ERROR_UNKNOWN = 0xFF
OTA_VERSION_1_0 = 1
@@ -66,7 +69,7 @@ SERVER_FEATURE_SUPPORTS_PARTITION_ACCESS = 0x02
# updates extend this set. Anything outside the set is rejected up front so callers
# of perform_ota/run_ota get a clear error instead of a post-auth 0x8E from the device.
_SUPPORTED_OTA_TYPES: frozenset[int] = frozenset(
{OTA_TYPE_UPDATE_APP, OTA_TYPE_UPDATE_PARTITION_TABLE}
{OTA_TYPE_UPDATE_APP, OTA_TYPE_UPDATE_PARTITION_TABLE, OTA_TYPE_UPDATE_BOOTLOADER}
)
UPLOAD_BLOCK_SIZE = 8192
@@ -143,6 +146,16 @@ _ERROR_MESSAGES: dict[int, str] = {
"the partition table update without rebooting the device. If the device "
"fails to boot, recover it via a serial flash."
),
RESPONSE_ERROR_BOOTLOADER_VERIFY: (
"The bootloader update could not be verified. No changes were "
"made to the bootloader. Check the logs for more information and retry."
),
RESPONSE_ERROR_BOOTLOADER_UPDATE: (
"An error occurred while updating the bootloader. The device is now "
"in a degraded state and may not be able to boot. Open the logs and retry "
"the bootloader update without rebooting the device. If the device "
"fails to boot, recover it via a serial flash."
),
RESPONSE_ERROR_UNKNOWN: "Unknown error from ESP",
}
@@ -325,15 +338,24 @@ def perform_ota(
# Any non-app OTA type requires the extended protocol and the
# partition-access server feature. Reject up front so the user gets
# a clear capability error instead of a post-auth 0x8E from the device.
flag_name = {
OTA_TYPE_UPDATE_PARTITION_TABLE: "--partition-table",
OTA_TYPE_UPDATE_BOOTLOADER: "--bootloader",
}.get(ota_type, f"OTA type 0x{ota_type:02X}")
if not extended_proto:
raise OTAError(
f"Device does not support extended OTA protocol; "
f"OTA type 0x{ota_type:02X} requires it"
f"Device does not support the extended OTA protocol that "
f"{flag_name} requires. The running firmware is too old; "
f"recompile and upload a current ESPHome firmware via a "
f"regular OTA (without {flag_name}), then retry."
)
if not (features & SERVER_FEATURE_SUPPORTS_PARTITION_ACCESS):
raise OTAError(
f"Device does not support partition access; "
f"OTA type 0x{ota_type:02X} cannot be used"
f"The running firmware was built without "
f"'allow_partition_access: true', so {flag_name} cannot be "
f"used. Add the option to the esphome OTA platform in your "
f"YAML, recompile and upload (without {flag_name}), then "
f"retry {flag_name}."
)
if features & SERVER_FEATURE_SUPPORTS_COMPRESSION:
+17
View File
@@ -853,6 +853,23 @@ class TestEsphomeCore:
target.testing_ensure_platform_registered("sensor")
assert target.platform_counts["sensor"] == 3
def test_bootloader_bin__native_idf(self, target):
"""Native ESP-IDF builds emit the bootloader under build/bootloader/bootloader.bin."""
target.data[const.KEY_NATIVE_IDF] = True
assert target.bootloader_bin == Path(
"foo/build/build/bootloader/bootloader.bin"
)
def test_bootloader_bin__platformio(self, target):
"""For PlatformIO builds bootloader.bin lives in the env-specific .pioenvs directory."""
target.name = "test-device"
target.data[const.KEY_NATIVE_IDF] = False
assert target.bootloader_bin == Path(
"foo/build/.pioenvs/test-device/bootloader.bin"
)
def test_add_library__extracts_short_name_from_path(self, target):
"""Test add_library extracts short name from library paths like owner/lib."""
target.data[const.KEY_CORE] = {
+66 -2
View File
@@ -201,6 +201,14 @@ def test_receive_exactly_socket_error(mock_socket: Mock) -> None:
espota2.RESPONSE_ERROR_PARTITION_TABLE_UPDATE,
"Error: An error occurred while updating the partition table",
),
(
espota2.RESPONSE_ERROR_BOOTLOADER_VERIFY,
"Error: The bootloader update could not be verified",
),
(
espota2.RESPONSE_ERROR_BOOTLOADER_UPDATE,
"Error: An error occurred while updating the bootloader",
),
(espota2.RESPONSE_ERROR_UNKNOWN, "Unknown error from ESP"),
],
)
@@ -992,7 +1000,8 @@ def test_perform_ota_non_app_type_requires_extended_protocol(
mock_socket.recv.side_effect = recv_responses
with pytest.raises(
espota2.OTAError, match="Device does not support extended OTA protocol"
espota2.OTAError,
match="Device does not support the extended OTA protocol",
):
espota2.perform_ota(
mock_socket,
@@ -1026,7 +1035,8 @@ def test_perform_ota_non_app_type_requires_partition_access(
mock_socket.recv.side_effect = recv_responses
with pytest.raises(
espota2.OTAError, match="Device does not support partition access"
espota2.OTAError,
match=(r"running firmware was built without 'allow_partition_access: true'"),
):
espota2.perform_ota(
mock_socket,
@@ -1037,6 +1047,60 @@ def test_perform_ota_non_app_type_requires_partition_access(
)
@pytest.mark.usefixtures("mock_time")
def test_perform_ota_partition_access_error_names_bootloader_flag(
mock_socket: Mock, mock_file: io.BytesIO
) -> None:
"""Bootloader OTA against a stale device must point at the --bootloader flag."""
recv_responses = [
bytes([espota2.RESPONSE_OK]),
bytes([espota2.OTA_VERSION_2_0]),
bytes([espota2.RESPONSE_FEATURE_FLAGS]),
bytes([0]), # No partition access
]
mock_socket.recv.side_effect = recv_responses
with pytest.raises(
espota2.OTAError,
match=r"--bootloader.*recompile and upload.*--bootloader.*retry --bootloader",
):
espota2.perform_ota(
mock_socket,
"testpass",
mock_file,
"test.bin",
espota2.OTA_TYPE_UPDATE_BOOTLOADER,
)
@pytest.mark.usefixtures("mock_time")
def test_perform_ota_partition_access_error_names_partition_table_flag(
mock_socket: Mock, mock_file: io.BytesIO
) -> None:
"""Partition-table OTA against a stale device must point at the --partition-table flag."""
recv_responses = [
bytes([espota2.RESPONSE_OK]),
bytes([espota2.OTA_VERSION_2_0]),
bytes([espota2.RESPONSE_FEATURE_FLAGS]),
bytes([0]), # No partition access
]
mock_socket.recv.side_effect = recv_responses
with pytest.raises(
espota2.OTAError,
match=r"--partition-table.*retry --partition-table",
):
espota2.perform_ota(
mock_socket,
"testpass",
mock_file,
"test.bin",
espota2.OTA_TYPE_UPDATE_PARTITION_TABLE,
)
def test_check_error_detects_errors_when_expect_is_none() -> None:
"""check_error must surface device error bytes even when expect is None.
+177 -2
View File
@@ -24,6 +24,7 @@ from esphome.__main__ import (
_get_configured_xtal_freq,
_make_crystal_freq_callback,
_resolve_network_devices,
_validate_bootloader_binary,
_validate_partition_table_binary,
choose_upload_log_host,
command_analyze_memory,
@@ -89,7 +90,11 @@ from esphome.const import (
PLATFORM_RP2040,
)
from esphome.core import CORE, EsphomeError
from esphome.espota2 import OTA_TYPE_UPDATE_APP, OTA_TYPE_UPDATE_PARTITION_TABLE
from esphome.espota2 import (
OTA_TYPE_UPDATE_APP,
OTA_TYPE_UPDATE_BOOTLOADER,
OTA_TYPE_UPDATE_PARTITION_TABLE,
)
from esphome.util import BootselResult, FlashImage
from esphome.zeroconf import _await_discovery, discover_mdns_devices
@@ -1127,6 +1132,7 @@ class MockArgs:
output: str | None = None
ota_platform: str | None = None
partition_table: bool = False
bootloader: bool = False
def test_upload_program_serial_esp32(
@@ -1816,6 +1822,27 @@ def test_validate_partition_table_binary_missing_file(tmp_path: Path) -> None:
_validate_partition_table_binary(tmp_path / "does-not-exist.bin")
def test_validate_bootloader_binary_rejects_wrong_magic(tmp_path: Path) -> None:
data = bytearray(_make_bootloader_bytes())
data[0] = 0x00
f = tmp_path / "bootloader.bin"
f.write_bytes(bytes(data))
with pytest.raises(EsphomeError, match="magic"):
_validate_bootloader_binary(f)
def test_validate_bootloader_binary_missing_file(tmp_path: Path) -> None:
with pytest.raises(EsphomeError, match="Cannot read bootloader file"):
_validate_bootloader_binary(tmp_path / "does-not-exist.bin")
def test_validate_bootloader_binary_rejects_empty_file(tmp_path: Path) -> None:
f = tmp_path / "bootloader.bin"
f.write_bytes(b"")
with pytest.raises(EsphomeError, match="is empty"):
_validate_bootloader_binary(f)
def test_upload_program_ota_partition_table_invalid_file(
mock_run_ota: Mock,
mock_get_port_type: Mock,
@@ -1869,7 +1896,155 @@ def test_upload_program_ota_partition_table_without_allow_flag(
with pytest.raises(
EsphomeError,
match="requires 'allow_partition_access: true'",
match=(
r"The option --partition-table requires 'allow_partition_access: true'.*"
r"retry --partition-table"
),
):
upload_program(config, args, devices)
mock_run_ota.assert_not_called()
def _make_bootloader_bytes() -> bytes:
"""Build a minimal bootloader image accepted by _validate_bootloader_binary."""
table = bytearray(b"\xff")
# Starts with: ESP_IMAGE_HEADER_MAGIC (0xE9)
table[0] = 0xE9
return bytes(table)
def test_upload_program_ota_bootloader_with_file_arg(
mock_run_ota: Mock,
mock_get_port_type: Mock,
tmp_path: Path,
) -> None:
"""Test upload_program with OTA and bootloader."""
setup_core(platform=PLATFORM_ESP32, tmp_path=tmp_path)
mock_get_port_type.return_value = "NETWORK"
mock_run_ota.return_value = (0, "192.168.1.100")
bootloader_file = tmp_path / "bootloader.bin"
bootloader_file.write_bytes(_make_bootloader_bytes())
config = {
CONF_OTA: [
{
CONF_PLATFORM: CONF_ESPHOME,
CONF_PORT: 3232,
"allow_partition_access": True,
}
]
}
args = MockArgs(file=str(bootloader_file), bootloader=True)
devices = ["192.168.1.100"]
exit_code, host = upload_program(config, args, devices)
assert exit_code == 0
assert host == "192.168.1.100"
mock_run_ota.assert_called_once_with(
["192.168.1.100"],
3232,
None,
bootloader_file,
OTA_TYPE_UPDATE_BOOTLOADER,
)
def test_upload_program_ota_partition_table_and_bootloader_options(
mock_run_ota: Mock,
mock_get_port_type: Mock,
tmp_path: Path,
) -> None:
"""--partition-table and --bootloader can't be used together."""
setup_core(platform=PLATFORM_ESP32, tmp_path=tmp_path)
mock_get_port_type.return_value = "NETWORK"
config = {
CONF_OTA: [
{
CONF_PLATFORM: CONF_ESPHOME,
CONF_PORT: 3232,
"allow_partition_access": True,
}
]
}
args = MockArgs(file="partitions.bin", partition_table=True, bootloader=True)
devices = ["192.168.1.100"]
with pytest.raises(
EsphomeError,
match="--partition-table and --bootloader",
):
upload_program(config, args, devices)
mock_run_ota.assert_not_called()
def test_upload_program_ota_bootloader_without_allow_flag(
mock_run_ota: Mock,
mock_get_port_type: Mock,
tmp_path: Path,
) -> None:
"""--bootloader must fail fast when allow_partition_access is not enabled in YAML."""
setup_core(platform=PLATFORM_ESP32, tmp_path=tmp_path)
mock_get_port_type.return_value = "NETWORK"
config = {
CONF_OTA: [
{
CONF_PLATFORM: CONF_ESPHOME,
CONF_PORT: 3232,
}
]
}
args = MockArgs(file="bootloader.bin", bootloader=True)
devices = ["192.168.1.100"]
with pytest.raises(
EsphomeError,
match=(
r"The option --bootloader requires 'allow_partition_access: true'.*"
r"retry --bootloader"
),
):
upload_program(config, args, devices)
mock_run_ota.assert_not_called()
def test_upload_program_ota_bootloader_platform_web_server(
mock_run_ota: Mock,
mock_get_port_type: Mock,
tmp_path: Path,
) -> None:
"""Test bootloader upload with web_server OTA."""
setup_core(platform=PLATFORM_ESP32, tmp_path=tmp_path)
mock_get_port_type.return_value = "NETWORK"
bootloader_file = tmp_path / "bootloader.bin"
bootloader_file.write_bytes(_make_bootloader_bytes())
config = {
CONF_OTA: [
{
CONF_PLATFORM: CONF_WEB_SERVER,
CONF_WEB_SERVER: {
CONF_PORT: 80,
CONF_AUTH: {CONF_USERNAME: "admin", CONF_PASSWORD: "pw"},
},
"allow_partition_access": True,
}
]
}
args = MockArgs(file=str(bootloader_file), bootloader=True)
devices = ["192.168.1.100"]
with pytest.raises(
EsphomeError,
match="the web_server OTA path can only update the firmware image",
):
upload_program(config, args, devices)
mock_run_ota.assert_not_called()