[rp2040] Use picotool for BOOTSEL upload and improve upload UX (#14483)

This commit is contained in:
J. Nick Koston
2026-03-10 09:10:33 -10:00
committed by GitHub
parent 06a127f64b
commit 2c7ef4f758
6 changed files with 859 additions and 27 deletions
+209 -2
View File
@@ -9,6 +9,8 @@ import logging
import os
from pathlib import Path
import re
import shutil
import subprocess
import sys
import time
from typing import Protocol
@@ -44,7 +46,9 @@ from esphome.const import (
CONF_SUBSTITUTIONS,
CONF_TOPIC,
ENV_NOGITIGNORE,
KEY_CORE,
KEY_NATIVE_IDF,
KEY_TARGET_PLATFORM,
PLATFORM_ESP32,
PLATFORM_ESP8266,
PLATFORM_RP2040,
@@ -56,7 +60,11 @@ from esphome.helpers import get_bool_env, indent, is_ip_address
from esphome.log import AnsiFore, color, setup_log
from esphome.types import ConfigType
from esphome.util import (
PICOTOOL_PACKAGE,
detect_rp2040_bootsel,
get_picotool_path,
get_serial_ports,
is_picotool_usb_permission_error,
list_yaml_files,
run_external_command,
run_external_process,
@@ -68,6 +76,21 @@ _LOGGER = logging.getLogger(__name__)
# Maximum buffer size for serial log reading to prevent unbounded memory growth
SERIAL_BUFFER_MAX_SIZE = 65536
_RP2040_BOOTSEL_INSTRUCTIONS = (
"To enter BOOTSEL mode:\n"
" 1. Unplug the device\n"
" 2. Hold the BOOT/BOOTSEL button\n"
" 3. Plug in the USB cable while holding the button\n"
" 4. Release the button - the device should appear as a USB drive (RPI-RP2)\n"
"Then run the upload command again."
)
_RP2040_UDEV_HINT = (
"You may need to add a udev rule for RP2040 devices. "
"See: https://github.com/raspberrypi/picotool"
"/blob/master/udev/60-picotool.rules"
)
# Special non-component keys that appear in configs
_NON_COMPONENT_KEYS = frozenset(
{
@@ -163,6 +186,7 @@ class PortType(StrEnum):
NETWORK = "NETWORK"
MQTT = "MQTT"
MQTTIP = "MQTTIP"
BOOTSEL = "BOOTSEL"
# Magic MQTT port types that require special handling
@@ -241,6 +265,19 @@ def choose_upload_log_host(
(f"{port.path} ({port.description})", port.path) for port in get_serial_ports()
]
# Add RP2040 BOOTSEL device option when uploading
bootsel_permission_error = False
if (
purpose == Purpose.UPLOADING
and CORE.data.get(KEY_CORE, {}).get(KEY_TARGET_PLATFORM) == PLATFORM_RP2040
and (picotool := _find_picotool()) is not None
):
bootsel = detect_rp2040_bootsel(picotool)
if bootsel.device_count > 0:
options.append(("RP2040 BOOTSEL (via picotool)", "BOOTSEL"))
elif bootsel.permission_error:
bootsel_permission_error = True
if purpose == Purpose.LOGGING:
if has_mqtt_logging():
mqtt_config = CORE.config[CONF_MQTT]
@@ -258,6 +295,25 @@ def choose_upload_log_host(
if has_mqtt_ip_lookup():
options.append(("Over The Air (MQTT IP lookup)", "MQTTIP"))
# Show helpful BOOTSEL instructions for RP2040 when no BOOTSEL device is found
if (
purpose == Purpose.UPLOADING
and CORE.data.get(KEY_CORE, {}).get(KEY_TARGET_PLATFORM) == PLATFORM_RP2040
and not any(get_port_type(opt[1]) == PortType.BOOTSEL for opt in options)
):
if bootsel_permission_error:
_LOGGER.warning(
"An RP2040 device in BOOTSEL mode was detected but could "
"not be accessed due to USB permissions."
)
if sys.platform.startswith("linux"):
_LOGGER.warning(_RP2040_UDEV_HINT)
if not options:
raise EsphomeError(
f"No RP2040 device found. {_RP2040_BOOTSEL_INSTRUCTIONS}"
)
_LOGGER.info("Tip: %s", _RP2040_BOOTSEL_INSTRUCTIONS)
if check_default is not None and check_default in [opt[1] for opt in options]:
return [check_default]
return [choose_prompt(options, purpose=purpose)]
@@ -404,10 +460,13 @@ def get_port_type(port: str) -> PortType:
Returns:
PortType.SERIAL for serial ports (/dev/ttyUSB0, COM1, etc.)
PortType.BOOTSEL for RP2040 BOOTSEL upload via picotool
PortType.MQTT for MQTT logging
PortType.MQTTIP for MQTT IP lookup
PortType.NETWORK for IP addresses, hostnames, or mDNS names
"""
if port == "BOOTSEL":
return PortType.BOOTSEL
if port.startswith("/") or port.startswith("COM"):
return PortType.SERIAL
if port == "MQTT":
@@ -695,15 +754,138 @@ def upload_using_esptool(
return run_esptool(115200)
def upload_using_platformio(config: ConfigType, port: str):
def upload_using_platformio(config: ConfigType, port: str) -> int:
from esphome import platformio_api
# RP2040 platform-raspberrypi build recipe expects firmware.bin.signed for
# the upload target, but 'nobuild' skips the build phase that creates it.
# Create it here so the upload doesn't fail.
if CORE.data.get(KEY_CORE, {}).get(KEY_TARGET_PLATFORM) == PLATFORM_RP2040:
idedata = platformio_api.get_idedata(config)
build_dir = Path(idedata.firmware_elf_path).parent
firmware_bin = build_dir / "firmware.bin"
signed_bin = build_dir / "firmware.bin.signed"
if firmware_bin.is_file() and not signed_bin.is_file():
shutil.copy2(firmware_bin, signed_bin)
upload_args = ["-t", "upload", "-t", "nobuild"]
if port is not None:
upload_args += ["--upload-port", port]
return platformio_api.run_platformio_cli_run(config, CORE.verbose, *upload_args)
def _find_picotool() -> Path | None:
"""Find the picotool binary from PlatformIO packages."""
from esphome import platformio_api
try:
idedata = platformio_api.get_idedata(CORE.config)
except Exception: # noqa: BLE001 # pylint: disable=broad-except
return None
return get_picotool_path(idedata.cc_path)
def upload_using_picotool(config: ConfigType) -> int:
"""Upload firmware to RP2040 in BOOTSEL mode using picotool.
Uses picotool to load the ELF firmware directly via USB, avoiding
the mass storage copy approach that causes "disk not ejected properly"
warnings on macOS.
"""
from esphome import platformio_api
idedata = platformio_api.get_idedata(config)
firmware_elf = Path(idedata.firmware_elf_path)
if not firmware_elf.is_file():
_LOGGER.error(
"Firmware ELF file not found at %s. "
"Make sure the project has been compiled first.",
firmware_elf,
)
return 1
picotool = get_picotool_path(idedata.cc_path)
if picotool is None:
_LOGGER.error(
"picotool not found. Ensure the RP2040 PlatformIO platform "
"is installed (%s).",
PICOTOOL_PACKAGE,
)
return 1
_LOGGER.info("Uploading firmware to RP2040 via picotool...")
try:
# Don't capture stdout — let picotool write directly to the terminal
# so progress bars display in real-time with \r updates.
# Capture stderr only so we can detect permission errors.
result = subprocess.run(
[str(picotool), "load", "-v", "-x", str(firmware_elf)],
stderr=subprocess.PIPE,
timeout=60,
check=False,
)
except subprocess.TimeoutExpired:
_LOGGER.error("picotool upload timed out after 60 seconds.")
return 1
except OSError as err:
_LOGGER.error("Failed to run picotool: %s", err)
return 1
if result.returncode != 0:
stderr = result.stderr.decode("utf-8", errors="replace").strip()
if stderr:
for line in stderr.splitlines():
safe_print(line)
if is_picotool_usb_permission_error(stderr):
msg = "Permission denied accessing USB device."
if sys.platform.startswith("linux"):
msg += f" {_RP2040_UDEV_HINT}"
_LOGGER.error(msg)
else:
_LOGGER.error("picotool upload failed (exit code %d).", result.returncode)
return 1
return 0
def _wait_for_serial_port(
port: str | None = None,
timeout: float = 30.0,
known_ports: set[str] | None = None,
) -> None:
"""Wait for a serial port to appear, e.g. after a device reboot.
USB-CDC devices disappear briefly after flashing while the device
reboots and re-enumerates on the USB bus.
If port is given, wait for that specific path. If known_ports is
given, wait for a new port that wasn't in the set. Otherwise wait
for any serial port to appear.
"""
def _port_found() -> bool:
ports = get_serial_ports()
if port is not None:
return any(p.path == port for p in ports)
if known_ports is not None:
return any(p.path not in known_ports for p in ports)
return bool(ports)
if _port_found():
return
if port is not None:
_LOGGER.info("Waiting for %s to come online...", port)
else:
_LOGGER.info("Waiting for device to reboot...")
start = time.monotonic()
while time.monotonic() - start < timeout:
time.sleep(0.05)
if _port_found():
time.sleep(0.05)
return
def check_permissions(port: str):
if os.name == "posix" and get_port_type(port) == PortType.SERIAL:
# Check if we can open selected serial port
@@ -733,7 +915,15 @@ def upload_program(
except AttributeError:
pass
if get_port_type(host) == PortType.SERIAL:
port_type = get_port_type(host)
if port_type == PortType.BOOTSEL:
exit_code = upload_using_picotool(config)
# Return None for device - BOOTSEL can't be used for logging,
# so command_run will show the interactive chooser for log source
return exit_code, None
if port_type == PortType.SERIAL:
check_permissions(host)
exit_code = 1
@@ -787,6 +977,7 @@ def show_logs(config: ConfigType, args: ArgsProtocol, devices: list[str]) -> int
port_type = get_port_type(port)
if port_type == PortType.SERIAL:
_wait_for_serial_port(port)
check_permissions(port)
return run_miniterm(config, port, args)
@@ -925,6 +1116,9 @@ def command_run(args: ArgsProtocol, config: ConfigType) -> int | None:
purpose=Purpose.UPLOADING,
)
# Snapshot current serial ports before upload so we can detect new ones
pre_upload_ports = {p.path for p in get_serial_ports()}
exit_code, successful_device = upload_program(config, args, devices)
if exit_code == 0:
_LOGGER.info("Successfully uploaded program.")
@@ -935,6 +1129,19 @@ def command_run(args: ArgsProtocol, config: ConfigType) -> int | None:
if args.no_logs:
return 0
# After BOOTSEL upload, wait for a new serial port to appear
# so it shows up in the log chooser
if (
successful_device is None
and CORE.data.get(KEY_CORE, {}).get(KEY_TARGET_PLATFORM) == PLATFORM_RP2040
):
_wait_for_serial_port(known_ports=pre_upload_ports)
# If exactly one new serial port appeared, use it directly
serial_ports = get_serial_ports()
new_ports = [p for p in serial_ports if p.path not in pre_upload_ports]
if len(new_ports) == 1:
successful_device = new_ports[0].path
# For logs, prefer the device we successfully uploaded to
devices = choose_upload_log_host(
default=successful_device,
+1 -25
View File
@@ -13,7 +13,7 @@ import time
from typing import Any
from esphome.core import EsphomeError
from esphome.helpers import resolve_ip_address
from esphome.helpers import ProgressBar, resolve_ip_address
RESPONSE_OK = 0x00
RESPONSE_REQUEST_AUTH = 0x01
@@ -63,30 +63,6 @@ _AUTH_METHODS: dict[int, tuple[Callable[..., Any], int, str]] = {
}
class ProgressBar:
def __init__(self):
self.last_progress = None
def update(self, progress):
bar_length = 60
status = ""
if progress >= 1:
progress = 1
status = "Done...\r\n"
new_progress = int(progress * 100)
if new_progress == self.last_progress:
return
self.last_progress = new_progress
block = int(round(bar_length * progress))
text = f"\rUploading: [{'=' * block + ' ' * (bar_length - block)}] {new_progress}% {status}"
sys.stderr.write(text)
sys.stderr.flush()
def done(self):
sys.stderr.write("\n")
sys.stderr.flush()
class OTAError(EsphomeError):
pass
+27
View File
@@ -9,6 +9,7 @@ import platform
import re
import shutil
import stat
import sys
import tempfile
from typing import TYPE_CHECKING
from urllib.parse import urlparse
@@ -585,6 +586,32 @@ def sanitize(value):
return _DISALLOWED_CHARS.sub("_", value)
class ProgressBar:
"""A simple terminal progress bar for upload operations."""
def __init__(self) -> None:
self.last_progress: int | None = None
def update(self, progress: float) -> None:
bar_length = 60
status = ""
if progress >= 1:
progress = 1
status = "Done...\r\n"
new_progress = int(progress * 100)
if new_progress == self.last_progress:
return
self.last_progress = new_progress
block = int(round(bar_length * progress))
text = f"\rUploading: [{'=' * block + ' ' * (bar_length - block)}] {new_progress}% {status}"
sys.stderr.write(text)
sys.stderr.flush()
def done(self) -> None:
sys.stderr.write("\n")
sys.stderr.flush()
def docs_url(path: str) -> str:
"""Return the URL to the documentation for a given path."""
# Local import to avoid circular import
+70
View File
@@ -1,5 +1,6 @@
import collections
from collections.abc import Callable
from dataclasses import dataclass
import io
import logging
from pathlib import Path
@@ -355,6 +356,75 @@ def get_serial_ports() -> list[SerialPort]:
return result
PICOTOOL_PACKAGE = "tool-picotool-rp2040-earlephilhower"
def get_picotool_path(cc_path: str) -> Path | None:
"""Derive the picotool binary path from the PlatformIO toolchain cc_path.
The cc_path from IDEData points to the toolchain package, e.g.:
~/.platformio/packages/toolchain-rp2040-earlephilhower/bin/arm-none-eabi-gcc
Picotool is in a sibling package:
~/.platformio/packages/tool-picotool-rp2040-earlephilhower/picotool
"""
cc = Path(cc_path)
# Go from .../packages/toolchain-.../bin/gcc up to .../packages/
packages_dir = cc.parent.parent.parent
binary_name = "picotool.exe" if sys.platform == "win32" else "picotool"
picotool = packages_dir / PICOTOOL_PACKAGE / binary_name
if picotool.is_file():
return picotool
return None
def is_picotool_usb_permission_error(output: str | bytes) -> bool:
"""Check if picotool output indicates a USB permission error."""
if isinstance(output, str):
return (
"unable to connect" in output
or "LIBUSB_ERROR_ACCESS" in output
or "Permission denied" in output
)
return (
b"unable to connect" in output
or b"LIBUSB_ERROR_ACCESS" in output
or b"Permission denied" in output
)
@dataclass
class BootselResult:
"""Result of RP2040 BOOTSEL detection."""
device_count: int
permission_error: bool = False
def detect_rp2040_bootsel(picotool_path: str | Path) -> BootselResult:
"""Detect RP2040/RP2350 devices in BOOTSEL mode using picotool.
Returns a BootselResult with the number of devices found (by counting
'type:' lines in output), and whether a permission error was detected.
"""
try:
result = subprocess.run(
[str(picotool_path), "info", "-d"],
capture_output=True,
timeout=10,
check=False,
)
device_count = result.stdout.count(b"type:")
if device_count > 0:
return BootselResult(device_count)
# Check for permission issues — picotool can see the device
# on the USB bus but can't connect without proper permissions
if is_picotool_usb_permission_error(result.stderr + result.stdout):
return BootselResult(0, permission_error=True)
return BootselResult(0)
except (OSError, subprocess.TimeoutExpired):
return BootselResult(0)
def get_esp32_arduino_flash_error_help() -> str | None:
"""Returns helpful message when ESP32 with Arduino runs out of flash space."""
from esphome.core import CORE
+420
View File
@@ -8,6 +8,7 @@ import json
import logging
from pathlib import Path
import re
import sys
import time
from typing import Any
from unittest.mock import MagicMock, Mock, patch
@@ -40,6 +41,8 @@ from esphome.__main__ import (
show_logs,
upload_program,
upload_using_esptool,
upload_using_picotool,
upload_using_platformio,
)
from esphome.components.esp32 import KEY_ESP32, KEY_VARIANT, VARIANT_ESP32
from esphome.const import (
@@ -70,6 +73,7 @@ from esphome.const import (
PLATFORM_RP2040,
)
from esphome.core import CORE, EsphomeError
from esphome.util import BootselResult
def strip_ansi_codes(text: str) -> str:
@@ -174,6 +178,13 @@ def mock_upload_using_platformio() -> Generator[Mock]:
yield mock
@pytest.fixture
def mock_upload_using_picotool() -> Generator[Mock]:
"""Mock upload_using_picotool for testing."""
with patch("esphome.__main__.upload_using_picotool") as mock:
yield mock
@pytest.fixture
def mock_run_ota() -> Generator[Mock]:
"""Mock espota2.run_ota for testing."""
@@ -851,6 +862,221 @@ def test_choose_upload_log_host_no_address_with_ota_config() -> None:
)
@pytest.mark.usefixtures("mock_no_serial_ports")
def test_choose_upload_log_host_no_defaults_with_rp2040_bootsel(
mock_choose_prompt: Mock,
) -> None:
"""Test interactive mode shows RP2040 BOOTSEL option via picotool."""
setup_core(platform=PLATFORM_RP2040)
with (
patch(
"esphome.__main__._find_picotool", return_value=Path("/usr/bin/picotool")
),
patch("esphome.__main__.detect_rp2040_bootsel", return_value=BootselResult(1)),
):
result = choose_upload_log_host(
default=None,
check_default=None,
purpose=Purpose.UPLOADING,
)
assert result == ["/dev/ttyUSB0"] # mock_choose_prompt default
mock_choose_prompt.assert_called_once_with(
[("RP2040 BOOTSEL (via picotool)", "BOOTSEL")],
purpose=Purpose.UPLOADING,
)
@pytest.mark.usefixtures("mock_no_serial_ports")
def test_choose_upload_log_host_rp2040_no_device_shows_bootsel_help() -> None:
"""Test BOOTSEL instructions shown when no RP2040 device found."""
setup_core(platform=PLATFORM_RP2040)
with (
patch(
"esphome.__main__._find_picotool", return_value=Path("/usr/bin/picotool")
),
patch("esphome.__main__.detect_rp2040_bootsel", return_value=BootselResult(0)),
pytest.raises(EsphomeError, match="BOOTSEL"),
):
choose_upload_log_host(
default=None,
check_default=None,
purpose=Purpose.UPLOADING,
)
@pytest.mark.usefixtures("mock_no_serial_ports")
def test_choose_upload_log_host_rp2040_bootsel_tip_with_ota(
caplog: pytest.LogCaptureFixture,
) -> None:
"""Test BOOTSEL tip shown when only OTA options exist for RP2040."""
setup_core(
platform=PLATFORM_RP2040,
config={CONF_OTA: [{CONF_PLATFORM: CONF_ESPHOME}]},
address="192.168.1.100",
)
with (
patch(
"esphome.__main__._find_picotool", return_value=Path("/usr/bin/picotool")
),
patch("esphome.__main__.detect_rp2040_bootsel", return_value=BootselResult(0)),
patch(
"esphome.__main__.choose_prompt",
return_value="192.168.1.100",
),
caplog.at_level(logging.INFO, logger="esphome.__main__"),
):
choose_upload_log_host(
default=None,
check_default=None,
purpose=Purpose.UPLOADING,
)
assert "BOOTSEL" in caplog.text
def test_choose_upload_log_host_rp2040_bootsel_tip_with_serial_ports(
caplog: pytest.LogCaptureFixture,
mock_choose_prompt: Mock,
) -> None:
"""Test BOOTSEL tip shown when serial ports exist but no BOOTSEL device."""
setup_core(platform=PLATFORM_RP2040)
mock_ports = [MockSerialPort("/dev/ttyACM0", "RP2040 Serial")]
with (
patch("esphome.__main__.get_serial_ports", return_value=mock_ports),
patch(
"esphome.__main__._find_picotool",
return_value=Path("/usr/bin/picotool"),
),
patch("esphome.__main__.detect_rp2040_bootsel", return_value=BootselResult(0)),
caplog.at_level(logging.INFO, logger="esphome.__main__"),
):
choose_upload_log_host(
default=None,
check_default=None,
purpose=Purpose.UPLOADING,
)
assert "BOOTSEL" in caplog.text
@pytest.mark.usefixtures("mock_no_serial_ports")
def test_choose_upload_log_host_rp2040_permission_error_no_options(
caplog: pytest.LogCaptureFixture,
) -> None:
"""Test permission warning shown when BOOTSEL device found but not accessible."""
setup_core(platform=PLATFORM_RP2040)
with (
patch(
"esphome.__main__._find_picotool", return_value=Path("/usr/bin/picotool")
),
patch(
"esphome.__main__.detect_rp2040_bootsel",
return_value=BootselResult(0, permission_error=True),
),
patch("esphome.__main__.sys.platform", "linux"),
pytest.raises(EsphomeError, match="BOOTSEL"),
caplog.at_level(logging.WARNING, logger="esphome.__main__"),
):
choose_upload_log_host(
default=None,
check_default=None,
purpose=Purpose.UPLOADING,
)
assert "USB permissions" in caplog.text
assert "udev" in caplog.text
@pytest.mark.usefixtures("mock_no_serial_ports")
def test_choose_upload_log_host_rp2040_permission_error_with_ota(
caplog: pytest.LogCaptureFixture,
) -> None:
"""Test permission warning shown with OTA fallback available."""
setup_core(
platform=PLATFORM_RP2040,
config={CONF_OTA: [{CONF_PLATFORM: CONF_ESPHOME}]},
address="192.168.1.100",
)
with (
patch(
"esphome.__main__._find_picotool", return_value=Path("/usr/bin/picotool")
),
patch(
"esphome.__main__.detect_rp2040_bootsel",
return_value=BootselResult(0, permission_error=True),
),
patch(
"esphome.__main__.choose_prompt",
return_value="192.168.1.100",
),
caplog.at_level(logging.WARNING, logger="esphome.__main__"),
):
choose_upload_log_host(
default=None,
check_default=None,
purpose=Purpose.UPLOADING,
)
assert "USB permissions" in caplog.text
def test_choose_upload_log_host_no_bootsel_for_non_rp2040(
mock_no_serial_ports: Mock,
) -> None:
"""Test that BOOTSEL detection is not run for non-RP2040 platforms."""
setup_core(
platform=PLATFORM_ESP32,
config={CONF_OTA: [{CONF_PLATFORM: CONF_ESPHOME}]},
address="192.168.1.100",
)
with (
patch("esphome.__main__._find_picotool") as mock_find_picotool,
patch(
"esphome.__main__.choose_prompt",
return_value="192.168.1.100",
),
):
choose_upload_log_host(
default=None,
check_default=None,
purpose=Purpose.UPLOADING,
)
mock_find_picotool.assert_not_called()
def test_choose_upload_log_host_rp2040_serial_and_bootsel(
mock_choose_prompt: Mock,
) -> None:
"""Test both serial ports and BOOTSEL option shown for RP2040."""
setup_core(platform=PLATFORM_RP2040)
mock_ports = [MockSerialPort("/dev/ttyACM0", "RP2040 Serial")]
with (
patch("esphome.__main__.get_serial_ports", return_value=mock_ports),
patch(
"esphome.__main__._find_picotool", return_value=Path("/usr/bin/picotool")
),
patch("esphome.__main__.detect_rp2040_bootsel", return_value=BootselResult(1)),
):
choose_upload_log_host(
default=None,
check_default=None,
purpose=Purpose.UPLOADING,
)
mock_choose_prompt.assert_called_once_with(
[
("/dev/ttyACM0 (RP2040 Serial)", "/dev/ttyACM0"),
("RP2040 BOOTSEL (via picotool)", "BOOTSEL"),
],
purpose=Purpose.UPLOADING,
)
@dataclass
class MockArgs:
"""Mock args for testing."""
@@ -1060,6 +1286,46 @@ def test_upload_program_serial_platformio_platforms(
mock_upload_using_platformio.assert_called_once_with(config, device)
def test_upload_using_platformio_creates_signed_bin_for_rp2040(
tmp_path: Path,
) -> None:
"""Test that upload_using_platformio creates firmware.bin.signed for RP2040."""
setup_core(platform=PLATFORM_RP2040)
build_dir = tmp_path / "build"
build_dir.mkdir()
firmware_bin = build_dir / "firmware.bin"
firmware_bin.write_bytes(b"test firmware content")
firmware_elf = build_dir / "firmware.elf"
firmware_elf.write_bytes(b"elf")
mock_idedata = MagicMock()
mock_idedata.firmware_elf_path = str(firmware_elf)
with (
patch("esphome.platformio_api.get_idedata", return_value=mock_idedata),
patch("esphome.platformio_api.run_platformio_cli_run", return_value=0),
):
result = upload_using_platformio({}, "/dev/ttyACM0")
assert result == 0
signed_bin = build_dir / "firmware.bin.signed"
assert signed_bin.is_file()
assert signed_bin.read_bytes() == b"test firmware content"
def test_upload_using_platformio_skips_signed_bin_for_non_rp2040(
tmp_path: Path,
) -> None:
"""Test that upload_using_platformio doesn't create signed bin for non-RP2040."""
setup_core(platform=PLATFORM_ESP32)
with patch("esphome.platformio_api.run_platformio_cli_run", return_value=0):
result = upload_using_platformio({}, "/dev/ttyUSB0")
assert result == 0
def test_upload_program_serial_upload_failed(
mock_upload_using_esptool: Mock,
mock_get_port_type: Mock,
@@ -1082,6 +1348,158 @@ def test_upload_program_serial_upload_failed(
mock_upload_using_esptool.assert_called_once()
def test_upload_program_bootsel(
mock_upload_using_picotool: Mock,
mock_get_port_type: Mock,
) -> None:
"""Test upload_program with BOOTSEL for RP2040."""
setup_core(platform=PLATFORM_RP2040)
mock_get_port_type.return_value = "BOOTSEL"
mock_upload_using_picotool.return_value = 0
config = {}
args = MockArgs()
devices = ["BOOTSEL"]
exit_code, host = upload_program(config, args, devices)
assert exit_code == 0
# BOOTSEL device can't be used for logging, so host should be None
assert host is None
mock_upload_using_picotool.assert_called_once_with(config)
def test_upload_program_bootsel_failed(
mock_upload_using_picotool: Mock,
mock_get_port_type: Mock,
) -> None:
"""Test upload_program when BOOTSEL upload fails."""
setup_core(platform=PLATFORM_RP2040)
mock_get_port_type.return_value = "BOOTSEL"
mock_upload_using_picotool.return_value = 1
config = {}
args = MockArgs()
devices = ["BOOTSEL"]
exit_code, host = upload_program(config, args, devices)
assert exit_code == 1
assert host is None
mock_upload_using_picotool.assert_called_once_with(config)
def test_upload_using_picotool_success(tmp_path: Path) -> None:
"""Test upload_using_picotool succeeds."""
setup_core(platform=PLATFORM_RP2040, tmp_path=tmp_path)
build_dir = tmp_path / "build"
build_dir.mkdir()
firmware_elf = build_dir / "firmware.elf"
firmware_elf.write_bytes(b"\x00" * 1024)
# Create picotool binary
packages_dir = tmp_path / "packages"
toolchain_bin = packages_dir / "toolchain-rp2040-earlephilhower" / "bin"
toolchain_bin.mkdir(parents=True)
picotool_dir = packages_dir / "tool-picotool-rp2040-earlephilhower"
picotool_dir.mkdir(parents=True)
binary_name = "picotool.exe" if sys.platform == "win32" else "picotool"
picotool = picotool_dir / binary_name
picotool.touch()
mock_idedata = MagicMock()
mock_idedata.firmware_elf_path = str(firmware_elf)
mock_idedata.cc_path = str(toolchain_bin / "arm-none-eabi-gcc")
mock_result = MagicMock()
mock_result.returncode = 0
mock_result.stderr = b""
config = {}
with (
patch("esphome.platformio_api.get_idedata", return_value=mock_idedata),
patch("subprocess.run", return_value=mock_result),
):
exit_code = upload_using_picotool(config)
assert exit_code == 0
def test_upload_using_picotool_no_elf(tmp_path: Path) -> None:
"""Test upload_using_picotool when ELF file is missing."""
setup_core(platform=PLATFORM_RP2040, tmp_path=tmp_path)
build_dir = tmp_path / "build"
build_dir.mkdir()
mock_idedata = MagicMock()
mock_idedata.firmware_elf_path = str(build_dir / "firmware.elf")
mock_idedata.cc_path = "/fake/path/gcc"
config = {}
with patch("esphome.platformio_api.get_idedata", return_value=mock_idedata):
exit_code = upload_using_picotool(config)
assert exit_code == 1
def test_upload_using_picotool_not_found(tmp_path: Path) -> None:
"""Test upload_using_picotool when picotool binary not found."""
setup_core(platform=PLATFORM_RP2040, tmp_path=tmp_path)
build_dir = tmp_path / "build"
build_dir.mkdir()
firmware_elf = build_dir / "firmware.elf"
firmware_elf.write_bytes(b"\x00" * 512)
mock_idedata = MagicMock()
mock_idedata.firmware_elf_path = str(firmware_elf)
mock_idedata.cc_path = "/fake/path/gcc"
config = {}
with patch("esphome.platformio_api.get_idedata", return_value=mock_idedata):
exit_code = upload_using_picotool(config)
assert exit_code == 1
def test_upload_using_picotool_permission_error(tmp_path: Path) -> None:
"""Test upload_using_picotool shows helpful message on permission error."""
setup_core(platform=PLATFORM_RP2040, tmp_path=tmp_path)
build_dir = tmp_path / "build"
build_dir.mkdir()
firmware_elf = build_dir / "firmware.elf"
firmware_elf.write_bytes(b"\x00" * 512)
packages_dir = tmp_path / "packages"
toolchain_bin = packages_dir / "toolchain-rp2040-earlephilhower" / "bin"
toolchain_bin.mkdir(parents=True)
picotool_dir = packages_dir / "tool-picotool-rp2040-earlephilhower"
picotool_dir.mkdir(parents=True)
binary_name = "picotool.exe" if sys.platform == "win32" else "picotool"
picotool = picotool_dir / binary_name
picotool.touch()
mock_idedata = MagicMock()
mock_idedata.firmware_elf_path = str(firmware_elf)
mock_idedata.cc_path = str(toolchain_bin / "arm-none-eabi-gcc")
mock_result = MagicMock()
mock_result.returncode = 1
mock_result.stderr = b"LIBUSB_ERROR_ACCESS"
config = {}
with (
patch("esphome.platformio_api.get_idedata", return_value=mock_idedata),
patch("subprocess.run", return_value=mock_result),
):
exit_code = upload_using_picotool(config)
assert exit_code == 1
def test_upload_program_ota_success(
mock_run_ota: Mock,
mock_get_port_type: Mock,
@@ -1606,6 +2024,8 @@ def test_get_port_type() -> None:
assert get_port_type("esphome-device.local") == "NETWORK"
assert get_port_type("10.0.0.1") == "NETWORK"
assert get_port_type("BOOTSEL") == "BOOTSEL"
def test_has_mqtt_ip_lookup() -> None:
"""Test has_mqtt_ip_lookup function."""
+132
View File
@@ -3,6 +3,9 @@
from __future__ import annotations
from pathlib import Path
import subprocess
import sys
from unittest.mock import MagicMock, patch
import pytest
@@ -402,3 +405,132 @@ def test_shlex_quote_edge_cases() -> None:
assert util.shlex_quote("\t") == "'\t'"
assert util.shlex_quote("\n") == "'\n'"
assert util.shlex_quote(" ") == "' '"
def test_get_picotool_path_found(tmp_path: Path) -> None:
"""Test picotool path derivation from cc_path."""
# Create the expected directory structure
packages_dir = tmp_path / "packages"
toolchain_dir = packages_dir / "toolchain-rp2040-earlephilhower" / "bin"
toolchain_dir.mkdir(parents=True)
gcc = toolchain_dir / "arm-none-eabi-gcc"
gcc.touch()
binary_name = "picotool.exe" if sys.platform == "win32" else "picotool"
picotool_dir = packages_dir / "tool-picotool-rp2040-earlephilhower"
picotool_dir.mkdir(parents=True)
picotool = picotool_dir / binary_name
picotool.touch()
result = util.get_picotool_path(str(gcc))
assert result == picotool
def test_get_picotool_path_not_found(tmp_path: Path) -> None:
"""Test picotool path returns None when not installed."""
packages_dir = tmp_path / "packages"
toolchain_dir = packages_dir / "toolchain-rp2040-earlephilhower" / "bin"
toolchain_dir.mkdir(parents=True)
gcc = toolchain_dir / "arm-none-eabi-gcc"
gcc.touch()
result = util.get_picotool_path(str(gcc))
assert result is None
def test_get_picotool_path_windows(tmp_path: Path) -> None:
"""Test picotool path uses .exe on Windows."""
packages_dir = tmp_path / "packages"
toolchain_dir = packages_dir / "toolchain-rp2040-earlephilhower" / "bin"
toolchain_dir.mkdir(parents=True)
gcc = toolchain_dir / "arm-none-eabi-gcc.exe"
gcc.touch()
picotool_dir = packages_dir / "tool-picotool-rp2040-earlephilhower"
picotool_dir.mkdir(parents=True)
picotool = picotool_dir / "picotool.exe"
picotool.touch()
with patch("esphome.util.sys.platform", "win32"):
result = util.get_picotool_path(str(gcc))
assert result == picotool
def test_detect_rp2040_bootsel_found() -> None:
"""Test BOOTSEL device detection when device is present."""
mock_result = MagicMock()
mock_result.stdout = b"Device Information\n type: RP2040\n"
with patch("esphome.util.subprocess.run", return_value=mock_result):
result = util.detect_rp2040_bootsel("/usr/bin/picotool")
assert result.device_count == 1
assert result.permission_error is False
def test_detect_rp2040_bootsel_multiple() -> None:
"""Test BOOTSEL detection with multiple devices."""
mock_result = MagicMock()
mock_result.stdout = b"type: RP2040\ntype: RP2350\n"
with patch("esphome.util.subprocess.run", return_value=mock_result):
result = util.detect_rp2040_bootsel("/usr/bin/picotool")
assert result.device_count == 2
assert result.permission_error is False
def test_detect_rp2040_bootsel_none() -> None:
"""Test BOOTSEL detection when no device found."""
mock_result = MagicMock()
mock_result.stdout = (
b"No accessible RP2040/RP2350 devices in BOOTSEL mode were found.\n"
)
mock_result.stderr = b""
with patch("esphome.util.subprocess.run", return_value=mock_result):
result = util.detect_rp2040_bootsel("/usr/bin/picotool")
assert result.device_count == 0
assert result.permission_error is False
def test_detect_rp2040_bootsel_permission_error() -> None:
"""Test BOOTSEL detection with device found but not accessible."""
mock_result = MagicMock()
mock_result.stdout = (
b"No accessible RP-series devices in BOOTSEL mode were found.\n"
)
mock_result.stderr = (
b"RP2040 device at bus 5, address 24 appears to be in BOOTSEL mode, "
b"but picotool was unable to connect. "
b"Maybe try 'sudo' or check your permissions.\n"
)
with patch("esphome.util.subprocess.run", return_value=mock_result):
result = util.detect_rp2040_bootsel("/usr/bin/picotool")
assert result.device_count == 0
assert result.permission_error is True
def test_detect_rp2040_bootsel_libusb_access_error() -> None:
"""Test BOOTSEL detection with LIBUSB_ERROR_ACCESS."""
mock_result = MagicMock()
mock_result.stdout = b""
mock_result.stderr = b"LIBUSB_ERROR_ACCESS\n"
with patch("esphome.util.subprocess.run", return_value=mock_result):
result = util.detect_rp2040_bootsel("/usr/bin/picotool")
assert result.device_count == 0
assert result.permission_error is True
def test_detect_rp2040_bootsel_oserror() -> None:
"""Test BOOTSEL detection handles OSError."""
with patch("esphome.util.subprocess.run", side_effect=OSError("not found")):
result = util.detect_rp2040_bootsel("/usr/bin/picotool")
assert result.device_count == 0
assert result.permission_error is False
def test_detect_rp2040_bootsel_timeout() -> None:
"""Test BOOTSEL detection handles timeout."""
with patch(
"esphome.util.subprocess.run",
side_effect=subprocess.TimeoutExpired("picotool", 10),
):
result = util.detect_rp2040_bootsel("/usr/bin/picotool")
assert result.device_count == 0
assert result.permission_error is False