[media_player][speaker][speaker_source] Centralize preferred format codegen (#14771)

This commit is contained in:
Kevin Ahrendt
2026-04-24 10:09:03 -04:00
committed by GitHub
parent baa6d5f96b
commit f132b7dc07
4 changed files with 156 additions and 200 deletions
+109 -2
View File
@@ -1,20 +1,31 @@
from collections.abc import Callable
from esphome import automation
import esphome.codegen as cg
from esphome.components import audio
import esphome.config_validation as cv
from esphome.const import (
CONF_ENTITY_CATEGORY,
CONF_FORMAT,
CONF_ICON,
CONF_ID,
CONF_NUM_CHANNELS,
CONF_ON_IDLE,
CONF_ON_STATE,
CONF_ON_TURN_OFF,
CONF_ON_TURN_ON,
CONF_SAMPLE_RATE,
CONF_VOLUME,
)
from esphome.core import CORE
from esphome.core.entity_helpers import entity_duplicate_validator, setup_entity
from esphome.core.entity_helpers import (
entity_duplicate_validator,
inherit_property_from,
setup_entity,
)
from esphome.coroutine import CoroPriority, coroutine_with_priority
from esphome.cpp_generator import MockObjClass
from esphome.cpp_generator import MockObj, MockObjClass
from esphome.types import ConfigType
CODEOWNERS = ["@jesserockz"]
@@ -34,6 +45,102 @@ MEDIA_PLAYER_FORMAT_PURPOSE_ENUM = {
"announcement": MediaPlayerFormatPurpose.PURPOSE_ANNOUNCEMENT,
}
# Public API for external components. Do not remove.
FORMAT_MAPPING = {
"FLAC": "flac",
"MP3": "mp3",
"OPUS": "opus",
"WAV": "wav",
}
def build_supported_format_struct(
format_config: ConfigType, purpose: MockObj
) -> cg.StructInitializer:
"""Build a MediaPlayerSupportedFormat struct from a format config and purpose.
Public API for external components. Do not remove.
"""
args = [
MediaPlayerSupportedFormat,
("format", FORMAT_MAPPING[format_config[CONF_FORMAT]]),
("sample_rate", format_config[CONF_SAMPLE_RATE]),
("num_channels", format_config[CONF_NUM_CHANNELS]),
("purpose", purpose),
]
# Omit sample_bytes for MP3: ffmpeg transcoding in Home Assistant fails
# if the number of bytes per sample is specified for MP3.
if format_config[CONF_FORMAT] != "MP3":
args.append(("sample_bytes", 2))
return cg.StructInitializer(*args)
def validate_preferred_format(
component_name: str, audio_device_key: str
) -> Callable[[ConfigType], ConfigType]:
"""Return a validator that inherits audio device settings and validates format constraints.
Public API for external components. Do not remove.
"""
def validator(config: ConfigType) -> ConfigType:
# Inherit settings from audio device if not manually set
inherit_property_from(CONF_NUM_CHANNELS, audio_device_key)(config)
inherit_property_from(CONF_SAMPLE_RATE, audio_device_key)(config)
# Opus only supports 48 kHz
if config.get(CONF_FORMAT) == "OPUS" and config.get(CONF_SAMPLE_RATE) != 48000:
raise cv.Invalid("Opus only supports a sample rate of 48000 Hz")
# Validate the settings are compatible with the audio device
audio.final_validate_audio_schema(
component_name,
audio_device=audio_device_key,
bits_per_sample=16,
channels=config.get(CONF_NUM_CHANNELS),
sample_rate=config.get(CONF_SAMPLE_RATE),
)(config)
return config
return validator
def request_codecs_for_format_configs(
config: ConfigType, format_config_keys: list[str]
) -> None:
"""Scan format configs for configured formats and request the needed codec support.
If any config uses "NONE" (accepts any format), all codecs are requested.
Public API for external components. Do not remove.
"""
needed_formats: set[str] = set()
need_all = False
for key in format_config_keys:
if format_config := config.get(key):
fmt = format_config[CONF_FORMAT]
if fmt == "NONE":
need_all = True
else:
needed_formats.add(fmt)
if need_all:
audio.request_flac_support()
audio.request_mp3_support()
audio.request_opus_support()
else:
if "FLAC" in needed_formats:
audio.request_flac_support()
if "MP3" in needed_formats:
audio.request_mp3_support()
if "OPUS" in needed_formats:
audio.request_opus_support()
# Local config key constants
CONF_ANNOUNCEMENT = "announcement"
CONF_ON_PLAY = "on_play"
@@ -32,7 +32,6 @@ from esphome.const import (
CONF_URL,
)
from esphome.core import CORE, HexInt
from esphome.core.entity_helpers import inherit_property_from
from esphome.external_files import download_content
_LOGGER = logging.getLogger(__name__)
@@ -44,16 +43,12 @@ DEPENDENCIES = ["network"]
CODEOWNERS = ["@kahrendt", "@synesthesiam"]
DOMAIN = "media_player"
CODEC_SUPPORT_ALL = "all"
CODEC_SUPPORT_NEEDED = "needed"
CODEC_SUPPORT_NONE = "none"
TYPE_LOCAL = "local"
TYPE_WEB = "web"
CONF_ANNOUNCEMENT = "announcement"
CONF_ANNOUNCEMENT_PIPELINE = "announcement_pipeline"
CONF_CODEC_SUPPORT_ENABLED = "codec_support_enabled"
CONF_CODEC_SUPPORT_ENABLED = "codec_support_enabled" # Remove before 2026.10.0
CONF_ENQUEUE = "enqueue"
CONF_MEDIA_FILE = "media_file"
CONF_MEDIA_PIPELINE = "media_pipeline"
@@ -106,43 +101,10 @@ def _download_web_file(value):
return value
# Returns a media_player.MediaPlayerSupportedFormat struct with the configured
# format, sample rate, number of channels, purpose, and bytes per sample
def _get_supported_format_struct(pipeline, type):
args = [
media_player.MediaPlayerSupportedFormat,
]
if pipeline[CONF_FORMAT] == "FLAC":
args.append(("format", "flac"))
elif pipeline[CONF_FORMAT] == "MP3":
args.append(("format", "mp3"))
elif pipeline[CONF_FORMAT] == "OPUS":
args.append(("format", "opus"))
elif pipeline[CONF_FORMAT] == "WAV":
args.append(("format", "wav"))
args.append(("sample_rate", pipeline[CONF_SAMPLE_RATE]))
args.append(("num_channels", pipeline[CONF_NUM_CHANNELS]))
if type == "MEDIA":
args.append(
(
"purpose",
media_player.MEDIA_PLAYER_FORMAT_PURPOSE_ENUM["default"],
)
)
elif type == "ANNOUNCEMENT":
args.append(
(
"purpose",
media_player.MEDIA_PLAYER_FORMAT_PURPOSE_ENUM["announcement"],
)
)
if pipeline[CONF_FORMAT] != "MP3":
args.append(("sample_bytes", 2))
return cg.StructInitializer(*args)
_PURPOSE_MAP = {
"MEDIA": media_player.MEDIA_PLAYER_FORMAT_PURPOSE_ENUM["default"],
"ANNOUNCEMENT": media_player.MEDIA_PLAYER_FORMAT_PURPOSE_ENUM["announcement"],
}
def _file_schema(value):
@@ -210,25 +172,9 @@ def _validate_file_shorthand(value):
)
def _validate_pipeline(config):
# Inherit transcoder settings from speaker if not manually set
inherit_property_from(CONF_NUM_CHANNELS, CONF_SPEAKER)(config)
inherit_property_from(CONF_SAMPLE_RATE, CONF_SPEAKER)(config)
# Opus only supports 48 kHz
if config.get(CONF_FORMAT) == "OPUS" and config.get(CONF_SAMPLE_RATE) != 48000:
raise cv.Invalid("Opus only supports a sample rate of 48000 Hz")
# Validate the transcoder settings is compatible with the speaker
audio.final_validate_audio_schema(
"speaker media_player",
audio_device=CONF_SPEAKER,
bits_per_sample=16,
channels=config.get(CONF_NUM_CHANNELS),
sample_rate=config.get(CONF_SAMPLE_RATE),
)(config)
return config
_validate_pipeline = media_player.validate_preferred_format(
"speaker media_player", CONF_SPEAKER
)
def _validate_repeated_speaker(config):
@@ -245,59 +191,34 @@ def _validate_repeated_speaker(config):
def _final_validate(config):
# Normalize boolean values to string equivalents
codec_mode = config[CONF_CODEC_SUPPORT_ENABLED]
if codec_mode is True:
codec_mode = CODEC_SUPPORT_ALL
elif codec_mode is False:
codec_mode = CODEC_SUPPORT_NONE
# Remove before 2026.10.0
if CONF_CODEC_SUPPORT_ENABLED in config:
_LOGGER.warning(
"'%s' is deprecated and will be removed in 2026.10.0. "
"Codec support is now automatically determined from the pipeline "
"'format' setting. Set format to 'NONE' to enable all codecs.",
CONF_CODEC_SUPPORT_ENABLED,
)
use_codec = codec_mode != CODEC_SUPPORT_NONE
# In "needed" mode, collect formats from pipelines and files
needed_formats = set()
need_all = False
if codec_mode == CODEC_SUPPORT_NEEDED:
for pipeline_key in (CONF_ANNOUNCEMENT_PIPELINE, CONF_MEDIA_PIPELINE):
if pipeline := config.get(pipeline_key):
fmt = pipeline[CONF_FORMAT]
if fmt == "NONE":
# No preferred format means any format could arrive
need_all = True
else:
needed_formats.add(fmt)
# Request codecs based on pipeline formats
media_player.request_codecs_for_format_configs(
config, [CONF_ANNOUNCEMENT_PIPELINE, CONF_MEDIA_PIPELINE]
)
# Validate local files and request any additional codecs they need
for file_config in config.get(CONF_FILES, []):
_, media_file_type = _read_audio_file_and_type(file_config)
if str(media_file_type) == str(audio.AUDIO_FILE_TYPE_ENUM["NONE"]):
raise cv.Invalid("Unsupported local media file")
if not use_codec and str(media_file_type) != str(
audio.AUDIO_FILE_TYPE_ENUM["WAV"]
):
# Only wav files are supported
raise cv.Invalid(
f"Unsupported local media file type, set {CONF_CODEC_SUPPORT_ENABLED} to true or convert the media file to wav"
)
# In "needed" mode, add file format to needed codecs
if codec_mode == CODEC_SUPPORT_NEEDED:
for fmt_name, fmt_enum in audio.AUDIO_FILE_TYPE_ENUM.items():
if str(media_file_type) == str(fmt_enum):
if fmt_name not in ("WAV", "NONE"):
needed_formats.add(fmt_name)
break
# Request codec support
if codec_mode == CODEC_SUPPORT_ALL or need_all:
audio.request_flac_support()
audio.request_mp3_support()
audio.request_opus_support()
elif codec_mode == CODEC_SUPPORT_NEEDED:
if "FLAC" in needed_formats:
audio.request_flac_support()
if "MP3" in needed_formats:
audio.request_mp3_support()
if "OPUS" in needed_formats:
audio.request_opus_support()
for fmt_name, fmt_enum in audio.AUDIO_FILE_TYPE_ENUM.items():
if str(media_file_type) == str(fmt_enum):
if fmt_name == "FLAC":
audio.request_flac_support()
elif fmt_name == "MP3":
audio.request_mp3_support()
elif fmt_name == "OPUS":
audio.request_opus_support()
break
return config
@@ -362,17 +283,8 @@ CONFIG_SCHEMA = cv.All(
cv.Optional(CONF_BUFFER_SIZE, default=1000000): cv.int_range(
min=4000, max=4000000
),
cv.Optional(
CONF_CODEC_SUPPORT_ENABLED, default=CODEC_SUPPORT_NEEDED
): cv.Any(
cv.boolean,
cv.one_of(
CODEC_SUPPORT_ALL,
CODEC_SUPPORT_NEEDED,
CODEC_SUPPORT_NONE,
lower=True,
),
),
# Remove before 2026.10.0
cv.Optional(CONF_CODEC_SUPPORT_ENABLED): cv.Any(cv.boolean, cv.string),
cv.Optional(CONF_FILES): cv.ensure_list(MEDIA_FILE_TYPE_SCHEMA),
cv.Optional(CONF_TASK_STACK_IN_PSRAM): cv.All(
cv.boolean, cv.requires_component(psram.DOMAIN)
@@ -432,8 +344,8 @@ async def to_code(config):
if announcement_pipeline_config[CONF_FORMAT] != "NONE":
cg.add(
var.set_announcement_format(
_get_supported_format_struct(
announcement_pipeline_config, "ANNOUNCEMENT"
media_player.build_supported_format_struct(
announcement_pipeline_config, _PURPOSE_MAP["ANNOUNCEMENT"]
)
)
)
@@ -444,7 +356,9 @@ async def to_code(config):
if media_pipeline_config[CONF_FORMAT] != "NONE":
cg.add(
var.set_media_format(
_get_supported_format_struct(media_pipeline_config, "MEDIA")
media_player.build_supported_format_struct(
media_pipeline_config, _PURPOSE_MAP["MEDIA"]
)
)
)
@@ -17,7 +17,6 @@ from esphome.const import (
CONF_SPEAKER,
)
from esphome.core import ID
from esphome.core.entity_helpers import inherit_property_from
from esphome.cpp_generator import MockObj, TemplateArgsType
from esphome.types import ConfigType
@@ -65,53 +64,9 @@ SetPlaylistDelayAction = speaker_source_ns.class_(
)
FORMAT_MAPPING = {
"FLAC": "flac",
"MP3": "mp3",
"OPUS": "opus",
"WAV": "wav",
}
# Returns a media_player.MediaPlayerSupportedFormat struct with the configured
# format, sample rate, number of channels, purpose, and bytes per sample
def _get_supported_format_struct(pipeline: ConfigType, purpose: MockObj):
args = [
media_player.MediaPlayerSupportedFormat,
]
args.append(("format", FORMAT_MAPPING[pipeline[CONF_FORMAT]]))
args.append(("sample_rate", pipeline[CONF_SAMPLE_RATE]))
args.append(("num_channels", pipeline[CONF_NUM_CHANNELS]))
args.append(("purpose", purpose))
# Omit sample_bytes for MP3: ffmpeg transcoding in Home Assistant fails
# if the number of bytes per sample is specified for MP3.
if pipeline[CONF_FORMAT] != "MP3":
args.append(("sample_bytes", 2))
return cg.StructInitializer(*args)
def _validate_pipeline(config: ConfigType) -> ConfigType:
# Inherit settings from speaker if not manually set
inherit_property_from(CONF_NUM_CHANNELS, CONF_SPEAKER)(config)
inherit_property_from(CONF_SAMPLE_RATE, CONF_SPEAKER)(config)
# Opus only supports 48 kHz
if config.get(CONF_FORMAT) == "OPUS" and config.get(CONF_SAMPLE_RATE) != 48000:
raise cv.Invalid("Opus only supports a sample rate of 48000 Hz")
audio.final_validate_audio_schema(
"speaker_source media_player",
audio_device=CONF_SPEAKER,
bits_per_sample=16,
channels=config.get(CONF_NUM_CHANNELS),
sample_rate=config.get(CONF_SAMPLE_RATE),
)(config)
return config
_validate_pipeline = media_player.validate_preferred_format(
"speaker_source media_player", CONF_SPEAKER
)
PIPELINE_SCHEMA = cv.Schema(
@@ -198,31 +153,9 @@ CONFIG_SCHEMA = cv.All(
def _final_validate_codecs(config: ConfigType) -> ConfigType:
# "NONE" means the pipeline accepts any format at runtime, so all optional codecs must be available.
# When a specific format is set, only that codec is requested.
needed_formats: set[str] = set()
need_all = False
for pipeline_key in (CONF_ANNOUNCEMENT_PIPELINE, CONF_MEDIA_PIPELINE):
if pipeline := config.get(pipeline_key):
fmt = pipeline[CONF_FORMAT]
if fmt == "NONE":
need_all = True
else:
needed_formats.add(fmt)
if need_all:
audio.request_flac_support()
audio.request_mp3_support()
audio.request_opus_support()
else:
if "FLAC" in needed_formats:
audio.request_flac_support()
if "MP3" in needed_formats:
audio.request_mp3_support()
if "OPUS" in needed_formats:
audio.request_opus_support()
media_player.request_codecs_for_format_configs(
config, [CONF_ANNOUNCEMENT_PIPELINE, CONF_MEDIA_PIPELINE]
)
return config
@@ -264,7 +197,9 @@ async def to_code(config: ConfigType) -> None:
cg.add(
var.set_format(
pipeline_enum,
_get_supported_format_struct(pipeline_config, purpose),
media_player.build_supported_format_struct(
pipeline_config, purpose
),
)
)
@@ -11,9 +11,9 @@ media_player:
id: speaker_media_player_id
announcement_pipeline:
speaker: speaker_id
format: NONE
buffer_size: 1000000
volume_increment: 0.02
volume_max: 0.95
volume_min: 0.0
task_stack_in_psram: true
codec_support_enabled: all