diff --git a/.versioning/changes/nBpZUnmkTX.patch.md b/.versioning/changes/nBpZUnmkTX.patch.md new file mode 100644 index 0000000..82ab4f9 --- /dev/null +++ b/.versioning/changes/nBpZUnmkTX.patch.md @@ -0,0 +1 @@ +Fixes an A/V desync issue with the opus encoder diff --git a/src/av/sample_format.cpp b/src/av/sample_format.cpp index 163c519..1e69a52 100644 --- a/src/av/sample_format.cpp +++ b/src/av/sample_format.cpp @@ -46,4 +46,36 @@ auto is_sample_rate_supported(std::uint32_t requested, return false; } +auto sample_format_name(SampleFormat fmt) noexcept -> char const* +{ + switch (fmt) { + case SampleFormat::u8_interleaved: + return "u8_interleaved"; + case SampleFormat::s16_interleaved: + return "s16_interleaved"; + case SampleFormat::s32_interleaved: + return "s32_interleaved"; + case SampleFormat::float_interleaved: + return "float_interleaved"; + case SampleFormat::double_interleaved: + return "double_interleaved"; + case SampleFormat::u8_planar: + return "u8_planar"; + case SampleFormat::s16_planar: + return "s16_planar"; + case SampleFormat::s32_planar: + return "s32_planar"; + case SampleFormat::float_planar: + return "float_planar"; + case SampleFormat::double_planar: + return "double_planar"; + case SampleFormat::s64_interleaved: + return "s64_interleaved"; + case SampleFormat::s64_planar: + return "s64_planar"; + default: + return "unknown"; + } +} + } // namespace sc diff --git a/src/av/sample_format.hpp b/src/av/sample_format.hpp index 9a42ec1..2ab6409 100644 --- a/src/av/sample_format.hpp +++ b/src/av/sample_format.hpp @@ -182,6 +182,8 @@ auto constexpr convert_to_pipewire_format(SampleFormat fmt) -> spa_audio_format throw std::runtime_error { "No viable Pipewire sample format conversion " }; } +auto sample_format_name(SampleFormat fmt) noexcept -> char const*; + auto find_supported_formats(sc::BorrowedPtr codec) -> std::vector; diff --git a/src/handlers/audio_chunk_writer.cpp b/src/handlers/audio_chunk_writer.cpp index a287d91..a63b9b8 100644 --- a/src/handlers/audio_chunk_writer.cpp +++ b/src/handlers/audio_chunk_writer.cpp @@ -14,10 +14,12 @@ namespace sc ChunkWriter::ChunkWriter(AVCodecContext* codec_context, AVStream* stream, - Encoder encoder) noexcept + Encoder encoder, + std::size_t frame_size) noexcept : codec_context_ { codec_context } , stream_ { stream } , encoder_ { encoder } + , frame_size_ { frame_size } , frame_ { av_frame_alloc() } , total_samples_written_ { 0 } { @@ -25,9 +27,7 @@ ChunkWriter::ChunkWriter(AVCodecContext* codec_context, auto ChunkWriter::operator()(MediaChunk const& chunk) -> void { - SC_EXPECT(!codec_context_->frame_size || - static_cast(chunk.sample_count) == - codec_context_->frame_size); + SC_EXPECT(chunk.sample_count >= frame_size_); sc::SampleFormat const sample_format = sc::convert_from_libav_format(codec_context_->sample_fmt); @@ -38,7 +38,7 @@ auto ChunkWriter::operator()(MediaChunk const& chunk) -> void encoder_.prepare_frame(codec_context_.get(), stream_.get()); auto* frame = encoder_frame->frame.get(); - frame->nb_samples = chunk.sample_count; + frame->nb_samples = frame_size_; frame->format = codec_context_->sample_fmt; frame->sample_rate = codec_context_->sample_rate; #if LIBAVCODEC_VERSION_MAJOR < 60 diff --git a/src/handlers/audio_chunk_writer.hpp b/src/handlers/audio_chunk_writer.hpp index 917235f..fdf2430 100644 --- a/src/handlers/audio_chunk_writer.hpp +++ b/src/handlers/audio_chunk_writer.hpp @@ -12,7 +12,8 @@ struct ChunkWriter { explicit ChunkWriter(AVCodecContext* codec_context, AVStream* stream, - Encoder encoder) noexcept; + Encoder encoder, + std::size_t frame_size) noexcept; auto operator()(MediaChunk const& chunk) -> void; @@ -20,6 +21,7 @@ struct ChunkWriter BorrowedPtr codec_context_; BorrowedPtr stream_; Encoder encoder_; + std::size_t frame_size_; FramePtr frame_; std::size_t total_samples_written_ { 0 }; }; diff --git a/src/main.cpp b/src/main.cpp index 9b2b7fa..7715e8c 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -1,4 +1,5 @@ #include "./shadow_cast.hpp" +#include "utils/contracts.hpp" #include "utils/frame_time.hpp" #include @@ -9,9 +10,11 @@ #include #include #include +#include #include #include #include +#include #include #include #include @@ -79,6 +82,15 @@ struct DestroyCaptureSessionGuard sc::NvFBC nvfbc; }; +auto apply_audio_codec_modifiers(AVCodec const& codec, AVDictionary*& output) + -> void +{ + if (std::string_view { codec.name } == "libopus") { + av_dict_set_int(&output, "frame_duration", 40, 0); + av_dict_set_int(&output, "vbr", 0, 0); + } +} + auto run_loop(sc::Context& main, sc::Context& media, sc::Context& audio, @@ -213,8 +225,11 @@ auto run_wayland(sc::Parameters const& params, sc::wayland::DisplayPtr display) audio_encoder_context->time_base = AVRational { 1, params.sample_rate }; audio_encoder_context->flags |= AV_CODEC_FLAG_GLOBAL_HEADER; + AVDictionary* options = nullptr; + apply_audio_codec_modifiers(*encoder, options); + if (auto const ret = - avcodec_open2(audio_encoder_context.get(), encoder.get(), nullptr); + avcodec_open2(audio_encoder_context.get(), encoder.get(), &options); ret < 0) { throw sc::CodecError { "Failed to open audio codec: " + sc::av_error_to_string(ret) }; @@ -292,6 +307,10 @@ auto run_wayland(sc::Parameters const& params, sc::wayland::DisplayPtr display) sc::Context audio_ctx { params.frame_time }; sc::Context media_ctx { params.frame_time }; + std::size_t const frame_size = audio_encoder_context->frame_size + ? audio_encoder_context->frame_size + : 2048; + ctx.services().add(sc::SignalService {}); add_signal_handler(ctx, SIGINT, [&](std::uint32_t) { ctx.request_stop(); @@ -300,11 +319,7 @@ auto run_wayland(sc::Parameters const& params, sc::wayland::DisplayPtr display) audio_ctx.services().add_from_factory([&] { return std::make_unique( - supported_formats.front(), - params.sample_rate, - audio_encoder_context->frame_size - ? audio_encoder_context->frame_size - : 2048); + supported_formats.front(), params.sample_rate, frame_size); }); ctx.services().add_from_factory([&] { @@ -321,7 +336,8 @@ auto run_wayland(sc::Parameters const& params, sc::wayland::DisplayPtr display) set_audio_chunk_handler(audio_ctx, sc::ChunkWriter { audio_encoder_context.get(), stream.get(), - media_writer }); + media_writer, + frame_size }); set_drm_video_frame_handler( ctx, sc::DRMVideoFrameWriter { @@ -407,8 +423,11 @@ auto run(sc::Parameters const& params) -> void audio_encoder_context->time_base = AVRational { 1, params.sample_rate }; audio_encoder_context->flags |= AV_CODEC_FLAG_GLOBAL_HEADER; + AVDictionary* options = nullptr; + apply_audio_codec_modifiers(*encoder, options); + if (auto const ret = - avcodec_open2(audio_encoder_context.get(), encoder.get(), nullptr); + avcodec_open2(audio_encoder_context.get(), encoder.get(), &options); ret < 0) { throw sc::CodecError { "Failed to open audio codec: " + sc::av_error_to_string(ret) }; @@ -496,6 +515,10 @@ auto run(sc::Parameters const& params) -> void sc::Context audio_ctx { params.frame_time }; sc::Context media_ctx { params.frame_time }; + std::size_t const frame_size = audio_encoder_context->frame_size + ? audio_encoder_context->frame_size + : 2048; + ctx.services().add(sc::SignalService {}); add_signal_handler(ctx, SIGINT, [&](std::uint32_t) { ctx.request_stop(); @@ -504,11 +527,7 @@ auto run(sc::Parameters const& params) -> void audio_ctx.services().add_from_factory([&] { return std::make_unique( - supported_formats.front(), - params.sample_rate, - audio_encoder_context->frame_size - ? audio_encoder_context->frame_size - : 2048); + supported_formats.front(), params.sample_rate, frame_size); }); ctx.services().add_from_factory([&] { @@ -525,7 +544,8 @@ auto run(sc::Parameters const& params) -> void set_audio_chunk_handler(audio_ctx, sc::ChunkWriter { audio_encoder_context.get(), stream.get(), - media_writer }); + media_writer, + frame_size }); set_video_frame_handler(ctx, sc::VideoFrameWriter { video_encoder_context.get(), video_stream.get(), diff --git a/src/services/audio_service.cpp b/src/services/audio_service.cpp index 46cde11..e611032 100644 --- a/src/services/audio_service.cpp +++ b/src/services/audio_service.cpp @@ -1,13 +1,19 @@ #include "services/audio_service.hpp" +#include "av/media_chunk.hpp" +#include "av/sample_format.hpp" #include "utils/contracts.hpp" #include "utils/elapsed.hpp" #include #include #include #include +#include +#include #include #include +#include #include +#include #include #include @@ -20,32 +26,23 @@ using namespace std::literals::string_literals; namespace { -auto transfer_chunk_n(sc::MediaChunk& source, - std::size_t num_bytes, - sc::MediaChunk& dest) -> void +auto prepare_buffer_channels(sc::MediaChunk& buffer, + sc::SampleFormat sample_format, + std::size_t num_channels = 2) -> void { - auto channels_required = - std::max(0, - static_cast(source.channel_buffers().size() - - dest.channel_buffers().size())); + buffer.channel_buffers().clear(); + buffer.channel_buffers().push_back(sc::DynamicBuffer {}); - while (channels_required--) - dest.channel_buffers().push_back(sc::DynamicBuffer {}); - - auto it = source.channel_buffers().begin(); - auto out_it = dest.channel_buffers().begin(); - - for (; it != source.channel_buffers().end(); ++it) { - auto& src_buf = *it; - auto& dst_buf = *out_it++; - - assert(src_buf.size() >= num_bytes); - - auto dst_data = dst_buf.prepare(num_bytes); - std::copy_n(src_buf.data().begin(), num_bytes, dst_data.begin()); - dst_buf.commit(num_bytes); - src_buf.consume(num_bytes); + if (!sc::is_interleaved_format(sample_format)) { + while (num_channels != buffer.channel_buffers().size()) { + buffer.channel_buffers().push_back(sc::DynamicBuffer {}); + } } + + SC_EXPECT(buffer.channel_buffers().size() == + sc::is_interleaved_format(sample_format) + ? 1 + : num_channels); } } // namespace @@ -70,56 +67,6 @@ struct RequeueBufferGuard * * pw_stream_queue_buffer(stream, b); */ -static void on_process(void* userdata) -{ - sc::AudioLoopData* data = reinterpret_cast(userdata); - struct pw_buffer* b; - - if ((b = pw_stream_dequeue_buffer(data->stream)) == NULL) { - pw_log_warn("out of buffers: %m"); - return; - } - - RequeueBufferGuard scope_guard { b, data->stream }; - - spa_buffer* buf = b->buffer; - - if (!buf || !buf->datas[0].data) { - pw_log_warn("No data in buffer\n"); - return; - } - - auto const sample_size = sample_format_size(data->required_sample_format); - auto const num_bytes = buf->datas[0].chunk->size; - auto const num_samples = is_interleaved_format(data->required_sample_format) - ? (num_bytes / 2) / sample_size - : num_bytes / sample_size; - - auto pool_chunk = data->service->chunk_pool().get(); - - sc::MediaChunk& chunk = *pool_chunk; - chunk.timestamp_ms = sc::global_elapsed.value(); - chunk.sample_count = num_samples; - std::span channel_data { buf->datas, buf->n_datas }; - while (chunk.channel_buffers().size() < channel_data.size()) - chunk.channel_buffers().push_back(sc::DynamicBuffer {}); - - auto out_buffer_it = chunk.channel_buffers().begin(); - - for (auto const& d : channel_data) { - sc::DynamicBuffer& chunk_buffer = *out_buffer_it++; - auto target_bytes = chunk_buffer.prepare(num_bytes); - - std::span source_bytes { reinterpret_cast(d.data), - num_bytes }; - - std::copy(begin(source_bytes), end(source_bytes), begin(target_bytes)); - chunk_buffer.commit(num_bytes); - } - - add_chunk(*data->service, std::move(pool_chunk)); -} - /* Be notified when the stream param changes. We're only looking at the * format changes. */ @@ -145,7 +92,7 @@ on_stream_param_changed(void* _data, uint32_t id, const struct spa_pod* param) spa_format_audio_raw_parse(param, &data->format.info.raw); fprintf(stdout, - "capturing rate: %d, channels: %d\n", + "Audio capturing rate: %d, channels: %d\n", data->format.info.raw.rate, data->format.info.raw.channels); } @@ -159,7 +106,7 @@ constexpr pw_stream_events stream_events = { .version = on_stream_param_changed, .add_buffer = nullptr, .remove_buffer = nullptr, - .process = on_process, + .process = sc::on_process, .drained = nullptr, .command = nullptr, .trigger_done = nullptr }; @@ -208,6 +155,9 @@ auto start_pipewire(sc::AudioLoopData& data) * value). We leave the channels and rate empty to accept the native * graph rate and channels. */ spa_audio_info_raw raw_init = {}; + fprintf(stderr, + "Audio sample format: %s\n", + sample_format_name(data.required_sample_format)); raw_init.format = convert_to_pipewire_format(data.required_sample_format); raw_init.rate = data.required_sample_rate; params[0] = spa_format_audio_raw_build(&b, SPA_PARAM_EnumFormat, &raw_init); @@ -242,6 +192,59 @@ auto stop_pipewire(sc::AudioLoopData& data) noexcept -> void namespace sc { +auto on_process(void* userdata) -> void +{ + sc::AudioLoopData* data = reinterpret_cast(userdata); + struct pw_buffer* b; + + if ((b = pw_stream_dequeue_buffer(data->stream)) == NULL) { + pw_log_warn("out of buffers: %m"); + return; + } + + RequeueBufferGuard scope_guard { b, data->stream }; + + spa_buffer* buf = b->buffer; + + if (!buf || !buf->datas[0].data) { + pw_log_warn("No data in buffer\n"); + return; + } + + auto const sample_size = sample_format_size(data->required_sample_format); + auto const num_bytes = buf->datas[0].chunk->size; + auto const num_samples = is_interleaved_format(data->required_sample_format) + ? (num_bytes / 2) / sample_size + : num_bytes / sample_size; + + std::span channel_data { buf->datas, buf->n_datas }; + + auto lock = std::lock_guard { data->service->data_mutex_ }; + + auto& input_buffer = data->service->input_buffer_; + SC_EXPECT(channel_data.size() == input_buffer.channel_buffers().size()); + + auto out_buffer_it = input_buffer.channel_buffers().begin(); + + for (auto const& d : channel_data) { + sc::DynamicBuffer& chunk_buffer = *out_buffer_it++; + auto target_bytes = chunk_buffer.prepare(num_bytes); + + std::span source_bytes { reinterpret_cast(d.data), + num_bytes }; + + std::copy(begin(source_bytes), end(source_bytes), begin(target_bytes)); + chunk_buffer.commit(num_bytes); + } + + input_buffer.sample_count += num_samples; + + if (input_buffer.sample_count >= data->service->frame_size_) { + data->service->notify(input_buffer.sample_count / + data->service->frame_size_); + } +} + AudioService::AudioService(SampleFormat sample_format, std::size_t sample_rate, std::size_t frame_size) @@ -251,19 +254,21 @@ AudioService::AudioService(SampleFormat sample_format, { } -AudioService::~AudioService() -{ - ReturnToPoolGuard return_to_pool_guard { available_chunks_, chunk_pool_ }; -} +AudioService::~AudioService() {} -auto AudioService::chunk_pool() noexcept -> SynchronizedPool& +auto AudioService::notify(std::size_t frames) noexcept -> void { - return chunk_pool_; + std::uint64_t const val = frames; + ::write(event_fd_, &val, sizeof(val)); } auto AudioService::on_init(ReadinessRegister reg) -> void { - reg(FrameTimeRatio(1), &dispatch_chunks); + event_fd_ = eventfd(0, EFD_NONBLOCK); + SC_EXPECT(event_fd_ >= 0); + reg(event_fd_, &dispatch_chunks); + + prepare_buffer_channels(input_buffer_, sample_format_); loop_data_ = {}; loop_data_.service = this; @@ -274,6 +279,8 @@ auto AudioService::on_init(ReadinessRegister reg) -> void auto AudioService::on_uninit() noexcept -> void { + ::close(event_fd_); + event_fd_ = -1; stop_pipewire(loop_data_); if (stream_end_listener_) @@ -283,34 +290,40 @@ auto AudioService::on_uninit() noexcept -> void auto dispatch_chunks(sc::Service& svc) -> void { auto& self = static_cast(svc); + std::uint64_t val; + ::read(self.event_fd_, &val, sizeof(val)); + + std::size_t num_frames = val; + SC_EXPECT(num_frames); #ifdef SHADOW_CAST_ENABLE_HISTOGRAMS auto const frame_start = global_elapsed.nanosecond_value(); #endif - decltype(self.available_chunks_) tmp_chunks; - ReturnToPoolGuard return_to_pool_guard { tmp_chunks, self.chunk_pool() }; - - { - std::lock_guard data_lock { self.data_mutex_ }; - auto it = std::find_if(self.available_chunks_.begin(), - self.available_chunks_.end(), - [&](auto const& chunk) { - return self.frame_size_ && - chunk.sample_count < self.frame_size_; - }); - - tmp_chunks.splice(tmp_chunks.begin(), - self.available_chunks_, - self.available_chunks_.begin(), - it); - } - if (auto& listener = self.chunk_listener_; listener) { - for (auto const& chunk : tmp_chunks) { - SC_EXPECT(!self.frame_size_ || - chunk.sample_count == self.frame_size_); - (*listener)(chunk); + while (num_frames--) { + auto lock = std::lock_guard { self.data_mutex_ }; + SC_EXPECT(self.input_buffer_.sample_count >= self.frame_size_); + + (*listener)(self.input_buffer_); + auto const sample_size = + sc::sample_format_size(self.sample_format_); + auto const interleaved = + sc::is_interleaved_format(self.sample_format_); + + for (auto it = self.input_buffer_.channel_buffers().begin(); + it != self.input_buffer_.channel_buffers().end(); + ++it) { + + if (interleaved) { + it->consume(sample_size * self.frame_size_ * 2); + break; + } + + it->consume(sample_size * self.frame_size_); + } + + self.input_buffer_.sample_count -= self.frame_size_; } } @@ -320,58 +333,4 @@ auto dispatch_chunks(sc::Service& svc) -> void #endif } -auto add_chunk(AudioService& svc, - sc::SynchronizedPool::ItemPtr chunk) -> void -{ - auto const sample_size = sc::sample_format_size(svc.sample_format_); - auto const interleaved = sc::is_interleaved_format(svc.sample_format_); - - using namespace std::literals::string_literals; - - { - std::lock_guard data_lock { svc.data_mutex_ }; - - if (!svc.frame_size_) { - svc.available_chunks_.push_back(chunk.release()); - return; - } - - if (!svc.available_chunks_.empty()) { - auto it = std::next(svc.available_chunks_.end(), -1); - - if (svc.frame_size_ > it->sample_count) { - auto const samples_to_copy = std::min( - chunk->sample_count, svc.frame_size_ - it->sample_count); - - if (samples_to_copy) { - transfer_chunk_n(*chunk, - sample_size * samples_to_copy * - (interleaved ? 2 : 1), - *it); - it->sample_count += samples_to_copy; - chunk->sample_count -= samples_to_copy; - } - } - } - - while (chunk->sample_count > svc.frame_size_) { - auto new_chunk = svc.chunk_pool().get(); - - transfer_chunk_n(*chunk, - sample_size * svc.frame_size_ * - (interleaved ? 2 : 1), - *new_chunk); - - chunk->sample_count -= svc.frame_size_; - new_chunk->sample_count += svc.frame_size_; - - svc.available_chunks_.push_back(new_chunk.release()); - } - - if (chunk->sample_count) { - svc.available_chunks_.push_back(chunk.release()); - } - } -} - } // namespace sc diff --git a/src/services/audio_service.hpp b/src/services/audio_service.hpp index 755b423..e4c8af7 100644 --- a/src/services/audio_service.hpp +++ b/src/services/audio_service.hpp @@ -32,6 +32,8 @@ struct AudioLoopData std::size_t required_sample_rate; }; +auto on_process(void* userdata) -> void; + struct AudioService final : Service { friend auto dispatch_chunks(Service&) -> void; @@ -52,7 +54,7 @@ struct AudioService final : Service AudioService(AudioService const&) = delete; auto operator=(AudioService const&) -> AudioService& = delete; - auto chunk_pool() noexcept -> SynchronizedPool&; + friend auto on_process(void* userdata) -> void; protected: auto on_init(ReadinessRegister) -> void override; @@ -73,19 +75,20 @@ struct AudioService final : Service } private: + auto notify(std::size_t frames) noexcept -> void; + std::optional chunk_listener_; std::optional stream_end_listener_; - IntrusiveList available_chunks_; std::mutex data_mutex_; AudioLoopData loop_data_ {}; SampleFormat sample_format_; std::size_t sample_rate_; std::size_t frame_size_; - SynchronizedPool chunk_pool_; + MediaChunk input_buffer_; + int event_fd_ { -1 }; }; auto dispatch_chunks(sc::Service&) -> void; -auto add_chunk(AudioService&, SynchronizedPool::ItemPtr) -> void; } // namespace sc