[schema] generator fixes (#15276)

This commit is contained in:
Guillermo Ruffino
2026-03-30 21:08:50 -03:00
committed by GitHub
parent 53b2a03c80
commit ef65e47bc5
3 changed files with 151 additions and 63 deletions
+4
View File
@@ -113,6 +113,7 @@ from esphome.core.entity_helpers import (
setup_unit_of_measurement,
)
from esphome.cpp_generator import MockObj, MockObjClass
from esphome.schema_extractors import SCHEMA_EXTRACT, schema_extractor
from esphome.util import Registry
CODEOWNERS = ["@esphome/core"]
@@ -229,7 +230,10 @@ _SENSOR_ENTITY_CATEGORIES = {
}
@schema_extractor("enum")
def sensor_entity_category(value):
if value == SCHEMA_EXTRACT:
return _SENSOR_ENTITY_CATEGORIES
return cv.enum(_SENSOR_ENTITY_CATEGORIES, lower=True)(value)
+4
View File
@@ -417,10 +417,14 @@ def icon(value):
return value
@schema_extractor("use_id")
def sub_device_id(value: str | None) -> core.ID | None:
# Lazy import to avoid circular imports
from esphome.core.config import Device
if value == SCHEMA_EXTRACT:
return Device
if not value:
return None
+143 -63
View File
@@ -67,19 +67,16 @@ def get_component_names():
# pylint: disable-next=redefined-outer-name,reimported
from esphome.loader import CORE_COMPONENTS_PATH
component_names = ["esphome", "sensor", "esp32", "esp8266"]
skip_components = []
for d in CORE_COMPONENTS_PATH.iterdir():
if (
not d.name.startswith("__")
and d.is_dir()
and d.name not in component_names
and d.name not in skip_components
):
component_names.append(d.name)
return sorted(component_names)
# return sorted(
# ["esphome", "sensor", "esp32", "esp8266", "adc", "touchscreen", "xpt2046"]
# )
return sorted(
[
d.name
for d in CORE_COMPONENTS_PATH.iterdir()
if not d.name.startswith("__") and d.is_dir()
]
)
def load_components():
@@ -120,39 +117,57 @@ from esphome.util import Registry # noqa: E402
# pylint: enable=wrong-import-position
def sort_obj(obj):
if isinstance(obj, dict):
return {k: sort_obj(v) for k, v in sorted(obj.items(), key=lambda x: str(x[0]))}
if isinstance(obj, list):
return [sort_obj(item) for item in obj]
return obj
def write_file(name, obj):
full_path = Path(args.output_path) / f"{name}.json"
sorted_obj = sort_obj(obj)
if JSON_DUMP_PRETTY:
json_str = json.dumps(obj, indent=2)
json_str = json.dumps(sorted_obj, indent=2)
else:
json_str = json.dumps(obj, separators=(",", ":"))
json_str = json.dumps(sorted_obj, separators=(",", ":"))
write_file_if_changed(full_path, json_str)
print(f"Wrote {full_path}")
def delete_extra_files(keep_names):
output_path = Path(args.output_path)
count = 0
for d in output_path.iterdir():
if d.suffix == ".json" and d.stem not in keep_names:
count += 1
d.unlink()
print(f"Deleted {d}")
return count
def register_module_schemas(key, module, manifest=None):
count = 0
for name, schema in module_schemas(module):
count += 1
register_known_schema(key, name, schema)
if manifest and manifest.multi_conf and S_CONFIG_SCHEMA in output[key][S_SCHEMAS]:
if (
manifest
and manifest.multi_conf
and key in output
and S_CONFIG_SCHEMA in output[key][S_SCHEMAS]
):
# Multi conf should allow list of components
# not sure about 2nd part of the if, might be useless config (e.g. as3935)
output[key][S_SCHEMAS][S_CONFIG_SCHEMA]["is_list"] = True
return count
def register_known_schema(module, name, schema):
if module not in output:
output[module] = {S_SCHEMAS: {}}
config = convert_config(schema, f"{module}/{name}")
if S_TYPE not in config:
if S_TYPE not in config and name != "FINAL_VALIDATE_SCHEMA" and module != "core":
print(f"Config var without type: {module}.{name}")
output[module][S_SCHEMAS][name] = config
@@ -175,14 +190,23 @@ def module_schemas(module):
except OSError:
# some empty __init__ files
module_str = ""
schemas = {}
schemas = []
for m_attr_name in dir(module):
m_attr_obj = getattr(module, m_attr_name)
if is_convertible_schema(m_attr_obj):
schemas[module_str.find(m_attr_name)] = [m_attr_name, m_attr_obj]
# Find where the name is assigned in the module source to preserve
# definition order. Using ^NAME\s*= (multiline) targets assignments
# at column 0, so "CONFIG_SCHEMA" won't collide with "CONFIG_SCHEMA_BASE".
match = re.search(
r"^" + re.escape(m_attr_name) + r"\s*=",
module_str,
re.MULTILINE,
)
pos = match.start() if match else -1
schemas.append((pos, m_attr_name, m_attr_obj))
for pos in sorted(schemas.keys()):
yield schemas[pos]
for _, name, obj in sorted(schemas, key=lambda x: x[0]):
yield name, obj
found_registries = {}
@@ -240,9 +264,16 @@ def add_module_registries(domain, module):
if len(parts) == 2:
reg_domain = parts[0]
reg_entry_name = parts[1]
else:
reg_domain = ".".join([parts[1], parts[0]])
reg_entry_name = parts[2]
elif len(parts) == 3:
# is a platform or a component?
if parts[0] in schema_core[S_PLATFORMS]:
reg_domain = ".".join([parts[1], parts[0]])
reg_entry_name = parts[2]
elif parts[0] in schema_core[S_COMPONENTS]:
reg_domain = parts[0]
reg_entry_name = ".".join([parts[1], parts[2]])
else:
print(f"registry {name} is unknown")
if reg_domain not in output:
output[reg_domain] = {}
@@ -252,8 +283,6 @@ def add_module_registries(domain, module):
attr_obj[name].schema, f"{reg_domain}/{reg_type}/{reg_entry_name}"
)
# print(f"{domain} - {attr_name} - {name}")
def do_pins():
# do pin registries
@@ -330,6 +359,35 @@ def fix_font():
)
def fix_globals():
if "globals" not in output:
return
from esphome.components.globals import _NON_RESTORING_SCHEMA
config = convert_config(_NON_RESTORING_SCHEMA, "globals/CONFIG_SCHEMA")
config["is_list"] = True
output["globals"][S_SCHEMAS][S_CONFIG_SCHEMA] = config
def fix_mapping():
if "mapping" not in output:
return
from esphome.components.mapping import BASE_SCHEMA
config = convert_config(BASE_SCHEMA, "mapping/CONFIG_SCHEMA")
output["mapping"][S_SCHEMAS][S_CONFIG_SCHEMA] = config
def fix_image():
if "image" not in output:
return
from esphome.components.image import IMAGE_SCHEMA
config = convert_config(IMAGE_SCHEMA, "image/CONFIG_SCHEMA")
config["is_list"] = True
output["image"][S_SCHEMAS][S_CONFIG_SCHEMA] = config
def fix_menu():
if "display_menu_base" not in output:
return
@@ -355,7 +413,7 @@ def fix_menu():
# 4. Configure menu items inside as recursive
menu = schemas["MENU_TYPES"][S_SCHEMA][S_CONFIG_VARS]["items"]["types"]["menu"]
menu[S_CONFIG_VARS].pop("items")
menu[S_EXTENDS] = ["display_menu_base.MENU_TYPES"]
menu[S_EXTENDS].append("display_menu_base.MENU_TYPES")
def get_logger_tags():
@@ -531,7 +589,6 @@ def shrink():
else:
arr_s.pop(S_EXTENDS)
arr_s |= key_s[S_SCHEMA]
print(x)
# simple types should be spread on each component,
# for enums so far these are logger.is_log_level, cover.validate_cover_state and pulse_counter.sensor.COUNT_MODE_SCHEMA
@@ -580,6 +637,10 @@ def shrink():
domain_schemas[S_SCHEMAS].pop(schema_name)
def is_cv_invalid(schema):
return repr(schema).startswith("<function invalid.<locals>.validator")
def build_schema():
print("Building schema")
@@ -610,7 +671,8 @@ def build_schema():
output[domain] = {S_COMPONENTS: {}, S_SCHEMAS: {}}
platforms[domain] = {}
elif manifest.config_schema is not None:
# e.g. dallas
if is_cv_invalid(manifest.config_schema):
continue
output[domain] = {S_SCHEMAS: {S_CONFIG_SCHEMA: {}}}
# Generate platforms (e.g. sensor, binary_sensor, climate )
@@ -621,7 +683,9 @@ def build_schema():
# Generate components
for domain, manifest in components.items():
if domain not in platforms:
if manifest.config_schema is not None:
if manifest.config_schema is not None and not is_cv_invalid(
manifest.config_schema
):
core_components[domain] = {}
if len(manifest.dependencies) > 0:
core_components[domain]["dependencies"] = manifest.dependencies
@@ -630,14 +694,15 @@ def build_schema():
for platform in platforms:
platform_manifest = get_platform(domain=platform, platform=domain)
if platform_manifest is not None:
output[platform][S_COMPONENTS][domain] = {}
if len(platform_manifest.dependencies) > 0:
output[platform][S_COMPONENTS][domain]["dependencies"] = (
platform_manifest.dependencies
)
register_module_schemas(
count = register_module_schemas(
f"{domain}.{platform}", platform_manifest.module, platform_manifest
)
if count > 0:
output[platform][S_COMPONENTS].setdefault(domain, {})
if len(platform_manifest.dependencies) > 0:
output[platform][S_COMPONENTS][domain]["dependencies"] = (
platform_manifest.dependencies
)
# Do registries
add_module_registries("core", automation)
@@ -657,6 +722,9 @@ def build_schema():
fix_remote_receiver()
fix_script()
fix_font()
fix_globals()
fix_mapping()
fix_image()
add_logger_tags()
shrink()
fix_menu()
@@ -677,12 +745,19 @@ def build_schema():
# bundle core inside esphome
data["esphome"]["core"] = data.pop("core")["core"]
if GENERATED_ID_TYPES:
print(
"Unconsumed id_type matchers:",
[id_type for _, id_type in GENERATED_ID_TYPES],
)
if args.check: # do not gen files
return
for c, s in data.items():
write_file(c, s)
delete_extra_files(data.keys())
deleted = delete_extra_files(data.keys())
print(f"Written {len(data.items())} deleted {deleted} files.")
def is_convertible_schema(schema):
@@ -711,6 +786,30 @@ def convert_config(schema, path):
return converted
GENERATED_ID_TYPES = [
(
lambda p: p.startswith("i2c/CONFIG_SCHEMA/") and p.endswith("/id"),
{"class": "i2c::I2CBus", "parents": ["Component"]},
),
(
lambda p: p == "uart/CONFIG_SCHEMA/val 1/ext0/all/id",
{"class": "uart::UARTComponent", "parents": ["Component"]},
),
(
lambda p: p == "http_request/CONFIG_SCHEMA/val 1/ext0/all/id",
{"class": "http_request::HttpRequestComponent", "parents": ["Component"]},
),
(
lambda p: (
p
== "uptime.sensor/CONFIG_SCHEMA/type_timestamp/ext0/ext1/all/time_id/val 1"
),
{},
),
(lambda p: p == "esp_ldo/action/voltage.adjust/all/all/id", {}),
]
def convert(schema, config_var, path):
"""config_var can be a config_var or a schema: both are dicts
config_var has a S_TYPE property, if this is S_SCHEMA, then it has a S_SCHEMA property
@@ -718,9 +817,6 @@ def convert(schema, config_var, path):
"""
repr_schema = repr(schema)
if path.startswith("ads1115.sensor") and path.endswith("gain"):
print(path)
if repr_schema in known_schemas:
schema_info = known_schemas[(repr_schema)]
for schema_instance, name in schema_info:
@@ -841,8 +937,6 @@ def convert(schema, config_var, path):
schema({"delay": "1s"})
except cv.Invalid:
config_var["has_required_var"] = True
else:
print("figure out " + path)
elif schema_type == "effects":
config_var[S_TYPE] = "registry"
config_var["registry"] = "light.effects"
@@ -879,8 +973,6 @@ def convert(schema, config_var, path):
"id"
]["id_type"]["class"]
config_var[S_TYPE] = "use_id"
else:
print("TODO deferred?")
elif isinstance(data, str):
# TODO: Figure out why pipsolar does this
config_var["use_id_type"] = data
@@ -890,23 +982,11 @@ def convert(schema, config_var, path):
else:
raise TypeError("Unknown extracted schema type")
elif config_var.get("key") == "GeneratedID":
if path.startswith("i2c/CONFIG_SCHEMA/") and path.endswith("/id"):
config_var["id_type"] = {
"class": "i2c::I2CBus",
"parents": ["Component"],
}
elif path == "uart/CONFIG_SCHEMA/val 1/ext0/all/id":
config_var["id_type"] = {
"class": "uart::UARTComponent",
"parents": ["Component"],
}
elif path == "http_request/CONFIG_SCHEMA/val 1/ext0/all/id":
config_var["id_type"] = {
"class": "http_request::HttpRequestComponent",
"parents": ["Component"],
}
elif path == "pins/esp32/val 1/id":
config_var["id_type"] = "pin"
for i, (matcher, id_type) in enumerate(GENERATED_ID_TYPES):
if matcher(path):
config_var["id_type"] = id_type
GENERATED_ID_TYPES.pop(i)
break
else:
print("Cannot determine id_type for " + path)