Use RingBufferAudioSource in mixer speaker

This commit is contained in:
Kevin Ahrendt
2026-05-08 13:36:43 +00:00
parent efea921e60
commit 9dd5ac6172
2 changed files with 59 additions and 52 deletions
@@ -182,7 +182,7 @@ void SourceSpeaker::loop() {
break; break;
} }
case speaker::STATE_RUNNING: case speaker::STATE_RUNNING:
if (!this->transfer_buffer_->has_buffered_data() && if (!this->audio_source_->has_buffered_data() &&
(this->pending_playback_frames_.load(std::memory_order_acquire) == 0)) { (this->pending_playback_frames_.load(std::memory_order_acquire) == 0)) {
// No audio data in buffer waiting to get mixed and no frames are pending playback // No audio data in buffer waiting to get mixed and no frames are pending playback
if ((this->timeout_ms_.has_value() && ((millis() - this->last_seen_data_ms_) > this->timeout_ms_.value())) || if ((this->timeout_ms_.has_value() && ((millis() - this->last_seen_data_ms_) > this->timeout_ms_.value())) ||
@@ -254,15 +254,12 @@ void SourceSpeaker::send_command_(uint32_t command_bit, bool wake_loop) {
void SourceSpeaker::start() { this->send_command_(SOURCE_SPEAKER_COMMAND_START, true); } void SourceSpeaker::start() { this->send_command_(SOURCE_SPEAKER_COMMAND_START, true); }
esp_err_t SourceSpeaker::start_() { esp_err_t SourceSpeaker::start_() {
const size_t ring_buffer_size = this->audio_stream_info_.ms_to_bytes(this->buffer_duration_ms_); const size_t bytes_per_frame = this->audio_stream_info_.frames_to_bytes(1);
if (this->transfer_buffer_.use_count() == 0) { // Round the ring buffer size down to a multiple of bytes_per_frame so the wrap boundary stays frame-aligned and
this->transfer_buffer_ = // avoids unnecessary single-frame splices.
audio::AudioSourceTransferBuffer::create(this->audio_stream_info_.ms_to_bytes(TRANSFER_BUFFER_DURATION_MS)); const size_t ring_buffer_size =
(this->audio_stream_info_.ms_to_bytes(this->buffer_duration_ms_) / bytes_per_frame) * bytes_per_frame;
if (this->transfer_buffer_ == nullptr) { if (this->audio_source_.use_count() == 0) {
return ESP_ERR_NO_MEM;
}
std::shared_ptr<ring_buffer::RingBuffer> temp_ring_buffer = this->ring_buffer_.lock(); std::shared_ptr<ring_buffer::RingBuffer> temp_ring_buffer = this->ring_buffer_.lock();
if (!temp_ring_buffer) { if (!temp_ring_buffer) {
temp_ring_buffer = ring_buffer::RingBuffer::create(ring_buffer_size); temp_ring_buffer = ring_buffer::RingBuffer::create(ring_buffer_size);
@@ -271,9 +268,15 @@ esp_err_t SourceSpeaker::start_() {
if (!temp_ring_buffer) { if (!temp_ring_buffer) {
return ESP_ERR_NO_MEM; return ESP_ERR_NO_MEM;
} else {
this->transfer_buffer_->set_source(temp_ring_buffer);
} }
std::unique_ptr<audio::RingBufferAudioSource> source = audio::RingBufferAudioSource::create(
temp_ring_buffer, this->audio_stream_info_.ms_to_bytes(TRANSFER_BUFFER_DURATION_MS),
static_cast<uint8_t>(bytes_per_frame));
if (source == nullptr) {
return ESP_ERR_NO_MEM;
}
this->audio_source_ = std::move(source);
} }
return this->parent_->start(this->audio_stream_info_); return this->parent_->start(this->audio_stream_info_);
@@ -284,7 +287,7 @@ void SourceSpeaker::stop() { this->send_command_(SOURCE_SPEAKER_COMMAND_STOP); }
void SourceSpeaker::finish() { this->send_command_(SOURCE_SPEAKER_COMMAND_FINISH); } void SourceSpeaker::finish() { this->send_command_(SOURCE_SPEAKER_COMMAND_FINISH); }
bool SourceSpeaker::has_buffered_data() const { bool SourceSpeaker::has_buffered_data() const {
return ((this->transfer_buffer_.use_count() > 0) && this->transfer_buffer_->has_buffered_data()); return ((this->audio_source_.use_count() > 0) && this->audio_source_->has_buffered_data());
} }
void SourceSpeaker::set_mute_state(bool mute_state) { void SourceSpeaker::set_mute_state(bool mute_state) {
@@ -301,16 +304,18 @@ void SourceSpeaker::set_volume(float volume) {
float SourceSpeaker::get_volume() { return this->parent_->get_output_speaker()->get_volume(); } float SourceSpeaker::get_volume() { return this->parent_->get_output_speaker()->get_volume(); }
size_t SourceSpeaker::process_data_from_source(std::shared_ptr<audio::AudioSourceTransferBuffer> &transfer_buffer, size_t SourceSpeaker::process_data_from_source(std::shared_ptr<audio::RingBufferAudioSource> &audio_source,
TickType_t ticks_to_wait) { TickType_t ticks_to_wait) {
// Store current offset, as these samples are already ducked if (audio_source->available() > 0) {
const size_t current_length = transfer_buffer->available(); // Existing exposure was ducked when fill() promoted it; do not re-duck on partial-consume re-entry.
return 0;
}
size_t bytes_read = transfer_buffer->transfer_data_from_source(ticks_to_wait); size_t bytes_read = audio_source->fill(ticks_to_wait, false);
uint32_t samples_to_duck = this->audio_stream_info_.bytes_to_samples(bytes_read); uint32_t samples_to_duck = this->audio_stream_info_.bytes_to_samples(bytes_read);
if (samples_to_duck > 0) { if (samples_to_duck > 0) {
int16_t *current_buffer = reinterpret_cast<int16_t *>(transfer_buffer->get_buffer_start() + current_length); int16_t *current_buffer = reinterpret_cast<int16_t *>(audio_source->mutable_data());
duck_samples(current_buffer, samples_to_duck, &this->current_ducking_db_reduction_, duck_samples(current_buffer, samples_to_duck, &this->current_ducking_db_reduction_,
&this->ducking_transition_samples_remaining_, this->samples_per_ducking_step_, &this->ducking_transition_samples_remaining_, this->samples_per_ducking_step_,
@@ -406,7 +411,7 @@ void SourceSpeaker::duck_samples(int16_t *input_buffer, uint32_t input_samples_t
void SourceSpeaker::enter_stopping_state_() { void SourceSpeaker::enter_stopping_state_() {
this->state_ = speaker::STATE_STOPPING; this->state_ = speaker::STATE_STOPPING;
this->stopping_start_ms_ = millis(); this->stopping_start_ms_ = millis();
this->transfer_buffer_.reset(); this->audio_source_.reset();
} }
void MixerSpeaker::dump_config() { void MixerSpeaker::dump_config() {
@@ -612,9 +617,9 @@ void MixerSpeaker::audio_mixer_task(void *params) {
// Pre-allocate vectors to avoid heap allocation in the loop (max 8 source speakers per schema) // Pre-allocate vectors to avoid heap allocation in the loop (max 8 source speakers per schema)
FixedVector<SourceSpeaker *> speakers_with_data; FixedVector<SourceSpeaker *> speakers_with_data;
FixedVector<std::shared_ptr<audio::AudioSourceTransferBuffer>> transfer_buffers_with_data; FixedVector<std::shared_ptr<audio::RingBufferAudioSource>> audio_sources_with_data;
speakers_with_data.init(this_mixer->source_speakers_.size()); speakers_with_data.init(this_mixer->source_speakers_.size());
transfer_buffers_with_data.init(this_mixer->source_speakers_.size()); audio_sources_with_data.init(this_mixer->source_speakers_.size());
while (true) { while (true) {
uint32_t event_group_bits = xEventGroupGetBits(this_mixer->event_group_); uint32_t event_group_bits = xEventGroupGetBits(this_mixer->event_group_);
@@ -629,27 +634,27 @@ void MixerSpeaker::audio_mixer_task(void *params) {
this_mixer->audio_stream_info_.value().bytes_to_frames(output_transfer_buffer->free()); this_mixer->audio_stream_info_.value().bytes_to_frames(output_transfer_buffer->free());
speakers_with_data.clear(); speakers_with_data.clear();
transfer_buffers_with_data.clear(); audio_sources_with_data.clear();
for (auto &speaker : this_mixer->source_speakers_) { for (auto &speaker : this_mixer->source_speakers_) {
if (speaker->is_running() && !speaker->get_pause_state()) { if (speaker->is_running() && !speaker->get_pause_state()) {
// Speaker is running and not paused, so it possibly can provide audio data // Speaker is running and not paused, so it possibly can provide audio data
std::shared_ptr<audio::AudioSourceTransferBuffer> transfer_buffer = speaker->get_transfer_buffer().lock(); std::shared_ptr<audio::RingBufferAudioSource> audio_source = speaker->get_audio_source().lock();
if (transfer_buffer.use_count() == 0) { if (audio_source.use_count() == 0) {
// No transfer buffer allocated, so skip processing this speaker // No audio source allocated, so skip processing this speaker
continue; continue;
} }
speaker->process_data_from_source(transfer_buffer, 0); // Transfers and ducks audio from source ring buffers speaker->process_data_from_source(audio_source, 0); // Exposes and ducks audio from source ring buffers
if (transfer_buffer->available() > 0) { if (audio_source->available() > 0) {
// Store the locked transfer buffers in their own vector to avoid releasing ownership until after the loop // Retain shared ownership across the mixing pass so the source isn't released mid-mix
transfer_buffers_with_data.push_back(transfer_buffer); audio_sources_with_data.push_back(audio_source);
speakers_with_data.push_back(speaker); speakers_with_data.push_back(speaker);
} }
} }
} }
if (transfer_buffers_with_data.empty()) { if (audio_sources_with_data.empty()) {
// No audio available for transferring, block task temporarily // No audio available for transferring, block task temporarily
delay(TASK_DELAY_MS); delay(TASK_DELAY_MS);
continue; continue;
@@ -657,7 +662,7 @@ void MixerSpeaker::audio_mixer_task(void *params) {
uint32_t frames_to_mix = output_frames_free; uint32_t frames_to_mix = output_frames_free;
if ((transfer_buffers_with_data.size() == 1) || this_mixer->queue_mode_) { if ((audio_sources_with_data.size() == 1) || this_mixer->queue_mode_) {
// Only one speaker has audio data, just copy samples over // Only one speaker has audio data, just copy samples over
audio::AudioStreamInfo active_stream_info = speakers_with_data[0]->get_audio_stream_info(); audio::AudioStreamInfo active_stream_info = speakers_with_data[0]->get_audio_stream_info();
@@ -667,10 +672,10 @@ void MixerSpeaker::audio_mixer_task(void *params) {
// Speaker's sample rate matches the output speaker's, copy directly // Speaker's sample rate matches the output speaker's, copy directly
const uint32_t frames_available_in_buffer = const uint32_t frames_available_in_buffer =
active_stream_info.bytes_to_frames(transfer_buffers_with_data[0]->available()); active_stream_info.bytes_to_frames(audio_sources_with_data[0]->available());
frames_to_mix = std::min(frames_to_mix, frames_available_in_buffer); frames_to_mix = std::min(frames_to_mix, frames_available_in_buffer);
copy_frames(reinterpret_cast<int16_t *>(transfer_buffers_with_data[0]->get_buffer_start()), copy_frames(reinterpret_cast<const int16_t *>(audio_sources_with_data[0]->data()), active_stream_info,
active_stream_info, reinterpret_cast<int16_t *>(output_transfer_buffer->get_buffer_end()), reinterpret_cast<int16_t *>(output_transfer_buffer->get_buffer_end()),
this_mixer->audio_stream_info_.value(), frames_to_mix); this_mixer->audio_stream_info_.value(), frames_to_mix);
// Set playback delay for newly contributing source // Set playback delay for newly contributing source
@@ -682,7 +687,7 @@ void MixerSpeaker::audio_mixer_task(void *params) {
// Update source speaker pending frames // Update source speaker pending frames
speakers_with_data[0]->pending_playback_frames_.fetch_add(frames_to_mix, std::memory_order_release); speakers_with_data[0]->pending_playback_frames_.fetch_add(frames_to_mix, std::memory_order_release);
transfer_buffers_with_data[0]->decrease_buffer_length(active_stream_info.frames_to_bytes(frames_to_mix)); audio_sources_with_data[0]->consume(active_stream_info.frames_to_bytes(frames_to_mix));
// Update output transfer buffer length and pipeline frame count // Update output transfer buffer length and pipeline frame count
output_transfer_buffer->increase_buffer_length( output_transfer_buffer->increase_buffer_length(
@@ -709,25 +714,25 @@ void MixerSpeaker::audio_mixer_task(void *params) {
} }
} else { } else {
// Determine how many frames to mix // Determine how many frames to mix
for (size_t i = 0; i < transfer_buffers_with_data.size(); ++i) { for (size_t i = 0; i < audio_sources_with_data.size(); ++i) {
const uint32_t frames_available_in_buffer = speakers_with_data[i]->get_audio_stream_info().bytes_to_frames( const uint32_t frames_available_in_buffer =
transfer_buffers_with_data[i]->available()); speakers_with_data[i]->get_audio_stream_info().bytes_to_frames(audio_sources_with_data[i]->available());
frames_to_mix = std::min(frames_to_mix, frames_available_in_buffer); frames_to_mix = std::min(frames_to_mix, frames_available_in_buffer);
} }
int16_t *primary_buffer = reinterpret_cast<int16_t *>(transfer_buffers_with_data[0]->get_buffer_start()); const int16_t *primary_buffer = reinterpret_cast<const int16_t *>(audio_sources_with_data[0]->data());
audio::AudioStreamInfo primary_stream_info = speakers_with_data[0]->get_audio_stream_info(); audio::AudioStreamInfo primary_stream_info = speakers_with_data[0]->get_audio_stream_info();
// Mix two streams together // Mix two streams together
for (size_t i = 1; i < transfer_buffers_with_data.size(); ++i) { for (size_t i = 1; i < audio_sources_with_data.size(); ++i) {
mix_audio_samples(primary_buffer, primary_stream_info, mix_audio_samples(primary_buffer, primary_stream_info,
reinterpret_cast<int16_t *>(transfer_buffers_with_data[i]->get_buffer_start()), reinterpret_cast<const int16_t *>(audio_sources_with_data[i]->data()),
speakers_with_data[i]->get_audio_stream_info(), speakers_with_data[i]->get_audio_stream_info(),
reinterpret_cast<int16_t *>(output_transfer_buffer->get_buffer_end()), reinterpret_cast<int16_t *>(output_transfer_buffer->get_buffer_end()),
this_mixer->audio_stream_info_.value(), frames_to_mix); this_mixer->audio_stream_info_.value(), frames_to_mix);
if (i != transfer_buffers_with_data.size() - 1) { if (i != audio_sources_with_data.size() - 1) {
// Need to mix more streams together, point primary buffer and stream info to the already mixed output // Need to mix more streams together, point primary buffer and stream info to the already mixed output
primary_buffer = reinterpret_cast<int16_t *>(output_transfer_buffer->get_buffer_end()); primary_buffer = reinterpret_cast<const int16_t *>(output_transfer_buffer->get_buffer_end());
primary_stream_info = this_mixer->audio_stream_info_.value(); primary_stream_info = this_mixer->audio_stream_info_.value();
} }
} }
@@ -735,8 +740,8 @@ void MixerSpeaker::audio_mixer_task(void *params) {
// Get current pipeline depth for delay calculation (before incrementing) // Get current pipeline depth for delay calculation (before incrementing)
uint32_t current_pipeline_frames = this_mixer->frames_in_pipeline_.load(std::memory_order_acquire); uint32_t current_pipeline_frames = this_mixer->frames_in_pipeline_.load(std::memory_order_acquire);
// Update source transfer buffer lengths and add new audio durations to the source speaker pending playbacks // Update source audio source consumption and add new audio durations to the source speaker pending playbacks
for (size_t i = 0; i < transfer_buffers_with_data.size(); ++i) { for (size_t i = 0; i < audio_sources_with_data.size(); ++i) {
// Set playback delay for newly contributing sources // Set playback delay for newly contributing sources
if (!speakers_with_data[i]->has_contributed_.load(std::memory_order_acquire)) { if (!speakers_with_data[i]->has_contributed_.load(std::memory_order_acquire)) {
speakers_with_data[i]->playback_delay_frames_.store(current_pipeline_frames, std::memory_order_release); speakers_with_data[i]->playback_delay_frames_.store(current_pipeline_frames, std::memory_order_release);
@@ -744,7 +749,7 @@ void MixerSpeaker::audio_mixer_task(void *params) {
} }
speakers_with_data[i]->pending_playback_frames_.fetch_add(frames_to_mix, std::memory_order_release); speakers_with_data[i]->pending_playback_frames_.fetch_add(frames_to_mix, std::memory_order_release);
transfer_buffers_with_data[i]->decrease_buffer_length( audio_sources_with_data[i]->consume(
speakers_with_data[i]->get_audio_stream_info().frames_to_bytes(frames_to_mix)); speakers_with_data[i]->get_audio_stream_info().frames_to_bytes(frames_to_mix));
} }
@@ -67,11 +67,13 @@ class SourceSpeaker : public speaker::Speaker, public Component {
void set_pause_state(bool pause_state) override { this->pause_state_ = pause_state; } void set_pause_state(bool pause_state) override { this->pause_state_ = pause_state; }
bool get_pause_state() const override { return this->pause_state_; } bool get_pause_state() const override { return this->pause_state_; }
/// @brief Transfers audio from the ring buffer into the transfer buffer. Ducks audio while transferring. /// @brief Exposes the next ring buffer chunk (zero-copy) and ducks the freshly exposed bytes in place.
/// @param transfer_buffer Locked shared_ptr to the transfer buffer (must be valid, not null) /// If the source still has bytes from a prior partial consume, this is a no-op (those bytes were already
/// ducked on the fill that exposed them).
/// @param audio_source Locked shared_ptr to the audio source (must be valid, not null)
/// @param ticks_to_wait FreeRTOS ticks to wait while waiting to read from the ring buffer. /// @param ticks_to_wait FreeRTOS ticks to wait while waiting to read from the ring buffer.
/// @return Number of bytes transferred from the ring buffer. /// @return Number of bytes newly exposed from the ring buffer.
size_t process_data_from_source(std::shared_ptr<audio::AudioSourceTransferBuffer> &transfer_buffer, size_t process_data_from_source(std::shared_ptr<audio::RingBufferAudioSource> &audio_source,
TickType_t ticks_to_wait); TickType_t ticks_to_wait);
/// @brief Sets the ducking level for the source speaker. /// @brief Sets the ducking level for the source speaker.
@@ -83,7 +85,7 @@ class SourceSpeaker : public speaker::Speaker, public Component {
void set_parent(MixerSpeaker *parent) { this->parent_ = parent; } void set_parent(MixerSpeaker *parent) { this->parent_ = parent; }
void set_timeout(uint32_t ms) { this->timeout_ms_ = ms; } void set_timeout(uint32_t ms) { this->timeout_ms_ = ms; }
std::weak_ptr<audio::AudioSourceTransferBuffer> get_transfer_buffer() { return this->transfer_buffer_; } std::weak_ptr<audio::RingBufferAudioSource> get_audio_source() { return this->audio_source_; }
protected: protected:
friend class MixerSpeaker; friend class MixerSpeaker;
@@ -106,7 +108,7 @@ class SourceSpeaker : public speaker::Speaker, public Component {
MixerSpeaker *parent_; MixerSpeaker *parent_;
std::shared_ptr<audio::AudioSourceTransferBuffer> transfer_buffer_; std::shared_ptr<audio::RingBufferAudioSource> audio_source_;
std::weak_ptr<ring_buffer::RingBuffer> ring_buffer_; std::weak_ptr<ring_buffer::RingBuffer> ring_buffer_;
uint32_t buffer_duration_ms_; uint32_t buffer_duration_ms_;