mirror of
https://github.com/esphome/esphome.git
synced 2026-03-28 05:39:20 +08:00
Some checks failed
CI / Create common environment (push) Has been cancelled
CI / Check pylint (push) Has been cancelled
CI / Run script/ci-custom (push) Has been cancelled
CI / Run pytest (macOS-latest, 3.11) (push) Has been cancelled
CI / Run pytest (macOS-latest, 3.14) (push) Has been cancelled
CI / Run pytest (ubuntu-latest, 3.11) (push) Has been cancelled
CI / Run pytest (ubuntu-latest, 3.13) (push) Has been cancelled
CI / Run pytest (ubuntu-latest, 3.14) (push) Has been cancelled
CI / Run pytest (windows-latest, 3.11) (push) Has been cancelled
CI / Run pytest (windows-latest, 3.14) (push) Has been cancelled
CI / Determine which jobs to run (push) Has been cancelled
CI / Run integration tests (push) Has been cancelled
CI / Run C++ unit tests (push) Has been cancelled
CI / Run script/clang-tidy for ESP32 IDF (push) Has been cancelled
CI / Run script/clang-tidy for ESP8266 (push) Has been cancelled
CI / Run script/clang-tidy for ZEPHYR (push) Has been cancelled
CI / Run script/clang-tidy for ESP32 Arduino (push) Has been cancelled
CI / Run script/clang-tidy for ESP32 Arduino 1/4 (push) Has been cancelled
CI / Run script/clang-tidy for ESP32 Arduino 2/4 (push) Has been cancelled
CI / Run script/clang-tidy for ESP32 Arduino 3/4 (push) Has been cancelled
CI / Run script/clang-tidy for ESP32 Arduino 4/4 (push) Has been cancelled
CI / Test components batch (${{ matrix.components }}) (push) Has been cancelled
CI / pre-commit.ci lite (push) Has been cancelled
CI / Build target branch for memory impact (push) Has been cancelled
CI / Build PR branch for memory impact (push) Has been cancelled
CI / Comment memory impact (push) Has been cancelled
CI / CI Status (push) Has been cancelled
Stale / stale (push) Has been cancelled
Lock closed issues and PRs / lock (push) Has been cancelled
Publish Release / Initialize build (push) Has been cancelled
Publish Release / Build and publish to PyPi (push) Has been cancelled
Publish Release / Build ESPHome amd64 (push) Has been cancelled
Publish Release / Build ESPHome arm64 (push) Has been cancelled
Publish Release / Publish ESPHome docker to dockerhub (push) Has been cancelled
Publish Release / Publish ESPHome docker to ghcr (push) Has been cancelled
Publish Release / Publish ESPHome ha-addon to dockerhub (push) Has been cancelled
Publish Release / Publish ESPHome ha-addon to ghcr (push) Has been cancelled
Publish Release / deploy-ha-addon-repo (push) Has been cancelled
Publish Release / deploy-esphome-schema (push) Has been cancelled
Publish Release / version-notifier (push) Has been cancelled
Synchronise Device Classes from Home Assistant / Sync Device Classes (push) Has been cancelled
Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
712 lines
23 KiB
Python
712 lines
23 KiB
Python
"""Tests for esphome.util module."""
|
|
|
|
from __future__ import annotations
|
|
|
|
from collections.abc import Callable
|
|
import io
|
|
from pathlib import Path
|
|
import subprocess
|
|
import sys
|
|
from typing import Any
|
|
from unittest.mock import MagicMock, patch
|
|
|
|
import pytest
|
|
|
|
from esphome import util
|
|
|
|
|
|
def test_list_yaml_files_with_files_and_directories(tmp_path: Path) -> None:
|
|
"""Test that list_yaml_files handles both files and directories."""
|
|
# Create directory structure
|
|
dir1 = tmp_path / "configs"
|
|
dir1.mkdir()
|
|
dir2 = tmp_path / "more_configs"
|
|
dir2.mkdir()
|
|
|
|
# Create YAML files in directories
|
|
(dir1 / "config1.yaml").write_text("test: 1")
|
|
(dir1 / "config2.yml").write_text("test: 2")
|
|
(dir1 / "not_yaml.txt").write_text("not yaml")
|
|
|
|
(dir2 / "config3.yaml").write_text("test: 3")
|
|
|
|
# Create standalone YAML files
|
|
standalone1 = tmp_path / "standalone.yaml"
|
|
standalone1.write_text("test: 4")
|
|
standalone2 = tmp_path / "another.yml"
|
|
standalone2.write_text("test: 5")
|
|
|
|
# Test with mixed input (directories and files)
|
|
configs = [
|
|
dir1,
|
|
standalone1,
|
|
dir2,
|
|
standalone2,
|
|
]
|
|
|
|
result = util.list_yaml_files(configs)
|
|
|
|
# Should include all YAML files but not the .txt file
|
|
assert set(result) == {
|
|
dir1 / "config1.yaml",
|
|
dir1 / "config2.yml",
|
|
dir2 / "config3.yaml",
|
|
standalone1,
|
|
standalone2,
|
|
}
|
|
# Check that results are sorted
|
|
assert result == sorted(result)
|
|
|
|
|
|
def test_list_yaml_files_only_directories(tmp_path: Path) -> None:
|
|
"""Test list_yaml_files with only directories."""
|
|
dir1 = tmp_path / "dir1"
|
|
dir1.mkdir()
|
|
dir2 = tmp_path / "dir2"
|
|
dir2.mkdir()
|
|
|
|
(dir1 / "a.yaml").write_text("test: a")
|
|
(dir1 / "b.yml").write_text("test: b")
|
|
(dir2 / "c.yaml").write_text("test: c")
|
|
|
|
result = util.list_yaml_files([dir1, dir2])
|
|
|
|
assert set(result) == {
|
|
dir1 / "a.yaml",
|
|
dir1 / "b.yml",
|
|
dir2 / "c.yaml",
|
|
}
|
|
assert result == sorted(result)
|
|
|
|
|
|
def test_list_yaml_files_only_files(tmp_path: Path) -> None:
|
|
"""Test list_yaml_files with only files."""
|
|
file1 = tmp_path / "file1.yaml"
|
|
file2 = tmp_path / "file2.yml"
|
|
file3 = tmp_path / "file3.yaml"
|
|
non_yaml = tmp_path / "not_yaml.json"
|
|
|
|
file1.write_text("test: 1")
|
|
file2.write_text("test: 2")
|
|
file3.write_text("test: 3")
|
|
non_yaml.write_text("{}")
|
|
|
|
# Include a non-YAML file to test filtering
|
|
result = util.list_yaml_files(
|
|
[
|
|
file1,
|
|
file2,
|
|
file3,
|
|
non_yaml,
|
|
]
|
|
)
|
|
|
|
assert set(result) == {
|
|
file1,
|
|
file2,
|
|
file3,
|
|
}
|
|
assert result == sorted(result)
|
|
|
|
|
|
def test_list_yaml_files_empty_directory(tmp_path: Path) -> None:
|
|
"""Test list_yaml_files with an empty directory."""
|
|
empty_dir = tmp_path / "empty"
|
|
empty_dir.mkdir()
|
|
|
|
result = util.list_yaml_files([empty_dir])
|
|
|
|
assert result == []
|
|
|
|
|
|
def test_list_yaml_files_nonexistent_path(tmp_path: Path) -> None:
|
|
"""Test list_yaml_files with a nonexistent path raises an error."""
|
|
nonexistent = tmp_path / "nonexistent"
|
|
existing = tmp_path / "existing.yaml"
|
|
existing.write_text("test: 1")
|
|
|
|
# Should raise an error for non-existent directory
|
|
with pytest.raises(FileNotFoundError):
|
|
util.list_yaml_files([nonexistent, existing])
|
|
|
|
|
|
def test_list_yaml_files_mixed_extensions(tmp_path: Path) -> None:
|
|
"""Test that both .yaml and .yml extensions are recognized."""
|
|
dir1 = tmp_path / "configs"
|
|
dir1.mkdir()
|
|
|
|
yaml_file = dir1 / "config.yaml"
|
|
yml_file = dir1 / "config.yml"
|
|
other_file = dir1 / "config.txt"
|
|
|
|
yaml_file.write_text("test: yaml")
|
|
yml_file.write_text("test: yml")
|
|
other_file.write_text("test: txt")
|
|
|
|
result = util.list_yaml_files([dir1])
|
|
|
|
assert set(result) == {
|
|
yaml_file,
|
|
yml_file,
|
|
}
|
|
|
|
|
|
def test_list_yaml_files_does_not_recurse_into_subdirectories(tmp_path: Path) -> None:
|
|
"""Test that list_yaml_files only finds files in specified directory, not subdirectories."""
|
|
# Create directory structure with YAML files at different depths
|
|
root = tmp_path / "configs"
|
|
root.mkdir()
|
|
|
|
# Create YAML files in the root directory
|
|
(root / "config1.yaml").write_text("test: 1")
|
|
(root / "config2.yml").write_text("test: 2")
|
|
(root / "device.yaml").write_text("test: device")
|
|
|
|
# Create subdirectory with YAML files (should NOT be found)
|
|
subdir = root / "subdir"
|
|
subdir.mkdir()
|
|
(subdir / "nested1.yaml").write_text("test: nested1")
|
|
(subdir / "nested2.yml").write_text("test: nested2")
|
|
|
|
# Create deeper subdirectory (should NOT be found)
|
|
deep_subdir = subdir / "deeper"
|
|
deep_subdir.mkdir()
|
|
(deep_subdir / "very_nested.yaml").write_text("test: very_nested")
|
|
|
|
# Test listing files from the root directory
|
|
result = util.list_yaml_files([str(root)])
|
|
|
|
# Should only find the 3 files in root, not the 3 in subdirectories
|
|
assert len(result) == 3
|
|
|
|
# Check that only root-level files are found
|
|
assert root / "config1.yaml" in result
|
|
assert root / "config2.yml" in result
|
|
assert root / "device.yaml" in result
|
|
|
|
# Ensure nested files are NOT found
|
|
for r in result:
|
|
r_str = str(r)
|
|
assert "subdir" not in r_str
|
|
assert "deeper" not in r_str
|
|
assert "nested1.yaml" not in r_str
|
|
assert "nested2.yml" not in r_str
|
|
assert "very_nested.yaml" not in r_str
|
|
|
|
|
|
def test_list_yaml_files_excludes_secrets(tmp_path: Path) -> None:
|
|
"""Test that secrets.yaml and secrets.yml are excluded."""
|
|
root = tmp_path / "configs"
|
|
root.mkdir()
|
|
|
|
# Create various YAML files including secrets
|
|
(root / "config.yaml").write_text("test: config")
|
|
(root / "secrets.yaml").write_text("wifi_password: secret123")
|
|
(root / "secrets.yml").write_text("api_key: secret456")
|
|
(root / "device.yaml").write_text("test: device")
|
|
|
|
result = util.list_yaml_files([str(root)])
|
|
|
|
# Should find 2 files (config.yaml and device.yaml), not secrets
|
|
assert len(result) == 2
|
|
assert root / "config.yaml" in result
|
|
assert root / "device.yaml" in result
|
|
assert root / "secrets.yaml" not in result
|
|
assert root / "secrets.yml" not in result
|
|
|
|
|
|
def test_list_yaml_files_excludes_hidden_files(tmp_path: Path) -> None:
|
|
"""Test that hidden files (starting with .) are excluded."""
|
|
root = tmp_path / "configs"
|
|
root.mkdir()
|
|
|
|
# Create regular and hidden YAML files
|
|
(root / "config.yaml").write_text("test: config")
|
|
(root / ".hidden.yaml").write_text("test: hidden")
|
|
(root / ".backup.yml").write_text("test: backup")
|
|
(root / "device.yaml").write_text("test: device")
|
|
|
|
result = util.list_yaml_files([str(root)])
|
|
|
|
# Should find only non-hidden files
|
|
assert len(result) == 2
|
|
assert root / "config.yaml" in result
|
|
assert root / "device.yaml" in result
|
|
assert root / ".hidden.yaml" not in result
|
|
assert root / ".backup.yml" not in result
|
|
|
|
|
|
def test_filter_yaml_files_basic() -> None:
|
|
"""Test filter_yaml_files function."""
|
|
files = [
|
|
Path("/path/to/config.yaml"),
|
|
Path("/path/to/device.yml"),
|
|
Path("/path/to/readme.txt"),
|
|
Path("/path/to/script.py"),
|
|
Path("/path/to/data.json"),
|
|
Path("/path/to/another.yaml"),
|
|
]
|
|
|
|
result = util.filter_yaml_files(files)
|
|
|
|
assert len(result) == 3
|
|
assert Path("/path/to/config.yaml") in result
|
|
assert Path("/path/to/device.yml") in result
|
|
assert Path("/path/to/another.yaml") in result
|
|
assert Path("/path/to/readme.txt") not in result
|
|
assert Path("/path/to/script.py") not in result
|
|
assert Path("/path/to/data.json") not in result
|
|
|
|
|
|
def test_filter_yaml_files_excludes_secrets() -> None:
|
|
"""Test that filter_yaml_files excludes secrets files."""
|
|
files = [
|
|
Path("/path/to/config.yaml"),
|
|
Path("/path/to/secrets.yaml"),
|
|
Path("/path/to/secrets.yml"),
|
|
Path("/path/to/device.yaml"),
|
|
Path("/some/dir/secrets.yaml"),
|
|
]
|
|
|
|
result = util.filter_yaml_files(files)
|
|
|
|
assert len(result) == 2
|
|
assert Path("/path/to/config.yaml") in result
|
|
assert Path("/path/to/device.yaml") in result
|
|
assert Path("/path/to/secrets.yaml") not in result
|
|
assert Path("/path/to/secrets.yml") not in result
|
|
assert Path("/some/dir/secrets.yaml") not in result
|
|
|
|
|
|
def test_filter_yaml_files_excludes_hidden() -> None:
|
|
"""Test that filter_yaml_files excludes hidden files."""
|
|
files = [
|
|
Path("/path/to/config.yaml"),
|
|
Path("/path/to/.hidden.yaml"),
|
|
Path("/path/to/.backup.yml"),
|
|
Path("/path/to/device.yaml"),
|
|
Path("/some/dir/.config.yaml"),
|
|
]
|
|
|
|
result = util.filter_yaml_files(files)
|
|
|
|
assert len(result) == 2
|
|
assert Path("/path/to/config.yaml") in result
|
|
assert Path("/path/to/device.yaml") in result
|
|
assert Path("/path/to/.hidden.yaml") not in result
|
|
assert Path("/path/to/.backup.yml") not in result
|
|
assert Path("/some/dir/.config.yaml") not in result
|
|
|
|
|
|
def test_filter_yaml_files_case_sensitive() -> None:
|
|
"""Test that filter_yaml_files is case-sensitive for extensions."""
|
|
files = [
|
|
Path("/path/to/config.yaml"),
|
|
Path("/path/to/config.YAML"),
|
|
Path("/path/to/config.YML"),
|
|
Path("/path/to/config.Yaml"),
|
|
Path("/path/to/config.yml"),
|
|
]
|
|
|
|
result = util.filter_yaml_files(files)
|
|
|
|
# Should only match lowercase .yaml and .yml
|
|
assert len(result) == 2
|
|
|
|
# Check the actual suffixes to ensure case-sensitive filtering
|
|
result_suffixes = [p.suffix for p in result]
|
|
assert ".yaml" in result_suffixes
|
|
assert ".yml" in result_suffixes
|
|
|
|
# Verify the filtered files have the expected names
|
|
result_names = [p.name for p in result]
|
|
assert "config.yaml" in result_names
|
|
assert "config.yml" in result_names
|
|
# Ensure uppercase extensions are NOT included
|
|
assert "config.YAML" not in result_names
|
|
assert "config.YML" not in result_names
|
|
assert "config.Yaml" not in result_names
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
("input_str", "expected"),
|
|
[
|
|
# Empty string
|
|
("", "''"),
|
|
# Simple strings that don't need quoting
|
|
("hello", "hello"),
|
|
("test123", "test123"),
|
|
("file.txt", "file.txt"),
|
|
("/path/to/file", "/path/to/file"),
|
|
("user@host", "user@host"),
|
|
("value:123", "value:123"),
|
|
("item,list", "item,list"),
|
|
("path-with-dash", "path-with-dash"),
|
|
# Strings that need quoting
|
|
("hello world", "'hello world'"),
|
|
("test\ttab", "'test\ttab'"),
|
|
("line\nbreak", "'line\nbreak'"),
|
|
("semicolon;here", "'semicolon;here'"),
|
|
("pipe|symbol", "'pipe|symbol'"),
|
|
("redirect>file", "'redirect>file'"),
|
|
("redirect<file", "'redirect<file'"),
|
|
("background&", "'background&'"),
|
|
("dollar$sign", "'dollar$sign'"),
|
|
("backtick`cmd", "'backtick`cmd'"),
|
|
('double"quote', "'double\"quote'"),
|
|
("backslash\\path", "'backslash\\path'"),
|
|
("question?mark", "'question?mark'"),
|
|
("asterisk*wild", "'asterisk*wild'"),
|
|
("bracket[test]", "'bracket[test]'"),
|
|
("paren(test)", "'paren(test)'"),
|
|
("curly{brace}", "'curly{brace}'"),
|
|
# Single quotes in string (special escaping)
|
|
("it's", "'it'\"'\"'s'"),
|
|
("don't", "'don'\"'\"'t'"),
|
|
("'quoted'", "''\"'\"'quoted'\"'\"''"),
|
|
# Complex combinations
|
|
("test 'with' quotes", "'test '\"'\"'with'\"'\"' quotes'"),
|
|
("path/to/file's.txt", "'path/to/file'\"'\"'s.txt'"),
|
|
],
|
|
)
|
|
def test_shlex_quote(input_str: str, expected: str) -> None:
|
|
"""Test shlex_quote properly escapes shell arguments."""
|
|
assert util.shlex_quote(input_str) == expected
|
|
|
|
|
|
def test_shlex_quote_safe_characters() -> None:
|
|
"""Test that safe characters are not quoted."""
|
|
# These characters are considered safe and shouldn't be quoted
|
|
safe_chars = (
|
|
"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789@%+=:,./-_"
|
|
)
|
|
for char in safe_chars:
|
|
assert util.shlex_quote(char) == char
|
|
assert util.shlex_quote(f"test{char}test") == f"test{char}test"
|
|
|
|
|
|
def test_shlex_quote_unsafe_characters() -> None:
|
|
"""Test that unsafe characters trigger quoting."""
|
|
# These characters should trigger quoting
|
|
unsafe_chars = ' \t\n;|>&<$`"\\?*[](){}!#~^'
|
|
for char in unsafe_chars:
|
|
result = util.shlex_quote(f"test{char}test")
|
|
assert result.startswith("'")
|
|
assert result.endswith("'")
|
|
|
|
|
|
def test_shlex_quote_edge_cases() -> None:
|
|
"""Test edge cases for shlex_quote."""
|
|
# Multiple single quotes
|
|
assert util.shlex_quote("'''") == "''\"'\"''\"'\"''\"'\"''"
|
|
|
|
# Mixed quotes
|
|
assert util.shlex_quote('"\'"') == "'\"'\"'\"'\"'"
|
|
|
|
# Only whitespace
|
|
assert util.shlex_quote(" ") == "' '"
|
|
assert util.shlex_quote("\t") == "'\t'"
|
|
assert util.shlex_quote("\n") == "'\n'"
|
|
assert util.shlex_quote(" ") == "' '"
|
|
|
|
|
|
def _make_redirect(
|
|
line_callbacks: list[Callable[[str], str | None]] | None = None,
|
|
filter_lines: list[str] | None = None,
|
|
) -> tuple[util.RedirectText, io.StringIO]:
|
|
"""Create a RedirectText that writes to a StringIO buffer."""
|
|
buf = io.StringIO()
|
|
redirect = util.RedirectText(
|
|
buf, filter_lines=filter_lines, line_callbacks=line_callbacks
|
|
)
|
|
return redirect, buf
|
|
|
|
|
|
def test_redirect_text_callback_called_on_matching_line() -> None:
|
|
"""Test that a line callback is called and its output is written."""
|
|
results: list[str] = []
|
|
|
|
def callback(line: str) -> str | None:
|
|
results.append(line)
|
|
if "target" in line:
|
|
return "CALLBACK OUTPUT\n"
|
|
return None
|
|
|
|
redirect, buf = _make_redirect(line_callbacks=[callback])
|
|
redirect.write("some target line\n")
|
|
|
|
assert "some target line" in buf.getvalue()
|
|
assert "CALLBACK OUTPUT" in buf.getvalue()
|
|
assert len(results) == 1
|
|
|
|
|
|
def test_redirect_text_callback_not_triggered_on_non_matching_line() -> None:
|
|
"""Test that callback returns None for non-matching lines."""
|
|
|
|
def callback(line: str) -> str | None:
|
|
if "target" in line:
|
|
return "FOUND\n"
|
|
return None
|
|
|
|
redirect, buf = _make_redirect(line_callbacks=[callback])
|
|
redirect.write("no match here\n")
|
|
|
|
assert "no match here" in buf.getvalue()
|
|
assert "FOUND" not in buf.getvalue()
|
|
|
|
|
|
def test_redirect_text_callback_works_without_filter_pattern() -> None:
|
|
"""Test that callbacks fire even when no filter_lines is set."""
|
|
|
|
def callback(line: str) -> str | None:
|
|
if "Crystal" in line:
|
|
return "WARNING: mismatch\n"
|
|
return None
|
|
|
|
redirect, buf = _make_redirect(line_callbacks=[callback])
|
|
redirect.write("Crystal frequency: 26MHz\n")
|
|
|
|
assert "Crystal frequency: 26MHz" in buf.getvalue()
|
|
assert "WARNING: mismatch" in buf.getvalue()
|
|
|
|
|
|
def test_redirect_text_callback_works_with_filter_pattern() -> None:
|
|
"""Test that callbacks fire alongside filter patterns."""
|
|
|
|
def callback(line: str) -> str | None:
|
|
if "important" in line:
|
|
return "NOTED\n"
|
|
return None
|
|
|
|
redirect, buf = _make_redirect(
|
|
line_callbacks=[callback],
|
|
filter_lines=[r"^skip this.*"],
|
|
)
|
|
redirect.write("skip this line\n")
|
|
redirect.write("important line\n")
|
|
|
|
assert "skip this" not in buf.getvalue()
|
|
assert "important line" in buf.getvalue()
|
|
assert "NOTED" in buf.getvalue()
|
|
|
|
|
|
def test_redirect_text_multiple_callbacks() -> None:
|
|
"""Test that multiple callbacks are all invoked."""
|
|
|
|
def callback_a(line: str) -> str | None:
|
|
if "test" in line:
|
|
return "FROM A\n"
|
|
return None
|
|
|
|
def callback_b(line: str) -> str | None:
|
|
if "test" in line:
|
|
return "FROM B\n"
|
|
return None
|
|
|
|
redirect, buf = _make_redirect(line_callbacks=[callback_a, callback_b])
|
|
redirect.write("test line\n")
|
|
|
|
output = buf.getvalue()
|
|
assert "FROM A" in output
|
|
assert "FROM B" in output
|
|
|
|
|
|
def test_redirect_text_incomplete_line_buffered() -> None:
|
|
"""Test that incomplete lines are buffered until newline."""
|
|
results: list[str] = []
|
|
|
|
def callback(line: str) -> str | None:
|
|
results.append(line)
|
|
return None
|
|
|
|
redirect, buf = _make_redirect(line_callbacks=[callback])
|
|
redirect.write("partial")
|
|
assert len(results) == 0
|
|
|
|
redirect.write(" line\n")
|
|
assert len(results) == 1
|
|
assert results[0] == "partial line"
|
|
|
|
|
|
def test_run_external_command_line_callbacks(capsys: pytest.CaptureFixture) -> None:
|
|
"""Test that run_external_command passes line_callbacks to RedirectText."""
|
|
results: list[str] = []
|
|
|
|
def callback(line: str) -> str | None:
|
|
results.append(line)
|
|
if "hello" in line:
|
|
return "CALLBACK FIRED\n"
|
|
return None
|
|
|
|
def fake_main() -> int:
|
|
print("hello world")
|
|
return 0
|
|
|
|
rc = util.run_external_command(fake_main, "fake", line_callbacks=[callback])
|
|
|
|
assert rc == 0
|
|
assert len(results) == 1
|
|
assert "hello world" in results[0]
|
|
captured = capsys.readouterr()
|
|
assert "CALLBACK FIRED" in captured.out
|
|
|
|
|
|
def test_run_external_process_line_callbacks() -> None:
|
|
"""Test that run_external_process passes line_callbacks to RedirectText."""
|
|
results: list[str] = []
|
|
|
|
def callback(line: str) -> str | None:
|
|
results.append(line)
|
|
if "from subprocess" in line:
|
|
return "PROCESS CALLBACK\n"
|
|
return None
|
|
|
|
with patch("esphome.util.subprocess.run") as mock_run:
|
|
|
|
def run_side_effect(*args: Any, **kwargs: Any) -> MagicMock:
|
|
# Simulate subprocess writing to the stdout RedirectText
|
|
stdout = kwargs.get("stdout")
|
|
if stdout is not None and isinstance(stdout, util.RedirectText):
|
|
stdout.write("from subprocess\n")
|
|
return MagicMock(returncode=0)
|
|
|
|
mock_run.side_effect = run_side_effect
|
|
|
|
rc = util.run_external_process(
|
|
"echo",
|
|
"test",
|
|
line_callbacks=[callback],
|
|
)
|
|
|
|
assert rc == 0
|
|
assert any("from subprocess" in r for r in results)
|
|
|
|
|
|
def test_get_picotool_path_found(tmp_path: Path) -> None:
|
|
"""Test picotool path derivation from cc_path."""
|
|
# Create the expected directory structure
|
|
packages_dir = tmp_path / "packages"
|
|
toolchain_dir = packages_dir / "toolchain-rp2040-earlephilhower" / "bin"
|
|
toolchain_dir.mkdir(parents=True)
|
|
gcc = toolchain_dir / "arm-none-eabi-gcc"
|
|
gcc.touch()
|
|
|
|
binary_name = "picotool.exe" if sys.platform == "win32" else "picotool"
|
|
picotool_dir = packages_dir / "tool-picotool-rp2040-earlephilhower"
|
|
picotool_dir.mkdir(parents=True)
|
|
picotool = picotool_dir / binary_name
|
|
picotool.touch()
|
|
|
|
result = util.get_picotool_path(str(gcc))
|
|
assert result == picotool
|
|
|
|
|
|
def test_get_picotool_path_not_found(tmp_path: Path) -> None:
|
|
"""Test picotool path returns None when not installed."""
|
|
packages_dir = tmp_path / "packages"
|
|
toolchain_dir = packages_dir / "toolchain-rp2040-earlephilhower" / "bin"
|
|
toolchain_dir.mkdir(parents=True)
|
|
gcc = toolchain_dir / "arm-none-eabi-gcc"
|
|
gcc.touch()
|
|
|
|
result = util.get_picotool_path(str(gcc))
|
|
assert result is None
|
|
|
|
|
|
def test_get_picotool_path_windows(tmp_path: Path) -> None:
|
|
"""Test picotool path uses .exe on Windows."""
|
|
packages_dir = tmp_path / "packages"
|
|
toolchain_dir = packages_dir / "toolchain-rp2040-earlephilhower" / "bin"
|
|
toolchain_dir.mkdir(parents=True)
|
|
gcc = toolchain_dir / "arm-none-eabi-gcc.exe"
|
|
gcc.touch()
|
|
|
|
picotool_dir = packages_dir / "tool-picotool-rp2040-earlephilhower"
|
|
picotool_dir.mkdir(parents=True)
|
|
picotool = picotool_dir / "picotool.exe"
|
|
picotool.touch()
|
|
|
|
with patch("esphome.util.sys.platform", "win32"):
|
|
result = util.get_picotool_path(str(gcc))
|
|
assert result == picotool
|
|
|
|
|
|
def test_detect_rp2040_bootsel_found() -> None:
|
|
"""Test BOOTSEL device detection when device is present."""
|
|
mock_result = MagicMock()
|
|
mock_result.stdout = b"Device Information\n type: RP2040\n"
|
|
with patch("esphome.util.subprocess.run", return_value=mock_result):
|
|
result = util.detect_rp2040_bootsel("/usr/bin/picotool")
|
|
assert result.device_count == 1
|
|
assert result.permission_error is False
|
|
|
|
|
|
def test_detect_rp2040_bootsel_multiple() -> None:
|
|
"""Test BOOTSEL detection with multiple devices."""
|
|
mock_result = MagicMock()
|
|
mock_result.stdout = b"type: RP2040\ntype: RP2350\n"
|
|
with patch("esphome.util.subprocess.run", return_value=mock_result):
|
|
result = util.detect_rp2040_bootsel("/usr/bin/picotool")
|
|
assert result.device_count == 2
|
|
assert result.permission_error is False
|
|
|
|
|
|
def test_detect_rp2040_bootsel_none() -> None:
|
|
"""Test BOOTSEL detection when no device found."""
|
|
mock_result = MagicMock()
|
|
mock_result.stdout = (
|
|
b"No accessible RP2040/RP2350 devices in BOOTSEL mode were found.\n"
|
|
)
|
|
mock_result.stderr = b""
|
|
with patch("esphome.util.subprocess.run", return_value=mock_result):
|
|
result = util.detect_rp2040_bootsel("/usr/bin/picotool")
|
|
assert result.device_count == 0
|
|
assert result.permission_error is False
|
|
|
|
|
|
def test_detect_rp2040_bootsel_permission_error() -> None:
|
|
"""Test BOOTSEL detection with device found but not accessible."""
|
|
mock_result = MagicMock()
|
|
mock_result.stdout = (
|
|
b"No accessible RP-series devices in BOOTSEL mode were found.\n"
|
|
)
|
|
mock_result.stderr = (
|
|
b"RP2040 device at bus 5, address 24 appears to be in BOOTSEL mode, "
|
|
b"but picotool was unable to connect. "
|
|
b"Maybe try 'sudo' or check your permissions.\n"
|
|
)
|
|
with patch("esphome.util.subprocess.run", return_value=mock_result):
|
|
result = util.detect_rp2040_bootsel("/usr/bin/picotool")
|
|
assert result.device_count == 0
|
|
assert result.permission_error is True
|
|
|
|
|
|
def test_detect_rp2040_bootsel_libusb_access_error() -> None:
|
|
"""Test BOOTSEL detection with LIBUSB_ERROR_ACCESS."""
|
|
mock_result = MagicMock()
|
|
mock_result.stdout = b""
|
|
mock_result.stderr = b"LIBUSB_ERROR_ACCESS\n"
|
|
with patch("esphome.util.subprocess.run", return_value=mock_result):
|
|
result = util.detect_rp2040_bootsel("/usr/bin/picotool")
|
|
assert result.device_count == 0
|
|
assert result.permission_error is True
|
|
|
|
|
|
def test_detect_rp2040_bootsel_oserror() -> None:
|
|
"""Test BOOTSEL detection handles OSError."""
|
|
with patch("esphome.util.subprocess.run", side_effect=OSError("not found")):
|
|
result = util.detect_rp2040_bootsel("/usr/bin/picotool")
|
|
assert result.device_count == 0
|
|
assert result.permission_error is False
|
|
|
|
|
|
def test_detect_rp2040_bootsel_timeout() -> None:
|
|
"""Test BOOTSEL detection handles timeout."""
|
|
with patch(
|
|
"esphome.util.subprocess.run",
|
|
side_effect=subprocess.TimeoutExpired("picotool", 10),
|
|
):
|
|
result = util.detect_rp2040_bootsel("/usr/bin/picotool")
|
|
assert result.device_count == 0
|
|
assert result.permission_error is False
|