[mixer] Fix memory leak in mixer task on stop/start cycles (#15185)

This commit is contained in:
Kevin Ahrendt
2026-03-31 19:06:48 -04:00
committed by Jesse Hills
parent dc634b8c7b
commit 514c0c8331
+137 -137
View File
@@ -597,173 +597,173 @@ void MixerSpeaker::audio_mixer_task(void *params) {
xEventGroupSetBits(this_mixer->event_group_, MIXER_TASK_STATE_STARTING);
std::unique_ptr<audio::AudioSinkTransferBuffer> output_transfer_buffer = audio::AudioSinkTransferBuffer::create(
this_mixer->audio_stream_info_.value().ms_to_bytes(TRANSFER_BUFFER_DURATION_MS));
{ // Ensure C++ objects fall out of scope to ensure proper cleanup before stopping the task
std::unique_ptr<audio::AudioSinkTransferBuffer> output_transfer_buffer = audio::AudioSinkTransferBuffer::create(
this_mixer->audio_stream_info_.value().ms_to_bytes(TRANSFER_BUFFER_DURATION_MS));
if (output_transfer_buffer == nullptr) {
xEventGroupSetBits(this_mixer->event_group_, MIXER_TASK_STATE_STOPPED | MIXER_TASK_ERR_ESP_NO_MEM);
if (output_transfer_buffer == nullptr) {
xEventGroupSetBits(this_mixer->event_group_, MIXER_TASK_STATE_STOPPED | MIXER_TASK_ERR_ESP_NO_MEM);
vTaskSuspend(nullptr); // Suspend this task indefinitely until the loop method deletes it
}
output_transfer_buffer->set_sink(this_mixer->output_speaker_);
xEventGroupSetBits(this_mixer->event_group_, MIXER_TASK_STATE_RUNNING);
bool sent_finished = false;
// Pre-allocate vectors to avoid heap allocation in the loop (max 8 source speakers per schema)
FixedVector<SourceSpeaker *> speakers_with_data;
FixedVector<std::shared_ptr<audio::AudioSourceTransferBuffer>> transfer_buffers_with_data;
speakers_with_data.init(this_mixer->source_speakers_.size());
transfer_buffers_with_data.init(this_mixer->source_speakers_.size());
while (true) {
uint32_t event_group_bits = xEventGroupGetBits(this_mixer->event_group_);
if (event_group_bits & MIXER_TASK_COMMAND_STOP) {
break;
vTaskSuspend(nullptr); // Suspend this task indefinitely until the loop method deletes it
}
// Never shift the data in the output transfer buffer to avoid unnecessary, slow data moves
output_transfer_buffer->transfer_data_to_sink(pdMS_TO_TICKS(TASK_DELAY_MS), false);
output_transfer_buffer->set_sink(this_mixer->output_speaker_);
const uint32_t output_frames_free =
this_mixer->audio_stream_info_.value().bytes_to_frames(output_transfer_buffer->free());
xEventGroupSetBits(this_mixer->event_group_, MIXER_TASK_STATE_RUNNING);
speakers_with_data.clear();
transfer_buffers_with_data.clear();
bool sent_finished = false;
for (auto &speaker : this_mixer->source_speakers_) {
if (speaker->is_running() && !speaker->get_pause_state()) {
// Speaker is running and not paused, so it possibly can provide audio data
std::shared_ptr<audio::AudioSourceTransferBuffer> transfer_buffer = speaker->get_transfer_buffer().lock();
if (transfer_buffer.use_count() == 0) {
// No transfer buffer allocated, so skip processing this speaker
continue;
}
speaker->process_data_from_source(transfer_buffer, 0); // Transfers and ducks audio from source ring buffers
// Pre-allocate vectors to avoid heap allocation in the loop (max 8 source speakers per schema)
FixedVector<SourceSpeaker *> speakers_with_data;
FixedVector<std::shared_ptr<audio::AudioSourceTransferBuffer>> transfer_buffers_with_data;
speakers_with_data.init(this_mixer->source_speakers_.size());
transfer_buffers_with_data.init(this_mixer->source_speakers_.size());
if (transfer_buffer->available() > 0) {
// Store the locked transfer buffers in their own vector to avoid releasing ownership until after the loop
transfer_buffers_with_data.push_back(transfer_buffer);
speakers_with_data.push_back(speaker);
while (true) {
uint32_t event_group_bits = xEventGroupGetBits(this_mixer->event_group_);
if (event_group_bits & MIXER_TASK_COMMAND_STOP) {
break;
}
// Never shift the data in the output transfer buffer to avoid unnecessary, slow data moves
output_transfer_buffer->transfer_data_to_sink(pdMS_TO_TICKS(TASK_DELAY_MS), false);
const uint32_t output_frames_free =
this_mixer->audio_stream_info_.value().bytes_to_frames(output_transfer_buffer->free());
speakers_with_data.clear();
transfer_buffers_with_data.clear();
for (auto &speaker : this_mixer->source_speakers_) {
if (speaker->is_running() && !speaker->get_pause_state()) {
// Speaker is running and not paused, so it possibly can provide audio data
std::shared_ptr<audio::AudioSourceTransferBuffer> transfer_buffer = speaker->get_transfer_buffer().lock();
if (transfer_buffer.use_count() == 0) {
// No transfer buffer allocated, so skip processing this speaker
continue;
}
speaker->process_data_from_source(transfer_buffer, 0); // Transfers and ducks audio from source ring buffers
if (transfer_buffer->available() > 0) {
// Store the locked transfer buffers in their own vector to avoid releasing ownership until after the loop
transfer_buffers_with_data.push_back(transfer_buffer);
speakers_with_data.push_back(speaker);
}
}
}
}
if (transfer_buffers_with_data.empty()) {
// No audio available for transferring, block task temporarily
delay(TASK_DELAY_MS);
continue;
}
if (transfer_buffers_with_data.empty()) {
// No audio available for transferring, block task temporarily
delay(TASK_DELAY_MS);
continue;
}
uint32_t frames_to_mix = output_frames_free;
uint32_t frames_to_mix = output_frames_free;
if ((transfer_buffers_with_data.size() == 1) || this_mixer->queue_mode_) {
// Only one speaker has audio data, just copy samples over
if ((transfer_buffers_with_data.size() == 1) || this_mixer->queue_mode_) {
// Only one speaker has audio data, just copy samples over
audio::AudioStreamInfo active_stream_info = speakers_with_data[0]->get_audio_stream_info();
audio::AudioStreamInfo active_stream_info = speakers_with_data[0]->get_audio_stream_info();
if (active_stream_info.get_sample_rate() ==
this_mixer->output_speaker_->get_audio_stream_info().get_sample_rate()) {
// Speaker's sample rate matches the output speaker's, copy directly
if (active_stream_info.get_sample_rate() ==
this_mixer->output_speaker_->get_audio_stream_info().get_sample_rate()) {
// Speaker's sample rate matches the output speaker's, copy directly
const uint32_t frames_available_in_buffer =
active_stream_info.bytes_to_frames(transfer_buffers_with_data[0]->available());
frames_to_mix = std::min(frames_to_mix, frames_available_in_buffer);
copy_frames(reinterpret_cast<int16_t *>(transfer_buffers_with_data[0]->get_buffer_start()), active_stream_info,
reinterpret_cast<int16_t *>(output_transfer_buffer->get_buffer_end()),
this_mixer->audio_stream_info_.value(), frames_to_mix);
const uint32_t frames_available_in_buffer =
active_stream_info.bytes_to_frames(transfer_buffers_with_data[0]->available());
frames_to_mix = std::min(frames_to_mix, frames_available_in_buffer);
copy_frames(reinterpret_cast<int16_t *>(transfer_buffers_with_data[0]->get_buffer_start()),
active_stream_info, reinterpret_cast<int16_t *>(output_transfer_buffer->get_buffer_end()),
this_mixer->audio_stream_info_.value(), frames_to_mix);
// Set playback delay for newly contributing source
if (!speakers_with_data[0]->has_contributed_.load(std::memory_order_acquire)) {
speakers_with_data[0]->playback_delay_frames_.store(
this_mixer->frames_in_pipeline_.load(std::memory_order_acquire), std::memory_order_release);
speakers_with_data[0]->has_contributed_.store(true, std::memory_order_release);
// Set playback delay for newly contributing source
if (!speakers_with_data[0]->has_contributed_.load(std::memory_order_acquire)) {
speakers_with_data[0]->playback_delay_frames_.store(
this_mixer->frames_in_pipeline_.load(std::memory_order_acquire), std::memory_order_release);
speakers_with_data[0]->has_contributed_.store(true, std::memory_order_release);
}
// Update source speaker pending frames
speakers_with_data[0]->pending_playback_frames_.fetch_add(frames_to_mix, std::memory_order_release);
transfer_buffers_with_data[0]->decrease_buffer_length(active_stream_info.frames_to_bytes(frames_to_mix));
// Update output transfer buffer length and pipeline frame count
output_transfer_buffer->increase_buffer_length(
this_mixer->audio_stream_info_.value().frames_to_bytes(frames_to_mix));
this_mixer->frames_in_pipeline_.fetch_add(frames_to_mix, std::memory_order_release);
} else {
// Speaker's stream info doesn't match the output speaker's, so it's a new source speaker
if (!this_mixer->output_speaker_->is_stopped()) {
if (!sent_finished) {
this_mixer->output_speaker_->finish();
sent_finished = true; // Avoid repeatedly sending the finish command
}
} else {
// Speaker has finished writing the current audio, update the stream information and restart the speaker
this_mixer->audio_stream_info_ =
audio::AudioStreamInfo(active_stream_info.get_bits_per_sample(), this_mixer->output_channels_,
active_stream_info.get_sample_rate());
this_mixer->output_speaker_->set_audio_stream_info(this_mixer->audio_stream_info_.value());
this_mixer->output_speaker_->start();
// Reset pipeline frame count since we're starting fresh with a new sample rate
this_mixer->frames_in_pipeline_.store(0, std::memory_order_release);
sent_finished = false;
}
}
} else {
// Determine how many frames to mix
for (size_t i = 0; i < transfer_buffers_with_data.size(); ++i) {
const uint32_t frames_available_in_buffer = speakers_with_data[i]->get_audio_stream_info().bytes_to_frames(
transfer_buffers_with_data[i]->available());
frames_to_mix = std::min(frames_to_mix, frames_available_in_buffer);
}
int16_t *primary_buffer = reinterpret_cast<int16_t *>(transfer_buffers_with_data[0]->get_buffer_start());
audio::AudioStreamInfo primary_stream_info = speakers_with_data[0]->get_audio_stream_info();
// Mix two streams together
for (size_t i = 1; i < transfer_buffers_with_data.size(); ++i) {
mix_audio_samples(primary_buffer, primary_stream_info,
reinterpret_cast<int16_t *>(transfer_buffers_with_data[i]->get_buffer_start()),
speakers_with_data[i]->get_audio_stream_info(),
reinterpret_cast<int16_t *>(output_transfer_buffer->get_buffer_end()),
this_mixer->audio_stream_info_.value(), frames_to_mix);
if (i != transfer_buffers_with_data.size() - 1) {
// Need to mix more streams together, point primary buffer and stream info to the already mixed output
primary_buffer = reinterpret_cast<int16_t *>(output_transfer_buffer->get_buffer_end());
primary_stream_info = this_mixer->audio_stream_info_.value();
}
}
// Update source speaker pending frames
speakers_with_data[0]->pending_playback_frames_.fetch_add(frames_to_mix, std::memory_order_release);
transfer_buffers_with_data[0]->decrease_buffer_length(active_stream_info.frames_to_bytes(frames_to_mix));
// Get current pipeline depth for delay calculation (before incrementing)
uint32_t current_pipeline_frames = this_mixer->frames_in_pipeline_.load(std::memory_order_acquire);
// Update output transfer buffer length and pipeline frame count
// Update source transfer buffer lengths and add new audio durations to the source speaker pending playbacks
for (size_t i = 0; i < transfer_buffers_with_data.size(); ++i) {
// Set playback delay for newly contributing sources
if (!speakers_with_data[i]->has_contributed_.load(std::memory_order_acquire)) {
speakers_with_data[i]->playback_delay_frames_.store(current_pipeline_frames, std::memory_order_release);
speakers_with_data[i]->has_contributed_.store(true, std::memory_order_release);
}
speakers_with_data[i]->pending_playback_frames_.fetch_add(frames_to_mix, std::memory_order_release);
transfer_buffers_with_data[i]->decrease_buffer_length(
speakers_with_data[i]->get_audio_stream_info().frames_to_bytes(frames_to_mix));
}
// Update output transfer buffer length and pipeline frame count (once, not per source)
output_transfer_buffer->increase_buffer_length(
this_mixer->audio_stream_info_.value().frames_to_bytes(frames_to_mix));
this_mixer->frames_in_pipeline_.fetch_add(frames_to_mix, std::memory_order_release);
} else {
// Speaker's stream info doesn't match the output speaker's, so it's a new source speaker
if (!this_mixer->output_speaker_->is_stopped()) {
if (!sent_finished) {
this_mixer->output_speaker_->finish();
sent_finished = true; // Avoid repeatedly sending the finish command
}
} else {
// Speaker has finished writing the current audio, update the stream information and restart the speaker
this_mixer->audio_stream_info_ =
audio::AudioStreamInfo(active_stream_info.get_bits_per_sample(), this_mixer->output_channels_,
active_stream_info.get_sample_rate());
this_mixer->output_speaker_->set_audio_stream_info(this_mixer->audio_stream_info_.value());
this_mixer->output_speaker_->start();
// Reset pipeline frame count since we're starting fresh with a new sample rate
this_mixer->frames_in_pipeline_.store(0, std::memory_order_release);
sent_finished = false;
}
}
} else {
// Determine how many frames to mix
for (size_t i = 0; i < transfer_buffers_with_data.size(); ++i) {
const uint32_t frames_available_in_buffer =
speakers_with_data[i]->get_audio_stream_info().bytes_to_frames(transfer_buffers_with_data[i]->available());
frames_to_mix = std::min(frames_to_mix, frames_available_in_buffer);
}
int16_t *primary_buffer = reinterpret_cast<int16_t *>(transfer_buffers_with_data[0]->get_buffer_start());
audio::AudioStreamInfo primary_stream_info = speakers_with_data[0]->get_audio_stream_info();
// Mix two streams together
for (size_t i = 1; i < transfer_buffers_with_data.size(); ++i) {
mix_audio_samples(primary_buffer, primary_stream_info,
reinterpret_cast<int16_t *>(transfer_buffers_with_data[i]->get_buffer_start()),
speakers_with_data[i]->get_audio_stream_info(),
reinterpret_cast<int16_t *>(output_transfer_buffer->get_buffer_end()),
this_mixer->audio_stream_info_.value(), frames_to_mix);
if (i != transfer_buffers_with_data.size() - 1) {
// Need to mix more streams together, point primary buffer and stream info to the already mixed output
primary_buffer = reinterpret_cast<int16_t *>(output_transfer_buffer->get_buffer_end());
primary_stream_info = this_mixer->audio_stream_info_.value();
}
}
// Get current pipeline depth for delay calculation (before incrementing)
uint32_t current_pipeline_frames = this_mixer->frames_in_pipeline_.load(std::memory_order_acquire);
// Update source transfer buffer lengths and add new audio durations to the source speaker pending playbacks
for (size_t i = 0; i < transfer_buffers_with_data.size(); ++i) {
// Set playback delay for newly contributing sources
if (!speakers_with_data[i]->has_contributed_.load(std::memory_order_acquire)) {
speakers_with_data[i]->playback_delay_frames_.store(current_pipeline_frames, std::memory_order_release);
speakers_with_data[i]->has_contributed_.store(true, std::memory_order_release);
}
speakers_with_data[i]->pending_playback_frames_.fetch_add(frames_to_mix, std::memory_order_release);
transfer_buffers_with_data[i]->decrease_buffer_length(
speakers_with_data[i]->get_audio_stream_info().frames_to_bytes(frames_to_mix));
}
// Update output transfer buffer length and pipeline frame count (once, not per source)
output_transfer_buffer->increase_buffer_length(
this_mixer->audio_stream_info_.value().frames_to_bytes(frames_to_mix));
this_mixer->frames_in_pipeline_.fetch_add(frames_to_mix, std::memory_order_release);
}
}
xEventGroupSetBits(this_mixer->event_group_, MIXER_TASK_STATE_STOPPING);
xEventGroupSetBits(this_mixer->event_group_, MIXER_TASK_STATE_STOPPING);
}
// Reset pipeline frame count since the task is stopping
this_mixer->frames_in_pipeline_.store(0, std::memory_order_release);
output_transfer_buffer.reset();
xEventGroupSetBits(this_mixer->event_group_, MIXER_TASK_STATE_STOPPED);
vTaskSuspend(nullptr); // Suspend this task indefinitely until the loop method deletes it