diff --git a/esphome/__main__.py b/esphome/__main__.py index 4cb01eacb57..7ce2205beb9 100644 --- a/esphome/__main__.py +++ b/esphome/__main__.py @@ -62,6 +62,9 @@ from esphome.util import ( _LOGGER = logging.getLogger(__name__) +# Maximum buffer size for serial log reading to prevent unbounded memory growth +SERIAL_BUFFER_MAX_SIZE = 65536 + # Special non-component keys that appear in configs _NON_COMPONENT_KEYS = frozenset( { @@ -440,11 +443,15 @@ def run_miniterm(config: ConfigType, port: str, args) -> int: if not chunk: continue time_ = datetime.now() - nanoseconds = time_.microsecond // 1000 - time_str = f"[{time_.hour:02}:{time_.minute:02}:{time_.second:02}.{nanoseconds:03}]" + milliseconds = time_.microsecond // 1000 + time_str = f"[{time_.hour:02}:{time_.minute:02}:{time_.second:02}.{milliseconds:03}]" # Add to buffer and process complete lines + # Limit buffer size to prevent unbounded memory growth + # if device sends data without newlines buffer += chunk + if len(buffer) > SERIAL_BUFFER_MAX_SIZE: + buffer = buffer[-SERIAL_BUFFER_MAX_SIZE:] while b"\n" in buffer: raw_line, buffer = buffer.split(b"\n", 1) line = raw_line.replace(b"\r", b"").decode( @@ -1032,6 +1039,7 @@ def command_analyze_memory(args: ArgsProtocol, config: ConfigType) -> int: idedata.objdump_path, idedata.readelf_path, external_components, + idedata=idedata, ) analyzer.analyze() diff --git a/esphome/analyze_memory/__init__.py b/esphome/analyze_memory/__init__.py index a06603a6539..9c935c78fac 100644 --- a/esphome/analyze_memory/__init__.py +++ b/esphome/analyze_memory/__init__.py @@ -22,6 +22,7 @@ from .helpers import ( map_section_name, parse_symbol_line, ) +from .toolchain import find_tool, run_tool if TYPE_CHECKING: from esphome.platformio_api import IDEData @@ -63,7 +64,20 @@ class MemorySection: name: str symbols: list[SymbolInfoType] = field(default_factory=list) - total_size: int = 0 + total_size: int = 0 # Actual section size from ELF headers + symbol_size: int = 0 # Sum of symbol sizes (may be less than total_size) + + +@dataclass +class SDKSymbol: + """Represents a symbol from an SDK library that's not in the ELF symbol table.""" + + name: str + size: int + library: str # Name of the .a file (e.g., "libpp.a") + section: str # ".bss" or ".data" + is_local: bool # True if static/local symbol (lowercase in nm output) + demangled: str = "" # Demangled name (populated after analysis) @dataclass @@ -121,6 +135,10 @@ class MemoryAnalyzer: self.objdump_path = objdump_path or "objdump" self.readelf_path = readelf_path or "readelf" self.external_components = external_components or set() + self._idedata = idedata + + # Derive nm path from objdump path using shared toolchain utility + self.nm_path = find_tool("nm", self.objdump_path) self.sections: dict[str, MemorySection] = {} self.components: dict[str, ComponentMemory] = defaultdict( @@ -139,12 +157,17 @@ class MemoryAnalyzer: self._ram_symbols: dict[str, list[tuple[str, str, int, str]]] = defaultdict( list ) + # Track ELF symbol names for SDK cross-reference + self._elf_symbol_names: set[str] = set() + # SDK symbols not in ELF (static/local symbols from closed-source libs) + self._sdk_symbols: list[SDKSymbol] = [] def analyze(self) -> dict[str, ComponentMemory]: """Analyze the ELF file and return component memory usage.""" self._parse_sections() self._parse_symbols() self._categorize_symbols() + self._analyze_sdk_libraries() return dict(self.components) def _parse_sections(self) -> None: @@ -198,6 +221,8 @@ class MemoryAnalyzer: continue self.sections[section].symbols.append((name, size, "")) + self.sections[section].symbol_size += size + self._elf_symbol_names.add(name) seen_addresses.add(address) def _categorize_symbols(self) -> None: @@ -341,6 +366,247 @@ class MemoryAnalyzer: return "Other Core" + def get_unattributed_ram(self) -> tuple[int, int, int]: + """Get unattributed RAM sizes (SDK/framework overhead). + + Returns: + Tuple of (unattributed_bss, unattributed_data, total_unattributed) + These are bytes in RAM sections that have no corresponding symbols. + """ + bss_section = self.sections.get(".bss") + data_section = self.sections.get(".data") + + unattributed_bss = 0 + unattributed_data = 0 + + if bss_section: + unattributed_bss = max(0, bss_section.total_size - bss_section.symbol_size) + if data_section: + unattributed_data = max( + 0, data_section.total_size - data_section.symbol_size + ) + + return unattributed_bss, unattributed_data, unattributed_bss + unattributed_data + + def _find_sdk_library_dirs(self) -> list[Path]: + """Find SDK library directories based on platform. + + Returns: + List of paths to SDK library directories containing .a files. + """ + sdk_dirs: list[Path] = [] + + if self._idedata is None: + return sdk_dirs + + # Get the CC path to determine the framework location + cc_path = getattr(self._idedata, "cc_path", None) + if not cc_path: + return sdk_dirs + + cc_path = Path(cc_path) + + # For ESP8266 Arduino framework + # CC is like: ~/.platformio/packages/toolchain-xtensa/bin/xtensa-lx106-elf-gcc + # SDK libs are in: ~/.platformio/packages/framework-arduinoespressif8266/tools/sdk/lib/ + if "xtensa-lx106" in str(cc_path): + platformio_dir = cc_path.parent.parent.parent + esp8266_sdk = ( + platformio_dir + / "framework-arduinoespressif8266" + / "tools" + / "sdk" + / "lib" + ) + if esp8266_sdk.exists(): + sdk_dirs.append(esp8266_sdk) + # Also check for NONOSDK subdirectories (closed-source libs) + sdk_dirs.extend( + subdir + for subdir in esp8266_sdk.iterdir() + if subdir.is_dir() and subdir.name.startswith("NONOSDK") + ) + + # For ESP32 IDF framework + # CC is like: ~/.platformio/packages/toolchain-xtensa-esp-elf/bin/xtensa-esp32-elf-gcc + # or: ~/.platformio/packages/toolchain-riscv32-esp/bin/riscv32-esp-elf-gcc + elif "xtensa-esp" in str(cc_path) or "riscv32-esp" in str(cc_path): + # Detect ESP32 variant from CC path or defines + variant = self._detect_esp32_variant() + if variant: + platformio_dir = cc_path.parent.parent.parent + espidf_dir = platformio_dir / "framework-espidf" / "components" + if espidf_dir.exists(): + # Find all directories named after the variant that contain .a files + # This handles various ESP-IDF library layouts: + # - components/*/lib// + # - components/*// + # - components/*/lib/lib// + # - components/*/*/lib_*// + sdk_dirs.extend( + variant_dir + for variant_dir in espidf_dir.rglob(variant) + if variant_dir.is_dir() and any(variant_dir.glob("*.a")) + ) + + return sdk_dirs + + def _detect_esp32_variant(self) -> str | None: + """Detect ESP32 variant from idedata defines. + + Returns: + Variant string like 'esp32', 'esp32s2', 'esp32c3', etc. or None. + """ + if self._idedata is None: + return None + + defines = getattr(self._idedata, "defines", []) + if not defines: + return None + + # ESPHome always adds USE_ESP32_VARIANT_xxx defines + variant_prefix = "USE_ESP32_VARIANT_" + for define in defines: + if define.startswith(variant_prefix): + # Extract variant name and convert to lowercase + # USE_ESP32_VARIANT_ESP32 -> esp32 + # USE_ESP32_VARIANT_ESP32S3 -> esp32s3 + return define[len(variant_prefix) :].lower() + + return None + + def _parse_sdk_library( + self, lib_path: Path + ) -> tuple[list[tuple[str, int, str, bool]], set[str]]: + """Parse a single SDK library for symbols. + + Args: + lib_path: Path to the .a library file + + Returns: + Tuple of: + - List of BSS/DATA symbols: (symbol_name, size, section, is_local) + - Set of global BSS/DATA symbol names (for checking if RAM is linked) + """ + ram_symbols: list[tuple[str, int, str, bool]] = [] + global_ram_symbols: set[str] = set() + + result = run_tool([self.nm_path, "--size-sort", str(lib_path)], timeout=10) + if result is None: + return ram_symbols, global_ram_symbols + + for line in result.stdout.splitlines(): + parts = line.split() + if len(parts) < 3: + continue + + try: + size = int(parts[0], 16) + sym_type = parts[1] + name = parts[2] + + # Only collect BSS (b/B) and DATA (d/D) for RAM analysis + if sym_type in ("b", "B"): + section = ".bss" + is_local = sym_type == "b" + ram_symbols.append((name, size, section, is_local)) + # Track global RAM symbols (B/D) for linking check + if sym_type == "B": + global_ram_symbols.add(name) + elif sym_type in ("d", "D"): + section = ".data" + is_local = sym_type == "d" + ram_symbols.append((name, size, section, is_local)) + if sym_type == "D": + global_ram_symbols.add(name) + except (ValueError, IndexError): + continue + + return ram_symbols, global_ram_symbols + + def _analyze_sdk_libraries(self) -> None: + """Analyze SDK libraries to find symbols not in the ELF. + + This finds static/local symbols from closed-source SDK libraries + that consume RAM but don't appear in the final ELF symbol table. + Only includes symbols from libraries that have RAM actually linked + (at least one global BSS/DATA symbol in the ELF). + """ + sdk_dirs = self._find_sdk_library_dirs() + if not sdk_dirs: + _LOGGER.debug("No SDK library directories found") + return + + _LOGGER.debug("Analyzing SDK libraries in %d directories", len(sdk_dirs)) + + # Track seen symbols to avoid duplicates from multiple SDK versions + seen_symbols: set[str] = set() + + for sdk_dir in sdk_dirs: + for lib_path in sorted(sdk_dir.glob("*.a")): + lib_name = lib_path.name + ram_symbols, global_ram_symbols = self._parse_sdk_library(lib_path) + + # Check if this library's RAM is actually linked by seeing if any + # of its global BSS/DATA symbols appear in the ELF + if not global_ram_symbols & self._elf_symbol_names: + # No RAM from this library is in the ELF - skip it + continue + + for name, size, section, is_local in ram_symbols: + # Skip if already in ELF or already seen from another lib + if name in self._elf_symbol_names or name in seen_symbols: + continue + + # Only track symbols with non-zero size + if size > 0: + self._sdk_symbols.append( + SDKSymbol( + name=name, + size=size, + library=lib_name, + section=section, + is_local=is_local, + ) + ) + seen_symbols.add(name) + + # Demangle SDK symbols for better readability + if self._sdk_symbols: + sdk_names = [sym.name for sym in self._sdk_symbols] + demangled_map = batch_demangle(sdk_names, objdump_path=self.objdump_path) + for sym in self._sdk_symbols: + sym.demangled = demangled_map.get(sym.name, sym.name) + + # Sort by size descending for reporting + self._sdk_symbols.sort(key=lambda s: s.size, reverse=True) + + total_sdk_ram = sum(s.size for s in self._sdk_symbols) + _LOGGER.debug( + "Found %d SDK symbols not in ELF, totaling %d bytes", + len(self._sdk_symbols), + total_sdk_ram, + ) + + def get_sdk_ram_symbols(self) -> list[SDKSymbol]: + """Get SDK symbols that consume RAM but aren't in the ELF symbol table. + + Returns: + List of SDKSymbol objects sorted by size descending. + """ + return self._sdk_symbols + + def get_sdk_ram_by_library(self) -> dict[str, list[SDKSymbol]]: + """Get SDK RAM symbols grouped by library. + + Returns: + Dictionary mapping library name to list of symbols. + """ + by_lib: dict[str, list[SDKSymbol]] = defaultdict(list) + for sym in self._sdk_symbols: + by_lib[sym.library].append(sym) + return dict(by_lib) + if __name__ == "__main__": from .cli import main diff --git a/esphome/analyze_memory/cli.py b/esphome/analyze_memory/cli.py index 1332e8a892d..1e9ca66ed49 100644 --- a/esphome/analyze_memory/cli.py +++ b/esphome/analyze_memory/cli.py @@ -3,6 +3,7 @@ from __future__ import annotations from collections import defaultdict +from collections.abc import Callable import json import sys from typing import TYPE_CHECKING @@ -27,6 +28,8 @@ class MemoryAnalyzerCLI(MemoryAnalyzer): SYMBOL_SIZE_THRESHOLD: int = ( 100 # Show symbols larger than this in detailed analysis ) + # Lower threshold for RAM symbols (RAM is more constrained) + RAM_SYMBOL_SIZE_THRESHOLD: int = 24 # Column width constants COL_COMPONENT: int = 29 @@ -104,12 +107,23 @@ class MemoryAnalyzerCLI(MemoryAnalyzer): lines: list[str], title: str, components: list[tuple[str, ComponentMemory]], - get_size: callable, + get_size: Callable[[ComponentMemory], int], total: int, memory_type: str, limit: int = 25, ) -> None: - """Add a top consumers list for flash or RAM.""" + """Add a formatted list of top memory consumers to the report. + + Args: + lines: List of report lines to append the output to. + title: Section title to print before the list. + components: Sequence of (name, ComponentMemory) tuples to analyze. + get_size: Callable that takes a ComponentMemory and returns the + size in bytes to use for ranking and display. + total: Total size in bytes for computing percentage usage. + memory_type: Label for the memory region (e.g., "flash" or "RAM"). + limit: Maximum number of components to include in the list. + """ lines.append("") lines.append(f"{title}:") for i, (name, mem) in enumerate(components[:limit]): @@ -123,7 +137,12 @@ class MemoryAnalyzerCLI(MemoryAnalyzer): def _format_symbol_with_section( self, demangled: str, size: int, section: str | None = None ) -> str: - """Format a symbol entry, optionally with section label for RAM symbols.""" + """Format a symbol entry, optionally adding a RAM section label. + + If section is one of the RAM sections (.data or .bss), a label like + " [data]" or " [bss]" is appended. For non-RAM sections or when + section is None, no section label is added. + """ section_label = "" if section in RAM_SECTIONS: section_label = f" [{section[1:]}]" # .data -> [data], .bss -> [bss] @@ -169,6 +188,47 @@ class MemoryAnalyzerCLI(MemoryAnalyzer): f"{total_flash:>{self.COL_TOTAL_FLASH - 2},} B | {total_ram:>{self.COL_TOTAL_RAM - 2},} B" ) + # Show unattributed RAM (SDK/framework overhead) + unattributed_bss, unattributed_data, unattributed_total = ( + self.get_unattributed_ram() + ) + if unattributed_total > 0: + lines.append("") + lines.append( + f"Unattributed RAM: {unattributed_total:,} B (SDK/framework overhead)" + ) + if unattributed_bss > 0 and unattributed_data > 0: + lines.append( + f" .bss: {unattributed_bss:,} B | .data: {unattributed_data:,} B" + ) + + # Show SDK symbol breakdown if available + sdk_by_lib = self.get_sdk_ram_by_library() + if sdk_by_lib: + lines.append("") + lines.append("SDK library breakdown (static symbols not in ELF):") + # Sort libraries by total size + lib_totals = [ + (lib, sum(s.size for s in syms), syms) + for lib, syms in sdk_by_lib.items() + ] + lib_totals.sort(key=lambda x: x[1], reverse=True) + + for lib_name, lib_total, syms in lib_totals: + if lib_total == 0: + continue + lines.append(f" {lib_name}: {lib_total:,} B") + # Show top symbols from this library + for sym in sorted(syms, key=lambda s: s.size, reverse=True)[:3]: + section_label = sym.section.lstrip(".") + # Use demangled name (falls back to original if not demangled) + display_name = sym.demangled or sym.name + if len(display_name) > 50: + display_name = f"{display_name[:47]}..." + lines.append( + f" {sym.size:>6,} B [{section_label}] {display_name}" + ) + # Top consumers self._add_top_consumers( lines, @@ -243,6 +303,8 @@ class MemoryAnalyzerCLI(MemoryAnalyzer): f"{_COMPONENT_CORE} Symbols > {self.SYMBOL_SIZE_THRESHOLD} B ({len(large_core_symbols)} symbols):" ) for i, (symbol, demangled, size) in enumerate(large_core_symbols): + # Core symbols only track (symbol, demangled, size) without section info, + # so we don't show section labels here lines.append( f"{i + 1}. {self._format_symbol_with_section(demangled, size)}" ) @@ -340,7 +402,9 @@ class MemoryAnalyzerCLI(MemoryAnalyzer): # Sort by size descending sorted_ram_syms = sorted(ram_syms, key=lambda x: x[2], reverse=True) - large_ram_syms = [s for s in sorted_ram_syms if s[2] > 50] + large_ram_syms = [ + s for s in sorted_ram_syms if s[2] > self.RAM_SYMBOL_SIZE_THRESHOLD + ] lines.append(f"{name} ({mem.ram_total:,} B total RAM):") @@ -351,10 +415,19 @@ class MemoryAnalyzerCLI(MemoryAnalyzer): lines.append(f" .bss (uninitialized): {bss_size:,} B") if large_ram_syms: - lines.append(f" Symbols > 50 B ({len(large_ram_syms)}):") + lines.append( + f" Symbols > {self.RAM_SYMBOL_SIZE_THRESHOLD} B ({len(large_ram_syms)}):" + ) for symbol, demangled, size, section in large_ram_syms[:10]: - section_label = "data" if section == ".data" else "bss" - lines.append(f" {size:>6,} B [{section_label}] {demangled[:70]}") + # Format section label consistently by stripping leading dot + section_label = section.lstrip(".") if section else "" + # Add ellipsis if name is truncated + demangled_display = ( + f"{demangled[:70]}..." if len(demangled) > 70 else demangled + ) + lines.append( + f" {size:>6,} B [{section_label}] {demangled_display}" + ) if len(large_ram_syms) > 10: lines.append(f" ... and {len(large_ram_syms) - 10} more") lines.append("") diff --git a/esphome/analyze_memory/const.py b/esphome/analyze_memory/const.py index 8dd6664bc00..9933bd77fdf 100644 --- a/esphome/analyze_memory/const.py +++ b/esphome/analyze_memory/const.py @@ -7,11 +7,13 @@ ESPHOME_COMPONENT_PATTERN = re.compile(r"esphome::([a-zA-Z0-9_]+)::") # Section mapping for ELF file sections # Maps standard section names to their various platform-specific variants +# Note: Order matters! More specific patterns (.bss) must come before general ones (.dram) +# because ESP-IDF uses names like ".dram0.bss" which would match ".dram" otherwise SECTION_MAPPING = { ".text": frozenset([".text", ".iram"]), ".rodata": frozenset([".rodata"]), + ".bss": frozenset([".bss"]), # Must be before .data to catch ".dram0.bss" ".data": frozenset([".data", ".dram"]), - ".bss": frozenset([".bss"]), } # Section to ComponentMemory attribute mapping diff --git a/esphome/analyze_memory/toolchain.py b/esphome/analyze_memory/toolchain.py index e7662524126..23d85e97001 100644 --- a/esphome/analyze_memory/toolchain.py +++ b/esphome/analyze_memory/toolchain.py @@ -5,6 +5,10 @@ from __future__ import annotations import logging from pathlib import Path import subprocess +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from collections.abc import Sequence _LOGGER = logging.getLogger(__name__) @@ -55,3 +59,35 @@ def find_tool( _LOGGER.warning("Could not find %s tool", tool_name) return None + + +def run_tool( + cmd: Sequence[str], + timeout: int = 30, +) -> subprocess.CompletedProcess[str] | None: + """Run a toolchain command and return the result. + + Args: + cmd: Command and arguments to run + timeout: Timeout in seconds + + Returns: + CompletedProcess on success, None on failure + """ + try: + return subprocess.run( + cmd, + capture_output=True, + text=True, + timeout=timeout, + check=False, + ) + except subprocess.TimeoutExpired: + _LOGGER.warning("Command timed out: %s", " ".join(cmd)) + return None + except FileNotFoundError: + _LOGGER.warning("Command not found: %s", cmd[0]) + return None + except OSError as e: + _LOGGER.warning("Failed to run command %s: %s", cmd[0], e) + return None diff --git a/esphome/components/api/api_connection.cpp b/esphome/components/api/api_connection.cpp index 7e9ddd54d32..fb3548d117a 100644 --- a/esphome/components/api/api_connection.cpp +++ b/esphome/components/api/api_connection.cpp @@ -1828,6 +1828,10 @@ bool APIConnection::send_buffer(ProtoWriteBuffer buffer, uint8_t message_type) { // Toggle Nagle's algorithm based on message type to prevent log messages from // filling the TCP send buffer and crowding out important state updates. // + // This honors the `no_delay` proto option - SubscribeLogsResponse is the only + // message with `option (no_delay) = false;` in api.proto, indicating it should + // allow Nagle coalescing. This option existed since 2019 but was never implemented. + // // - Log messages: Enable Nagle (NODELAY=false) so small log packets coalesce // into fewer, larger packets. They flush naturally via TCP delayed ACK timer // (~200ms), buffer filling, or when a state update triggers a flush. diff --git a/esphome/components/bme68x_bsec2_i2c/bme68x_bsec2_i2c.cpp b/esphome/components/bme68x_bsec2_i2c/bme68x_bsec2_i2c.cpp index c6afc0b7c1e..2d74ba6b122 100644 --- a/esphome/components/bme68x_bsec2_i2c/bme68x_bsec2_i2c.cpp +++ b/esphome/components/bme68x_bsec2_i2c/bme68x_bsec2_i2c.cpp @@ -32,7 +32,9 @@ void BME68xBSEC2I2CComponent::dump_config() { } uint32_t BME68xBSEC2I2CComponent::get_hash() { - return fnv1_hash_extend(fnv1_hash("bme68x_bsec_state_"), static_cast(this->address_)); + char buf[22]; // "bme68x_bsec_state_" (18) + uint8_t max (3) + null + snprintf(buf, sizeof(buf), "bme68x_bsec_state_%u", this->address_); + return fnv1_hash(buf); } int8_t BME68xBSEC2I2CComponent::read_bytes_wrapper(uint8_t a_register, uint8_t *data, uint32_t len, void *intfPtr) { diff --git a/esphome/components/espnow/__init__.py b/esphome/components/espnow/__init__.py index cc2c02d4c09..1f5ca1104ad 100644 --- a/esphome/components/espnow/__init__.py +++ b/esphome/components/espnow/__init__.py @@ -66,11 +66,17 @@ CONF_WAIT_FOR_SENT = "wait_for_sent" MAX_ESPNOW_PACKET_SIZE = 250 # Maximum size of the payload in bytes +def validate_channel(value): + if value is None: + raise cv.Invalid("channel is required if wifi is not configured") + return wifi.validate_channel(value) + + CONFIG_SCHEMA = cv.All( cv.Schema( { cv.GenerateID(): cv.declare_id(ESPNowComponent), - cv.OnlyWithout(CONF_CHANNEL, CONF_WIFI): wifi.validate_channel, + cv.OnlyWithout(CONF_CHANNEL, CONF_WIFI): validate_channel, cv.Optional(CONF_ENABLE_ON_BOOT, default=True): cv.boolean, cv.Optional(CONF_AUTO_ADD_PEER, default=False): cv.boolean, cv.Optional(CONF_PEERS): cv.ensure_list(cv.mac_address), diff --git a/esphome/components/sensor/__init__.py b/esphome/components/sensor/__init__.py index 83b2656661f..2ac45a55ace 100644 --- a/esphome/components/sensor/__init__.py +++ b/esphome/components/sensor/__init__.py @@ -3,7 +3,7 @@ import math from esphome import automation import esphome.codegen as cg -from esphome.components import mqtt, web_server +from esphome.components import mqtt, web_server, zigbee import esphome.config_validation as cv from esphome.const import ( CONF_ABOVE, @@ -295,6 +295,7 @@ validate_device_class = cv.one_of(*DEVICE_CLASSES, lower=True, space="_") _SENSOR_SCHEMA = ( cv.ENTITY_BASE_SCHEMA.extend(web_server.WEBSERVER_SORTING_SCHEMA) .extend(cv.MQTT_COMPONENT_SCHEMA) + .extend(zigbee.SENSOR_SCHEMA) .extend( { cv.OnlyWith(CONF_MQTT_ID, "mqtt"): cv.declare_id(mqtt.MQTTSensorComponent), @@ -335,6 +336,7 @@ _SENSOR_SCHEMA = ( ) _SENSOR_SCHEMA.add_extra(entity_duplicate_validator("sensor")) +_SENSOR_SCHEMA.add_extra(zigbee.validate_sensor) def sensor_schema( @@ -918,6 +920,8 @@ async def setup_sensor_core_(var, config): if web_server_config := config.get(CONF_WEB_SERVER): await web_server.add_entity_config(var, web_server_config) + await zigbee.setup_sensor(var, config) + async def register_sensor(var, config): if not CORE.has_id(config[CONF_ID]): diff --git a/esphome/components/sun_gtil2/sun_gtil2.h b/esphome/components/sun_gtil2/sun_gtil2.h index ebdd2abe5ba..3e28527cf7a 100644 --- a/esphome/components/sun_gtil2/sun_gtil2.h +++ b/esphome/components/sun_gtil2/sun_gtil2.h @@ -36,7 +36,7 @@ class SunGTIL2 : public Component, public uart::UARTDevice { void set_serial_number(text_sensor::TextSensor *text_sensor) { serial_number_ = text_sensor; } #endif - static constexpr size_t STATE_BUFFER_SIZE = 16; + static constexpr size_t STATE_BUFFER_SIZE = 32; protected: const char *state_to_string_(uint8_t state, std::span buffer); diff --git a/esphome/components/tuya/text_sensor/tuya_text_sensor.cpp b/esphome/components/tuya/text_sensor/tuya_text_sensor.cpp index c71bf176a49..3c492d609d6 100644 --- a/esphome/components/tuya/text_sensor/tuya_text_sensor.cpp +++ b/esphome/components/tuya/text_sensor/tuya_text_sensor.cpp @@ -20,9 +20,10 @@ void TuyaTextSensor::setup() { break; } case TuyaDatapointType::ENUM: { - std::string data = to_string(datapoint.value_enum); - ESP_LOGD(TAG, "MCU reported text sensor %u is: %s", datapoint.id, data.c_str()); - this->publish_state(data); + char buf[4]; // uint8_t max is 3 digits + null + snprintf(buf, sizeof(buf), "%u", datapoint.value_enum); + ESP_LOGD(TAG, "MCU reported text sensor %u is: %s", datapoint.id, buf); + this->publish_state(buf); break; } default: diff --git a/esphome/components/zigbee/__init__.py b/esphome/components/zigbee/__init__.py index 2009f92d2ec..1a017f2ab27 100644 --- a/esphome/components/zigbee/__init__.py +++ b/esphome/components/zigbee/__init__.py @@ -13,14 +13,16 @@ from esphome.types import ConfigType from .const_zephyr import ( CONF_MAX_EP_NUMBER, CONF_ON_JOIN, + CONF_POWER_SOURCE, CONF_WIPE_ON_BOOT, CONF_ZIGBEE_ID, KEY_EP_NUMBER, KEY_ZIGBEE, + POWER_SOURCE, ZigbeeComponent, zigbee_ns, ) -from .zigbee_zephyr import zephyr_binary_sensor +from .zigbee_zephyr import zephyr_binary_sensor, zephyr_sensor CODEOWNERS = ["@tomaszduda23"] @@ -35,6 +37,7 @@ def zigbee_set_core_data(config: ConfigType) -> ConfigType: BINARY_SENSOR_SCHEMA = cv.Schema({}).extend(zephyr_binary_sensor) +SENSOR_SCHEMA = cv.Schema({}).extend(zephyr_sensor) CONFIG_SCHEMA = cv.All( cv.Schema( @@ -42,9 +45,15 @@ CONFIG_SCHEMA = cv.All( cv.GenerateID(CONF_ID): cv.declare_id(ZigbeeComponent), cv.Optional(CONF_ON_JOIN): automation.validate_automation(single=True), cv.Optional(CONF_WIPE_ON_BOOT, default=False): cv.All( - cv.boolean, + cv.Any( + cv.boolean, + cv.one_of(*["once"], lower=True), + ), cv.requires_component("nrf52"), ), + cv.Optional(CONF_POWER_SOURCE, default="DC_SOURCE"): cv.enum( + POWER_SOURCE, upper=True + ), } ).extend(cv.COMPONENT_SCHEMA), zigbee_set_core_data, @@ -86,7 +95,16 @@ async def setup_binary_sensor(entity: cg.MockObj, config: ConfigType) -> None: await zephyr_setup_binary_sensor(entity, config) -def validate_binary_sensor(config: ConfigType) -> ConfigType: +async def setup_sensor(entity: cg.MockObj, config: ConfigType) -> None: + if not config.get(CONF_ZIGBEE_ID) or config.get(CONF_INTERNAL): + return + if CORE.using_zephyr: + from .zigbee_zephyr import zephyr_setup_sensor + + await zephyr_setup_sensor(entity, config) + + +def consume_endpoint(config: ConfigType) -> ConfigType: if not config.get(CONF_ZIGBEE_ID) or config.get(CONF_INTERNAL): return config data: dict[str, Any] = CORE.data.setdefault(KEY_ZIGBEE, {}) @@ -95,6 +113,14 @@ def validate_binary_sensor(config: ConfigType) -> ConfigType: return config +def validate_binary_sensor(config: ConfigType) -> ConfigType: + return consume_endpoint(config) + + +def validate_sensor(config: ConfigType) -> ConfigType: + return consume_endpoint(config) + + ZIGBEE_ACTION_SCHEMA = automation.maybe_simple_id( cv.Schema( { diff --git a/esphome/components/zigbee/const_zephyr.py b/esphome/components/zigbee/const_zephyr.py index ecd08f1f0a2..8d1f229b6e0 100644 --- a/esphome/components/zigbee/const_zephyr.py +++ b/esphome/components/zigbee/const_zephyr.py @@ -3,12 +3,24 @@ import esphome.codegen as cg zigbee_ns = cg.esphome_ns.namespace("zigbee") ZigbeeComponent = zigbee_ns.class_("ZigbeeComponent", cg.Component) BinaryAttrs = zigbee_ns.struct("BinaryAttrs") +AnalogAttrs = zigbee_ns.struct("AnalogAttrs") CONF_MAX_EP_NUMBER = 8 CONF_ZIGBEE_ID = "zigbee_id" CONF_ON_JOIN = "on_join" CONF_WIPE_ON_BOOT = "wipe_on_boot" CONF_ZIGBEE_BINARY_SENSOR = "zigbee_binary_sensor" +CONF_ZIGBEE_SENSOR = "zigbee_sensor" +CONF_POWER_SOURCE = "power_source" +POWER_SOURCE = { + "UNKNOWN": "ZB_ZCL_BASIC_POWER_SOURCE_UNKNOWN", + "MAINS_SINGLE_PHASE": "ZB_ZCL_BASIC_POWER_SOURCE_MAINS_SINGLE_PHASE", + "MAINS_THREE_PHASE": "ZB_ZCL_BASIC_POWER_SOURCE_MAINS_THREE_PHASE", + "BATTERY": "ZB_ZCL_BASIC_POWER_SOURCE_BATTERY", + "DC_SOURCE": "ZB_ZCL_BASIC_POWER_SOURCE_DC_SOURCE", + "EMERGENCY_MAINS_CONST": "ZB_ZCL_BASIC_POWER_SOURCE_EMERGENCY_MAINS_CONST", + "EMERGENCY_MAINS_TRANSF": "ZB_ZCL_BASIC_POWER_SOURCE_EMERGENCY_MAINS_TRANSF", +} # Keys for CORE.data storage KEY_ZIGBEE = "zigbee" @@ -22,3 +34,4 @@ ZB_ZCL_IDENTIFY_ATTRS_T = "zb_zcl_identify_attrs_t" ZB_ZCL_CLUSTER_ID_BASIC = "ZB_ZCL_CLUSTER_ID_BASIC" ZB_ZCL_CLUSTER_ID_IDENTIFY = "ZB_ZCL_CLUSTER_ID_IDENTIFY" ZB_ZCL_CLUSTER_ID_BINARY_INPUT = "ZB_ZCL_CLUSTER_ID_BINARY_INPUT" +ZB_ZCL_CLUSTER_ID_ANALOG_INPUT = "ZB_ZCL_CLUSTER_ID_ANALOG_INPUT" diff --git a/esphome/components/zigbee/zigbee_binary_sensor_zephyr.cpp b/esphome/components/zigbee/zigbee_binary_sensor_zephyr.cpp index 744d04adc51..8b7aff70a82 100644 --- a/esphome/components/zigbee/zigbee_binary_sensor_zephyr.cpp +++ b/esphome/components/zigbee/zigbee_binary_sensor_zephyr.cpp @@ -17,9 +17,9 @@ ZigbeeBinarySensor::ZigbeeBinarySensor(binary_sensor::BinarySensor *binary_senso void ZigbeeBinarySensor::setup() { this->binary_sensor_->add_on_state_callback([this](bool state) { this->cluster_attributes_->present_value = state ? ZB_TRUE : ZB_FALSE; - ESP_LOGD(TAG, "Set attribute end point: %d, present_value %d", this->end_point_, + ESP_LOGD(TAG, "Set attribute endpoint: %d, present_value %d", this->endpoint_, this->cluster_attributes_->present_value); - ZB_ZCL_SET_ATTRIBUTE(this->end_point_, ZB_ZCL_CLUSTER_ID_BINARY_INPUT, ZB_ZCL_CLUSTER_SERVER_ROLE, + ZB_ZCL_SET_ATTRIBUTE(this->endpoint_, ZB_ZCL_CLUSTER_ID_BINARY_INPUT, ZB_ZCL_CLUSTER_SERVER_ROLE, ZB_ZCL_ATTR_BINARY_INPUT_PRESENT_VALUE_ID, &this->cluster_attributes_->present_value, ZB_FALSE); this->parent_->flush(); @@ -29,8 +29,8 @@ void ZigbeeBinarySensor::setup() { void ZigbeeBinarySensor::dump_config() { ESP_LOGCONFIG(TAG, "Zigbee Binary Sensor\n" - " End point: %d, present_value %u", - this->end_point_, this->cluster_attributes_->present_value); + " Endpoint: %d, present_value %u", + this->endpoint_, this->cluster_attributes_->present_value); } } // namespace esphome::zigbee diff --git a/esphome/components/zigbee/zigbee_sensor_zephyr.cpp b/esphome/components/zigbee/zigbee_sensor_zephyr.cpp new file mode 100644 index 00000000000..74550d6487f --- /dev/null +++ b/esphome/components/zigbee/zigbee_sensor_zephyr.cpp @@ -0,0 +1,76 @@ +#include "zigbee_sensor_zephyr.h" +#if defined(USE_ZIGBEE) && defined(USE_NRF52) && defined(USE_SENSOR) +#include "esphome/core/log.h" +extern "C" { +#include +#include +#include +#include +#include +} +namespace esphome::zigbee { + +static const char *const TAG = "zigbee.sensor"; + +ZigbeeSensor::ZigbeeSensor(sensor::Sensor *sensor) : sensor_(sensor) {} + +void ZigbeeSensor::setup() { + this->sensor_->add_on_state_callback([this](float state) { + this->cluster_attributes_->present_value = state; + ESP_LOGD(TAG, "Set attribute endpoint: %d, present_value %f", this->endpoint_, state); + ZB_ZCL_SET_ATTRIBUTE(this->endpoint_, ZB_ZCL_CLUSTER_ID_ANALOG_INPUT, ZB_ZCL_CLUSTER_SERVER_ROLE, + ZB_ZCL_ATTR_ANALOG_INPUT_PRESENT_VALUE_ID, + (zb_uint8_t *) &this->cluster_attributes_->present_value, ZB_FALSE); + this->parent_->flush(); + }); +} + +void ZigbeeSensor::dump_config() { + ESP_LOGCONFIG(TAG, + "Zigbee Sensor\n" + " Endpoint: %d, present_value %f", + this->endpoint_, this->cluster_attributes_->present_value); +} + +const zb_uint8_t ZB_ZCL_ANALOG_INPUT_STATUS_FLAG_MAX_VALUE = 0x0F; + +static zb_ret_t check_value_analog_server(zb_uint16_t attr_id, zb_uint8_t endpoint, + zb_uint8_t *value) { // NOLINT(readability-non-const-parameter) + zb_ret_t ret = RET_OK; + ZVUNUSED(endpoint); + + switch (attr_id) { + case ZB_ZCL_ATTR_ANALOG_INPUT_OUT_OF_SERVICE_ID: + ret = ZB_ZCL_CHECK_BOOL_VALUE(*value) ? RET_OK : RET_ERROR; + break; + case ZB_ZCL_ATTR_ANALOG_INPUT_PRESENT_VALUE_ID: + break; + + case ZB_ZCL_ATTR_ANALOG_INPUT_STATUS_FLAG_ID: + if (*value > ZB_ZCL_ANALOG_INPUT_STATUS_FLAG_MAX_VALUE) { + ret = RET_ERROR; + } + break; + + default: + break; + } + + return ret; +} + +} // namespace esphome::zigbee + +void zb_zcl_analog_input_init_server() { + zb_zcl_add_cluster_handlers(ZB_ZCL_CLUSTER_ID_ANALOG_INPUT, ZB_ZCL_CLUSTER_SERVER_ROLE, + esphome::zigbee::check_value_analog_server, (zb_zcl_cluster_write_attr_hook_t) NULL, + (zb_zcl_cluster_handler_t) NULL); +} + +void zb_zcl_analog_input_init_client() { + zb_zcl_add_cluster_handlers(ZB_ZCL_CLUSTER_ID_ANALOG_INPUT, ZB_ZCL_CLUSTER_CLIENT_ROLE, + (zb_zcl_cluster_check_value_t) NULL, (zb_zcl_cluster_write_attr_hook_t) NULL, + (zb_zcl_cluster_handler_t) NULL); +} + +#endif diff --git a/esphome/components/zigbee/zigbee_sensor_zephyr.h b/esphome/components/zigbee/zigbee_sensor_zephyr.h new file mode 100644 index 00000000000..37406f21d06 --- /dev/null +++ b/esphome/components/zigbee/zigbee_sensor_zephyr.h @@ -0,0 +1,86 @@ +#pragma once + +#include "esphome/components/zigbee/zigbee_zephyr.h" +#if defined(USE_ZIGBEE) && defined(USE_NRF52) && defined(USE_SENSOR) +#include "esphome/core/component.h" +#include "esphome/components/sensor/sensor.h" +extern "C" { +#include +#include +} + +enum { + ZB_ZCL_ATTR_ANALOG_INPUT_DESCRIPTION_ID = 0x001C, + ZB_ZCL_ATTR_ANALOG_INPUT_OUT_OF_SERVICE_ID = 0x0051, + ZB_ZCL_ATTR_ANALOG_INPUT_PRESENT_VALUE_ID = 0x0055, + ZB_ZCL_ATTR_ANALOG_INPUT_STATUS_FLAG_ID = 0x006F, + ZB_ZCL_ATTR_ANALOG_INPUT_ENGINEERING_UNITS_ID = 0x0075, +}; + +#define ZB_ZCL_ANALOG_INPUT_CLUSTER_REVISION_DEFAULT ((zb_uint16_t) 0x0001u) + +#define ZB_SET_ATTR_DESCR_WITH_ZB_ZCL_ATTR_ANALOG_INPUT_DESCRIPTION_ID(data_ptr) \ + { \ + ZB_ZCL_ATTR_ANALOG_INPUT_DESCRIPTION_ID, ZB_ZCL_ATTR_TYPE_CHAR_STRING, ZB_ZCL_ATTR_ACCESS_READ_ONLY, \ + (ZB_ZCL_NON_MANUFACTURER_SPECIFIC), (void *) (data_ptr) \ + } + +#define ZB_SET_ATTR_DESCR_WITH_ZB_ZCL_ATTR_ANALOG_INPUT_OUT_OF_SERVICE_ID(data_ptr) \ + { \ + ZB_ZCL_ATTR_ANALOG_INPUT_OUT_OF_SERVICE_ID, ZB_ZCL_ATTR_TYPE_BOOL, \ + ZB_ZCL_ATTR_ACCESS_READ_ONLY | ZB_ZCL_ATTR_ACCESS_WRITE_OPTIONAL, (ZB_ZCL_NON_MANUFACTURER_SPECIFIC), \ + (void *) (data_ptr) \ + } + +#define ZB_SET_ATTR_DESCR_WITH_ZB_ZCL_ATTR_ANALOG_INPUT_PRESENT_VALUE_ID(data_ptr) \ + { \ + ZB_ZCL_ATTR_ANALOG_INPUT_PRESENT_VALUE_ID, ZB_ZCL_ATTR_TYPE_SINGLE, \ + ZB_ZCL_ATTR_ACCESS_READ_ONLY | ZB_ZCL_ATTR_ACCESS_WRITE_OPTIONAL | ZB_ZCL_ATTR_ACCESS_REPORTING, \ + (ZB_ZCL_NON_MANUFACTURER_SPECIFIC), (void *) (data_ptr) \ + } + +#define ZB_SET_ATTR_DESCR_WITH_ZB_ZCL_ATTR_ANALOG_INPUT_STATUS_FLAG_ID(data_ptr) \ + { \ + ZB_ZCL_ATTR_ANALOG_INPUT_STATUS_FLAG_ID, ZB_ZCL_ATTR_TYPE_8BITMAP, \ + ZB_ZCL_ATTR_ACCESS_READ_ONLY | ZB_ZCL_ATTR_ACCESS_REPORTING, (ZB_ZCL_NON_MANUFACTURER_SPECIFIC), \ + (void *) (data_ptr) \ + } + +#define ZB_SET_ATTR_DESCR_WITH_ZB_ZCL_ATTR_ANALOG_INPUT_ENGINEERING_UNITS_ID(data_ptr) \ + { \ + ZB_ZCL_ATTR_ANALOG_INPUT_ENGINEERING_UNITS_ID, ZB_ZCL_ATTR_TYPE_16BIT_ENUM, ZB_ZCL_ATTR_ACCESS_READ_ONLY, \ + (ZB_ZCL_NON_MANUFACTURER_SPECIFIC), (void *) (data_ptr) \ + } + +#define ESPHOME_ZB_ZCL_DECLARE_ANALOG_INPUT_ATTRIB_LIST(attr_list, out_of_service, present_value, status_flag, \ + engineering_units, description) \ + ZB_ZCL_START_DECLARE_ATTRIB_LIST_CLUSTER_REVISION(attr_list, ZB_ZCL_ANALOG_INPUT) \ + ZB_ZCL_SET_ATTR_DESC(ZB_ZCL_ATTR_ANALOG_INPUT_OUT_OF_SERVICE_ID, (out_of_service)) \ + ZB_ZCL_SET_ATTR_DESC(ZB_ZCL_ATTR_ANALOG_INPUT_PRESENT_VALUE_ID, (present_value)) \ + ZB_ZCL_SET_ATTR_DESC(ZB_ZCL_ATTR_ANALOG_INPUT_STATUS_FLAG_ID, (status_flag)) \ + ZB_ZCL_SET_ATTR_DESC(ZB_ZCL_ATTR_ANALOG_INPUT_ENGINEERING_UNITS_ID, (engineering_units)) \ + ZB_ZCL_SET_ATTR_DESC(ZB_ZCL_ATTR_ANALOG_INPUT_DESCRIPTION_ID, (description)) \ + ZB_ZCL_FINISH_DECLARE_ATTRIB_LIST + +void zb_zcl_analog_input_init_server(); +void zb_zcl_analog_input_init_client(); +#define ZB_ZCL_CLUSTER_ID_ANALOG_INPUT_SERVER_ROLE_INIT zb_zcl_analog_input_init_server +#define ZB_ZCL_CLUSTER_ID_ANALOG_INPUT_CLIENT_ROLE_INIT zb_zcl_analog_input_init_client + +namespace esphome::zigbee { + +class ZigbeeSensor : public ZigbeeEntity, public Component { + public: + explicit ZigbeeSensor(sensor::Sensor *sensor); + void set_cluster_attributes(AnalogAttrs &cluster_attributes) { this->cluster_attributes_ = &cluster_attributes; } + + void setup() override; + void dump_config() override; + + protected: + AnalogAttrs *cluster_attributes_{nullptr}; + sensor::Sensor *sensor_{nullptr}; +}; + +} // namespace esphome::zigbee +#endif diff --git a/esphome/components/zigbee/zigbee_zephyr.cpp b/esphome/components/zigbee/zigbee_zephyr.cpp index c9027d0a74a..9a421aaec14 100644 --- a/esphome/components/zigbee/zigbee_zephyr.cpp +++ b/esphome/components/zigbee/zigbee_zephyr.cpp @@ -138,9 +138,26 @@ void ZigbeeComponent::setup() { } #ifdef USE_ZIGBEE_WIPE_ON_BOOT - erase_flash_(FIXED_PARTITION_ID(ZBOSS_NVRAM)); - erase_flash_(FIXED_PARTITION_ID(ZBOSS_PRODUCT_CONFIG)); - erase_flash_(FIXED_PARTITION_ID(SETTINGS_STORAGE)); + bool wipe = true; +#ifdef USE_ZIGBEE_WIPE_ON_BOOT_MAGIC + // unique hash to store preferences for this component + uint32_t hash = 88498616UL; + uint32_t wipe_value = 0; + auto wipe_pref = global_preferences->make_preference(hash, true); + if (wipe_pref.load(&wipe_value)) { + wipe = wipe_value != USE_ZIGBEE_WIPE_ON_BOOT_MAGIC; + ESP_LOGD(TAG, "Wipe value in preferences %u, in firmware %u", wipe_value, USE_ZIGBEE_WIPE_ON_BOOT_MAGIC); + } +#endif + if (wipe) { + erase_flash_(FIXED_PARTITION_ID(ZBOSS_NVRAM)); + erase_flash_(FIXED_PARTITION_ID(ZBOSS_PRODUCT_CONFIG)); + erase_flash_(FIXED_PARTITION_ID(SETTINGS_STORAGE)); +#ifdef USE_ZIGBEE_WIPE_ON_BOOT_MAGIC + wipe_value = USE_ZIGBEE_WIPE_ON_BOOT_MAGIC; + wipe_pref.save(&wipe_value); +#endif + } #endif ZB_ZCL_REGISTER_DEVICE_CB(zcl_device_cb); @@ -152,15 +169,54 @@ void ZigbeeComponent::setup() { zigbee_enable(); } -void ZigbeeComponent::dump_config() { - bool wipe = false; +static const char *role() { + switch (zb_get_network_role()) { + case ZB_NWK_DEVICE_TYPE_COORDINATOR: + return "coordinator"; + case ZB_NWK_DEVICE_TYPE_ROUTER: + return "router"; + case ZB_NWK_DEVICE_TYPE_ED: + return "end device"; + } + return "unknown"; +} + +static const char *get_wipe_on_boot() { #ifdef USE_ZIGBEE_WIPE_ON_BOOT - wipe = true; +#ifdef USE_ZIGBEE_WIPE_ON_BOOT_MAGIC + return "ONCE"; +#else + return "YES"; #endif +#else + return "NO"; +#endif +} + +void ZigbeeComponent::dump_config() { + char ieee_addr_buf[IEEE_ADDR_BUF_SIZE] = {0}; + zb_ieee_addr_t addr; + zb_get_long_address(addr); + ieee_addr_to_str(ieee_addr_buf, sizeof(ieee_addr_buf), addr); + zb_ext_pan_id_t extended_pan_id; + char extended_pan_id_buf[IEEE_ADDR_BUF_SIZE] = {0}; + zb_get_extended_pan_id(extended_pan_id); + ieee_addr_to_str(extended_pan_id_buf, sizeof(extended_pan_id_buf), extended_pan_id); ESP_LOGCONFIG(TAG, "Zigbee\n" - " Wipe on boot: %s", - YESNO(wipe)); + " Wipe on boot: %s\n" + " Device is joined to the network: %s\n" + " Current channel: %d\n" + " Current page: %d\n" + " Sleep threshold: %ums\n" + " Role: %s\n" + " Long addr: 0x%s\n" + " Short addr: 0x%04X\n" + " Long pan id: 0x%s\n" + " Short pan id: 0x%04X", + get_wipe_on_boot(), YESNO(zb_zdo_joined()), zb_get_current_channel(), zb_get_current_page(), + zb_get_sleep_threshold(), role(), ieee_addr_buf, zb_get_short_address(), extended_pan_id_buf, + zb_get_pan_id()); } static void send_attribute_report(zb_bufid_t bufid, zb_uint16_t cmd_id) { diff --git a/esphome/components/zigbee/zigbee_zephyr.h b/esphome/components/zigbee/zigbee_zephyr.h index 853c6deb4d4..fa23907bf4b 100644 --- a/esphome/components/zigbee/zigbee_zephyr.h +++ b/esphome/components/zigbee/zigbee_zephyr.h @@ -28,16 +28,15 @@ extern "C" { ESPHOME_CAT7(zb_af_simple_desc_, ep_name, _, in_num, _, out_num, _t) // needed to use ESPHOME_ZB_DECLARE_SIMPLE_DESC -#define ESPHOME_ZB_ZCL_DECLARE_SIMPLE_DESC(ep_name, ep_id, in_clust_num, out_clust_num, ...) \ +#define ESPHOME_ZB_ZCL_DECLARE_SIMPLE_DESC(ep_name, ep_id, in_clust_num, out_clust_num, app_device_id, ...) \ ESPHOME_ZB_DECLARE_SIMPLE_DESC(ep_name, in_clust_num, out_clust_num); \ ESPHOME_ZB_AF_SIMPLE_DESC_TYPE(ep_name, in_clust_num, out_clust_num) \ - simple_desc_##ep_name = {ep_id, ZB_AF_HA_PROFILE_ID, ZB_HA_SIMPLE_SENSOR_DEVICE_ID, 0, 0, in_clust_num, \ - out_clust_num, {__VA_ARGS__}} + simple_desc_##ep_name = {ep_id, ZB_AF_HA_PROFILE_ID, app_device_id, 0, 0, in_clust_num, out_clust_num, {__VA_ARGS__}} // needed to use ESPHOME_ZB_ZCL_DECLARE_SIMPLE_DESC #define ESPHOME_ZB_HA_DECLARE_EP(ep_name, ep_id, cluster_list, in_cluster_num, out_cluster_num, report_attr_count, \ - ...) \ - ESPHOME_ZB_ZCL_DECLARE_SIMPLE_DESC(ep_name, ep_id, in_cluster_num, out_cluster_num, __VA_ARGS__); \ + app_device_id, ...) \ + ESPHOME_ZB_ZCL_DECLARE_SIMPLE_DESC(ep_name, ep_id, in_cluster_num, out_cluster_num, app_device_id, __VA_ARGS__); \ ZBOSS_DEVICE_DECLARE_REPORTING_CTX(reporting_info##ep_name, report_attr_count); \ ZB_AF_DECLARE_ENDPOINT_DESC(ep_name, ep_id, ZB_AF_HA_PROFILE_ID, 0, NULL, \ ZB_ZCL_ARRAY_SIZE(cluster_list, zb_zcl_cluster_desc_t), cluster_list, \ @@ -57,10 +56,8 @@ struct AnalogAttrs { zb_bool_t out_of_service; float present_value; zb_uint8_t status_flags; + zb_uint16_t engineering_units; zb_uchar_t description[ZB_ZCL_MAX_STRING_SIZE]; - float max_present_value; - float min_present_value; - float resolution; }; class ZigbeeComponent : public Component { @@ -93,10 +90,10 @@ class ZigbeeComponent : public Component { class ZigbeeEntity { public: void set_parent(ZigbeeComponent *parent) { this->parent_ = parent; } - void set_end_point(zb_uint8_t end_point) { this->end_point_ = end_point; } + void set_endpoint(zb_uint8_t endpoint) { this->endpoint_ = endpoint; } protected: - zb_uint8_t end_point_{0}; + zb_uint8_t endpoint_{0}; ZigbeeComponent *parent_{nullptr}; }; diff --git a/esphome/components/zigbee/zigbee_zephyr.py b/esphome/components/zigbee/zigbee_zephyr.py index ce55675c41d..d8a2716603c 100644 --- a/esphome/components/zigbee/zigbee_zephyr.py +++ b/esphome/components/zigbee/zigbee_zephyr.py @@ -1,10 +1,45 @@ from datetime import datetime +import random from esphome import automation import esphome.codegen as cg from esphome.components.zephyr import zephyr_add_prj_conf import esphome.config_validation as cv -from esphome.const import CONF_ID, CONF_NAME, __version__ +from esphome.const import ( + CONF_ID, + CONF_NAME, + CONF_UNIT_OF_MEASUREMENT, + UNIT_AMPERE, + UNIT_CELSIUS, + UNIT_CENTIMETER, + UNIT_DECIBEL, + UNIT_HECTOPASCAL, + UNIT_HERTZ, + UNIT_HOUR, + UNIT_KELVIN, + UNIT_KILOMETER, + UNIT_KILOWATT, + UNIT_KILOWATT_HOURS, + UNIT_LUX, + UNIT_METER, + UNIT_MICROGRAMS_PER_CUBIC_METER, + UNIT_MILLIAMP, + UNIT_MILLIGRAMS_PER_CUBIC_METER, + UNIT_MILLIMETER, + UNIT_MILLISECOND, + UNIT_MILLIVOLT, + UNIT_MINUTE, + UNIT_OHM, + UNIT_PARTS_PER_BILLION, + UNIT_PARTS_PER_MILLION, + UNIT_PASCAL, + UNIT_PERCENT, + UNIT_SECOND, + UNIT_VOLT, + UNIT_WATT, + UNIT_WATT_HOURS, + __version__, +) from esphome.core import CORE, CoroPriority, coroutine_with_priority from esphome.cpp_generator import ( AssignmentExpression, @@ -15,22 +50,63 @@ from esphome.types import ConfigType from .const_zephyr import ( CONF_ON_JOIN, + CONF_POWER_SOURCE, CONF_WIPE_ON_BOOT, CONF_ZIGBEE_BINARY_SENSOR, CONF_ZIGBEE_ID, + CONF_ZIGBEE_SENSOR, KEY_EP_NUMBER, KEY_ZIGBEE, + POWER_SOURCE, ZB_ZCL_BASIC_ATTRS_EXT_T, + ZB_ZCL_CLUSTER_ID_ANALOG_INPUT, ZB_ZCL_CLUSTER_ID_BASIC, ZB_ZCL_CLUSTER_ID_BINARY_INPUT, ZB_ZCL_CLUSTER_ID_IDENTIFY, ZB_ZCL_IDENTIFY_ATTRS_T, + AnalogAttrs, BinaryAttrs, ZigbeeComponent, zigbee_ns, ) ZigbeeBinarySensor = zigbee_ns.class_("ZigbeeBinarySensor", cg.Component) +ZigbeeSensor = zigbee_ns.class_("ZigbeeSensor", cg.Component) + +# BACnet engineering units mapping (ZCL uses BACnet unit codes) +# See: https://github.com/zigpy/zha/blob/dev/zha/application/platforms/number/bacnet.py +BACNET_UNITS = { + UNIT_CELSIUS: 62, + UNIT_KELVIN: 63, + UNIT_VOLT: 5, + UNIT_MILLIVOLT: 124, + UNIT_AMPERE: 3, + UNIT_MILLIAMP: 2, + UNIT_OHM: 4, + UNIT_WATT: 47, + UNIT_KILOWATT: 48, + UNIT_WATT_HOURS: 18, + UNIT_KILOWATT_HOURS: 19, + UNIT_PASCAL: 53, + UNIT_HECTOPASCAL: 133, + UNIT_HERTZ: 27, + UNIT_MILLIMETER: 30, + UNIT_CENTIMETER: 118, + UNIT_METER: 31, + UNIT_KILOMETER: 193, + UNIT_MILLISECOND: 159, + UNIT_SECOND: 73, + UNIT_MINUTE: 72, + UNIT_HOUR: 71, + UNIT_PARTS_PER_MILLION: 96, + UNIT_PARTS_PER_BILLION: 97, + UNIT_MICROGRAMS_PER_CUBIC_METER: 219, + UNIT_MILLIGRAMS_PER_CUBIC_METER: 218, + UNIT_LUX: 37, + UNIT_DECIBEL: 199, + UNIT_PERCENT: 98, +} +BACNET_UNIT_NO_UNITS = 95 zephyr_binary_sensor = cv.Schema( { @@ -41,6 +117,15 @@ zephyr_binary_sensor = cv.Schema( } ) +zephyr_sensor = cv.Schema( + { + cv.OnlyWith(CONF_ZIGBEE_ID, ["nrf52", "zigbee"]): cv.use_id(ZigbeeComponent), + cv.OnlyWith(CONF_ZIGBEE_SENSOR, ["nrf52", "zigbee"]): cv.declare_id( + ZigbeeSensor + ), + } +) + async def zephyr_to_code(config: ConfigType) -> None: zephyr_add_prj_conf("ZIGBEE", True) @@ -56,6 +141,10 @@ async def zephyr_to_code(config: ConfigType) -> None: zephyr_add_prj_conf("NET_UDP", False) if config[CONF_WIPE_ON_BOOT]: + if config[CONF_WIPE_ON_BOOT] == "once": + cg.add_define( + "USE_ZIGBEE_WIPE_ON_BOOT_MAGIC", random.randint(0x000001, 0xFFFFFF) + ) cg.add_define("USE_ZIGBEE_WIPE_ON_BOOT") var = cg.new_Pvariable(config[CONF_ID]) @@ -85,7 +174,7 @@ async def _attr_to_code(config: ConfigType) -> None: ), zigbee_assign( basic_attrs.power_source, - cg.RawExpression("ZB_ZCL_BASIC_POWER_SOURCE_DC_SOURCE"), + cg.RawExpression(POWER_SOURCE[config[CONF_POWER_SOURCE]]), ), zigbee_set_string(basic_attrs.location_id, ""), zigbee_assign( @@ -191,6 +280,7 @@ def zigbee_register_ep( report_attr_count: int, clusters: list[ZigbeeClusterDesc], slot_index: int, + app_device_id: str, ) -> None: """Register a Zigbee endpoint.""" in_cluster_num = sum(1 for c in clusters if c.has_attrs) @@ -204,7 +294,7 @@ def zigbee_register_ep( ep_id = slot_index + 1 # Endpoints are 1-indexed obj = cg.RawExpression( f"ESPHOME_ZB_HA_DECLARE_EP({ep_name}, {ep_id}, {cluster_list_name}, " - f"{in_cluster_num}, {out_cluster_num}, {report_attr_count}, {', '.join(cluster_ids)})" + f"{in_cluster_num}, {out_cluster_num}, {report_attr_count}, {app_device_id}, {', '.join(cluster_ids)})" ) CORE.add_global(obj) @@ -224,42 +314,102 @@ async def zephyr_setup_binary_sensor(entity: cg.MockObj, config: ConfigType) -> CORE.add_job(_add_binary_sensor, entity, config) -async def _add_binary_sensor(entity: cg.MockObj, config: ConfigType) -> None: - # Find the next available endpoint slot - slot_index = next( +async def zephyr_setup_sensor(entity: cg.MockObj, config: ConfigType) -> None: + CORE.add_job(_add_sensor, entity, config) + + +def _slot_index() -> int: + """Find the next available endpoint slot""" + slot = next( (i for i, v in enumerate(CORE.data[KEY_ZIGBEE][KEY_EP_NUMBER]) if v == ""), None ) + if slot is None: + raise cv.Invalid( + f"Not found empty slot, size ({len(CORE.data[KEY_ZIGBEE][KEY_EP_NUMBER])})" + ) + return slot + + +async def _add_zigbee_input( + entity: cg.MockObj, + config: ConfigType, + component_key, + attrs_type, + zcl_macro: str, + cluster_id: str, + app_device_id: str, + extra_field_values: dict[str, int] | None = None, +) -> None: + slot_index = _slot_index() - # Create unique names for this sensor's variables based on slot index prefix = f"zigbee_ep{slot_index + 1}" - attrs_name = f"{prefix}_binary_attrs" - attr_list_name = f"{prefix}_binary_input_attrib_list" + attrs_name = f"{prefix}_attrs" + attr_list_name = f"{prefix}_attrib_list" cluster_list_name = f"{prefix}_cluster_list" ep_name = f"{prefix}_ep" - # Create the binary attributes structure - binary_attrs = zigbee_new_variable(attrs_name, BinaryAttrs) - attr_list = zigbee_new_attr_list( - attr_list_name, - "ESPHOME_ZB_ZCL_DECLARE_BINARY_INPUT_ATTRIB_LIST", - zigbee_assign(binary_attrs.out_of_service, 0), - zigbee_assign(binary_attrs.present_value, 0), - zigbee_assign(binary_attrs.status_flags, 0), - zigbee_set_string(binary_attrs.description, config[CONF_NAME]), - ) + # Create attribute struct + attrs = zigbee_new_variable(attrs_name, attrs_type) + + # Build attribute list args + attr_args = [ + zigbee_assign(attrs.out_of_service, 0), + zigbee_assign(attrs.present_value, 0), + zigbee_assign(attrs.status_flags, 0), + ] + # Add extra field assignments (e.g., engineering_units for sensors) + if extra_field_values: + for field_name, value in extra_field_values.items(): + attr_args.append(zigbee_assign(getattr(attrs, field_name), value)) + attr_args.append(zigbee_set_string(attrs.description, config[CONF_NAME])) + + # Create attribute list + attr_list = zigbee_new_attr_list(attr_list_name, zcl_macro, *attr_args) # Create cluster list and register endpoint cluster_list_name, clusters = zigbee_new_cluster_list( cluster_list_name, - [ZigbeeClusterDesc(ZB_ZCL_CLUSTER_ID_BINARY_INPUT, attr_list)], + [ZigbeeClusterDesc(cluster_id, attr_list)], + ) + zigbee_register_ep( + ep_name, cluster_list_name, 2, clusters, slot_index, app_device_id ) - zigbee_register_ep(ep_name, cluster_list_name, 2, clusters, slot_index) - # Create the ZigbeeBinarySensor component - var = cg.new_Pvariable(config[CONF_ZIGBEE_BINARY_SENSOR], entity) - await cg.register_component(var, config) + # Create ESPHome component + var = cg.new_Pvariable(config[component_key], entity) + await cg.register_component(var, {}) + + cg.add(var.set_endpoint(slot_index + 1)) + cg.add(var.set_cluster_attributes(attrs)) - cg.add(var.set_end_point(slot_index + 1)) - cg.add(var.set_cluster_attributes(binary_attrs)) hub = await cg.get_variable(config[CONF_ZIGBEE_ID]) cg.add(var.set_parent(hub)) + + +async def _add_binary_sensor(entity: cg.MockObj, config: ConfigType) -> None: + await _add_zigbee_input( + entity, + config, + CONF_ZIGBEE_BINARY_SENSOR, + BinaryAttrs, + "ESPHOME_ZB_ZCL_DECLARE_BINARY_INPUT_ATTRIB_LIST", + ZB_ZCL_CLUSTER_ID_BINARY_INPUT, + "ZB_HA_SIMPLE_SENSOR_DEVICE_ID", + ) + + +async def _add_sensor(entity: cg.MockObj, config: ConfigType) -> None: + # Get BACnet engineering unit from unit_of_measurement + unit = config.get(CONF_UNIT_OF_MEASUREMENT, "") + bacnet_unit = BACNET_UNITS.get(unit, BACNET_UNIT_NO_UNITS) + + await _add_zigbee_input( + entity, + config, + CONF_ZIGBEE_SENSOR, + AnalogAttrs, + "ESPHOME_ZB_ZCL_DECLARE_ANALOG_INPUT_ATTRIB_LIST", + ZB_ZCL_CLUSTER_ID_ANALOG_INPUT, + "ZB_HA_CUSTOM_ATTR_DEVICE_ID", + extra_field_values={"engineering_units": bacnet_unit}, + ) diff --git a/esphome/components/zwave_proxy/zwave_proxy.cpp b/esphome/components/zwave_proxy/zwave_proxy.cpp index f8e0f580e98..8506b19e7f4 100644 --- a/esphome/components/zwave_proxy/zwave_proxy.cpp +++ b/esphome/components/zwave_proxy/zwave_proxy.cpp @@ -105,7 +105,7 @@ void ZWaveProxy::process_uart_() { this->buffer_[1] >= ZWAVE_MIN_GET_NETWORK_IDS_LENGTH && this->buffer_[0] == ZWAVE_FRAME_TYPE_START) { // Store the 4-byte Home ID, which starts at offset 4, and notify connected clients if it changed // The frame parser has already validated the checksum and ensured all bytes are present - if (this->set_home_id(&this->buffer_[4])) { + if (this->set_home_id_(&this->buffer_[4])) { this->send_homeid_changed_msg_(); } } @@ -165,7 +165,7 @@ void ZWaveProxy::zwave_proxy_request(api::APIConnection *api_connection, api::en } } -bool ZWaveProxy::set_home_id(const uint8_t *new_home_id) { +bool ZWaveProxy::set_home_id_(const uint8_t *new_home_id) { if (std::memcmp(this->home_id_.data(), new_home_id, this->home_id_.size()) == 0) { ESP_LOGV(TAG, "Home ID unchanged"); return false; // No change @@ -178,19 +178,28 @@ bool ZWaveProxy::set_home_id(const uint8_t *new_home_id) { } void ZWaveProxy::send_frame(const uint8_t *data, size_t length) { - if (length == 1 && data[0] == this->last_response_) { - ESP_LOGV(TAG, "Skipping sending duplicate response: 0x%02X", data[0]); + // Safety: validate pointer before any access + if (data == nullptr) { + ESP_LOGE(TAG, "Null data pointer"); return; } - if (length && data != nullptr) { -#if ESPHOME_LOG_LEVEL >= ESPHOME_LOG_LEVEL_VERY_VERBOSE - char hex_buf[format_hex_pretty_size(ZWAVE_MAX_LOG_BYTES)]; -#endif - ESP_LOGVV(TAG, "Sending: %s", format_hex_pretty_to(hex_buf, data, length)); - this->write_array(data, length); - } else { - ESP_LOGE(TAG, "Null pointer or length 0"); + if (length == 0) { + ESP_LOGE(TAG, "Length 0"); + return; } + + // Skip duplicate single-byte responses (ACK/NAK/CAN) + if (length == 1 && data[0] == this->last_response_) { + ESP_LOGV(TAG, "Response already sent: 0x%02X", data[0]); + return; + } + +#if ESPHOME_LOG_LEVEL >= ESPHOME_LOG_LEVEL_VERY_VERBOSE + char hex_buf[format_hex_pretty_size(ZWAVE_MAX_LOG_BYTES)]; +#endif + ESP_LOGVV(TAG, "Sending: %s", format_hex_pretty_to(hex_buf, data, length)); + + this->write_array(data, length); } void ZWaveProxy::send_homeid_changed_msg_(api::APIConnection *conn) { diff --git a/esphome/components/zwave_proxy/zwave_proxy.h b/esphome/components/zwave_proxy/zwave_proxy.h index f36287d32af..eb26316f492 100644 --- a/esphome/components/zwave_proxy/zwave_proxy.h +++ b/esphome/components/zwave_proxy/zwave_proxy.h @@ -60,11 +60,11 @@ class ZWaveProxy : public uart::UARTDevice, public Component { uint32_t get_home_id() { return encode_uint32(this->home_id_[0], this->home_id_[1], this->home_id_[2], this->home_id_[3]); } - bool set_home_id(const uint8_t *new_home_id); // Store a new home ID. Returns true if it changed. void send_frame(const uint8_t *data, size_t length); protected: + bool set_home_id_(const uint8_t *new_home_id); // Store a new home ID. Returns true if it changed. void send_homeid_changed_msg_(api::APIConnection *conn = nullptr); void send_simple_command_(uint8_t command_id); bool parse_byte_(uint8_t byte); // Returns true if frame parsing was completed (a frame is ready in the buffer) diff --git a/esphome/core/defines.h b/esphome/core/defines.h index 50def602cb8..ae94f6ef5f5 100644 --- a/esphome/core/defines.h +++ b/esphome/core/defines.h @@ -307,6 +307,7 @@ #define USE_SOFTDEVICE_VERSION 1 #define USE_ZIGBEE #define USE_ZIGBEE_WIPE_ON_BOOT +#define USE_ZIGBEE_WIPE_ON_BOOT_MAGIC 1 #define ZIGBEE_ENDPOINTS_COUNT 8 #endif diff --git a/esphome/core/helpers.h b/esphome/core/helpers.h index 10070dececc..ffbd5b9b785 100644 --- a/esphome/core/helpers.h +++ b/esphome/core/helpers.h @@ -418,8 +418,10 @@ inline uint32_t fnv1a_hash_extend(uint32_t hash, const std::string &str) { } /// Extend a FNV-1a hash with an integer (hashes each byte). template constexpr uint32_t fnv1a_hash_extend(uint32_t hash, T value) { + using UnsignedT = std::make_unsigned_t; + UnsignedT uvalue = static_cast(value); for (size_t i = 0; i < sizeof(T); i++) { - hash ^= (value >> (i * 8)) & 0xFF; + hash ^= (uvalue >> (i * 8)) & 0xFF; hash *= FNV1_PRIME; } return hash; diff --git a/esphome/platformio_api.py b/esphome/platformio_api.py index d7e2f830723..bc241d39e2f 100644 --- a/esphome/platformio_api.py +++ b/esphome/platformio_api.py @@ -430,6 +430,11 @@ class IDEData: else f"{path[:-3]}readelf" ) + @property + def defines(self) -> list[str]: + """Return the list of preprocessor defines from idedata.""" + return self.raw.get("defines", []) + def analyze_memory_usage(config: dict[str, Any]) -> None: """Analyze memory usage by component after compilation.""" diff --git a/tests/components/zigbee/common.yaml b/tests/components/zigbee/common.yaml index eb30205446a..c91569bdbe8 100644 --- a/tests/components/zigbee/common.yaml +++ b/tests/components/zigbee/common.yaml @@ -15,10 +15,14 @@ binary_sensor: - platform: template name: "Garage Door Open 7" internal: True + +sensor: - platform: template - name: "Garage Door Open 8" + name: "Analog 1" + lambda: return 10.0; - platform: template - name: "Garage Door Open 9" + name: "Analog 2" + lambda: return 11.0; zigbee: wipe_on_boot: true diff --git a/tests/components/zigbee/test.nrf52-xiao-ble.yaml b/tests/components/zigbee/test.nrf52-xiao-ble.yaml index dade44d145b..d2ce552de33 100644 --- a/tests/components/zigbee/test.nrf52-xiao-ble.yaml +++ b/tests/components/zigbee/test.nrf52-xiao-ble.yaml @@ -1 +1,5 @@ <<: !include common.yaml + +zigbee: + wipe_on_boot: once + power_source: battery diff --git a/tests/unit_tests/test_main.py b/tests/unit_tests/test_main.py index 1db79446539..fd8f04ded5e 100644 --- a/tests/unit_tests/test_main.py +++ b/tests/unit_tests/test_main.py @@ -2482,6 +2482,7 @@ def test_command_analyze_memory_success( "/path/to/objdump", "/path/to/readelf", set(), # No external components + idedata=mock_get_idedata.return_value, ) # Verify analysis was run @@ -2551,6 +2552,7 @@ def test_command_analyze_memory_with_external_components( "/path/to/objdump", "/path/to/readelf", {"my_custom_component"}, # External component detected + idedata=mock_get_idedata.return_value, ) @@ -2853,16 +2855,31 @@ class MockSerial: return 0 def read(self, size: int = 1) -> bytes: - """Read next chunk of data.""" + """Read up to size bytes from the current chunk. + + This method respects the size argument and keeps any unconsumed + bytes in the current chunk so that subsequent calls to in_waiting + and read see the remaining data. + """ if self.chunk_index < len(self.chunks): chunk = self.chunks[self.chunk_index] - self.chunk_index += 1 if chunk is MOCK_SERIAL_END: # Sentinel means we're done - simulate port closed import serial raise serial.SerialException("Port closed") - return chunk # type: ignore[return-value] + # Respect the requested size and keep any remaining bytes + if size <= 0: + return b"" + data = chunk[:size] # type: ignore[index] + remaining = chunk[size:] # type: ignore[index] + if remaining: + # Keep remaining bytes for the next read + self.chunks[self.chunk_index] = remaining # type: ignore[assignment] + else: + # Entire chunk consumed; advance to the next one + self.chunk_index += 1 + return data # type: ignore[return-value] import serial raise serial.SerialException("Port closed") @@ -3104,3 +3121,54 @@ def test_run_miniterm_baud_rate_zero_returns_early( assert result == 1 assert "UART logging is disabled" in caplog.text + + +def test_run_miniterm_buffer_limit_prevents_unbounded_growth() -> None: + """Test that buffer is limited to prevent unbounded memory growth. + + If a device sends data without newlines, the buffer should be truncated + to SERIAL_BUFFER_MAX_SIZE to prevent memory exhaustion. + """ + # Use a small buffer limit for testing + test_buffer_limit = 100 + + # Create data larger than the limit without newlines + large_data_no_newline = b"X" * 150 # 150 bytes, no newline + final_line = b"END\r\n" + + mock_serial = MockSerial([large_data_no_newline, final_line, MOCK_SERIAL_END]) + + config = { + CONF_LOGGER: { + CONF_BAUD_RATE: 115200, + "deassert_rts_dtr": False, + } + } + args = MockArgs() + + with ( + patch("serial.Serial", return_value=mock_serial), + patch.object(platformio_api, "process_stacktrace") as mock_bt, + patch("esphome.__main__.safe_print") as mock_print, + patch("esphome.__main__.SERIAL_BUFFER_MAX_SIZE", test_buffer_limit), + ): + mock_bt.return_value = False + run_miniterm(config, "/dev/ttyUSB0", args) + + # Should have printed exactly one line + assert mock_print.call_count == 1 + printed_line = mock_print.call_args[0][0] + + # The line should contain "END" and some X's, but not all 150 X's + # because the buffer was truncated + assert "END" in printed_line + assert "X" in printed_line + # Verify truncation happened - we shouldn't have all 150 X's + # The buffer logic is: + # 1. Add 150 X's -> buffer = 150 bytes -> truncate to last 100 = 100 X's + # 2. Add "END\r\n" (5 bytes) -> buffer = 105 bytes -> truncate to last 100 + # = 95 X's + "END\r\n" + # 3. Find newline, extract line = "95 X's + END" + x_count = printed_line.count("X") + assert x_count < 150, f"Expected truncation but got {x_count} X's" + assert x_count == 95, f"Expected 95 X's after truncation but got {x_count}"