Merge remote-tracking branch 'upstream/dev' into integration

This commit is contained in:
J. Nick Koston
2026-01-07 11:39:50 -10:00
27 changed files with 987 additions and 88 deletions
+10 -2
View File
@@ -62,6 +62,9 @@ from esphome.util import (
_LOGGER = logging.getLogger(__name__)
# Maximum buffer size for serial log reading to prevent unbounded memory growth
SERIAL_BUFFER_MAX_SIZE = 65536
# Special non-component keys that appear in configs
_NON_COMPONENT_KEYS = frozenset(
{
@@ -440,11 +443,15 @@ def run_miniterm(config: ConfigType, port: str, args) -> int:
if not chunk:
continue
time_ = datetime.now()
nanoseconds = time_.microsecond // 1000
time_str = f"[{time_.hour:02}:{time_.minute:02}:{time_.second:02}.{nanoseconds:03}]"
milliseconds = time_.microsecond // 1000
time_str = f"[{time_.hour:02}:{time_.minute:02}:{time_.second:02}.{milliseconds:03}]"
# Add to buffer and process complete lines
# Limit buffer size to prevent unbounded memory growth
# if device sends data without newlines
buffer += chunk
if len(buffer) > SERIAL_BUFFER_MAX_SIZE:
buffer = buffer[-SERIAL_BUFFER_MAX_SIZE:]
while b"\n" in buffer:
raw_line, buffer = buffer.split(b"\n", 1)
line = raw_line.replace(b"\r", b"").decode(
@@ -1032,6 +1039,7 @@ def command_analyze_memory(args: ArgsProtocol, config: ConfigType) -> int:
idedata.objdump_path,
idedata.readelf_path,
external_components,
idedata=idedata,
)
analyzer.analyze()
+267 -1
View File
@@ -22,6 +22,7 @@ from .helpers import (
map_section_name,
parse_symbol_line,
)
from .toolchain import find_tool, run_tool
if TYPE_CHECKING:
from esphome.platformio_api import IDEData
@@ -63,7 +64,20 @@ class MemorySection:
name: str
symbols: list[SymbolInfoType] = field(default_factory=list)
total_size: int = 0
total_size: int = 0 # Actual section size from ELF headers
symbol_size: int = 0 # Sum of symbol sizes (may be less than total_size)
@dataclass
class SDKSymbol:
"""Represents a symbol from an SDK library that's not in the ELF symbol table."""
name: str
size: int
library: str # Name of the .a file (e.g., "libpp.a")
section: str # ".bss" or ".data"
is_local: bool # True if static/local symbol (lowercase in nm output)
demangled: str = "" # Demangled name (populated after analysis)
@dataclass
@@ -121,6 +135,10 @@ class MemoryAnalyzer:
self.objdump_path = objdump_path or "objdump"
self.readelf_path = readelf_path or "readelf"
self.external_components = external_components or set()
self._idedata = idedata
# Derive nm path from objdump path using shared toolchain utility
self.nm_path = find_tool("nm", self.objdump_path)
self.sections: dict[str, MemorySection] = {}
self.components: dict[str, ComponentMemory] = defaultdict(
@@ -139,12 +157,17 @@ class MemoryAnalyzer:
self._ram_symbols: dict[str, list[tuple[str, str, int, str]]] = defaultdict(
list
)
# Track ELF symbol names for SDK cross-reference
self._elf_symbol_names: set[str] = set()
# SDK symbols not in ELF (static/local symbols from closed-source libs)
self._sdk_symbols: list[SDKSymbol] = []
def analyze(self) -> dict[str, ComponentMemory]:
"""Analyze the ELF file and return component memory usage."""
self._parse_sections()
self._parse_symbols()
self._categorize_symbols()
self._analyze_sdk_libraries()
return dict(self.components)
def _parse_sections(self) -> None:
@@ -198,6 +221,8 @@ class MemoryAnalyzer:
continue
self.sections[section].symbols.append((name, size, ""))
self.sections[section].symbol_size += size
self._elf_symbol_names.add(name)
seen_addresses.add(address)
def _categorize_symbols(self) -> None:
@@ -341,6 +366,247 @@ class MemoryAnalyzer:
return "Other Core"
def get_unattributed_ram(self) -> tuple[int, int, int]:
"""Get unattributed RAM sizes (SDK/framework overhead).
Returns:
Tuple of (unattributed_bss, unattributed_data, total_unattributed)
These are bytes in RAM sections that have no corresponding symbols.
"""
bss_section = self.sections.get(".bss")
data_section = self.sections.get(".data")
unattributed_bss = 0
unattributed_data = 0
if bss_section:
unattributed_bss = max(0, bss_section.total_size - bss_section.symbol_size)
if data_section:
unattributed_data = max(
0, data_section.total_size - data_section.symbol_size
)
return unattributed_bss, unattributed_data, unattributed_bss + unattributed_data
def _find_sdk_library_dirs(self) -> list[Path]:
"""Find SDK library directories based on platform.
Returns:
List of paths to SDK library directories containing .a files.
"""
sdk_dirs: list[Path] = []
if self._idedata is None:
return sdk_dirs
# Get the CC path to determine the framework location
cc_path = getattr(self._idedata, "cc_path", None)
if not cc_path:
return sdk_dirs
cc_path = Path(cc_path)
# For ESP8266 Arduino framework
# CC is like: ~/.platformio/packages/toolchain-xtensa/bin/xtensa-lx106-elf-gcc
# SDK libs are in: ~/.platformio/packages/framework-arduinoespressif8266/tools/sdk/lib/
if "xtensa-lx106" in str(cc_path):
platformio_dir = cc_path.parent.parent.parent
esp8266_sdk = (
platformio_dir
/ "framework-arduinoespressif8266"
/ "tools"
/ "sdk"
/ "lib"
)
if esp8266_sdk.exists():
sdk_dirs.append(esp8266_sdk)
# Also check for NONOSDK subdirectories (closed-source libs)
sdk_dirs.extend(
subdir
for subdir in esp8266_sdk.iterdir()
if subdir.is_dir() and subdir.name.startswith("NONOSDK")
)
# For ESP32 IDF framework
# CC is like: ~/.platformio/packages/toolchain-xtensa-esp-elf/bin/xtensa-esp32-elf-gcc
# or: ~/.platformio/packages/toolchain-riscv32-esp/bin/riscv32-esp-elf-gcc
elif "xtensa-esp" in str(cc_path) or "riscv32-esp" in str(cc_path):
# Detect ESP32 variant from CC path or defines
variant = self._detect_esp32_variant()
if variant:
platformio_dir = cc_path.parent.parent.parent
espidf_dir = platformio_dir / "framework-espidf" / "components"
if espidf_dir.exists():
# Find all directories named after the variant that contain .a files
# This handles various ESP-IDF library layouts:
# - components/*/lib/<variant>/
# - components/*/<variant>/
# - components/*/lib/lib/<variant>/
# - components/*/*/lib_*/<variant>/
sdk_dirs.extend(
variant_dir
for variant_dir in espidf_dir.rglob(variant)
if variant_dir.is_dir() and any(variant_dir.glob("*.a"))
)
return sdk_dirs
def _detect_esp32_variant(self) -> str | None:
"""Detect ESP32 variant from idedata defines.
Returns:
Variant string like 'esp32', 'esp32s2', 'esp32c3', etc. or None.
"""
if self._idedata is None:
return None
defines = getattr(self._idedata, "defines", [])
if not defines:
return None
# ESPHome always adds USE_ESP32_VARIANT_xxx defines
variant_prefix = "USE_ESP32_VARIANT_"
for define in defines:
if define.startswith(variant_prefix):
# Extract variant name and convert to lowercase
# USE_ESP32_VARIANT_ESP32 -> esp32
# USE_ESP32_VARIANT_ESP32S3 -> esp32s3
return define[len(variant_prefix) :].lower()
return None
def _parse_sdk_library(
self, lib_path: Path
) -> tuple[list[tuple[str, int, str, bool]], set[str]]:
"""Parse a single SDK library for symbols.
Args:
lib_path: Path to the .a library file
Returns:
Tuple of:
- List of BSS/DATA symbols: (symbol_name, size, section, is_local)
- Set of global BSS/DATA symbol names (for checking if RAM is linked)
"""
ram_symbols: list[tuple[str, int, str, bool]] = []
global_ram_symbols: set[str] = set()
result = run_tool([self.nm_path, "--size-sort", str(lib_path)], timeout=10)
if result is None:
return ram_symbols, global_ram_symbols
for line in result.stdout.splitlines():
parts = line.split()
if len(parts) < 3:
continue
try:
size = int(parts[0], 16)
sym_type = parts[1]
name = parts[2]
# Only collect BSS (b/B) and DATA (d/D) for RAM analysis
if sym_type in ("b", "B"):
section = ".bss"
is_local = sym_type == "b"
ram_symbols.append((name, size, section, is_local))
# Track global RAM symbols (B/D) for linking check
if sym_type == "B":
global_ram_symbols.add(name)
elif sym_type in ("d", "D"):
section = ".data"
is_local = sym_type == "d"
ram_symbols.append((name, size, section, is_local))
if sym_type == "D":
global_ram_symbols.add(name)
except (ValueError, IndexError):
continue
return ram_symbols, global_ram_symbols
def _analyze_sdk_libraries(self) -> None:
"""Analyze SDK libraries to find symbols not in the ELF.
This finds static/local symbols from closed-source SDK libraries
that consume RAM but don't appear in the final ELF symbol table.
Only includes symbols from libraries that have RAM actually linked
(at least one global BSS/DATA symbol in the ELF).
"""
sdk_dirs = self._find_sdk_library_dirs()
if not sdk_dirs:
_LOGGER.debug("No SDK library directories found")
return
_LOGGER.debug("Analyzing SDK libraries in %d directories", len(sdk_dirs))
# Track seen symbols to avoid duplicates from multiple SDK versions
seen_symbols: set[str] = set()
for sdk_dir in sdk_dirs:
for lib_path in sorted(sdk_dir.glob("*.a")):
lib_name = lib_path.name
ram_symbols, global_ram_symbols = self._parse_sdk_library(lib_path)
# Check if this library's RAM is actually linked by seeing if any
# of its global BSS/DATA symbols appear in the ELF
if not global_ram_symbols & self._elf_symbol_names:
# No RAM from this library is in the ELF - skip it
continue
for name, size, section, is_local in ram_symbols:
# Skip if already in ELF or already seen from another lib
if name in self._elf_symbol_names or name in seen_symbols:
continue
# Only track symbols with non-zero size
if size > 0:
self._sdk_symbols.append(
SDKSymbol(
name=name,
size=size,
library=lib_name,
section=section,
is_local=is_local,
)
)
seen_symbols.add(name)
# Demangle SDK symbols for better readability
if self._sdk_symbols:
sdk_names = [sym.name for sym in self._sdk_symbols]
demangled_map = batch_demangle(sdk_names, objdump_path=self.objdump_path)
for sym in self._sdk_symbols:
sym.demangled = demangled_map.get(sym.name, sym.name)
# Sort by size descending for reporting
self._sdk_symbols.sort(key=lambda s: s.size, reverse=True)
total_sdk_ram = sum(s.size for s in self._sdk_symbols)
_LOGGER.debug(
"Found %d SDK symbols not in ELF, totaling %d bytes",
len(self._sdk_symbols),
total_sdk_ram,
)
def get_sdk_ram_symbols(self) -> list[SDKSymbol]:
"""Get SDK symbols that consume RAM but aren't in the ELF symbol table.
Returns:
List of SDKSymbol objects sorted by size descending.
"""
return self._sdk_symbols
def get_sdk_ram_by_library(self) -> dict[str, list[SDKSymbol]]:
"""Get SDK RAM symbols grouped by library.
Returns:
Dictionary mapping library name to list of symbols.
"""
by_lib: dict[str, list[SDKSymbol]] = defaultdict(list)
for sym in self._sdk_symbols:
by_lib[sym.library].append(sym)
return dict(by_lib)
if __name__ == "__main__":
from .cli import main
+80 -7
View File
@@ -3,6 +3,7 @@
from __future__ import annotations
from collections import defaultdict
from collections.abc import Callable
import json
import sys
from typing import TYPE_CHECKING
@@ -27,6 +28,8 @@ class MemoryAnalyzerCLI(MemoryAnalyzer):
SYMBOL_SIZE_THRESHOLD: int = (
100 # Show symbols larger than this in detailed analysis
)
# Lower threshold for RAM symbols (RAM is more constrained)
RAM_SYMBOL_SIZE_THRESHOLD: int = 24
# Column width constants
COL_COMPONENT: int = 29
@@ -104,12 +107,23 @@ class MemoryAnalyzerCLI(MemoryAnalyzer):
lines: list[str],
title: str,
components: list[tuple[str, ComponentMemory]],
get_size: callable,
get_size: Callable[[ComponentMemory], int],
total: int,
memory_type: str,
limit: int = 25,
) -> None:
"""Add a top consumers list for flash or RAM."""
"""Add a formatted list of top memory consumers to the report.
Args:
lines: List of report lines to append the output to.
title: Section title to print before the list.
components: Sequence of (name, ComponentMemory) tuples to analyze.
get_size: Callable that takes a ComponentMemory and returns the
size in bytes to use for ranking and display.
total: Total size in bytes for computing percentage usage.
memory_type: Label for the memory region (e.g., "flash" or "RAM").
limit: Maximum number of components to include in the list.
"""
lines.append("")
lines.append(f"{title}:")
for i, (name, mem) in enumerate(components[:limit]):
@@ -123,7 +137,12 @@ class MemoryAnalyzerCLI(MemoryAnalyzer):
def _format_symbol_with_section(
self, demangled: str, size: int, section: str | None = None
) -> str:
"""Format a symbol entry, optionally with section label for RAM symbols."""
"""Format a symbol entry, optionally adding a RAM section label.
If section is one of the RAM sections (.data or .bss), a label like
" [data]" or " [bss]" is appended. For non-RAM sections or when
section is None, no section label is added.
"""
section_label = ""
if section in RAM_SECTIONS:
section_label = f" [{section[1:]}]" # .data -> [data], .bss -> [bss]
@@ -169,6 +188,47 @@ class MemoryAnalyzerCLI(MemoryAnalyzer):
f"{total_flash:>{self.COL_TOTAL_FLASH - 2},} B | {total_ram:>{self.COL_TOTAL_RAM - 2},} B"
)
# Show unattributed RAM (SDK/framework overhead)
unattributed_bss, unattributed_data, unattributed_total = (
self.get_unattributed_ram()
)
if unattributed_total > 0:
lines.append("")
lines.append(
f"Unattributed RAM: {unattributed_total:,} B (SDK/framework overhead)"
)
if unattributed_bss > 0 and unattributed_data > 0:
lines.append(
f" .bss: {unattributed_bss:,} B | .data: {unattributed_data:,} B"
)
# Show SDK symbol breakdown if available
sdk_by_lib = self.get_sdk_ram_by_library()
if sdk_by_lib:
lines.append("")
lines.append("SDK library breakdown (static symbols not in ELF):")
# Sort libraries by total size
lib_totals = [
(lib, sum(s.size for s in syms), syms)
for lib, syms in sdk_by_lib.items()
]
lib_totals.sort(key=lambda x: x[1], reverse=True)
for lib_name, lib_total, syms in lib_totals:
if lib_total == 0:
continue
lines.append(f" {lib_name}: {lib_total:,} B")
# Show top symbols from this library
for sym in sorted(syms, key=lambda s: s.size, reverse=True)[:3]:
section_label = sym.section.lstrip(".")
# Use demangled name (falls back to original if not demangled)
display_name = sym.demangled or sym.name
if len(display_name) > 50:
display_name = f"{display_name[:47]}..."
lines.append(
f" {sym.size:>6,} B [{section_label}] {display_name}"
)
# Top consumers
self._add_top_consumers(
lines,
@@ -243,6 +303,8 @@ class MemoryAnalyzerCLI(MemoryAnalyzer):
f"{_COMPONENT_CORE} Symbols > {self.SYMBOL_SIZE_THRESHOLD} B ({len(large_core_symbols)} symbols):"
)
for i, (symbol, demangled, size) in enumerate(large_core_symbols):
# Core symbols only track (symbol, demangled, size) without section info,
# so we don't show section labels here
lines.append(
f"{i + 1}. {self._format_symbol_with_section(demangled, size)}"
)
@@ -340,7 +402,9 @@ class MemoryAnalyzerCLI(MemoryAnalyzer):
# Sort by size descending
sorted_ram_syms = sorted(ram_syms, key=lambda x: x[2], reverse=True)
large_ram_syms = [s for s in sorted_ram_syms if s[2] > 50]
large_ram_syms = [
s for s in sorted_ram_syms if s[2] > self.RAM_SYMBOL_SIZE_THRESHOLD
]
lines.append(f"{name} ({mem.ram_total:,} B total RAM):")
@@ -351,10 +415,19 @@ class MemoryAnalyzerCLI(MemoryAnalyzer):
lines.append(f" .bss (uninitialized): {bss_size:,} B")
if large_ram_syms:
lines.append(f" Symbols > 50 B ({len(large_ram_syms)}):")
lines.append(
f" Symbols > {self.RAM_SYMBOL_SIZE_THRESHOLD} B ({len(large_ram_syms)}):"
)
for symbol, demangled, size, section in large_ram_syms[:10]:
section_label = "data" if section == ".data" else "bss"
lines.append(f" {size:>6,} B [{section_label}] {demangled[:70]}")
# Format section label consistently by stripping leading dot
section_label = section.lstrip(".") if section else ""
# Add ellipsis if name is truncated
demangled_display = (
f"{demangled[:70]}..." if len(demangled) > 70 else demangled
)
lines.append(
f" {size:>6,} B [{section_label}] {demangled_display}"
)
if len(large_ram_syms) > 10:
lines.append(f" ... and {len(large_ram_syms) - 10} more")
lines.append("")
+3 -1
View File
@@ -7,11 +7,13 @@ ESPHOME_COMPONENT_PATTERN = re.compile(r"esphome::([a-zA-Z0-9_]+)::")
# Section mapping for ELF file sections
# Maps standard section names to their various platform-specific variants
# Note: Order matters! More specific patterns (.bss) must come before general ones (.dram)
# because ESP-IDF uses names like ".dram0.bss" which would match ".dram" otherwise
SECTION_MAPPING = {
".text": frozenset([".text", ".iram"]),
".rodata": frozenset([".rodata"]),
".bss": frozenset([".bss"]), # Must be before .data to catch ".dram0.bss"
".data": frozenset([".data", ".dram"]),
".bss": frozenset([".bss"]),
}
# Section to ComponentMemory attribute mapping
+36
View File
@@ -5,6 +5,10 @@ from __future__ import annotations
import logging
from pathlib import Path
import subprocess
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from collections.abc import Sequence
_LOGGER = logging.getLogger(__name__)
@@ -55,3 +59,35 @@ def find_tool(
_LOGGER.warning("Could not find %s tool", tool_name)
return None
def run_tool(
cmd: Sequence[str],
timeout: int = 30,
) -> subprocess.CompletedProcess[str] | None:
"""Run a toolchain command and return the result.
Args:
cmd: Command and arguments to run
timeout: Timeout in seconds
Returns:
CompletedProcess on success, None on failure
"""
try:
return subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=timeout,
check=False,
)
except subprocess.TimeoutExpired:
_LOGGER.warning("Command timed out: %s", " ".join(cmd))
return None
except FileNotFoundError:
_LOGGER.warning("Command not found: %s", cmd[0])
return None
except OSError as e:
_LOGGER.warning("Failed to run command %s: %s", cmd[0], e)
return None
@@ -1828,6 +1828,10 @@ bool APIConnection::send_buffer(ProtoWriteBuffer buffer, uint8_t message_type) {
// Toggle Nagle's algorithm based on message type to prevent log messages from
// filling the TCP send buffer and crowding out important state updates.
//
// This honors the `no_delay` proto option - SubscribeLogsResponse is the only
// message with `option (no_delay) = false;` in api.proto, indicating it should
// allow Nagle coalescing. This option existed since 2019 but was never implemented.
//
// - Log messages: Enable Nagle (NODELAY=false) so small log packets coalesce
// into fewer, larger packets. They flush naturally via TCP delayed ACK timer
// (~200ms), buffer filling, or when a state update triggers a flush.
@@ -32,7 +32,9 @@ void BME68xBSEC2I2CComponent::dump_config() {
}
uint32_t BME68xBSEC2I2CComponent::get_hash() {
return fnv1_hash_extend(fnv1_hash("bme68x_bsec_state_"), static_cast<uint32_t>(this->address_));
char buf[22]; // "bme68x_bsec_state_" (18) + uint8_t max (3) + null
snprintf(buf, sizeof(buf), "bme68x_bsec_state_%u", this->address_);
return fnv1_hash(buf);
}
int8_t BME68xBSEC2I2CComponent::read_bytes_wrapper(uint8_t a_register, uint8_t *data, uint32_t len, void *intfPtr) {
+7 -1
View File
@@ -66,11 +66,17 @@ CONF_WAIT_FOR_SENT = "wait_for_sent"
MAX_ESPNOW_PACKET_SIZE = 250 # Maximum size of the payload in bytes
def validate_channel(value):
if value is None:
raise cv.Invalid("channel is required if wifi is not configured")
return wifi.validate_channel(value)
CONFIG_SCHEMA = cv.All(
cv.Schema(
{
cv.GenerateID(): cv.declare_id(ESPNowComponent),
cv.OnlyWithout(CONF_CHANNEL, CONF_WIFI): wifi.validate_channel,
cv.OnlyWithout(CONF_CHANNEL, CONF_WIFI): validate_channel,
cv.Optional(CONF_ENABLE_ON_BOOT, default=True): cv.boolean,
cv.Optional(CONF_AUTO_ADD_PEER, default=False): cv.boolean,
cv.Optional(CONF_PEERS): cv.ensure_list(cv.mac_address),
+5 -1
View File
@@ -3,7 +3,7 @@ import math
from esphome import automation
import esphome.codegen as cg
from esphome.components import mqtt, web_server
from esphome.components import mqtt, web_server, zigbee
import esphome.config_validation as cv
from esphome.const import (
CONF_ABOVE,
@@ -295,6 +295,7 @@ validate_device_class = cv.one_of(*DEVICE_CLASSES, lower=True, space="_")
_SENSOR_SCHEMA = (
cv.ENTITY_BASE_SCHEMA.extend(web_server.WEBSERVER_SORTING_SCHEMA)
.extend(cv.MQTT_COMPONENT_SCHEMA)
.extend(zigbee.SENSOR_SCHEMA)
.extend(
{
cv.OnlyWith(CONF_MQTT_ID, "mqtt"): cv.declare_id(mqtt.MQTTSensorComponent),
@@ -335,6 +336,7 @@ _SENSOR_SCHEMA = (
)
_SENSOR_SCHEMA.add_extra(entity_duplicate_validator("sensor"))
_SENSOR_SCHEMA.add_extra(zigbee.validate_sensor)
def sensor_schema(
@@ -918,6 +920,8 @@ async def setup_sensor_core_(var, config):
if web_server_config := config.get(CONF_WEB_SERVER):
await web_server.add_entity_config(var, web_server_config)
await zigbee.setup_sensor(var, config)
async def register_sensor(var, config):
if not CORE.has_id(config[CONF_ID]):
+1 -1
View File
@@ -36,7 +36,7 @@ class SunGTIL2 : public Component, public uart::UARTDevice {
void set_serial_number(text_sensor::TextSensor *text_sensor) { serial_number_ = text_sensor; }
#endif
static constexpr size_t STATE_BUFFER_SIZE = 16;
static constexpr size_t STATE_BUFFER_SIZE = 32;
protected:
const char *state_to_string_(uint8_t state, std::span<char, STATE_BUFFER_SIZE> buffer);
@@ -20,9 +20,10 @@ void TuyaTextSensor::setup() {
break;
}
case TuyaDatapointType::ENUM: {
std::string data = to_string(datapoint.value_enum);
ESP_LOGD(TAG, "MCU reported text sensor %u is: %s", datapoint.id, data.c_str());
this->publish_state(data);
char buf[4]; // uint8_t max is 3 digits + null
snprintf(buf, sizeof(buf), "%u", datapoint.value_enum);
ESP_LOGD(TAG, "MCU reported text sensor %u is: %s", datapoint.id, buf);
this->publish_state(buf);
break;
}
default:
+29 -3
View File
@@ -13,14 +13,16 @@ from esphome.types import ConfigType
from .const_zephyr import (
CONF_MAX_EP_NUMBER,
CONF_ON_JOIN,
CONF_POWER_SOURCE,
CONF_WIPE_ON_BOOT,
CONF_ZIGBEE_ID,
KEY_EP_NUMBER,
KEY_ZIGBEE,
POWER_SOURCE,
ZigbeeComponent,
zigbee_ns,
)
from .zigbee_zephyr import zephyr_binary_sensor
from .zigbee_zephyr import zephyr_binary_sensor, zephyr_sensor
CODEOWNERS = ["@tomaszduda23"]
@@ -35,6 +37,7 @@ def zigbee_set_core_data(config: ConfigType) -> ConfigType:
BINARY_SENSOR_SCHEMA = cv.Schema({}).extend(zephyr_binary_sensor)
SENSOR_SCHEMA = cv.Schema({}).extend(zephyr_sensor)
CONFIG_SCHEMA = cv.All(
cv.Schema(
@@ -42,9 +45,15 @@ CONFIG_SCHEMA = cv.All(
cv.GenerateID(CONF_ID): cv.declare_id(ZigbeeComponent),
cv.Optional(CONF_ON_JOIN): automation.validate_automation(single=True),
cv.Optional(CONF_WIPE_ON_BOOT, default=False): cv.All(
cv.boolean,
cv.Any(
cv.boolean,
cv.one_of(*["once"], lower=True),
),
cv.requires_component("nrf52"),
),
cv.Optional(CONF_POWER_SOURCE, default="DC_SOURCE"): cv.enum(
POWER_SOURCE, upper=True
),
}
).extend(cv.COMPONENT_SCHEMA),
zigbee_set_core_data,
@@ -86,7 +95,16 @@ async def setup_binary_sensor(entity: cg.MockObj, config: ConfigType) -> None:
await zephyr_setup_binary_sensor(entity, config)
def validate_binary_sensor(config: ConfigType) -> ConfigType:
async def setup_sensor(entity: cg.MockObj, config: ConfigType) -> None:
if not config.get(CONF_ZIGBEE_ID) or config.get(CONF_INTERNAL):
return
if CORE.using_zephyr:
from .zigbee_zephyr import zephyr_setup_sensor
await zephyr_setup_sensor(entity, config)
def consume_endpoint(config: ConfigType) -> ConfigType:
if not config.get(CONF_ZIGBEE_ID) or config.get(CONF_INTERNAL):
return config
data: dict[str, Any] = CORE.data.setdefault(KEY_ZIGBEE, {})
@@ -95,6 +113,14 @@ def validate_binary_sensor(config: ConfigType) -> ConfigType:
return config
def validate_binary_sensor(config: ConfigType) -> ConfigType:
return consume_endpoint(config)
def validate_sensor(config: ConfigType) -> ConfigType:
return consume_endpoint(config)
ZIGBEE_ACTION_SCHEMA = automation.maybe_simple_id(
cv.Schema(
{
+13
View File
@@ -3,12 +3,24 @@ import esphome.codegen as cg
zigbee_ns = cg.esphome_ns.namespace("zigbee")
ZigbeeComponent = zigbee_ns.class_("ZigbeeComponent", cg.Component)
BinaryAttrs = zigbee_ns.struct("BinaryAttrs")
AnalogAttrs = zigbee_ns.struct("AnalogAttrs")
CONF_MAX_EP_NUMBER = 8
CONF_ZIGBEE_ID = "zigbee_id"
CONF_ON_JOIN = "on_join"
CONF_WIPE_ON_BOOT = "wipe_on_boot"
CONF_ZIGBEE_BINARY_SENSOR = "zigbee_binary_sensor"
CONF_ZIGBEE_SENSOR = "zigbee_sensor"
CONF_POWER_SOURCE = "power_source"
POWER_SOURCE = {
"UNKNOWN": "ZB_ZCL_BASIC_POWER_SOURCE_UNKNOWN",
"MAINS_SINGLE_PHASE": "ZB_ZCL_BASIC_POWER_SOURCE_MAINS_SINGLE_PHASE",
"MAINS_THREE_PHASE": "ZB_ZCL_BASIC_POWER_SOURCE_MAINS_THREE_PHASE",
"BATTERY": "ZB_ZCL_BASIC_POWER_SOURCE_BATTERY",
"DC_SOURCE": "ZB_ZCL_BASIC_POWER_SOURCE_DC_SOURCE",
"EMERGENCY_MAINS_CONST": "ZB_ZCL_BASIC_POWER_SOURCE_EMERGENCY_MAINS_CONST",
"EMERGENCY_MAINS_TRANSF": "ZB_ZCL_BASIC_POWER_SOURCE_EMERGENCY_MAINS_TRANSF",
}
# Keys for CORE.data storage
KEY_ZIGBEE = "zigbee"
@@ -22,3 +34,4 @@ ZB_ZCL_IDENTIFY_ATTRS_T = "zb_zcl_identify_attrs_t"
ZB_ZCL_CLUSTER_ID_BASIC = "ZB_ZCL_CLUSTER_ID_BASIC"
ZB_ZCL_CLUSTER_ID_IDENTIFY = "ZB_ZCL_CLUSTER_ID_IDENTIFY"
ZB_ZCL_CLUSTER_ID_BINARY_INPUT = "ZB_ZCL_CLUSTER_ID_BINARY_INPUT"
ZB_ZCL_CLUSTER_ID_ANALOG_INPUT = "ZB_ZCL_CLUSTER_ID_ANALOG_INPUT"
@@ -17,9 +17,9 @@ ZigbeeBinarySensor::ZigbeeBinarySensor(binary_sensor::BinarySensor *binary_senso
void ZigbeeBinarySensor::setup() {
this->binary_sensor_->add_on_state_callback([this](bool state) {
this->cluster_attributes_->present_value = state ? ZB_TRUE : ZB_FALSE;
ESP_LOGD(TAG, "Set attribute end point: %d, present_value %d", this->end_point_,
ESP_LOGD(TAG, "Set attribute endpoint: %d, present_value %d", this->endpoint_,
this->cluster_attributes_->present_value);
ZB_ZCL_SET_ATTRIBUTE(this->end_point_, ZB_ZCL_CLUSTER_ID_BINARY_INPUT, ZB_ZCL_CLUSTER_SERVER_ROLE,
ZB_ZCL_SET_ATTRIBUTE(this->endpoint_, ZB_ZCL_CLUSTER_ID_BINARY_INPUT, ZB_ZCL_CLUSTER_SERVER_ROLE,
ZB_ZCL_ATTR_BINARY_INPUT_PRESENT_VALUE_ID, &this->cluster_attributes_->present_value,
ZB_FALSE);
this->parent_->flush();
@@ -29,8 +29,8 @@ void ZigbeeBinarySensor::setup() {
void ZigbeeBinarySensor::dump_config() {
ESP_LOGCONFIG(TAG,
"Zigbee Binary Sensor\n"
" End point: %d, present_value %u",
this->end_point_, this->cluster_attributes_->present_value);
" Endpoint: %d, present_value %u",
this->endpoint_, this->cluster_attributes_->present_value);
}
} // namespace esphome::zigbee
@@ -0,0 +1,76 @@
#include "zigbee_sensor_zephyr.h"
#if defined(USE_ZIGBEE) && defined(USE_NRF52) && defined(USE_SENSOR)
#include "esphome/core/log.h"
extern "C" {
#include <zboss_api.h>
#include <zboss_api_addons.h>
#include <zb_nrf_platform.h>
#include <zigbee/zigbee_app_utils.h>
#include <zb_error_to_string.h>
}
namespace esphome::zigbee {
static const char *const TAG = "zigbee.sensor";
ZigbeeSensor::ZigbeeSensor(sensor::Sensor *sensor) : sensor_(sensor) {}
void ZigbeeSensor::setup() {
this->sensor_->add_on_state_callback([this](float state) {
this->cluster_attributes_->present_value = state;
ESP_LOGD(TAG, "Set attribute endpoint: %d, present_value %f", this->endpoint_, state);
ZB_ZCL_SET_ATTRIBUTE(this->endpoint_, ZB_ZCL_CLUSTER_ID_ANALOG_INPUT, ZB_ZCL_CLUSTER_SERVER_ROLE,
ZB_ZCL_ATTR_ANALOG_INPUT_PRESENT_VALUE_ID,
(zb_uint8_t *) &this->cluster_attributes_->present_value, ZB_FALSE);
this->parent_->flush();
});
}
void ZigbeeSensor::dump_config() {
ESP_LOGCONFIG(TAG,
"Zigbee Sensor\n"
" Endpoint: %d, present_value %f",
this->endpoint_, this->cluster_attributes_->present_value);
}
const zb_uint8_t ZB_ZCL_ANALOG_INPUT_STATUS_FLAG_MAX_VALUE = 0x0F;
static zb_ret_t check_value_analog_server(zb_uint16_t attr_id, zb_uint8_t endpoint,
zb_uint8_t *value) { // NOLINT(readability-non-const-parameter)
zb_ret_t ret = RET_OK;
ZVUNUSED(endpoint);
switch (attr_id) {
case ZB_ZCL_ATTR_ANALOG_INPUT_OUT_OF_SERVICE_ID:
ret = ZB_ZCL_CHECK_BOOL_VALUE(*value) ? RET_OK : RET_ERROR;
break;
case ZB_ZCL_ATTR_ANALOG_INPUT_PRESENT_VALUE_ID:
break;
case ZB_ZCL_ATTR_ANALOG_INPUT_STATUS_FLAG_ID:
if (*value > ZB_ZCL_ANALOG_INPUT_STATUS_FLAG_MAX_VALUE) {
ret = RET_ERROR;
}
break;
default:
break;
}
return ret;
}
} // namespace esphome::zigbee
void zb_zcl_analog_input_init_server() {
zb_zcl_add_cluster_handlers(ZB_ZCL_CLUSTER_ID_ANALOG_INPUT, ZB_ZCL_CLUSTER_SERVER_ROLE,
esphome::zigbee::check_value_analog_server, (zb_zcl_cluster_write_attr_hook_t) NULL,
(zb_zcl_cluster_handler_t) NULL);
}
void zb_zcl_analog_input_init_client() {
zb_zcl_add_cluster_handlers(ZB_ZCL_CLUSTER_ID_ANALOG_INPUT, ZB_ZCL_CLUSTER_CLIENT_ROLE,
(zb_zcl_cluster_check_value_t) NULL, (zb_zcl_cluster_write_attr_hook_t) NULL,
(zb_zcl_cluster_handler_t) NULL);
}
#endif
@@ -0,0 +1,86 @@
#pragma once
#include "esphome/components/zigbee/zigbee_zephyr.h"
#if defined(USE_ZIGBEE) && defined(USE_NRF52) && defined(USE_SENSOR)
#include "esphome/core/component.h"
#include "esphome/components/sensor/sensor.h"
extern "C" {
#include <zboss_api.h>
#include <zboss_api_addons.h>
}
enum {
ZB_ZCL_ATTR_ANALOG_INPUT_DESCRIPTION_ID = 0x001C,
ZB_ZCL_ATTR_ANALOG_INPUT_OUT_OF_SERVICE_ID = 0x0051,
ZB_ZCL_ATTR_ANALOG_INPUT_PRESENT_VALUE_ID = 0x0055,
ZB_ZCL_ATTR_ANALOG_INPUT_STATUS_FLAG_ID = 0x006F,
ZB_ZCL_ATTR_ANALOG_INPUT_ENGINEERING_UNITS_ID = 0x0075,
};
#define ZB_ZCL_ANALOG_INPUT_CLUSTER_REVISION_DEFAULT ((zb_uint16_t) 0x0001u)
#define ZB_SET_ATTR_DESCR_WITH_ZB_ZCL_ATTR_ANALOG_INPUT_DESCRIPTION_ID(data_ptr) \
{ \
ZB_ZCL_ATTR_ANALOG_INPUT_DESCRIPTION_ID, ZB_ZCL_ATTR_TYPE_CHAR_STRING, ZB_ZCL_ATTR_ACCESS_READ_ONLY, \
(ZB_ZCL_NON_MANUFACTURER_SPECIFIC), (void *) (data_ptr) \
}
#define ZB_SET_ATTR_DESCR_WITH_ZB_ZCL_ATTR_ANALOG_INPUT_OUT_OF_SERVICE_ID(data_ptr) \
{ \
ZB_ZCL_ATTR_ANALOG_INPUT_OUT_OF_SERVICE_ID, ZB_ZCL_ATTR_TYPE_BOOL, \
ZB_ZCL_ATTR_ACCESS_READ_ONLY | ZB_ZCL_ATTR_ACCESS_WRITE_OPTIONAL, (ZB_ZCL_NON_MANUFACTURER_SPECIFIC), \
(void *) (data_ptr) \
}
#define ZB_SET_ATTR_DESCR_WITH_ZB_ZCL_ATTR_ANALOG_INPUT_PRESENT_VALUE_ID(data_ptr) \
{ \
ZB_ZCL_ATTR_ANALOG_INPUT_PRESENT_VALUE_ID, ZB_ZCL_ATTR_TYPE_SINGLE, \
ZB_ZCL_ATTR_ACCESS_READ_ONLY | ZB_ZCL_ATTR_ACCESS_WRITE_OPTIONAL | ZB_ZCL_ATTR_ACCESS_REPORTING, \
(ZB_ZCL_NON_MANUFACTURER_SPECIFIC), (void *) (data_ptr) \
}
#define ZB_SET_ATTR_DESCR_WITH_ZB_ZCL_ATTR_ANALOG_INPUT_STATUS_FLAG_ID(data_ptr) \
{ \
ZB_ZCL_ATTR_ANALOG_INPUT_STATUS_FLAG_ID, ZB_ZCL_ATTR_TYPE_8BITMAP, \
ZB_ZCL_ATTR_ACCESS_READ_ONLY | ZB_ZCL_ATTR_ACCESS_REPORTING, (ZB_ZCL_NON_MANUFACTURER_SPECIFIC), \
(void *) (data_ptr) \
}
#define ZB_SET_ATTR_DESCR_WITH_ZB_ZCL_ATTR_ANALOG_INPUT_ENGINEERING_UNITS_ID(data_ptr) \
{ \
ZB_ZCL_ATTR_ANALOG_INPUT_ENGINEERING_UNITS_ID, ZB_ZCL_ATTR_TYPE_16BIT_ENUM, ZB_ZCL_ATTR_ACCESS_READ_ONLY, \
(ZB_ZCL_NON_MANUFACTURER_SPECIFIC), (void *) (data_ptr) \
}
#define ESPHOME_ZB_ZCL_DECLARE_ANALOG_INPUT_ATTRIB_LIST(attr_list, out_of_service, present_value, status_flag, \
engineering_units, description) \
ZB_ZCL_START_DECLARE_ATTRIB_LIST_CLUSTER_REVISION(attr_list, ZB_ZCL_ANALOG_INPUT) \
ZB_ZCL_SET_ATTR_DESC(ZB_ZCL_ATTR_ANALOG_INPUT_OUT_OF_SERVICE_ID, (out_of_service)) \
ZB_ZCL_SET_ATTR_DESC(ZB_ZCL_ATTR_ANALOG_INPUT_PRESENT_VALUE_ID, (present_value)) \
ZB_ZCL_SET_ATTR_DESC(ZB_ZCL_ATTR_ANALOG_INPUT_STATUS_FLAG_ID, (status_flag)) \
ZB_ZCL_SET_ATTR_DESC(ZB_ZCL_ATTR_ANALOG_INPUT_ENGINEERING_UNITS_ID, (engineering_units)) \
ZB_ZCL_SET_ATTR_DESC(ZB_ZCL_ATTR_ANALOG_INPUT_DESCRIPTION_ID, (description)) \
ZB_ZCL_FINISH_DECLARE_ATTRIB_LIST
void zb_zcl_analog_input_init_server();
void zb_zcl_analog_input_init_client();
#define ZB_ZCL_CLUSTER_ID_ANALOG_INPUT_SERVER_ROLE_INIT zb_zcl_analog_input_init_server
#define ZB_ZCL_CLUSTER_ID_ANALOG_INPUT_CLIENT_ROLE_INIT zb_zcl_analog_input_init_client
namespace esphome::zigbee {
class ZigbeeSensor : public ZigbeeEntity, public Component {
public:
explicit ZigbeeSensor(sensor::Sensor *sensor);
void set_cluster_attributes(AnalogAttrs &cluster_attributes) { this->cluster_attributes_ = &cluster_attributes; }
void setup() override;
void dump_config() override;
protected:
AnalogAttrs *cluster_attributes_{nullptr};
sensor::Sensor *sensor_{nullptr};
};
} // namespace esphome::zigbee
#endif
+64 -8
View File
@@ -138,9 +138,26 @@ void ZigbeeComponent::setup() {
}
#ifdef USE_ZIGBEE_WIPE_ON_BOOT
erase_flash_(FIXED_PARTITION_ID(ZBOSS_NVRAM));
erase_flash_(FIXED_PARTITION_ID(ZBOSS_PRODUCT_CONFIG));
erase_flash_(FIXED_PARTITION_ID(SETTINGS_STORAGE));
bool wipe = true;
#ifdef USE_ZIGBEE_WIPE_ON_BOOT_MAGIC
// unique hash to store preferences for this component
uint32_t hash = 88498616UL;
uint32_t wipe_value = 0;
auto wipe_pref = global_preferences->make_preference<uint32_t>(hash, true);
if (wipe_pref.load(&wipe_value)) {
wipe = wipe_value != USE_ZIGBEE_WIPE_ON_BOOT_MAGIC;
ESP_LOGD(TAG, "Wipe value in preferences %u, in firmware %u", wipe_value, USE_ZIGBEE_WIPE_ON_BOOT_MAGIC);
}
#endif
if (wipe) {
erase_flash_(FIXED_PARTITION_ID(ZBOSS_NVRAM));
erase_flash_(FIXED_PARTITION_ID(ZBOSS_PRODUCT_CONFIG));
erase_flash_(FIXED_PARTITION_ID(SETTINGS_STORAGE));
#ifdef USE_ZIGBEE_WIPE_ON_BOOT_MAGIC
wipe_value = USE_ZIGBEE_WIPE_ON_BOOT_MAGIC;
wipe_pref.save(&wipe_value);
#endif
}
#endif
ZB_ZCL_REGISTER_DEVICE_CB(zcl_device_cb);
@@ -152,15 +169,54 @@ void ZigbeeComponent::setup() {
zigbee_enable();
}
void ZigbeeComponent::dump_config() {
bool wipe = false;
static const char *role() {
switch (zb_get_network_role()) {
case ZB_NWK_DEVICE_TYPE_COORDINATOR:
return "coordinator";
case ZB_NWK_DEVICE_TYPE_ROUTER:
return "router";
case ZB_NWK_DEVICE_TYPE_ED:
return "end device";
}
return "unknown";
}
static const char *get_wipe_on_boot() {
#ifdef USE_ZIGBEE_WIPE_ON_BOOT
wipe = true;
#ifdef USE_ZIGBEE_WIPE_ON_BOOT_MAGIC
return "ONCE";
#else
return "YES";
#endif
#else
return "NO";
#endif
}
void ZigbeeComponent::dump_config() {
char ieee_addr_buf[IEEE_ADDR_BUF_SIZE] = {0};
zb_ieee_addr_t addr;
zb_get_long_address(addr);
ieee_addr_to_str(ieee_addr_buf, sizeof(ieee_addr_buf), addr);
zb_ext_pan_id_t extended_pan_id;
char extended_pan_id_buf[IEEE_ADDR_BUF_SIZE] = {0};
zb_get_extended_pan_id(extended_pan_id);
ieee_addr_to_str(extended_pan_id_buf, sizeof(extended_pan_id_buf), extended_pan_id);
ESP_LOGCONFIG(TAG,
"Zigbee\n"
" Wipe on boot: %s",
YESNO(wipe));
" Wipe on boot: %s\n"
" Device is joined to the network: %s\n"
" Current channel: %d\n"
" Current page: %d\n"
" Sleep threshold: %ums\n"
" Role: %s\n"
" Long addr: 0x%s\n"
" Short addr: 0x%04X\n"
" Long pan id: 0x%s\n"
" Short pan id: 0x%04X",
get_wipe_on_boot(), YESNO(zb_zdo_joined()), zb_get_current_channel(), zb_get_current_page(),
zb_get_sleep_threshold(), role(), ieee_addr_buf, zb_get_short_address(), extended_pan_id_buf,
zb_get_pan_id());
}
static void send_attribute_report(zb_bufid_t bufid, zb_uint16_t cmd_id) {
+7 -10
View File
@@ -28,16 +28,15 @@ extern "C" {
ESPHOME_CAT7(zb_af_simple_desc_, ep_name, _, in_num, _, out_num, _t)
// needed to use ESPHOME_ZB_DECLARE_SIMPLE_DESC
#define ESPHOME_ZB_ZCL_DECLARE_SIMPLE_DESC(ep_name, ep_id, in_clust_num, out_clust_num, ...) \
#define ESPHOME_ZB_ZCL_DECLARE_SIMPLE_DESC(ep_name, ep_id, in_clust_num, out_clust_num, app_device_id, ...) \
ESPHOME_ZB_DECLARE_SIMPLE_DESC(ep_name, in_clust_num, out_clust_num); \
ESPHOME_ZB_AF_SIMPLE_DESC_TYPE(ep_name, in_clust_num, out_clust_num) \
simple_desc_##ep_name = {ep_id, ZB_AF_HA_PROFILE_ID, ZB_HA_SIMPLE_SENSOR_DEVICE_ID, 0, 0, in_clust_num, \
out_clust_num, {__VA_ARGS__}}
simple_desc_##ep_name = {ep_id, ZB_AF_HA_PROFILE_ID, app_device_id, 0, 0, in_clust_num, out_clust_num, {__VA_ARGS__}}
// needed to use ESPHOME_ZB_ZCL_DECLARE_SIMPLE_DESC
#define ESPHOME_ZB_HA_DECLARE_EP(ep_name, ep_id, cluster_list, in_cluster_num, out_cluster_num, report_attr_count, \
...) \
ESPHOME_ZB_ZCL_DECLARE_SIMPLE_DESC(ep_name, ep_id, in_cluster_num, out_cluster_num, __VA_ARGS__); \
app_device_id, ...) \
ESPHOME_ZB_ZCL_DECLARE_SIMPLE_DESC(ep_name, ep_id, in_cluster_num, out_cluster_num, app_device_id, __VA_ARGS__); \
ZBOSS_DEVICE_DECLARE_REPORTING_CTX(reporting_info##ep_name, report_attr_count); \
ZB_AF_DECLARE_ENDPOINT_DESC(ep_name, ep_id, ZB_AF_HA_PROFILE_ID, 0, NULL, \
ZB_ZCL_ARRAY_SIZE(cluster_list, zb_zcl_cluster_desc_t), cluster_list, \
@@ -57,10 +56,8 @@ struct AnalogAttrs {
zb_bool_t out_of_service;
float present_value;
zb_uint8_t status_flags;
zb_uint16_t engineering_units;
zb_uchar_t description[ZB_ZCL_MAX_STRING_SIZE];
float max_present_value;
float min_present_value;
float resolution;
};
class ZigbeeComponent : public Component {
@@ -93,10 +90,10 @@ class ZigbeeComponent : public Component {
class ZigbeeEntity {
public:
void set_parent(ZigbeeComponent *parent) { this->parent_ = parent; }
void set_end_point(zb_uint8_t end_point) { this->end_point_ = end_point; }
void set_endpoint(zb_uint8_t endpoint) { this->endpoint_ = endpoint; }
protected:
zb_uint8_t end_point_{0};
zb_uint8_t endpoint_{0};
ZigbeeComponent *parent_{nullptr};
};
+176 -26
View File
@@ -1,10 +1,45 @@
from datetime import datetime
import random
from esphome import automation
import esphome.codegen as cg
from esphome.components.zephyr import zephyr_add_prj_conf
import esphome.config_validation as cv
from esphome.const import CONF_ID, CONF_NAME, __version__
from esphome.const import (
CONF_ID,
CONF_NAME,
CONF_UNIT_OF_MEASUREMENT,
UNIT_AMPERE,
UNIT_CELSIUS,
UNIT_CENTIMETER,
UNIT_DECIBEL,
UNIT_HECTOPASCAL,
UNIT_HERTZ,
UNIT_HOUR,
UNIT_KELVIN,
UNIT_KILOMETER,
UNIT_KILOWATT,
UNIT_KILOWATT_HOURS,
UNIT_LUX,
UNIT_METER,
UNIT_MICROGRAMS_PER_CUBIC_METER,
UNIT_MILLIAMP,
UNIT_MILLIGRAMS_PER_CUBIC_METER,
UNIT_MILLIMETER,
UNIT_MILLISECOND,
UNIT_MILLIVOLT,
UNIT_MINUTE,
UNIT_OHM,
UNIT_PARTS_PER_BILLION,
UNIT_PARTS_PER_MILLION,
UNIT_PASCAL,
UNIT_PERCENT,
UNIT_SECOND,
UNIT_VOLT,
UNIT_WATT,
UNIT_WATT_HOURS,
__version__,
)
from esphome.core import CORE, CoroPriority, coroutine_with_priority
from esphome.cpp_generator import (
AssignmentExpression,
@@ -15,22 +50,63 @@ from esphome.types import ConfigType
from .const_zephyr import (
CONF_ON_JOIN,
CONF_POWER_SOURCE,
CONF_WIPE_ON_BOOT,
CONF_ZIGBEE_BINARY_SENSOR,
CONF_ZIGBEE_ID,
CONF_ZIGBEE_SENSOR,
KEY_EP_NUMBER,
KEY_ZIGBEE,
POWER_SOURCE,
ZB_ZCL_BASIC_ATTRS_EXT_T,
ZB_ZCL_CLUSTER_ID_ANALOG_INPUT,
ZB_ZCL_CLUSTER_ID_BASIC,
ZB_ZCL_CLUSTER_ID_BINARY_INPUT,
ZB_ZCL_CLUSTER_ID_IDENTIFY,
ZB_ZCL_IDENTIFY_ATTRS_T,
AnalogAttrs,
BinaryAttrs,
ZigbeeComponent,
zigbee_ns,
)
ZigbeeBinarySensor = zigbee_ns.class_("ZigbeeBinarySensor", cg.Component)
ZigbeeSensor = zigbee_ns.class_("ZigbeeSensor", cg.Component)
# BACnet engineering units mapping (ZCL uses BACnet unit codes)
# See: https://github.com/zigpy/zha/blob/dev/zha/application/platforms/number/bacnet.py
BACNET_UNITS = {
UNIT_CELSIUS: 62,
UNIT_KELVIN: 63,
UNIT_VOLT: 5,
UNIT_MILLIVOLT: 124,
UNIT_AMPERE: 3,
UNIT_MILLIAMP: 2,
UNIT_OHM: 4,
UNIT_WATT: 47,
UNIT_KILOWATT: 48,
UNIT_WATT_HOURS: 18,
UNIT_KILOWATT_HOURS: 19,
UNIT_PASCAL: 53,
UNIT_HECTOPASCAL: 133,
UNIT_HERTZ: 27,
UNIT_MILLIMETER: 30,
UNIT_CENTIMETER: 118,
UNIT_METER: 31,
UNIT_KILOMETER: 193,
UNIT_MILLISECOND: 159,
UNIT_SECOND: 73,
UNIT_MINUTE: 72,
UNIT_HOUR: 71,
UNIT_PARTS_PER_MILLION: 96,
UNIT_PARTS_PER_BILLION: 97,
UNIT_MICROGRAMS_PER_CUBIC_METER: 219,
UNIT_MILLIGRAMS_PER_CUBIC_METER: 218,
UNIT_LUX: 37,
UNIT_DECIBEL: 199,
UNIT_PERCENT: 98,
}
BACNET_UNIT_NO_UNITS = 95
zephyr_binary_sensor = cv.Schema(
{
@@ -41,6 +117,15 @@ zephyr_binary_sensor = cv.Schema(
}
)
zephyr_sensor = cv.Schema(
{
cv.OnlyWith(CONF_ZIGBEE_ID, ["nrf52", "zigbee"]): cv.use_id(ZigbeeComponent),
cv.OnlyWith(CONF_ZIGBEE_SENSOR, ["nrf52", "zigbee"]): cv.declare_id(
ZigbeeSensor
),
}
)
async def zephyr_to_code(config: ConfigType) -> None:
zephyr_add_prj_conf("ZIGBEE", True)
@@ -56,6 +141,10 @@ async def zephyr_to_code(config: ConfigType) -> None:
zephyr_add_prj_conf("NET_UDP", False)
if config[CONF_WIPE_ON_BOOT]:
if config[CONF_WIPE_ON_BOOT] == "once":
cg.add_define(
"USE_ZIGBEE_WIPE_ON_BOOT_MAGIC", random.randint(0x000001, 0xFFFFFF)
)
cg.add_define("USE_ZIGBEE_WIPE_ON_BOOT")
var = cg.new_Pvariable(config[CONF_ID])
@@ -85,7 +174,7 @@ async def _attr_to_code(config: ConfigType) -> None:
),
zigbee_assign(
basic_attrs.power_source,
cg.RawExpression("ZB_ZCL_BASIC_POWER_SOURCE_DC_SOURCE"),
cg.RawExpression(POWER_SOURCE[config[CONF_POWER_SOURCE]]),
),
zigbee_set_string(basic_attrs.location_id, ""),
zigbee_assign(
@@ -191,6 +280,7 @@ def zigbee_register_ep(
report_attr_count: int,
clusters: list[ZigbeeClusterDesc],
slot_index: int,
app_device_id: str,
) -> None:
"""Register a Zigbee endpoint."""
in_cluster_num = sum(1 for c in clusters if c.has_attrs)
@@ -204,7 +294,7 @@ def zigbee_register_ep(
ep_id = slot_index + 1 # Endpoints are 1-indexed
obj = cg.RawExpression(
f"ESPHOME_ZB_HA_DECLARE_EP({ep_name}, {ep_id}, {cluster_list_name}, "
f"{in_cluster_num}, {out_cluster_num}, {report_attr_count}, {', '.join(cluster_ids)})"
f"{in_cluster_num}, {out_cluster_num}, {report_attr_count}, {app_device_id}, {', '.join(cluster_ids)})"
)
CORE.add_global(obj)
@@ -224,42 +314,102 @@ async def zephyr_setup_binary_sensor(entity: cg.MockObj, config: ConfigType) ->
CORE.add_job(_add_binary_sensor, entity, config)
async def _add_binary_sensor(entity: cg.MockObj, config: ConfigType) -> None:
# Find the next available endpoint slot
slot_index = next(
async def zephyr_setup_sensor(entity: cg.MockObj, config: ConfigType) -> None:
CORE.add_job(_add_sensor, entity, config)
def _slot_index() -> int:
"""Find the next available endpoint slot"""
slot = next(
(i for i, v in enumerate(CORE.data[KEY_ZIGBEE][KEY_EP_NUMBER]) if v == ""), None
)
if slot is None:
raise cv.Invalid(
f"Not found empty slot, size ({len(CORE.data[KEY_ZIGBEE][KEY_EP_NUMBER])})"
)
return slot
async def _add_zigbee_input(
entity: cg.MockObj,
config: ConfigType,
component_key,
attrs_type,
zcl_macro: str,
cluster_id: str,
app_device_id: str,
extra_field_values: dict[str, int] | None = None,
) -> None:
slot_index = _slot_index()
# Create unique names for this sensor's variables based on slot index
prefix = f"zigbee_ep{slot_index + 1}"
attrs_name = f"{prefix}_binary_attrs"
attr_list_name = f"{prefix}_binary_input_attrib_list"
attrs_name = f"{prefix}_attrs"
attr_list_name = f"{prefix}_attrib_list"
cluster_list_name = f"{prefix}_cluster_list"
ep_name = f"{prefix}_ep"
# Create the binary attributes structure
binary_attrs = zigbee_new_variable(attrs_name, BinaryAttrs)
attr_list = zigbee_new_attr_list(
attr_list_name,
"ESPHOME_ZB_ZCL_DECLARE_BINARY_INPUT_ATTRIB_LIST",
zigbee_assign(binary_attrs.out_of_service, 0),
zigbee_assign(binary_attrs.present_value, 0),
zigbee_assign(binary_attrs.status_flags, 0),
zigbee_set_string(binary_attrs.description, config[CONF_NAME]),
)
# Create attribute struct
attrs = zigbee_new_variable(attrs_name, attrs_type)
# Build attribute list args
attr_args = [
zigbee_assign(attrs.out_of_service, 0),
zigbee_assign(attrs.present_value, 0),
zigbee_assign(attrs.status_flags, 0),
]
# Add extra field assignments (e.g., engineering_units for sensors)
if extra_field_values:
for field_name, value in extra_field_values.items():
attr_args.append(zigbee_assign(getattr(attrs, field_name), value))
attr_args.append(zigbee_set_string(attrs.description, config[CONF_NAME]))
# Create attribute list
attr_list = zigbee_new_attr_list(attr_list_name, zcl_macro, *attr_args)
# Create cluster list and register endpoint
cluster_list_name, clusters = zigbee_new_cluster_list(
cluster_list_name,
[ZigbeeClusterDesc(ZB_ZCL_CLUSTER_ID_BINARY_INPUT, attr_list)],
[ZigbeeClusterDesc(cluster_id, attr_list)],
)
zigbee_register_ep(
ep_name, cluster_list_name, 2, clusters, slot_index, app_device_id
)
zigbee_register_ep(ep_name, cluster_list_name, 2, clusters, slot_index)
# Create the ZigbeeBinarySensor component
var = cg.new_Pvariable(config[CONF_ZIGBEE_BINARY_SENSOR], entity)
await cg.register_component(var, config)
# Create ESPHome component
var = cg.new_Pvariable(config[component_key], entity)
await cg.register_component(var, {})
cg.add(var.set_endpoint(slot_index + 1))
cg.add(var.set_cluster_attributes(attrs))
cg.add(var.set_end_point(slot_index + 1))
cg.add(var.set_cluster_attributes(binary_attrs))
hub = await cg.get_variable(config[CONF_ZIGBEE_ID])
cg.add(var.set_parent(hub))
async def _add_binary_sensor(entity: cg.MockObj, config: ConfigType) -> None:
await _add_zigbee_input(
entity,
config,
CONF_ZIGBEE_BINARY_SENSOR,
BinaryAttrs,
"ESPHOME_ZB_ZCL_DECLARE_BINARY_INPUT_ATTRIB_LIST",
ZB_ZCL_CLUSTER_ID_BINARY_INPUT,
"ZB_HA_SIMPLE_SENSOR_DEVICE_ID",
)
async def _add_sensor(entity: cg.MockObj, config: ConfigType) -> None:
# Get BACnet engineering unit from unit_of_measurement
unit = config.get(CONF_UNIT_OF_MEASUREMENT, "")
bacnet_unit = BACNET_UNITS.get(unit, BACNET_UNIT_NO_UNITS)
await _add_zigbee_input(
entity,
config,
CONF_ZIGBEE_SENSOR,
AnalogAttrs,
"ESPHOME_ZB_ZCL_DECLARE_ANALOG_INPUT_ATTRIB_LIST",
ZB_ZCL_CLUSTER_ID_ANALOG_INPUT,
"ZB_HA_CUSTOM_ATTR_DEVICE_ID",
extra_field_values={"engineering_units": bacnet_unit},
)
+21 -12
View File
@@ -105,7 +105,7 @@ void ZWaveProxy::process_uart_() {
this->buffer_[1] >= ZWAVE_MIN_GET_NETWORK_IDS_LENGTH && this->buffer_[0] == ZWAVE_FRAME_TYPE_START) {
// Store the 4-byte Home ID, which starts at offset 4, and notify connected clients if it changed
// The frame parser has already validated the checksum and ensured all bytes are present
if (this->set_home_id(&this->buffer_[4])) {
if (this->set_home_id_(&this->buffer_[4])) {
this->send_homeid_changed_msg_();
}
}
@@ -165,7 +165,7 @@ void ZWaveProxy::zwave_proxy_request(api::APIConnection *api_connection, api::en
}
}
bool ZWaveProxy::set_home_id(const uint8_t *new_home_id) {
bool ZWaveProxy::set_home_id_(const uint8_t *new_home_id) {
if (std::memcmp(this->home_id_.data(), new_home_id, this->home_id_.size()) == 0) {
ESP_LOGV(TAG, "Home ID unchanged");
return false; // No change
@@ -178,19 +178,28 @@ bool ZWaveProxy::set_home_id(const uint8_t *new_home_id) {
}
void ZWaveProxy::send_frame(const uint8_t *data, size_t length) {
if (length == 1 && data[0] == this->last_response_) {
ESP_LOGV(TAG, "Skipping sending duplicate response: 0x%02X", data[0]);
// Safety: validate pointer before any access
if (data == nullptr) {
ESP_LOGE(TAG, "Null data pointer");
return;
}
if (length && data != nullptr) {
#if ESPHOME_LOG_LEVEL >= ESPHOME_LOG_LEVEL_VERY_VERBOSE
char hex_buf[format_hex_pretty_size(ZWAVE_MAX_LOG_BYTES)];
#endif
ESP_LOGVV(TAG, "Sending: %s", format_hex_pretty_to(hex_buf, data, length));
this->write_array(data, length);
} else {
ESP_LOGE(TAG, "Null pointer or length 0");
if (length == 0) {
ESP_LOGE(TAG, "Length 0");
return;
}
// Skip duplicate single-byte responses (ACK/NAK/CAN)
if (length == 1 && data[0] == this->last_response_) {
ESP_LOGV(TAG, "Response already sent: 0x%02X", data[0]);
return;
}
#if ESPHOME_LOG_LEVEL >= ESPHOME_LOG_LEVEL_VERY_VERBOSE
char hex_buf[format_hex_pretty_size(ZWAVE_MAX_LOG_BYTES)];
#endif
ESP_LOGVV(TAG, "Sending: %s", format_hex_pretty_to(hex_buf, data, length));
this->write_array(data, length);
}
void ZWaveProxy::send_homeid_changed_msg_(api::APIConnection *conn) {
+1 -1
View File
@@ -60,11 +60,11 @@ class ZWaveProxy : public uart::UARTDevice, public Component {
uint32_t get_home_id() {
return encode_uint32(this->home_id_[0], this->home_id_[1], this->home_id_[2], this->home_id_[3]);
}
bool set_home_id(const uint8_t *new_home_id); // Store a new home ID. Returns true if it changed.
void send_frame(const uint8_t *data, size_t length);
protected:
bool set_home_id_(const uint8_t *new_home_id); // Store a new home ID. Returns true if it changed.
void send_homeid_changed_msg_(api::APIConnection *conn = nullptr);
void send_simple_command_(uint8_t command_id);
bool parse_byte_(uint8_t byte); // Returns true if frame parsing was completed (a frame is ready in the buffer)
+1
View File
@@ -307,6 +307,7 @@
#define USE_SOFTDEVICE_VERSION 1
#define USE_ZIGBEE
#define USE_ZIGBEE_WIPE_ON_BOOT
#define USE_ZIGBEE_WIPE_ON_BOOT_MAGIC 1
#define ZIGBEE_ENDPOINTS_COUNT 8
#endif
+3 -1
View File
@@ -418,8 +418,10 @@ inline uint32_t fnv1a_hash_extend(uint32_t hash, const std::string &str) {
}
/// Extend a FNV-1a hash with an integer (hashes each byte).
template<std::integral T> constexpr uint32_t fnv1a_hash_extend(uint32_t hash, T value) {
using UnsignedT = std::make_unsigned_t<T>;
UnsignedT uvalue = static_cast<UnsignedT>(value);
for (size_t i = 0; i < sizeof(T); i++) {
hash ^= (value >> (i * 8)) & 0xFF;
hash ^= (uvalue >> (i * 8)) & 0xFF;
hash *= FNV1_PRIME;
}
return hash;
+5
View File
@@ -430,6 +430,11 @@ class IDEData:
else f"{path[:-3]}readelf"
)
@property
def defines(self) -> list[str]:
"""Return the list of preprocessor defines from idedata."""
return self.raw.get("defines", [])
def analyze_memory_usage(config: dict[str, Any]) -> None:
"""Analyze memory usage by component after compilation."""
+6 -2
View File
@@ -15,10 +15,14 @@ binary_sensor:
- platform: template
name: "Garage Door Open 7"
internal: True
sensor:
- platform: template
name: "Garage Door Open 8"
name: "Analog 1"
lambda: return 10.0;
- platform: template
name: "Garage Door Open 9"
name: "Analog 2"
lambda: return 11.0;
zigbee:
wipe_on_boot: true
@@ -1 +1,5 @@
<<: !include common.yaml
zigbee:
wipe_on_boot: once
power_source: battery
+71 -3
View File
@@ -2482,6 +2482,7 @@ def test_command_analyze_memory_success(
"/path/to/objdump",
"/path/to/readelf",
set(), # No external components
idedata=mock_get_idedata.return_value,
)
# Verify analysis was run
@@ -2551,6 +2552,7 @@ def test_command_analyze_memory_with_external_components(
"/path/to/objdump",
"/path/to/readelf",
{"my_custom_component"}, # External component detected
idedata=mock_get_idedata.return_value,
)
@@ -2853,16 +2855,31 @@ class MockSerial:
return 0
def read(self, size: int = 1) -> bytes:
"""Read next chunk of data."""
"""Read up to size bytes from the current chunk.
This method respects the size argument and keeps any unconsumed
bytes in the current chunk so that subsequent calls to in_waiting
and read see the remaining data.
"""
if self.chunk_index < len(self.chunks):
chunk = self.chunks[self.chunk_index]
self.chunk_index += 1
if chunk is MOCK_SERIAL_END:
# Sentinel means we're done - simulate port closed
import serial
raise serial.SerialException("Port closed")
return chunk # type: ignore[return-value]
# Respect the requested size and keep any remaining bytes
if size <= 0:
return b""
data = chunk[:size] # type: ignore[index]
remaining = chunk[size:] # type: ignore[index]
if remaining:
# Keep remaining bytes for the next read
self.chunks[self.chunk_index] = remaining # type: ignore[assignment]
else:
# Entire chunk consumed; advance to the next one
self.chunk_index += 1
return data # type: ignore[return-value]
import serial
raise serial.SerialException("Port closed")
@@ -3104,3 +3121,54 @@ def test_run_miniterm_baud_rate_zero_returns_early(
assert result == 1
assert "UART logging is disabled" in caplog.text
def test_run_miniterm_buffer_limit_prevents_unbounded_growth() -> None:
"""Test that buffer is limited to prevent unbounded memory growth.
If a device sends data without newlines, the buffer should be truncated
to SERIAL_BUFFER_MAX_SIZE to prevent memory exhaustion.
"""
# Use a small buffer limit for testing
test_buffer_limit = 100
# Create data larger than the limit without newlines
large_data_no_newline = b"X" * 150 # 150 bytes, no newline
final_line = b"END\r\n"
mock_serial = MockSerial([large_data_no_newline, final_line, MOCK_SERIAL_END])
config = {
CONF_LOGGER: {
CONF_BAUD_RATE: 115200,
"deassert_rts_dtr": False,
}
}
args = MockArgs()
with (
patch("serial.Serial", return_value=mock_serial),
patch.object(platformio_api, "process_stacktrace") as mock_bt,
patch("esphome.__main__.safe_print") as mock_print,
patch("esphome.__main__.SERIAL_BUFFER_MAX_SIZE", test_buffer_limit),
):
mock_bt.return_value = False
run_miniterm(config, "/dev/ttyUSB0", args)
# Should have printed exactly one line
assert mock_print.call_count == 1
printed_line = mock_print.call_args[0][0]
# The line should contain "END" and some X's, but not all 150 X's
# because the buffer was truncated
assert "END" in printed_line
assert "X" in printed_line
# Verify truncation happened - we shouldn't have all 150 X's
# The buffer logic is:
# 1. Add 150 X's -> buffer = 150 bytes -> truncate to last 100 = 100 X's
# 2. Add "END\r\n" (5 bytes) -> buffer = 105 bytes -> truncate to last 100
# = 95 X's + "END\r\n"
# 3. Find newline, extract line = "95 X's + END"
x_count = printed_line.count("X")
assert x_count < 150, f"Expected truncation but got {x_count} X's"
assert x_count == 95, f"Expected 95 X's after truncation but got {x_count}"