"""Tests for esphome.util module.""" from __future__ import annotations from collections.abc import Callable import io from pathlib import Path import subprocess import sys from typing import Any from unittest.mock import MagicMock, patch import pytest from esphome import util def test_list_yaml_files_with_files_and_directories(tmp_path: Path) -> None: """Test that list_yaml_files handles both files and directories.""" # Create directory structure dir1 = tmp_path / "configs" dir1.mkdir() dir2 = tmp_path / "more_configs" dir2.mkdir() # Create YAML files in directories (dir1 / "config1.yaml").write_text("test: 1") (dir1 / "config2.yml").write_text("test: 2") (dir1 / "not_yaml.txt").write_text("not yaml") (dir2 / "config3.yaml").write_text("test: 3") # Create standalone YAML files standalone1 = tmp_path / "standalone.yaml" standalone1.write_text("test: 4") standalone2 = tmp_path / "another.yml" standalone2.write_text("test: 5") # Test with mixed input (directories and files) configs = [ dir1, standalone1, dir2, standalone2, ] result = util.list_yaml_files(configs) # Should include all YAML files but not the .txt file assert set(result) == { dir1 / "config1.yaml", dir1 / "config2.yml", dir2 / "config3.yaml", standalone1, standalone2, } # Check that results are sorted assert result == sorted(result) def test_list_yaml_files_only_directories(tmp_path: Path) -> None: """Test list_yaml_files with only directories.""" dir1 = tmp_path / "dir1" dir1.mkdir() dir2 = tmp_path / "dir2" dir2.mkdir() (dir1 / "a.yaml").write_text("test: a") (dir1 / "b.yml").write_text("test: b") (dir2 / "c.yaml").write_text("test: c") result = util.list_yaml_files([dir1, dir2]) assert set(result) == { dir1 / "a.yaml", dir1 / "b.yml", dir2 / "c.yaml", } assert result == sorted(result) def test_list_yaml_files_only_files(tmp_path: Path) -> None: """Test list_yaml_files with only files.""" file1 = tmp_path / "file1.yaml" file2 = tmp_path / "file2.yml" file3 = tmp_path / "file3.yaml" non_yaml = tmp_path / "not_yaml.json" file1.write_text("test: 1") file2.write_text("test: 2") file3.write_text("test: 3") non_yaml.write_text("{}") # Include a non-YAML file to test filtering result = util.list_yaml_files( [ file1, file2, file3, non_yaml, ] ) assert set(result) == { file1, file2, file3, } assert result == sorted(result) def test_list_yaml_files_empty_directory(tmp_path: Path) -> None: """Test list_yaml_files with an empty directory.""" empty_dir = tmp_path / "empty" empty_dir.mkdir() result = util.list_yaml_files([empty_dir]) assert result == [] def test_list_yaml_files_nonexistent_path(tmp_path: Path) -> None: """Test list_yaml_files with a nonexistent path raises an error.""" nonexistent = tmp_path / "nonexistent" existing = tmp_path / "existing.yaml" existing.write_text("test: 1") # Should raise an error for non-existent directory with pytest.raises(FileNotFoundError): util.list_yaml_files([nonexistent, existing]) def test_list_yaml_files_mixed_extensions(tmp_path: Path) -> None: """Test that both .yaml and .yml extensions are recognized.""" dir1 = tmp_path / "configs" dir1.mkdir() yaml_file = dir1 / "config.yaml" yml_file = dir1 / "config.yml" other_file = dir1 / "config.txt" yaml_file.write_text("test: yaml") yml_file.write_text("test: yml") other_file.write_text("test: txt") result = util.list_yaml_files([dir1]) assert set(result) == { yaml_file, yml_file, } def test_list_yaml_files_does_not_recurse_into_subdirectories(tmp_path: Path) -> None: """Test that list_yaml_files only finds files in specified directory, not subdirectories.""" # Create directory structure with YAML files at different depths root = tmp_path / "configs" root.mkdir() # Create YAML files in the root directory (root / "config1.yaml").write_text("test: 1") (root / "config2.yml").write_text("test: 2") (root / "device.yaml").write_text("test: device") # Create subdirectory with YAML files (should NOT be found) subdir = root / "subdir" subdir.mkdir() (subdir / "nested1.yaml").write_text("test: nested1") (subdir / "nested2.yml").write_text("test: nested2") # Create deeper subdirectory (should NOT be found) deep_subdir = subdir / "deeper" deep_subdir.mkdir() (deep_subdir / "very_nested.yaml").write_text("test: very_nested") # Test listing files from the root directory result = util.list_yaml_files([str(root)]) # Should only find the 3 files in root, not the 3 in subdirectories assert len(result) == 3 # Check that only root-level files are found assert root / "config1.yaml" in result assert root / "config2.yml" in result assert root / "device.yaml" in result # Ensure nested files are NOT found for r in result: r_str = str(r) assert "subdir" not in r_str assert "deeper" not in r_str assert "nested1.yaml" not in r_str assert "nested2.yml" not in r_str assert "very_nested.yaml" not in r_str def test_list_yaml_files_excludes_secrets(tmp_path: Path) -> None: """Test that secrets.yaml and secrets.yml are excluded.""" root = tmp_path / "configs" root.mkdir() # Create various YAML files including secrets (root / "config.yaml").write_text("test: config") (root / "secrets.yaml").write_text("wifi_password: secret123") (root / "secrets.yml").write_text("api_key: secret456") (root / "device.yaml").write_text("test: device") result = util.list_yaml_files([str(root)]) # Should find 2 files (config.yaml and device.yaml), not secrets assert len(result) == 2 assert root / "config.yaml" in result assert root / "device.yaml" in result assert root / "secrets.yaml" not in result assert root / "secrets.yml" not in result def test_list_yaml_files_excludes_hidden_files(tmp_path: Path) -> None: """Test that hidden files (starting with .) are excluded.""" root = tmp_path / "configs" root.mkdir() # Create regular and hidden YAML files (root / "config.yaml").write_text("test: config") (root / ".hidden.yaml").write_text("test: hidden") (root / ".backup.yml").write_text("test: backup") (root / "device.yaml").write_text("test: device") result = util.list_yaml_files([str(root)]) # Should find only non-hidden files assert len(result) == 2 assert root / "config.yaml" in result assert root / "device.yaml" in result assert root / ".hidden.yaml" not in result assert root / ".backup.yml" not in result def test_filter_yaml_files_basic() -> None: """Test filter_yaml_files function.""" files = [ Path("/path/to/config.yaml"), Path("/path/to/device.yml"), Path("/path/to/readme.txt"), Path("/path/to/script.py"), Path("/path/to/data.json"), Path("/path/to/another.yaml"), ] result = util.filter_yaml_files(files) assert len(result) == 3 assert Path("/path/to/config.yaml") in result assert Path("/path/to/device.yml") in result assert Path("/path/to/another.yaml") in result assert Path("/path/to/readme.txt") not in result assert Path("/path/to/script.py") not in result assert Path("/path/to/data.json") not in result def test_filter_yaml_files_excludes_secrets() -> None: """Test that filter_yaml_files excludes secrets files.""" files = [ Path("/path/to/config.yaml"), Path("/path/to/secrets.yaml"), Path("/path/to/secrets.yml"), Path("/path/to/device.yaml"), Path("/some/dir/secrets.yaml"), ] result = util.filter_yaml_files(files) assert len(result) == 2 assert Path("/path/to/config.yaml") in result assert Path("/path/to/device.yaml") in result assert Path("/path/to/secrets.yaml") not in result assert Path("/path/to/secrets.yml") not in result assert Path("/some/dir/secrets.yaml") not in result def test_filter_yaml_files_excludes_hidden() -> None: """Test that filter_yaml_files excludes hidden files.""" files = [ Path("/path/to/config.yaml"), Path("/path/to/.hidden.yaml"), Path("/path/to/.backup.yml"), Path("/path/to/device.yaml"), Path("/some/dir/.config.yaml"), ] result = util.filter_yaml_files(files) assert len(result) == 2 assert Path("/path/to/config.yaml") in result assert Path("/path/to/device.yaml") in result assert Path("/path/to/.hidden.yaml") not in result assert Path("/path/to/.backup.yml") not in result assert Path("/some/dir/.config.yaml") not in result def test_filter_yaml_files_case_sensitive() -> None: """Test that filter_yaml_files is case-sensitive for extensions.""" files = [ Path("/path/to/config.yaml"), Path("/path/to/config.YAML"), Path("/path/to/config.YML"), Path("/path/to/config.Yaml"), Path("/path/to/config.yml"), ] result = util.filter_yaml_files(files) # Should only match lowercase .yaml and .yml assert len(result) == 2 # Check the actual suffixes to ensure case-sensitive filtering result_suffixes = [p.suffix for p in result] assert ".yaml" in result_suffixes assert ".yml" in result_suffixes # Verify the filtered files have the expected names result_names = [p.name for p in result] assert "config.yaml" in result_names assert "config.yml" in result_names # Ensure uppercase extensions are NOT included assert "config.YAML" not in result_names assert "config.YML" not in result_names assert "config.Yaml" not in result_names @pytest.mark.parametrize( ("input_str", "expected"), [ # Empty string ("", "''"), # Simple strings that don't need quoting ("hello", "hello"), ("test123", "test123"), ("file.txt", "file.txt"), ("/path/to/file", "/path/to/file"), ("user@host", "user@host"), ("value:123", "value:123"), ("item,list", "item,list"), ("path-with-dash", "path-with-dash"), # Strings that need quoting ("hello world", "'hello world'"), ("test\ttab", "'test\ttab'"), ("line\nbreak", "'line\nbreak'"), ("semicolon;here", "'semicolon;here'"), ("pipe|symbol", "'pipe|symbol'"), ("redirect>file", "'redirect>file'"), ("redirect None: """Test shlex_quote properly escapes shell arguments.""" assert util.shlex_quote(input_str) == expected def test_shlex_quote_safe_characters() -> None: """Test that safe characters are not quoted.""" # These characters are considered safe and shouldn't be quoted safe_chars = ( "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789@%+=:,./-_" ) for char in safe_chars: assert util.shlex_quote(char) == char assert util.shlex_quote(f"test{char}test") == f"test{char}test" def test_shlex_quote_unsafe_characters() -> None: """Test that unsafe characters trigger quoting.""" # These characters should trigger quoting unsafe_chars = ' \t\n;|>&<$`"\\?*[](){}!#~^' for char in unsafe_chars: result = util.shlex_quote(f"test{char}test") assert result.startswith("'") assert result.endswith("'") def test_shlex_quote_edge_cases() -> None: """Test edge cases for shlex_quote.""" # Multiple single quotes assert util.shlex_quote("'''") == "''\"'\"''\"'\"''\"'\"''" # Mixed quotes assert util.shlex_quote('"\'"') == "'\"'\"'\"'\"'" # Only whitespace assert util.shlex_quote(" ") == "' '" assert util.shlex_quote("\t") == "'\t'" assert util.shlex_quote("\n") == "'\n'" assert util.shlex_quote(" ") == "' '" def _make_redirect( line_callbacks: list[Callable[[str], str | None]] | None = None, filter_lines: list[str] | None = None, ) -> tuple[util.RedirectText, io.StringIO]: """Create a RedirectText that writes to a StringIO buffer.""" buf = io.StringIO() redirect = util.RedirectText( buf, filter_lines=filter_lines, line_callbacks=line_callbacks ) return redirect, buf def test_redirect_text_callback_called_on_matching_line() -> None: """Test that a line callback is called and its output is written.""" results: list[str] = [] def callback(line: str) -> str | None: results.append(line) if "target" in line: return "CALLBACK OUTPUT\n" return None redirect, buf = _make_redirect(line_callbacks=[callback]) redirect.write("some target line\n") assert "some target line" in buf.getvalue() assert "CALLBACK OUTPUT" in buf.getvalue() assert len(results) == 1 def test_redirect_text_callback_not_triggered_on_non_matching_line() -> None: """Test that callback returns None for non-matching lines.""" def callback(line: str) -> str | None: if "target" in line: return "FOUND\n" return None redirect, buf = _make_redirect(line_callbacks=[callback]) redirect.write("no match here\n") assert "no match here" in buf.getvalue() assert "FOUND" not in buf.getvalue() def test_redirect_text_callback_works_without_filter_pattern() -> None: """Test that callbacks fire even when no filter_lines is set.""" def callback(line: str) -> str | None: if "Crystal" in line: return "WARNING: mismatch\n" return None redirect, buf = _make_redirect(line_callbacks=[callback]) redirect.write("Crystal frequency: 26MHz\n") assert "Crystal frequency: 26MHz" in buf.getvalue() assert "WARNING: mismatch" in buf.getvalue() def test_redirect_text_callback_works_with_filter_pattern() -> None: """Test that callbacks fire alongside filter patterns.""" def callback(line: str) -> str | None: if "important" in line: return "NOTED\n" return None redirect, buf = _make_redirect( line_callbacks=[callback], filter_lines=[r"^skip this.*"], ) redirect.write("skip this line\n") redirect.write("important line\n") assert "skip this" not in buf.getvalue() assert "important line" in buf.getvalue() assert "NOTED" in buf.getvalue() def test_redirect_text_multiple_callbacks() -> None: """Test that multiple callbacks are all invoked.""" def callback_a(line: str) -> str | None: if "test" in line: return "FROM A\n" return None def callback_b(line: str) -> str | None: if "test" in line: return "FROM B\n" return None redirect, buf = _make_redirect(line_callbacks=[callback_a, callback_b]) redirect.write("test line\n") output = buf.getvalue() assert "FROM A" in output assert "FROM B" in output def test_redirect_text_incomplete_line_buffered() -> None: """Test that incomplete lines are buffered until newline.""" results: list[str] = [] def callback(line: str) -> str | None: results.append(line) return None redirect, buf = _make_redirect(line_callbacks=[callback]) redirect.write("partial") assert len(results) == 0 redirect.write(" line\n") assert len(results) == 1 assert results[0] == "partial line" def test_run_external_command_line_callbacks(capsys: pytest.CaptureFixture) -> None: """Test that run_external_command passes line_callbacks to RedirectText.""" results: list[str] = [] def callback(line: str) -> str | None: results.append(line) if "hello" in line: return "CALLBACK FIRED\n" return None def fake_main() -> int: print("hello world") return 0 rc = util.run_external_command(fake_main, "fake", line_callbacks=[callback]) assert rc == 0 assert len(results) == 1 assert "hello world" in results[0] captured = capsys.readouterr() assert "CALLBACK FIRED" in captured.out def test_run_external_process_line_callbacks() -> None: """Test that run_external_process passes line_callbacks to RedirectText.""" results: list[str] = [] def callback(line: str) -> str | None: results.append(line) if "from subprocess" in line: return "PROCESS CALLBACK\n" return None with patch("esphome.util.subprocess.run") as mock_run: def run_side_effect(*args: Any, **kwargs: Any) -> MagicMock: # Simulate subprocess writing to the stdout RedirectText stdout = kwargs.get("stdout") if stdout is not None and isinstance(stdout, util.RedirectText): stdout.write("from subprocess\n") return MagicMock(returncode=0) mock_run.side_effect = run_side_effect rc = util.run_external_process( "echo", "test", line_callbacks=[callback], ) assert rc == 0 assert any("from subprocess" in r for r in results) def test_get_picotool_path_found(tmp_path: Path) -> None: """Test picotool path derivation from cc_path.""" # Create the expected directory structure packages_dir = tmp_path / "packages" toolchain_dir = packages_dir / "toolchain-rp2040-earlephilhower" / "bin" toolchain_dir.mkdir(parents=True) gcc = toolchain_dir / "arm-none-eabi-gcc" gcc.touch() binary_name = "picotool.exe" if sys.platform == "win32" else "picotool" picotool_dir = packages_dir / "tool-picotool-rp2040-earlephilhower" picotool_dir.mkdir(parents=True) picotool = picotool_dir / binary_name picotool.touch() result = util.get_picotool_path(str(gcc)) assert result == picotool def test_get_picotool_path_not_found(tmp_path: Path) -> None: """Test picotool path returns None when not installed.""" packages_dir = tmp_path / "packages" toolchain_dir = packages_dir / "toolchain-rp2040-earlephilhower" / "bin" toolchain_dir.mkdir(parents=True) gcc = toolchain_dir / "arm-none-eabi-gcc" gcc.touch() result = util.get_picotool_path(str(gcc)) assert result is None def test_get_picotool_path_windows(tmp_path: Path) -> None: """Test picotool path uses .exe on Windows.""" packages_dir = tmp_path / "packages" toolchain_dir = packages_dir / "toolchain-rp2040-earlephilhower" / "bin" toolchain_dir.mkdir(parents=True) gcc = toolchain_dir / "arm-none-eabi-gcc.exe" gcc.touch() picotool_dir = packages_dir / "tool-picotool-rp2040-earlephilhower" picotool_dir.mkdir(parents=True) picotool = picotool_dir / "picotool.exe" picotool.touch() with patch("esphome.util.sys.platform", "win32"): result = util.get_picotool_path(str(gcc)) assert result == picotool def test_detect_rp2040_bootsel_found() -> None: """Test BOOTSEL device detection when device is present.""" mock_result = MagicMock() mock_result.stdout = b"Device Information\n type: RP2040\n" with patch("esphome.util.subprocess.run", return_value=mock_result): result = util.detect_rp2040_bootsel("/usr/bin/picotool") assert result.device_count == 1 assert result.permission_error is False def test_detect_rp2040_bootsel_multiple() -> None: """Test BOOTSEL detection with multiple devices.""" mock_result = MagicMock() mock_result.stdout = b"type: RP2040\ntype: RP2350\n" with patch("esphome.util.subprocess.run", return_value=mock_result): result = util.detect_rp2040_bootsel("/usr/bin/picotool") assert result.device_count == 2 assert result.permission_error is False def test_detect_rp2040_bootsel_none() -> None: """Test BOOTSEL detection when no device found.""" mock_result = MagicMock() mock_result.stdout = ( b"No accessible RP2040/RP2350 devices in BOOTSEL mode were found.\n" ) mock_result.stderr = b"" with patch("esphome.util.subprocess.run", return_value=mock_result): result = util.detect_rp2040_bootsel("/usr/bin/picotool") assert result.device_count == 0 assert result.permission_error is False def test_detect_rp2040_bootsel_permission_error() -> None: """Test BOOTSEL detection with device found but not accessible.""" mock_result = MagicMock() mock_result.stdout = ( b"No accessible RP-series devices in BOOTSEL mode were found.\n" ) mock_result.stderr = ( b"RP2040 device at bus 5, address 24 appears to be in BOOTSEL mode, " b"but picotool was unable to connect. " b"Maybe try 'sudo' or check your permissions.\n" ) with patch("esphome.util.subprocess.run", return_value=mock_result): result = util.detect_rp2040_bootsel("/usr/bin/picotool") assert result.device_count == 0 assert result.permission_error is True def test_detect_rp2040_bootsel_libusb_access_error() -> None: """Test BOOTSEL detection with LIBUSB_ERROR_ACCESS.""" mock_result = MagicMock() mock_result.stdout = b"" mock_result.stderr = b"LIBUSB_ERROR_ACCESS\n" with patch("esphome.util.subprocess.run", return_value=mock_result): result = util.detect_rp2040_bootsel("/usr/bin/picotool") assert result.device_count == 0 assert result.permission_error is True def test_detect_rp2040_bootsel_oserror() -> None: """Test BOOTSEL detection handles OSError.""" with patch("esphome.util.subprocess.run", side_effect=OSError("not found")): result = util.detect_rp2040_bootsel("/usr/bin/picotool") assert result.device_count == 0 assert result.permission_error is False def test_detect_rp2040_bootsel_timeout() -> None: """Test BOOTSEL detection handles timeout.""" with patch( "esphome.util.subprocess.run", side_effect=subprocess.TimeoutExpired("picotool", 10), ): result = util.detect_rp2040_bootsel("/usr/bin/picotool") assert result.device_count == 0 assert result.permission_error is False