From bb3ace43b7608f6760d9ff6fecce963fef6e4eb9 Mon Sep 17 00:00:00 2001 From: crosstyan Date: Fri, 27 Mar 2026 10:43:34 +0800 Subject: [PATCH] Add encoded SHM passthrough support --- CMakeLists.txt | 8 +- docs/encoded_passthrough.md | 81 ++++ .../cvmmap_streamer/config/runtime_config.hpp | 8 + include/cvmmap_streamer/ipc/contracts.hpp | 27 ++ src/config/runtime_config.cpp | 42 ++ src/ipc/contracts.cpp | 43 ++- src/pipeline/pipeline_runtime.cpp | 359 +++++++++++++----- src/protocol/rtmp_output.cpp | 10 +- 8 files changed, 480 insertions(+), 98 deletions(-) create mode 100644 docs/encoded_passthrough.md diff --git a/CMakeLists.txt b/CMakeLists.txt index bdbf1fe..2b3dcee 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -58,7 +58,10 @@ set(CVMMAP_LOCAL_CORE_DIR "${CVMMAP_LOCAL_ROOT}/build/core" CACHE PATH "Path to set(CVMMAP_LOCAL_NATS_STATIC "${CVMMAP_LOCAL_ROOT}/build/lib/libnats_static.a" CACHE PATH "Path to local cnats static library") if (CVMMAP_CNATS_PROVIDER STREQUAL "system") find_package(cnats CONFIG REQUIRED) - find_package(cvmmap-core CONFIG QUIET) + find_package(cvmmap-core CONFIG QUIET PATHS "${CVMMAP_LOCAL_CORE_DIR}" NO_DEFAULT_PATH) + if (NOT TARGET cvmmap::client) + find_package(cvmmap-core CONFIG QUIET) + endif() else() if (NOT EXISTS "${CVMMAP_LOCAL_NATS_STATIC}") message(FATAL_ERROR @@ -256,7 +259,8 @@ add_library(cvmmap_streamer_common STATIC target_include_directories(cvmmap_streamer_common PUBLIC "${CMAKE_CURRENT_LIST_DIR}/include" - "${CMAKE_CURRENT_BINARY_DIR}") + "${CMAKE_CURRENT_BINARY_DIR}" + "${CVMMAP_LOCAL_ROOT}/core/include") set(CVMMAP_STREAMER_LINK_DEPS Threads::Threads diff --git a/docs/encoded_passthrough.md b/docs/encoded_passthrough.md new file mode 100644 index 0000000..77eeae1 --- /dev/null +++ b/docs/encoded_passthrough.md @@ -0,0 +1,81 @@ +# Encoded Passthrough From `cv-mmap` ABI v2.1 + +## Summary + +`cvmmap-streamer` can now consume encoded access units directly from a `cv-mmap` shared-memory snapshot when the producer exposes the ABI v2.1 encoded plane. + +This is intended for producers such as `cv-mmap` with `video.backend = "udp_rtp"`: + +- `cv-mmap` receives RTP H.265 +- `cv-mmap` parses once and publishes raw BGR plus encoded AU in the same snapshot +- `cvmmap-streamer` can forward or mux the encoded AU directly + +The video no longer needs a decode -> encode round trip inside the streamer. + +## Input selection + +`input.video_source` controls how the pipeline uses the snapshot: + +```toml +[input] +uri = "cvmmap://default" +nats_url = "nats://localhost:4222" +video_source = "auto" +``` + +Valid values: + +- `auto`: prefer encoded passthrough when the encoded AU plane is present, otherwise use raw encoding +- `raw`: always use the raw left plane and the local encoder backend +- `encoded`: require the encoded AU plane; fail if it is missing + +CLI equivalent: + +```text +--input-video-source auto|raw|encoded +``` + +## Encoded passthrough behavior + +When `video_source` resolves to encoded mode: + +- no encoder backend is created +- the streamer reads codec, bitstream format, keyframe flag, frame rate, and stream PTS from the SHM metadata +- width and height still come from the raw left-plane frame metadata +- the encoded AU is forwarded as-is + +Current assumptions: + +- codec is H.265 or H.264 +- bitstream format is Annex B +- one SHM encoded plane contains one AU +- time base is treated as `1/1e9` + +## Outputs + +### RTMP + +RTMP output accepts passthrough access units directly. + +- If decoder config is present, it is used. +- If decoder config is absent, the output relies on in-band parameter sets from keyframes. + +For `cv-mmap udp_rtp`, this works because the producer republishes keyframes with VPS/SPS/PPS in-band. + +### MCAP + +MCAP recording writes the compressed video directly. + +- no local encoder is used +- keyframes are written exactly from the encoded AU payload +- depth and body-tracking recording continue to use the same raw/depth/NATS paths as before + +## Fallback to raw mode + +If the producer does not expose the encoded plane, `auto` mode falls back to the existing raw pipeline: + +- read raw BGR from SHM +- push frames through the configured encoder backend +- publish or record the encoder output + +This keeps existing producers compatible. diff --git a/include/cvmmap_streamer/config/runtime_config.hpp b/include/cvmmap_streamer/config/runtime_config.hpp index bc5486d..03bbe29 100644 --- a/include/cvmmap_streamer/config/runtime_config.hpp +++ b/include/cvmmap_streamer/config/runtime_config.hpp @@ -40,6 +40,12 @@ enum class EncoderDeviceType { Software, }; +enum class InputVideoSource { + Auto, + Raw, + Encoded, +}; + enum class McapCompression { None, Lz4, @@ -49,6 +55,7 @@ enum class McapCompression { struct InputConfig { std::string uri{"cvmmap://default"}; std::string nats_url{"nats://localhost:4222"}; + InputVideoSource video_source{InputVideoSource::Auto}; }; struct EncoderConfig { @@ -125,6 +132,7 @@ std::string_view to_string(RtmpMode mode); std::string_view to_string(RtmpTransportType transport); std::string_view to_string(EncoderBackendType backend); std::string_view to_string(EncoderDeviceType device); +std::string_view to_string(InputVideoSource source); std::string_view to_string(McapCompression compression); std::expected parse_mcap_compression(std::string_view raw); diff --git a/include/cvmmap_streamer/ipc/contracts.hpp b/include/cvmmap_streamer/ipc/contracts.hpp index cae4012..15540c7 100644 --- a/include/cvmmap_streamer/ipc/contracts.hpp +++ b/include/cvmmap_streamer/ipc/contracts.hpp @@ -51,6 +51,19 @@ enum class DepthUnit : std::uint8_t { Meter = 2, }; +enum class EncodedCodec : std::uint8_t { + Unknown = 0, + H264 = 1, + H265 = 2, +}; + +enum class EncodedBitstreamFormat : std::uint8_t { + Unknown = 0, + AnnexB = 1, +}; + +constexpr std::uint16_t kEncodedFlagKeyframe = 0x0001u; + enum class ModuleStatus : std::int32_t { Online = 0xa1, Offline = 0xa0, @@ -163,22 +176,36 @@ struct ControlResponseMessage { struct ValidatedShmView { FrameMetadata metadata; DepthUnit depth_unit{DepthUnit::Unknown}; + EncodedCodec encoded_codec{EncodedCodec::Unknown}; + EncodedBitstreamFormat encoded_bitstream_format{EncodedBitstreamFormat::Unknown}; + std::uint16_t encoded_flags{0}; + std::uint16_t encoded_frame_rate_num{0}; + std::uint16_t encoded_frame_rate_den{0}; + std::uint64_t encoded_stream_pts_ns{0}; std::span payload; std::span left; std::optional depth_info{}; std::span depth{}; std::optional confidence_info{}; std::span confidence{}; + std::span encoded_access_unit{}; }; struct CoherentSnapshot { FrameMetadata metadata; DepthUnit depth_unit{DepthUnit::Unknown}; + EncodedCodec encoded_codec{EncodedCodec::Unknown}; + EncodedBitstreamFormat encoded_bitstream_format{EncodedBitstreamFormat::Unknown}; + std::uint16_t encoded_flags{0}; + std::uint16_t encoded_frame_rate_num{0}; + std::uint16_t encoded_frame_rate_den{0}; + std::uint64_t encoded_stream_pts_ns{0}; std::span left; std::optional depth_info{}; std::span depth{}; std::optional confidence_info{}; std::span confidence{}; + std::span encoded_access_unit{}; std::size_t bytes_copied; }; diff --git a/src/config/runtime_config.cpp b/src/config/runtime_config.cpp index 6324c23..606ff89 100644 --- a/src/config/runtime_config.cpp +++ b/src/config/runtime_config.cpp @@ -155,6 +155,19 @@ std::expected parse_encoder_device(std::string_v return std::unexpected("invalid encoder device: '" + std::string(raw) + "' (expected: auto|nvidia|software)"); } +std::expected parse_input_video_source(std::string_view raw) { + if (raw == "auto") { + return InputVideoSource::Auto; + } + if (raw == "raw") { + return InputVideoSource::Raw; + } + if (raw == "encoded") { + return InputVideoSource::Encoded; + } + return std::unexpected("invalid input video source: '" + std::string(raw) + "' (expected: auto|raw|encoded)"); +} + std::expected parse_mcap_compression_impl(std::string_view raw) { if (raw == "none") { return McapCompression::None; @@ -263,6 +276,13 @@ std::expected apply_toml_file(RuntimeConfig &config, const st if (auto value = toml_value(table, "input.nats_url")) { config.input.nats_url = *value; } + if (auto value = toml_value(table, "input.video_source")) { + auto parsed = parse_input_video_source(*value); + if (!parsed) { + return std::unexpected(parsed.error()); + } + config.input.video_source = *parsed; + } if (auto value = toml_value(table, "run_mode")) { auto parsed = parse_run_mode(*value); if (!parsed) { @@ -553,6 +573,18 @@ std::string_view to_string(EncoderDeviceType device) { return "unknown"; } +std::string_view to_string(InputVideoSource source) { + switch (source) { + case InputVideoSource::Auto: + return "auto"; + case InputVideoSource::Raw: + return "raw"; + case InputVideoSource::Encoded: + return "encoded"; + } + return "unknown"; +} + std::string_view to_string(McapCompression compression) { switch (compression) { case McapCompression::None: @@ -571,6 +603,7 @@ std::expected parse_runtime_config(int argc, char ** std::string config_path_raw{}; std::string input_uri_raw{}; std::string input_nats_url_raw{}; + std::string input_video_source_raw{}; std::string run_mode_raw{}; std::string codec_raw{}; std::string encoder_backend_raw{}; @@ -612,6 +645,7 @@ std::expected parse_runtime_config(int argc, char ** app.add_option("--config", config_path_raw); app.add_option("--input-uri", input_uri_raw); app.add_option("--nats-url", input_nats_url_raw); + app.add_option("--input-video-source", input_video_source_raw); app.add_option("--run-mode", run_mode_raw); app.add_option("--codec", codec_raw); app.add_option("--encoder-backend", encoder_backend_raw); @@ -670,6 +704,13 @@ std::expected parse_runtime_config(int argc, char ** if (!input_nats_url_raw.empty()) { config.input.nats_url = input_nats_url_raw; } + if (!input_video_source_raw.empty()) { + auto parsed = parse_input_video_source(input_video_source_raw); + if (!parsed) { + return std::unexpected(parsed.error()); + } + config.input.video_source = *parsed; + } if (!run_mode_raw.empty()) { auto parsed = parse_run_mode(run_mode_raw); if (!parsed) { @@ -938,6 +979,7 @@ std::string summarize_runtime_config(const RuntimeConfig &config) { std::ostringstream ss; ss << "input.uri=" << config.input.uri; ss << ", input.nats_url=" << config.input.nats_url; + ss << ", input.video_source=" << to_string(config.input.video_source); ss << ", run_mode=" << to_string(config.run_mode); ss << ", encoder.backend=" << to_string(config.encoder.backend); ss << ", encoder.device=" << to_string(config.encoder.device); diff --git a/src/ipc/contracts.cpp b/src/ipc/contracts.cpp index af84d93..e983396 100644 --- a/src/ipc/contracts.cpp +++ b/src/ipc/contracts.cpp @@ -173,6 +173,31 @@ namespace { return converted; } + [[nodiscard]] + EncodedCodec from_core_encoded_codec(const cvmmap::EncodedCodec codec) { + switch (codec) { + case cvmmap::EncodedCodec::H264: + return EncodedCodec::H264; + case cvmmap::EncodedCodec::H265: + return EncodedCodec::H265; + case cvmmap::EncodedCodec::Unknown: + default: + return EncodedCodec::Unknown; + } + } + + [[nodiscard]] + EncodedBitstreamFormat from_core_encoded_bitstream_format( + const cvmmap::EncodedBitstreamFormat format) { + switch (format) { + case cvmmap::EncodedBitstreamFormat::AnnexB: + return EncodedBitstreamFormat::AnnexB; + case cvmmap::EncodedBitstreamFormat::Unknown: + default: + return EncodedBitstreamFormat::Unknown; + } + } + [[nodiscard]] std::size_t span_offset( const std::span outer, @@ -198,6 +223,7 @@ namespace { }; consider(metadata.depth_plane); consider(metadata.confidence_plane); + consider(metadata.encoded_access_unit); return payload_bytes; } @@ -420,6 +446,13 @@ std::expected validate_shm_region(std::spannormalized_metadata), .depth_unit = from_core_depth_unit(metadata_result->depth_unit), + .encoded_codec = from_core_encoded_codec(metadata_result->encoded_codec), + .encoded_bitstream_format = + from_core_encoded_bitstream_format(metadata_result->encoded_bitstream_format), + .encoded_flags = metadata_result->encoded_flags, + .encoded_frame_rate_num = metadata_result->encoded_frame_rate_num, + .encoded_frame_rate_den = metadata_result->encoded_frame_rate_den, + .encoded_stream_pts_ns = metadata_result->encoded_stream_pts_ns, .payload = payload_region.first(payload_bytes), .left = metadata_result->left_plane, .depth_info = metadata_result->depth_info @@ -429,7 +462,8 @@ std::expected validate_shm_region(std::spanconfidence_info ? std::optional(from_core_frame_info(*metadata_result->confidence_info)) : std::nullopt, - .confidence = metadata_result->confidence_plane}; + .confidence = metadata_result->confidence_plane, + .encoded_access_unit = metadata_result->encoded_access_unit}; } std::expected read_coherent_snapshot( @@ -466,11 +500,18 @@ std::expected read_coherent_snapshot( return CoherentSnapshot{ .metadata = first->metadata, .depth_unit = first->depth_unit, + .encoded_codec = first->encoded_codec, + .encoded_bitstream_format = first->encoded_bitstream_format, + .encoded_flags = first->encoded_flags, + .encoded_frame_rate_num = first->encoded_frame_rate_num, + .encoded_frame_rate_den = first->encoded_frame_rate_den, + .encoded_stream_pts_ns = first->encoded_stream_pts_ns, .left = translate_span(first->payload, copied_payload, first->left), .depth_info = first->depth_info, .depth = translate_span(first->payload, copied_payload, first->depth), .confidence_info = first->confidence_info, .confidence = translate_span(first->payload, copied_payload, first->confidence), + .encoded_access_unit = translate_span(first->payload, copied_payload, first->encoded_access_unit), .bytes_copied = first->payload.size()}; } diff --git a/src/pipeline/pipeline_runtime.cpp b/src/pipeline/pipeline_runtime.cpp index 02078ba..9af9ae3 100644 --- a/src/pipeline/pipeline_runtime.cpp +++ b/src/pipeline/pipeline_runtime.cpp @@ -243,6 +243,90 @@ std::expected make_depth_map_view(const ip }; } +[[nodiscard]] +bool snapshot_has_encoded_access_unit(const ipc::CoherentSnapshot &snapshot) { + return !snapshot.encoded_access_unit.empty() && + snapshot.encoded_codec != ipc::EncodedCodec::Unknown && + snapshot.encoded_bitstream_format == ipc::EncodedBitstreamFormat::AnnexB; +} + +[[nodiscard]] +std::expected codec_type_from_snapshot(const ipc::CoherentSnapshot &snapshot) { + switch (snapshot.encoded_codec) { + case ipc::EncodedCodec::H264: + return CodecType::H264; + case ipc::EncodedCodec::H265: + return CodecType::H265; + case ipc::EncodedCodec::Unknown: + default: + return std::unexpected("encoded snapshot codec is unsupported"); + } +} + +[[nodiscard]] +std::expected make_stream_info_from_snapshot( + const ipc::CoherentSnapshot &snapshot) { + auto codec = codec_type_from_snapshot(snapshot); + if (!codec) { + return std::unexpected(codec.error()); + } + if (snapshot.encoded_bitstream_format != ipc::EncodedBitstreamFormat::AnnexB) { + return std::unexpected("encoded snapshot bitstream format is unsupported"); + } + if (snapshot.metadata.info.width == 0 || snapshot.metadata.info.height == 0) { + return std::unexpected("encoded snapshot dimensions are invalid"); + } + + encode::EncodedStreamInfo stream_info{}; + stream_info.codec = *codec; + stream_info.width = snapshot.metadata.info.width; + stream_info.height = snapshot.metadata.info.height; + stream_info.time_base_num = 1; + stream_info.time_base_den = 1'000'000'000u; + stream_info.frame_rate_num = snapshot.encoded_frame_rate_num == 0 ? 30u : snapshot.encoded_frame_rate_num; + stream_info.frame_rate_den = snapshot.encoded_frame_rate_den == 0 ? 1u : snapshot.encoded_frame_rate_den; + stream_info.bitstream_format = encode::EncodedBitstreamFormat::AnnexB; + stream_info.decoder_config.clear(); + return stream_info; +} + +[[nodiscard]] +std::expected make_access_unit_from_snapshot( + const ipc::CoherentSnapshot &snapshot) { + auto codec = codec_type_from_snapshot(snapshot); + if (!codec) { + return std::unexpected(codec.error()); + } + if (snapshot.encoded_access_unit.empty()) { + return std::unexpected("encoded snapshot access unit is empty"); + } + + encode::EncodedAccessUnit access_unit{}; + access_unit.codec = *codec; + access_unit.source_timestamp_ns = snapshot.metadata.timestamp_ns; + access_unit.stream_pts_ns = + snapshot.encoded_stream_pts_ns == 0 ? snapshot.metadata.timestamp_ns : snapshot.encoded_stream_pts_ns; + access_unit.keyframe = (snapshot.encoded_flags & ipc::kEncodedFlagKeyframe) != 0; + access_unit.annexb_bytes.assign( + snapshot.encoded_access_unit.begin(), + snapshot.encoded_access_unit.end()); + return access_unit; +} + +[[nodiscard]] +bool stream_info_equal( + const encode::EncodedStreamInfo &lhs, + const encode::EncodedStreamInfo &rhs) { + return lhs.codec == rhs.codec && + lhs.width == rhs.width && + lhs.height == rhs.height && + lhs.time_base_num == rhs.time_base_num && + lhs.time_base_den == rhs.time_base_den && + lhs.frame_rate_num == rhs.frame_rate_num && + lhs.frame_rate_den == rhs.frame_rate_den && + lhs.bitstream_format == rhs.bitstream_format; +} + [[nodiscard]] std::uint64_t body_tracking_timestamp_ns(const cvmmap::body_tracking_frame_t &frame) { if (frame.header.timestamp_ns != 0) { @@ -559,11 +643,7 @@ int run_pipeline(const RuntimeConfig &config) { return exit_code(PipelineExitCode::InputError); } - auto backend = encode::make_encoder_backend(config); - if (!backend) { - spdlog::error("pipeline encoder backend selection failed: {}", format_error(backend.error())); - return exit_code(PipelineExitCode::InitializationError); - } + std::optional backend{}; auto shm = SharedMemoryView::open_readonly(input_endpoints->shm_name); if (!shm) { @@ -668,7 +748,9 @@ int run_pipeline(const RuntimeConfig &config) { metrics::IngestEmitLatencyTracker latency_tracker{}; bool producer_offline{false}; bool started{false}; + bool using_encoded_input{false}; std::optional active_info{}; + std::optional active_stream_info{}; std::optional restart_target_info{}; bool restart_pending{false}; bool warned_unknown_depth_unit{false}; @@ -678,33 +760,41 @@ int run_pipeline(const RuntimeConfig &config) { stats.supervised_restarts += 1; } spdlog::warn( - "PIPELINE_RESTART backend={} reason='{}' restarts={}", - (*backend)->backend_name(), + "PIPELINE_RESTART mode={} reason='{}' restarts={}", + using_encoded_input + ? "encoded_passthrough" + : (backend ? std::string((*backend)->backend_name()) : std::string("encoder_uninitialized")), reason, stats.supervised_restarts); - (*backend)->shutdown(); + if (backend) { + (*backend)->shutdown(); + } started = false; restart_pending = true; restart_target_info = target_info; warned_unknown_depth_unit = false; + using_encoded_input = false; + active_stream_info.reset(); rtmp_output.reset(); }; - const auto attempt_backend_start = [&](const ipc::FrameInfo &target_info) -> Status { - (*backend)->shutdown(); - rtmp_output.reset(); - auto init = (*backend)->init(config, target_info); - if (!init) { - return std::unexpected(init.error()); + const auto ensure_encoder_backend = [&]() -> Status { + if (backend) { + return {}; } + auto created = encode::make_encoder_backend(config); + if (!created) { + return std::unexpected(created.error()); + } + backend.emplace(std::move(*created)); + return {}; + }; + + const auto start_outputs_from_stream_info = + [&](const encode::EncodedStreamInfo &stream_info, const ipc::FrameInfo &target_info) -> Status { + rtmp_output.reset(); if (config.outputs.rtmp.enabled) { - auto stream_info = (*backend)->stream_info(); - if (!stream_info) { - return unexpected_error( - stream_info.error().code, - "pipeline RTMP output stream info unavailable: " + format_error(stream_info.error())); - } - auto created = protocol::make_rtmp_output(config, *stream_info); + auto created = protocol::make_rtmp_output(config, stream_info); if (!created) { return unexpected_error( created.error().code, @@ -712,17 +802,11 @@ int run_pipeline(const RuntimeConfig &config) { } rtmp_output.emplace(std::move(*created)); } - auto stream_info = (*backend)->stream_info(); - if (!stream_info) { - return unexpected_error( - stream_info.error().code, - "pipeline encoder stream info unavailable: " + format_error(stream_info.error())); - } - update_mcap_stream_info(mcap_recorder, *stream_info); + update_mcap_stream_info(mcap_recorder, stream_info); if (config.record.mcap.enabled) { std::lock_guard lock(mcap_recorder.mutex); if (!mcap_recorder.sink) { - auto created = record::McapRecordSink::create(config, *stream_info); + auto created = record::McapRecordSink::create(config, stream_info); if (!created) { return unexpected_error( ERR_INTERNAL, @@ -742,9 +826,43 @@ int run_pipeline(const RuntimeConfig &config) { restart_target_info.reset(); warned_unknown_depth_unit = false; active_info = target_info; + active_stream_info = stream_info; return {}; }; + const auto attempt_raw_backend_start = [&](const ipc::FrameInfo &target_info) -> Status { + auto ensure = ensure_encoder_backend(); + if (!ensure) { + return ensure; + } + (*backend)->shutdown(); + auto init = (*backend)->init(config, target_info); + if (!init) { + return std::unexpected(init.error()); + } + auto stream_info = (*backend)->stream_info(); + if (!stream_info) { + return unexpected_error( + stream_info.error().code, + "pipeline encoder stream info unavailable: " + format_error(stream_info.error())); + } + using_encoded_input = false; + return start_outputs_from_stream_info(*stream_info, target_info); + }; + + const auto attempt_encoded_passthrough_start = [&]( + const ipc::CoherentSnapshot &snapshot) -> Status { + auto stream_info = make_stream_info_from_snapshot(snapshot); + if (!stream_info) { + return unexpected_error(ERR_PROTOCOL, stream_info.error()); + } + if (backend) { + (*backend)->shutdown(); + } + using_encoded_input = true; + return start_outputs_from_stream_info(*stream_info, snapshot.metadata.info); + }; + const auto idle_timeout = std::chrono::milliseconds(config.latency.ingest_idle_timeout_ms); auto last_event = std::chrono::steady_clock::now(); @@ -774,10 +892,14 @@ int run_pipeline(const RuntimeConfig &config) { case cvmmap::ModuleStatus::StreamReset: spdlog::info("pipeline status event status=stream_reset"); stats.resets += 1; - (*backend)->shutdown(); + if (backend) { + (*backend)->shutdown(); + } started = false; restart_pending = false; restart_target_info.reset(); + active_stream_info.reset(); + using_encoded_input = false; active_info.reset(); rtmp_output.reset(); break; @@ -808,10 +930,12 @@ int run_pipeline(const RuntimeConfig &config) { } } - auto poll = (*backend)->poll(); - if (!poll) { - const auto reason = format_error(poll.error()); - restart_backend(reason, active_info); + if (backend && !using_encoded_input) { + auto poll = (*backend)->poll(); + if (!poll) { + const auto reason = format_error(poll.error()); + restart_backend(reason, active_info); + } } std::array poll_items{{ @@ -827,34 +951,34 @@ int run_pipeline(const RuntimeConfig &config) { const bool frame_socket_ready = (poll_items[0].revents & ZMQ_POLLIN) != 0; if (!frame_socket_ready) { const auto now = std::chrono::steady_clock::now(); - if (restart_pending && restart_target_info) { - auto start_result = attempt_backend_start(*restart_target_info); - if (!start_result) { - spdlog::error("pipeline backend restart failed: {}", format_error(start_result.error())); - return exit_code(PipelineExitCode::RuntimeError); + if (restart_pending && restart_target_info && backend && !using_encoded_input) { + auto start_result = attempt_raw_backend_start(*restart_target_info); + if (!start_result) { + spdlog::error("pipeline backend restart failed: {}", format_error(start_result.error())); + return exit_code(PipelineExitCode::RuntimeError); + } } - } - if (now - last_event >= idle_timeout) { - spdlog::info("pipeline idle timeout reached ({} ms), stopping", config.latency.ingest_idle_timeout_ms); - break; - } - if (!producer_offline && started) { - auto drain = drain_encoder( - config, - *backend, - false, - stats, - rtp_publisher ? &*rtp_publisher : nullptr, - rtmp_output ? &*rtmp_output : nullptr, - &mcap_recorder, - latency_tracker); - if (!drain) { - const auto reason = format_error(drain.error()); - restart_backend(reason, active_info); + if (now - last_event >= idle_timeout) { + spdlog::info("pipeline idle timeout reached ({} ms), stopping", config.latency.ingest_idle_timeout_ms); + break; } + if (!producer_offline && started && backend && !using_encoded_input) { + auto drain = drain_encoder( + config, + *backend, + false, + stats, + rtp_publisher ? &*rtp_publisher : nullptr, + rtmp_output ? &*rtmp_output : nullptr, + &mcap_recorder, + latency_tracker); + if (!drain) { + const auto reason = format_error(drain.error()); + restart_backend(reason, active_info); + } + } + continue; } - continue; - } zmq::message_t message; const auto recv_result = subscriber.recv(message, zmq::recv_flags::dontwait); @@ -894,14 +1018,45 @@ int run_pipeline(const RuntimeConfig &config) { continue; } + const bool has_encoded_access_unit = snapshot_has_encoded_access_unit(*snapshot); + const bool want_encoded_input = + config.input.video_source == InputVideoSource::Encoded || + (config.input.video_source == InputVideoSource::Auto && has_encoded_access_unit); + if (config.input.video_source == InputVideoSource::Encoded && !has_encoded_access_unit) { + spdlog::error("pipeline encoded input requested but SHM snapshot does not contain an encoded access unit"); + return exit_code(PipelineExitCode::InitializationError); + } + if (active_info && !frame_info_equal(*active_info, snapshot->metadata.info)) { stats.format_rebuilds += 1; restart_backend("frame_info_change", snapshot->metadata.info); } - if (!started || restart_pending) { + if (started && using_encoded_input != want_encoded_input) { + stats.format_rebuilds += 1; + restart_backend("input_video_source_switch", snapshot->metadata.info); + } + + if (want_encoded_input) { + auto stream_info = make_stream_info_from_snapshot(*snapshot); + if (!stream_info) { + spdlog::error("pipeline encoded snapshot metadata invalid: {}", stream_info.error()); + return exit_code(PipelineExitCode::InitializationError); + } + if (active_stream_info && !stream_info_equal(*active_stream_info, *stream_info)) { + stats.format_rebuilds += 1; + restart_backend("encoded_stream_info_change", snapshot->metadata.info); + } + if (!started || restart_pending) { + auto start_result = attempt_encoded_passthrough_start(*snapshot); + if (!start_result) { + spdlog::error("pipeline encoded passthrough init failed: {}", format_error(start_result.error())); + return exit_code(PipelineExitCode::InitializationError); + } + } + } else if (!started || restart_pending) { const auto target_info = restart_target_info.value_or(snapshot->metadata.info); - auto start_result = attempt_backend_start(target_info); + auto start_result = attempt_raw_backend_start(target_info); if (!start_result) { spdlog::error("pipeline backend init failed: {}", format_error(start_result.error())); return exit_code(PipelineExitCode::InitializationError); @@ -910,15 +1065,39 @@ int run_pipeline(const RuntimeConfig &config) { latency_tracker.note_ingest(); - auto push = (*backend)->push_frame(encode::RawVideoFrame{ - .info = snapshot->metadata.info, - .source_timestamp_ns = snapshot->metadata.timestamp_ns, - .bytes = std::span(snapshot_buffer.data(), snapshot->bytes_copied), - }); - if (!push) { - const auto reason = format_error(push.error()); - restart_backend(reason, active_info); - continue; + if (want_encoded_input) { + auto access_unit = make_access_unit_from_snapshot(*snapshot); + if (!access_unit) { + const auto reason = "pipeline encoded snapshot invalid: " + access_unit.error(); + restart_backend(reason, snapshot->metadata.info); + continue; + } + std::vector access_units{}; + access_units.push_back(std::move(*access_unit)); + auto publish = publish_access_units( + config, + std::move(access_units), + stats, + rtp_publisher ? &*rtp_publisher : nullptr, + rtmp_output ? &*rtmp_output : nullptr, + &mcap_recorder, + latency_tracker); + if (!publish) { + const auto reason = format_error(publish.error()); + restart_backend(reason, active_info); + continue; + } + } else { + auto push = (*backend)->push_frame(encode::RawVideoFrame{ + .info = snapshot->metadata.info, + .source_timestamp_ns = snapshot->metadata.timestamp_ns, + .bytes = std::span(snapshot_buffer.data(), snapshot->bytes_copied), + }); + if (!push) { + const auto reason = format_error(push.error()); + restart_backend(reason, active_info); + continue; + } } if (!snapshot->depth.empty()) { @@ -946,19 +1125,21 @@ int run_pipeline(const RuntimeConfig &config) { } stats.pushed_frames += 1; - auto drain = drain_encoder( - config, - *backend, - false, - stats, - rtp_publisher ? &*rtp_publisher : nullptr, - rtmp_output ? &*rtmp_output : nullptr, - &mcap_recorder, - latency_tracker); - if (!drain) { - const auto reason = format_error(drain.error()); - restart_backend(reason, active_info); - continue; + if (!want_encoded_input) { + auto drain = drain_encoder( + config, + *backend, + false, + stats, + rtp_publisher ? &*rtp_publisher : nullptr, + rtmp_output ? &*rtmp_output : nullptr, + &mcap_recorder, + latency_tracker); + if (!drain) { + const auto reason = format_error(drain.error()); + restart_backend(reason, active_info); + continue; + } } if (config.latency.ingest_max_frames > 0 && stats.pushed_frames >= config.latency.ingest_max_frames) { @@ -973,7 +1154,7 @@ int run_pipeline(const RuntimeConfig &config) { nats_client.Stop(); - if (started) { + if (started && backend && !using_encoded_input) { auto drain = drain_encoder( config, *backend, @@ -989,14 +1170,18 @@ int run_pipeline(const RuntimeConfig &config) { } } - (*backend)->shutdown(); + if (backend) { + (*backend)->shutdown(); + } std::ignore = stop_mcap_recording(mcap_recorder); recorder_service.Stop(); spdlog::info( "PIPELINE_METRICS codec={} backend={} sync_messages={} status_messages={} torn_frames={} pushed_frames={} encoded_access_units={} resets={} format_rebuilds={} supervised_restarts={}", - to_string(config.encoder.codec), - (*backend)->backend_name(), + active_stream_info ? to_string(active_stream_info->codec) : to_string(config.encoder.codec), + using_encoded_input + ? std::string("encoded_passthrough") + : (backend ? std::string((*backend)->backend_name()) : std::string("encoder_uninitialized")), stats.sync_messages, stats.status_messages, stats.torn_frames, diff --git a/src/protocol/rtmp_output.cpp b/src/protocol/rtmp_output.cpp index 9b8ff10..1e0bf62 100644 --- a/src/protocol/rtmp_output.cpp +++ b/src/protocol/rtmp_output.cpp @@ -191,7 +191,7 @@ Result> decoder_config_to_annexb( CodecType codec, std::span decoder_config) { if (decoder_config.empty()) { - return unexpected_error(ERR_PROTOCOL, "decoder config is required"); + return std::vector{}; } if (looks_like_annexb(decoder_config)) { return std::vector(decoder_config.begin(), decoder_config.end()); @@ -248,10 +248,6 @@ public: static Result create( const RuntimeConfig &config, const encode::EncodedStreamInfo &stream_info) { - if (stream_info.decoder_config.empty()) { - return unexpected_error(ERR_PROTOCOL, "libavformat RTMP requires encoder decoder_config/extradata"); - } - avformat_network_init(); LibavformatRtmpOutput output{}; @@ -442,9 +438,7 @@ public: }; auto decoder_config_annexb = decoder_config_to_annexb(stream_info.codec, stream_info.decoder_config); if (!decoder_config_annexb) { - return unexpected_error( - ERR_PROTOCOL, - "ffmpeg_process RTMP requires decoder config: " + format_error(decoder_config_annexb.error())); + return std::unexpected(decoder_config_annexb.error()); } output.decoder_config_annexb_ = std::move(*decoder_config_annexb);