[mixer] Use RingBufferAudioSource (#16316)

This commit is contained in:
Kevin Ahrendt
2026-05-12 19:58:32 -04:00
committed by GitHub
parent 8b6cbc9f2b
commit 7f37ee3c53
2 changed files with 59 additions and 52 deletions
@@ -182,7 +182,7 @@ void SourceSpeaker::loop() {
break;
}
case speaker::STATE_RUNNING:
if (!this->transfer_buffer_->has_buffered_data() &&
if (!this->audio_source_->has_buffered_data() &&
(this->pending_playback_frames_.load(std::memory_order_acquire) == 0)) {
// No audio data in buffer waiting to get mixed and no frames are pending playback
if ((this->timeout_ms_.has_value() && ((millis() - this->last_seen_data_ms_) > this->timeout_ms_.value())) ||
@@ -254,15 +254,12 @@ void SourceSpeaker::send_command_(uint32_t command_bit, bool wake_loop) {
void SourceSpeaker::start() { this->send_command_(SOURCE_SPEAKER_COMMAND_START, true); }
esp_err_t SourceSpeaker::start_() {
const size_t ring_buffer_size = this->audio_stream_info_.ms_to_bytes(this->buffer_duration_ms_);
if (this->transfer_buffer_.use_count() == 0) {
this->transfer_buffer_ =
audio::AudioSourceTransferBuffer::create(this->audio_stream_info_.ms_to_bytes(TRANSFER_BUFFER_DURATION_MS));
if (this->transfer_buffer_ == nullptr) {
return ESP_ERR_NO_MEM;
}
const size_t bytes_per_frame = this->audio_stream_info_.frames_to_bytes(1);
// Round the ring buffer size down to a multiple of bytes_per_frame so the wrap boundary stays frame-aligned and
// avoids unnecessary single-frame splices.
const size_t ring_buffer_size =
(this->audio_stream_info_.ms_to_bytes(this->buffer_duration_ms_) / bytes_per_frame) * bytes_per_frame;
if (this->audio_source_.use_count() == 0) {
std::shared_ptr<ring_buffer::RingBuffer> temp_ring_buffer = this->ring_buffer_.lock();
if (!temp_ring_buffer) {
temp_ring_buffer = ring_buffer::RingBuffer::create(ring_buffer_size);
@@ -271,9 +268,15 @@ esp_err_t SourceSpeaker::start_() {
if (!temp_ring_buffer) {
return ESP_ERR_NO_MEM;
} else {
this->transfer_buffer_->set_source(temp_ring_buffer);
}
std::unique_ptr<audio::RingBufferAudioSource> source = audio::RingBufferAudioSource::create(
temp_ring_buffer, this->audio_stream_info_.ms_to_bytes(TRANSFER_BUFFER_DURATION_MS),
static_cast<uint8_t>(bytes_per_frame));
if (source == nullptr) {
return ESP_ERR_NO_MEM;
}
this->audio_source_ = std::move(source);
}
return this->parent_->start(this->audio_stream_info_);
@@ -284,7 +287,7 @@ void SourceSpeaker::stop() { this->send_command_(SOURCE_SPEAKER_COMMAND_STOP); }
void SourceSpeaker::finish() { this->send_command_(SOURCE_SPEAKER_COMMAND_FINISH); }
bool SourceSpeaker::has_buffered_data() const {
return ((this->transfer_buffer_.use_count() > 0) && this->transfer_buffer_->has_buffered_data());
return ((this->audio_source_.use_count() > 0) && this->audio_source_->has_buffered_data());
}
void SourceSpeaker::set_mute_state(bool mute_state) {
@@ -301,16 +304,18 @@ void SourceSpeaker::set_volume(float volume) {
float SourceSpeaker::get_volume() { return this->parent_->get_output_speaker()->get_volume(); }
size_t SourceSpeaker::process_data_from_source(std::shared_ptr<audio::AudioSourceTransferBuffer> &transfer_buffer,
size_t SourceSpeaker::process_data_from_source(std::shared_ptr<audio::RingBufferAudioSource> &audio_source,
TickType_t ticks_to_wait) {
// Store current offset, as these samples are already ducked
const size_t current_length = transfer_buffer->available();
if (audio_source->available() > 0) {
// Existing exposure was ducked when fill() promoted it; do not re-duck on partial-consume re-entry.
return 0;
}
size_t bytes_read = transfer_buffer->transfer_data_from_source(ticks_to_wait);
size_t bytes_read = audio_source->fill(ticks_to_wait, false);
uint32_t samples_to_duck = this->audio_stream_info_.bytes_to_samples(bytes_read);
if (samples_to_duck > 0) {
int16_t *current_buffer = reinterpret_cast<int16_t *>(transfer_buffer->get_buffer_start() + current_length);
int16_t *current_buffer = reinterpret_cast<int16_t *>(audio_source->mutable_data());
duck_samples(current_buffer, samples_to_duck, &this->current_ducking_db_reduction_,
&this->ducking_transition_samples_remaining_, this->samples_per_ducking_step_,
@@ -406,7 +411,7 @@ void SourceSpeaker::duck_samples(int16_t *input_buffer, uint32_t input_samples_t
void SourceSpeaker::enter_stopping_state_() {
this->state_ = speaker::STATE_STOPPING;
this->stopping_start_ms_ = millis();
this->transfer_buffer_.reset();
this->audio_source_.reset();
}
void MixerSpeaker::dump_config() {
@@ -612,9 +617,9 @@ void MixerSpeaker::audio_mixer_task(void *params) {
// Pre-allocate vectors to avoid heap allocation in the loop (max 8 source speakers per schema)
FixedVector<SourceSpeaker *> speakers_with_data;
FixedVector<std::shared_ptr<audio::AudioSourceTransferBuffer>> transfer_buffers_with_data;
FixedVector<std::shared_ptr<audio::RingBufferAudioSource>> audio_sources_with_data;
speakers_with_data.init(this_mixer->source_speakers_.size());
transfer_buffers_with_data.init(this_mixer->source_speakers_.size());
audio_sources_with_data.init(this_mixer->source_speakers_.size());
while (true) {
uint32_t event_group_bits = xEventGroupGetBits(this_mixer->event_group_);
@@ -629,27 +634,27 @@ void MixerSpeaker::audio_mixer_task(void *params) {
this_mixer->audio_stream_info_.value().bytes_to_frames(output_transfer_buffer->free());
speakers_with_data.clear();
transfer_buffers_with_data.clear();
audio_sources_with_data.clear();
for (auto &speaker : this_mixer->source_speakers_) {
if (speaker->is_running() && !speaker->get_pause_state()) {
// Speaker is running and not paused, so it possibly can provide audio data
std::shared_ptr<audio::AudioSourceTransferBuffer> transfer_buffer = speaker->get_transfer_buffer().lock();
if (transfer_buffer.use_count() == 0) {
// No transfer buffer allocated, so skip processing this speaker
std::shared_ptr<audio::RingBufferAudioSource> audio_source = speaker->get_audio_source().lock();
if (audio_source.use_count() == 0) {
// No audio source allocated, so skip processing this speaker
continue;
}
speaker->process_data_from_source(transfer_buffer, 0); // Transfers and ducks audio from source ring buffers
speaker->process_data_from_source(audio_source, 0); // Exposes and ducks audio from source ring buffers
if (transfer_buffer->available() > 0) {
// Store the locked transfer buffers in their own vector to avoid releasing ownership until after the loop
transfer_buffers_with_data.push_back(transfer_buffer);
if (audio_source->available() > 0) {
// Retain shared ownership across the mixing pass so the source isn't released mid-mix
audio_sources_with_data.push_back(audio_source);
speakers_with_data.push_back(speaker);
}
}
}
if (transfer_buffers_with_data.empty()) {
if (audio_sources_with_data.empty()) {
// No audio available for transferring, block task temporarily
delay(TASK_DELAY_MS);
continue;
@@ -657,7 +662,7 @@ void MixerSpeaker::audio_mixer_task(void *params) {
uint32_t frames_to_mix = output_frames_free;
if ((transfer_buffers_with_data.size() == 1) || this_mixer->queue_mode_) {
if ((audio_sources_with_data.size() == 1) || this_mixer->queue_mode_) {
// Only one speaker has audio data, just copy samples over
audio::AudioStreamInfo active_stream_info = speakers_with_data[0]->get_audio_stream_info();
@@ -667,10 +672,10 @@ void MixerSpeaker::audio_mixer_task(void *params) {
// Speaker's sample rate matches the output speaker's, copy directly
const uint32_t frames_available_in_buffer =
active_stream_info.bytes_to_frames(transfer_buffers_with_data[0]->available());
active_stream_info.bytes_to_frames(audio_sources_with_data[0]->available());
frames_to_mix = std::min(frames_to_mix, frames_available_in_buffer);
copy_frames(reinterpret_cast<int16_t *>(transfer_buffers_with_data[0]->get_buffer_start()),
active_stream_info, reinterpret_cast<int16_t *>(output_transfer_buffer->get_buffer_end()),
copy_frames(reinterpret_cast<const int16_t *>(audio_sources_with_data[0]->data()), active_stream_info,
reinterpret_cast<int16_t *>(output_transfer_buffer->get_buffer_end()),
this_mixer->audio_stream_info_.value(), frames_to_mix);
// Set playback delay for newly contributing source
@@ -682,7 +687,7 @@ void MixerSpeaker::audio_mixer_task(void *params) {
// Update source speaker pending frames
speakers_with_data[0]->pending_playback_frames_.fetch_add(frames_to_mix, std::memory_order_release);
transfer_buffers_with_data[0]->decrease_buffer_length(active_stream_info.frames_to_bytes(frames_to_mix));
audio_sources_with_data[0]->consume(active_stream_info.frames_to_bytes(frames_to_mix));
// Update output transfer buffer length and pipeline frame count
output_transfer_buffer->increase_buffer_length(
@@ -709,25 +714,25 @@ void MixerSpeaker::audio_mixer_task(void *params) {
}
} else {
// Determine how many frames to mix
for (size_t i = 0; i < transfer_buffers_with_data.size(); ++i) {
const uint32_t frames_available_in_buffer = speakers_with_data[i]->get_audio_stream_info().bytes_to_frames(
transfer_buffers_with_data[i]->available());
for (size_t i = 0; i < audio_sources_with_data.size(); ++i) {
const uint32_t frames_available_in_buffer =
speakers_with_data[i]->get_audio_stream_info().bytes_to_frames(audio_sources_with_data[i]->available());
frames_to_mix = std::min(frames_to_mix, frames_available_in_buffer);
}
int16_t *primary_buffer = reinterpret_cast<int16_t *>(transfer_buffers_with_data[0]->get_buffer_start());
const int16_t *primary_buffer = reinterpret_cast<const int16_t *>(audio_sources_with_data[0]->data());
audio::AudioStreamInfo primary_stream_info = speakers_with_data[0]->get_audio_stream_info();
// Mix two streams together
for (size_t i = 1; i < transfer_buffers_with_data.size(); ++i) {
for (size_t i = 1; i < audio_sources_with_data.size(); ++i) {
mix_audio_samples(primary_buffer, primary_stream_info,
reinterpret_cast<int16_t *>(transfer_buffers_with_data[i]->get_buffer_start()),
reinterpret_cast<const int16_t *>(audio_sources_with_data[i]->data()),
speakers_with_data[i]->get_audio_stream_info(),
reinterpret_cast<int16_t *>(output_transfer_buffer->get_buffer_end()),
this_mixer->audio_stream_info_.value(), frames_to_mix);
if (i != transfer_buffers_with_data.size() - 1) {
if (i != audio_sources_with_data.size() - 1) {
// Need to mix more streams together, point primary buffer and stream info to the already mixed output
primary_buffer = reinterpret_cast<int16_t *>(output_transfer_buffer->get_buffer_end());
primary_buffer = reinterpret_cast<const int16_t *>(output_transfer_buffer->get_buffer_end());
primary_stream_info = this_mixer->audio_stream_info_.value();
}
}
@@ -735,8 +740,8 @@ void MixerSpeaker::audio_mixer_task(void *params) {
// Get current pipeline depth for delay calculation (before incrementing)
uint32_t current_pipeline_frames = this_mixer->frames_in_pipeline_.load(std::memory_order_acquire);
// Update source transfer buffer lengths and add new audio durations to the source speaker pending playbacks
for (size_t i = 0; i < transfer_buffers_with_data.size(); ++i) {
// Update source audio source consumption and add new audio durations to the source speaker pending playbacks
for (size_t i = 0; i < audio_sources_with_data.size(); ++i) {
// Set playback delay for newly contributing sources
if (!speakers_with_data[i]->has_contributed_.load(std::memory_order_acquire)) {
speakers_with_data[i]->playback_delay_frames_.store(current_pipeline_frames, std::memory_order_release);
@@ -744,7 +749,7 @@ void MixerSpeaker::audio_mixer_task(void *params) {
}
speakers_with_data[i]->pending_playback_frames_.fetch_add(frames_to_mix, std::memory_order_release);
transfer_buffers_with_data[i]->decrease_buffer_length(
audio_sources_with_data[i]->consume(
speakers_with_data[i]->get_audio_stream_info().frames_to_bytes(frames_to_mix));
}
@@ -67,11 +67,13 @@ class SourceSpeaker : public speaker::Speaker, public Component {
void set_pause_state(bool pause_state) override { this->pause_state_ = pause_state; }
bool get_pause_state() const override { return this->pause_state_; }
/// @brief Transfers audio from the ring buffer into the transfer buffer. Ducks audio while transferring.
/// @param transfer_buffer Locked shared_ptr to the transfer buffer (must be valid, not null)
/// @brief Exposes the next ring buffer chunk (zero-copy) and ducks the freshly exposed bytes in place.
/// If the source still has bytes from a prior partial consume, this is a no-op (those bytes were already
/// ducked on the fill that exposed them).
/// @param audio_source Locked shared_ptr to the audio source (must be valid, not null)
/// @param ticks_to_wait FreeRTOS ticks to wait while waiting to read from the ring buffer.
/// @return Number of bytes transferred from the ring buffer.
size_t process_data_from_source(std::shared_ptr<audio::AudioSourceTransferBuffer> &transfer_buffer,
/// @return Number of bytes newly exposed from the ring buffer.
size_t process_data_from_source(std::shared_ptr<audio::RingBufferAudioSource> &audio_source,
TickType_t ticks_to_wait);
/// @brief Sets the ducking level for the source speaker.
@@ -83,7 +85,7 @@ class SourceSpeaker : public speaker::Speaker, public Component {
void set_parent(MixerSpeaker *parent) { this->parent_ = parent; }
void set_timeout(uint32_t ms) { this->timeout_ms_ = ms; }
std::weak_ptr<audio::AudioSourceTransferBuffer> get_transfer_buffer() { return this->transfer_buffer_; }
std::weak_ptr<audio::RingBufferAudioSource> get_audio_source() { return this->audio_source_; }
protected:
friend class MixerSpeaker;
@@ -106,7 +108,7 @@ class SourceSpeaker : public speaker::Speaker, public Component {
MixerSpeaker *parent_;
std::shared_ptr<audio::AudioSourceTransferBuffer> transfer_buffer_;
std::shared_ptr<audio::RingBufferAudioSource> audio_source_;
std::weak_ptr<ring_buffer::RingBuffer> ring_buffer_;
uint32_t buffer_duration_ms_;