From 965b03c05346a1e1b3aa612f96ff28ba3163b3cb Mon Sep 17 00:00:00 2001 From: crosstyan Date: Thu, 9 Apr 2026 12:17:54 +0800 Subject: [PATCH] fix(stream): preserve live outputs and disable idle exit by default Preserve RTP/RTMP session continuity across upstream stream_reset events by forcing a keyframe on restart, remapping live timestamps, and keeping live outputs open when the runtime requests reset continuity. Disable idle auto-exit by default by changing ingest_idle_timeout_ms to 0, removing validation that rejected 0, and only enforcing idle shutdown when a positive timeout is configured in pipeline and ingest loops. Also suppress libavformat FLV trailer header backfill attempts on RTMP sockets and update the RTP output tester for the newer publisher create signature. Docs are updated to state that 0 disables the idle timeout. --- README.md | 2 +- .../cvmmap_streamer/config/runtime_config.hpp | 3 +- .../encode/encoder_backend.hpp | 1 + src/config/runtime_config.cpp | 3 - src/core/ingest_runtime.cpp | 3 +- src/encode/ffmpeg_encoder_backend.cpp | 6 + src/pipeline/pipeline_runtime.cpp | 488 ++++++++++++------ src/protocol/rtmp_output.cpp | 14 +- src/testers/rtp_output_tester.cpp | 2 +- 9 files changed, 357 insertions(+), 165 deletions(-) diff --git a/README.md b/README.md index a3d1f84..2a10425 100644 --- a/README.md +++ b/README.md @@ -512,7 +512,7 @@ Run the fault injection and latency validation suite. | Flag | Description | Default | |------|-------------|---------| | `--ingest-max-frames N` | Process at most N frames then exit | 0 (unlimited) | -| `--ingest-idle-timeout-ms MS` | Exit if idle for MS milliseconds | 0 (disabled) | +| `--ingest-idle-timeout-ms MS` | Exit if idle for MS milliseconds; 0 disables the timeout | 0 (disabled) | ## Architecture diff --git a/include/cvmmap_streamer/config/runtime_config.hpp b/include/cvmmap_streamer/config/runtime_config.hpp index 03bbe29..a722952 100644 --- a/include/cvmmap_streamer/config/runtime_config.hpp +++ b/include/cvmmap_streamer/config/runtime_config.hpp @@ -108,8 +108,9 @@ struct LatencyConfig { std::size_t queue_size{1}; bool realtime_sync{true}; bool force_idr_on_reset{true}; + bool keep_stream_on_reset{false}; std::uint32_t ingest_max_frames{0}; - std::uint32_t ingest_idle_timeout_ms{1000}; + std::uint32_t ingest_idle_timeout_ms{0}; std::uint32_t ingest_consumer_delay_ms{0}; std::uint32_t snapshot_copy_delay_us{0}; std::uint32_t emit_stall_ms{0}; diff --git a/include/cvmmap_streamer/encode/encoder_backend.hpp b/include/cvmmap_streamer/encode/encoder_backend.hpp index 423d0e4..ef53593 100644 --- a/include/cvmmap_streamer/encode/encoder_backend.hpp +++ b/include/cvmmap_streamer/encode/encoder_backend.hpp @@ -18,6 +18,7 @@ struct RawVideoFrame { ipc::FrameInfo info{}; std::uint64_t source_timestamp_ns{0}; std::size_t row_stride_bytes{0}; + bool force_keyframe{false}; std::span bytes{}; }; diff --git a/src/config/runtime_config.cpp b/src/config/runtime_config.cpp index 606ff89..3418c6e 100644 --- a/src/config/runtime_config.cpp +++ b/src/config/runtime_config.cpp @@ -968,9 +968,6 @@ std::expected validate_runtime_config(const RuntimeConfig &co if (config.encoder.b_frames > config.encoder.gop) { return std::unexpected("invalid encoder config: b_frames must be <= gop"); } - if (config.latency.ingest_idle_timeout_ms == 0) { - return std::unexpected("invalid ingest config: ingest_idle_timeout_ms must be >= 1"); - } return {}; } diff --git a/src/core/ingest_runtime.cpp b/src/core/ingest_runtime.cpp index 82de528..27dee63 100644 --- a/src/core/ingest_runtime.cpp +++ b/src/core/ingest_runtime.cpp @@ -275,6 +275,7 @@ int run_ingest_loop(const RuntimeConfig &config) { } }); + const auto idle_timeout_enabled = config.latency.ingest_idle_timeout_ms > 0; const auto idle_timeout = std::chrono::milliseconds(config.latency.ingest_idle_timeout_ms); auto last_event = std::chrono::steady_clock::now(); @@ -283,7 +284,7 @@ int run_ingest_loop(const RuntimeConfig &config) { const auto recv_result = subscriber.recv(message, zmq::recv_flags::none); if (!recv_result) { const auto now = std::chrono::steady_clock::now(); - if (now - last_event >= idle_timeout) { + if (idle_timeout_enabled && now - last_event >= idle_timeout) { spdlog::info( "ingest idle timeout reached ({} ms), stopping", config.latency.ingest_idle_timeout_ms); diff --git a/src/encode/ffmpeg_encoder_backend.cpp b/src/encode/ffmpeg_encoder_backend.cpp index aa86dff..c58c50a 100644 --- a/src/encode/ffmpeg_encoder_backend.cpp +++ b/src/encode/ffmpeg_encoder_backend.cpp @@ -217,6 +217,12 @@ public: first_source_timestamp_ns_ = frame.source_timestamp_ns; } + frame_->pict_type = frame.force_keyframe ? AV_PICTURE_TYPE_I : AV_PICTURE_TYPE_NONE; + if (frame.force_keyframe) { + frame_->flags |= AV_FRAME_FLAG_KEY; + } else { + frame_->flags &= ~AV_FRAME_FLAG_KEY; + } frame_->pts = static_cast(frame.source_timestamp_ns - *first_source_timestamp_ns_); const auto send_result = avcodec_send_frame(context_, frame_); if (send_result < 0) { diff --git a/src/pipeline/pipeline_runtime.cpp b/src/pipeline/pipeline_runtime.cpp index 1e653d2..c39a04f 100644 --- a/src/pipeline/pipeline_runtime.cpp +++ b/src/pipeline/pipeline_runtime.cpp @@ -62,6 +62,10 @@ constexpr int exit_code(PipelineExitCode code) { } struct ResolvedInputEndpoints { + std::string instance_name; + std::string namespace_name; + std::string ipc_prefix; + std::string base_name; std::string shm_name; std::string zmq_endpoint; std::string nats_target_key; @@ -71,17 +75,21 @@ struct ResolvedInputEndpoints { std::expected resolve_input_endpoints(const RuntimeConfig &config) { try { auto target = cvmmap::resolve_cvmmap_target_or_throw(config.input.uri); - spdlog::info( - "pipeline input resolved: uri='{}' shm_name='{}' zmq_endpoint='{}' nats_target_key='{}'", - config.input.uri, - target.shm_name, - target.zmq_addr, - target.nats_target_key); - return ResolvedInputEndpoints{ - .shm_name = target.shm_name, - .zmq_endpoint = target.zmq_addr, - .nats_target_key = target.nats_target_key, - }; + spdlog::info( + "pipeline input resolved: uri='{}' shm_name='{}' zmq_endpoint='{}' nats_target_key='{}'", + config.input.uri, + target.shm_name, + target.zmq_addr, + target.nats_target_key); + return ResolvedInputEndpoints{ + .instance_name = target.instance, + .namespace_name = target.namespace_name, + .ipc_prefix = target.prefix, + .base_name = target.base_name, + .shm_name = target.shm_name, + .zmq_endpoint = target.zmq_addr, + .nats_target_key = target.nats_target_key, + }; } catch (const std::exception &e) { return std::unexpected(std::string("invalid cvmmap input URI: ") + e.what()); } @@ -327,6 +335,81 @@ bool stream_info_equal( lhs.bitstream_format == rhs.bitstream_format; } +struct LiveOutputContinuityState { + bool boundary_pending{false}; + bool encoded_keyframe_required{false}; + std::optional last_output_pts_ns{}; + std::optional segment_input_start_pts_ns{}; + std::uint64_t segment_output_start_pts_ns{0}; + std::uint64_t nominal_frame_interval_ns{33'333'333ull}; + + void reset() { + boundary_pending = false; + encoded_keyframe_required = false; + last_output_pts_ns.reset(); + segment_input_start_pts_ns.reset(); + segment_output_start_pts_ns = 0; + nominal_frame_interval_ns = 33'333'333ull; + } + + void note_reset_boundary() { + boundary_pending = true; + encoded_keyframe_required = true; + segment_input_start_pts_ns.reset(); + } + + void note_new_session(const encode::EncodedStreamInfo &stream_info) { + reset(); + update_nominal_frame_interval(stream_info); + } + + void update_nominal_frame_interval(const encode::EncodedStreamInfo &stream_info) { + if (stream_info.frame_rate_num == 0 || stream_info.frame_rate_den == 0) { + nominal_frame_interval_ns = 33'333'333ull; + return; + } + + const auto numerator = static_cast(stream_info.frame_rate_den) * 1'000'000'000ull; + const auto denominator = static_cast(stream_info.frame_rate_num); + if (denominator == 0) { + nominal_frame_interval_ns = 33'333'333ull; + return; + } + + const auto interval = numerator / denominator; + nominal_frame_interval_ns = interval == 0 ? 1ull : interval; + } + + [[nodiscard]] + std::uint64_t remap_pts(const std::uint64_t input_pts_ns) { + if (!last_output_pts_ns) { + last_output_pts_ns = input_pts_ns; + segment_input_start_pts_ns = input_pts_ns; + segment_output_start_pts_ns = input_pts_ns; + boundary_pending = false; + return input_pts_ns; + } + + if (boundary_pending || !segment_input_start_pts_ns) { + segment_input_start_pts_ns = input_pts_ns; + segment_output_start_pts_ns = *last_output_pts_ns + nominal_frame_interval_ns; + boundary_pending = false; + } + + const auto input_delta = + input_pts_ns >= *segment_input_start_pts_ns + ? input_pts_ns - *segment_input_start_pts_ns + : 0ull; + auto remapped_pts = segment_output_start_pts_ns + input_delta; + if (remapped_pts <= *last_output_pts_ns) { + remapped_pts = *last_output_pts_ns + nominal_frame_interval_ns; + } + + last_output_pts_ns = remapped_pts; + return remapped_pts; + } +}; + [[nodiscard]] std::uint64_t body_tracking_timestamp_ns(const cvmmap::body_tracking_frame_t &frame) { if (frame.header.timestamp_ns != 0) { @@ -556,6 +639,7 @@ Status publish_access_units( protocol::UdpRtpPublisher *rtp_publisher, protocol::RtmpOutput *rtmp_output, McapRecorderState *mcap_recorder, + LiveOutputContinuityState *live_output_continuity, metrics::IngestEmitLatencyTracker &latency_tracker) { for (auto &access_unit : access_units) { if (access_unit.annexb_bytes.empty()) { @@ -567,11 +651,29 @@ Status publish_access_units( latency_tracker.note_emit_stall(); } + const auto should_rewrite_live_timestamps = + live_output_continuity != nullptr && + (rtp_publisher != nullptr || rtmp_output != nullptr); + auto live_access_unit = access_unit; + const bool live_boundary_packet = + should_rewrite_live_timestamps && + live_output_continuity->boundary_pending; + if (should_rewrite_live_timestamps) { + live_access_unit.stream_pts_ns = live_output_continuity->remap_pts(access_unit.stream_pts_ns); + } + if (live_boundary_packet) { + spdlog::info( + "PIPELINE_RESET_CONTINUITY first live packet keyframe={} source_pts_ns={} remapped_pts_ns={}", + access_unit.keyframe ? "true" : "false", + access_unit.stream_pts_ns, + live_access_unit.stream_pts_ns); + } + if (rtp_publisher != nullptr) { - rtp_publisher->publish_access_unit(access_unit.annexb_bytes, access_unit.stream_pts_ns); + rtp_publisher->publish_access_unit(live_access_unit.annexb_bytes, live_access_unit.stream_pts_ns); } if (rtmp_output != nullptr) { - auto publish = (*rtmp_output)->publish_access_unit(access_unit); + auto publish = (*rtmp_output)->publish_access_unit(live_access_unit); if (!publish) { return std::unexpected(publish.error()); } @@ -607,6 +709,7 @@ Status drain_encoder( protocol::UdpRtpPublisher *rtp_publisher, protocol::RtmpOutput *rtmp_output, McapRecorderState *mcap_recorder, + LiveOutputContinuityState *live_output_continuity, metrics::IngestEmitLatencyTracker &latency_tracker) { auto drained = flushing ? backend->flush() : backend->drain(); if (!drained) { @@ -619,6 +722,7 @@ Status drain_encoder( rtp_publisher, rtmp_output, mcap_recorder, + live_output_continuity, latency_tracker); } @@ -678,9 +782,17 @@ int run_pipeline(const RuntimeConfig &config) { input_endpoints->nats_target_key, config.input.nats_url); cvmmap::NatsControlService recorder_service( - config.input.uri, - input_endpoints->nats_target_key, - config.input.nats_url); + cvmmap::NatsControlServiceOptions{ + .instance_name = input_endpoints->instance_name, + .namespace_name = input_endpoints->namespace_name, + .ipc_prefix = input_endpoints->ipc_prefix, + .base_name = input_endpoints->base_name, + .target_key = input_endpoints->nats_target_key, + .shm_name = input_endpoints->shm_name, + .zmq_addr = input_endpoints->zmq_endpoint, + .backend = std::string((*source)->backend_name()), + .nats_url = config.input.nats_url, + }); std::mutex nats_event_mutex{}; std::deque> pending_body_packets{}; std::deque pending_status_codes{}; @@ -737,6 +849,10 @@ int run_pipeline(const RuntimeConfig &config) { PipelineStats stats{}; metrics::IngestEmitLatencyTracker latency_tracker{}; + LiveOutputContinuityState live_output_continuity{}; + const bool keep_live_outputs_on_reset = + config.latency.keep_stream_on_reset && + (config.outputs.rtp.enabled || config.outputs.rtmp.enabled); bool producer_offline{false}; bool started{false}; bool using_encoded_input{false}; @@ -766,6 +882,7 @@ int run_pipeline(const RuntimeConfig &config) { warned_unknown_depth_unit = false; using_encoded_input = false; active_stream_info.reset(); + live_output_continuity.reset(); rtp_publisher.reset(); rtmp_output.reset(); }; @@ -784,26 +901,55 @@ int run_pipeline(const RuntimeConfig &config) { const auto start_outputs_from_stream_info = [&](const encode::EncodedStreamInfo &stream_info, const ipc::FrameInfo &target_info) -> Status { - rtp_publisher.reset(); - rtmp_output.reset(); - if (config.outputs.rtp.enabled) { - auto created = protocol::UdpRtpPublisher::create(config, stream_info.codec); - if (!created) { - return unexpected_error( - ERR_INTERNAL, - "pipeline RTP publisher init failed: " + created.error()); + const bool preserving_live_outputs = + keep_live_outputs_on_reset && + live_output_continuity.boundary_pending && + active_stream_info.has_value() && + stream_info_equal(*active_stream_info, stream_info) && + ((config.outputs.rtp.enabled && rtp_publisher.has_value()) || + (config.outputs.rtmp.enabled && rtmp_output.has_value())); + if (preserving_live_outputs) { + spdlog::info( + "PIPELINE_RESET_CONTINUITY preserving live outputs codec={} width={} height={}", + to_string(stream_info.codec), + stream_info.width, + stream_info.height); + live_output_continuity.update_nominal_frame_interval(stream_info); + } else { + if (keep_live_outputs_on_reset && live_output_continuity.boundary_pending && active_stream_info.has_value() && + !stream_info_equal(*active_stream_info, stream_info)) { + spdlog::warn( + "PIPELINE_RESET_CONTINUITY fallback to output rebuild reason='stream_info_change' old_codec={} new_codec={} old={}x{} new={}x{}", + to_string(active_stream_info->codec), + to_string(stream_info.codec), + active_stream_info->width, + active_stream_info->height, + stream_info.width, + stream_info.height); } - rtp_publisher.emplace(std::move(*created)); - } - if (config.outputs.rtmp.enabled) { - auto created = protocol::make_rtmp_output(config, stream_info); - if (!created) { - return unexpected_error( - created.error().code, - "pipeline RTMP output init failed: " + format_error(created.error())); + rtp_publisher.reset(); + rtmp_output.reset(); + if (config.outputs.rtp.enabled) { + auto created = protocol::UdpRtpPublisher::create(config, stream_info.codec); + if (!created) { + return unexpected_error( + ERR_INTERNAL, + "pipeline RTP publisher init failed: " + created.error()); + } + rtp_publisher.emplace(std::move(*created)); } - rtmp_output.emplace(std::move(*created)); + if (config.outputs.rtmp.enabled) { + auto created = protocol::make_rtmp_output(config, stream_info); + if (!created) { + return unexpected_error( + created.error().code, + "pipeline RTMP output init failed: " + format_error(created.error())); + } + rtmp_output.emplace(std::move(*created)); + } + live_output_continuity.note_new_session(stream_info); } + update_mcap_stream_info(mcap_recorder, stream_info); if (config.record.mcap.enabled) { std::lock_guard lock(mcap_recorder.mutex); @@ -865,35 +1011,49 @@ int run_pipeline(const RuntimeConfig &config) { return start_outputs_from_stream_info(*stream_info, snapshot.metadata.info); }; - const auto idle_timeout = std::chrono::milliseconds(config.latency.ingest_idle_timeout_ms); - auto last_event = std::chrono::steady_clock::now(); + const auto idle_timeout_enabled = config.latency.ingest_idle_timeout_ms > 0; + const auto idle_timeout = std::chrono::milliseconds(config.latency.ingest_idle_timeout_ms); + auto last_event = std::chrono::steady_clock::now(); - while (true) { - std::deque> body_packets; - std::deque status_codes; - { - std::lock_guard lock(nats_event_mutex); - body_packets.swap(pending_body_packets); - status_codes.swap(pending_status_codes); - } - const bool had_nats_events = !body_packets.empty() || !status_codes.empty(); - if (had_nats_events) { - last_event = std::chrono::steady_clock::now(); - } - for (const auto status_code : status_codes) { - stats.status_messages += 1; - switch (static_cast(status_code)) { - case cvmmap::ModuleStatus::Online: - spdlog::info("pipeline status event status=online"); - producer_offline = false; - break; - case cvmmap::ModuleStatus::Offline: - spdlog::info("pipeline status event status=offline"); - producer_offline = true; - break; - case cvmmap::ModuleStatus::StreamReset: - spdlog::info("pipeline status event status=stream_reset"); - stats.resets += 1; + while (true) { + std::deque> body_packets; + std::deque status_codes; + { + std::lock_guard lock(nats_event_mutex); + body_packets.swap(pending_body_packets); + status_codes.swap(pending_status_codes); + } + const bool had_nats_events = !body_packets.empty() || !status_codes.empty(); + if (had_nats_events) { + last_event = std::chrono::steady_clock::now(); + } + for (const auto status_code : status_codes) { + stats.status_messages += 1; + switch (static_cast(status_code)) { + case cvmmap::ModuleStatus::Online: + spdlog::info("pipeline status event status=online"); + producer_offline = false; + break; + case cvmmap::ModuleStatus::Offline: + spdlog::info("pipeline status event status=offline"); + producer_offline = true; + break; + case cvmmap::ModuleStatus::StreamReset: + spdlog::info("pipeline status event status=stream_reset"); + stats.resets += 1; + if (keep_live_outputs_on_reset) { + live_output_continuity.note_reset_boundary(); + if (!using_encoded_input && backend) { + (*backend)->shutdown(); + started = false; + } + restart_pending = false; + restart_target_info.reset(); + spdlog::info( + "PIPELINE_RESET_CONTINUITY armed outputs_active={} encoded_input={}", + (rtp_publisher.has_value() || rtmp_output.has_value()) ? "true" : "false", + using_encoded_input ? "true" : "false"); + } else { if (backend) { (*backend)->shutdown(); } @@ -905,86 +1065,88 @@ int run_pipeline(const RuntimeConfig &config) { active_info.reset(); rtp_publisher.reset(); rtmp_output.reset(); - break; } + break; } - for (const auto &body_bytes_vec : body_packets) { - const auto body_bytes = std::span( - body_bytes_vec.data(), - body_bytes_vec.size()); - if (body_bytes.empty()) { - continue; - } - - auto parsed_body = cvmmap::parse_body_tracking_message(body_bytes); - if (!parsed_body) { - spdlog::warn("pipeline body packet parse error: {}", parsed_body.error()); - continue; - } - - auto write_body = write_mcap_body_message(&mcap_recorder, record::RawBodyTrackingMessageView{ - .timestamp_ns = body_tracking_timestamp_ns(*parsed_body), - .bytes = body_bytes, - }); - if (!write_body) { - const auto reason = "pipeline body MCAP write failed: " + format_error(write_body.error()); - restart_backend(reason, active_info); - break; - } - } - - if (backend && !using_encoded_input) { - auto poll = (*backend)->poll(); - if (!poll) { - const auto reason = format_error(poll.error()); - restart_backend(reason, active_info); - } - } - - std::array poll_items{{ - {subscriber.handle(), 0, ZMQ_POLLIN, 0}, - }}; - try { - zmq::poll(poll_items, std::chrono::milliseconds{20}); - } catch (const zmq::error_t &e) { - spdlog::error("pipeline poll failed: {}", e.what()); - return exit_code(PipelineExitCode::SubscriberError); - } - - const bool frame_socket_ready = (poll_items[0].revents & ZMQ_POLLIN) != 0; - if (!frame_socket_ready) { - const auto now = std::chrono::steady_clock::now(); - if (restart_pending && restart_target_info && backend && !using_encoded_input) { - auto start_result = attempt_raw_backend_start(*restart_target_info); - if (!start_result) { - spdlog::error("pipeline backend restart failed: {}", format_error(start_result.error())); - return exit_code(PipelineExitCode::RuntimeError); - } - } - if (now - last_event >= idle_timeout) { - spdlog::info("pipeline idle timeout reached ({} ms), stopping", config.latency.ingest_idle_timeout_ms); - break; - } - if (!producer_offline && started && backend && !using_encoded_input) { - auto drain = drain_encoder( - config, - *backend, - false, - stats, - rtp_publisher ? &*rtp_publisher : nullptr, - rtmp_output ? &*rtmp_output : nullptr, - &mcap_recorder, - latency_tracker); - if (!drain) { - const auto reason = format_error(drain.error()); - restart_backend(reason, active_info); - } - } + } + for (const auto &body_bytes_vec : body_packets) { + const auto body_bytes = std::span( + body_bytes_vec.data(), + body_bytes_vec.size()); + if (body_bytes.empty()) { continue; } - zmq::message_t message; - const auto recv_result = subscriber.recv(message, zmq::recv_flags::dontwait); + auto parsed_body = cvmmap::parse_body_tracking_message(body_bytes); + if (!parsed_body) { + spdlog::warn("pipeline body packet parse error: {}", parsed_body.error()); + continue; + } + + auto write_body = write_mcap_body_message(&mcap_recorder, record::RawBodyTrackingMessageView{ + .timestamp_ns = body_tracking_timestamp_ns(*parsed_body), + .bytes = body_bytes, + }); + if (!write_body) { + const auto reason = "pipeline body MCAP write failed: " + format_error(write_body.error()); + restart_backend(reason, active_info); + break; + } + } + + if (backend && !using_encoded_input) { + auto poll = (*backend)->poll(); + if (!poll) { + const auto reason = format_error(poll.error()); + restart_backend(reason, active_info); + } + } + + std::array poll_items{{ + {subscriber.handle(), 0, ZMQ_POLLIN, 0}, + }}; + try { + zmq::poll(poll_items, std::chrono::milliseconds{20}); + } catch (const zmq::error_t &e) { + spdlog::error("pipeline poll failed: {}", e.what()); + return exit_code(PipelineExitCode::SubscriberError); + } + + const bool frame_socket_ready = (poll_items[0].revents & ZMQ_POLLIN) != 0; + if (!frame_socket_ready) { + const auto now = std::chrono::steady_clock::now(); + if (restart_pending && restart_target_info && backend && !using_encoded_input) { + auto start_result = attempt_raw_backend_start(*restart_target_info); + if (!start_result) { + spdlog::error("pipeline backend restart failed: {}", format_error(start_result.error())); + return exit_code(PipelineExitCode::RuntimeError); + } + } + if (idle_timeout_enabled && now - last_event >= idle_timeout) { + spdlog::info("pipeline idle timeout reached ({} ms), stopping", config.latency.ingest_idle_timeout_ms); + break; + } + if (!producer_offline && started && backend && !using_encoded_input) { + auto drain = drain_encoder( + config, + *backend, + false, + stats, + rtp_publisher ? &*rtp_publisher : nullptr, + rtmp_output ? &*rtmp_output : nullptr, + &mcap_recorder, + keep_live_outputs_on_reset ? &live_output_continuity : nullptr, + latency_tracker); + if (!drain) { + const auto reason = format_error(drain.error()); + restart_backend(reason, active_info); + } + } + continue; + } + + zmq::message_t message; + const auto recv_result = subscriber.recv(message, zmq::recv_flags::dontwait); if (!recv_result) { continue; } @@ -1075,6 +1237,14 @@ int run_pipeline(const RuntimeConfig &config) { restart_backend(reason, snapshot->metadata.info); continue; } + if (keep_live_outputs_on_reset && live_output_continuity.encoded_keyframe_required && !access_unit->keyframe) { + spdlog::info( + "PIPELINE_RESET_CONTINUITY dropping encoded access unit until keyframe pts_ns={} source_timestamp_ns={}", + access_unit->stream_pts_ns, + access_unit->source_timestamp_ns); + continue; + } + live_output_continuity.encoded_keyframe_required = false; std::vector access_units{}; access_units.push_back(std::move(*access_unit)); auto publish = publish_access_units( @@ -1084,6 +1254,7 @@ int run_pipeline(const RuntimeConfig &config) { rtp_publisher ? &*rtp_publisher : nullptr, rtmp_output ? &*rtmp_output : nullptr, &mcap_recorder, + keep_live_outputs_on_reset ? &live_output_continuity : nullptr, latency_tracker); if (!publish) { const auto reason = format_error(publish.error()); @@ -1094,6 +1265,7 @@ int run_pipeline(const RuntimeConfig &config) { auto push = (*backend)->push_frame(encode::RawVideoFrame{ .info = snapshot->metadata.info, .source_timestamp_ns = snapshot->metadata.timestamp_ns, + .force_keyframe = keep_live_outputs_on_reset && live_output_continuity.boundary_pending, .bytes = std::span(snapshot_buffer.data(), snapshot->bytes_copied), }); if (!push) { @@ -1103,30 +1275,30 @@ int run_pipeline(const RuntimeConfig &config) { } } - if (!snapshot->depth.empty()) { - if (snapshot->depth_unit == ipc::DepthUnit::Unknown) { - if (!warned_unknown_depth_unit) { - spdlog::warn( - "pipeline depth plane present but depth_unit is unknown; assuming millimeters for MCAP recording, producer should upgrade to the ABI with explicit depth_unit metadata"); - warned_unknown_depth_unit = true; - } - } - - auto depth_map = make_depth_map_view(*snapshot); - if (!depth_map) { - const auto reason = "pipeline depth snapshot invalid: " + depth_map.error(); - restart_backend(reason, active_info); - continue; - } - - auto write_depth = write_mcap_depth_map(&mcap_recorder, *depth_map); - if (!write_depth) { - const auto reason = "pipeline depth MCAP write failed: " + format_error(write_depth.error()); - restart_backend(reason, active_info); - continue; + if (!snapshot->depth.empty()) { + if (snapshot->depth_unit == ipc::DepthUnit::Unknown) { + if (!warned_unknown_depth_unit) { + spdlog::warn( + "pipeline depth plane present but depth_unit is unknown; assuming millimeters for MCAP recording, producer should upgrade to the ABI with explicit depth_unit metadata"); + warned_unknown_depth_unit = true; } } + auto depth_map = make_depth_map_view(*snapshot); + if (!depth_map) { + const auto reason = "pipeline depth snapshot invalid: " + depth_map.error(); + restart_backend(reason, active_info); + continue; + } + + auto write_depth = write_mcap_depth_map(&mcap_recorder, *depth_map); + if (!write_depth) { + const auto reason = "pipeline depth MCAP write failed: " + format_error(write_depth.error()); + restart_backend(reason, active_info); + continue; + } + } + stats.pushed_frames += 1; if (!want_encoded_input) { auto drain = drain_encoder( @@ -1137,6 +1309,7 @@ int run_pipeline(const RuntimeConfig &config) { rtp_publisher ? &*rtp_publisher : nullptr, rtmp_output ? &*rtmp_output : nullptr, &mcap_recorder, + keep_live_outputs_on_reset ? &live_output_continuity : nullptr, latency_tracker); if (!drain) { const auto reason = format_error(drain.error()); @@ -1166,6 +1339,7 @@ int run_pipeline(const RuntimeConfig &config) { rtp_publisher ? &*rtp_publisher : nullptr, rtmp_output ? &*rtmp_output : nullptr, &mcap_recorder, + keep_live_outputs_on_reset ? &live_output_continuity : nullptr, latency_tracker); if (!drain) { spdlog::error("pipeline publish failed during flush: {}", format_error(drain.error())); diff --git a/src/protocol/rtmp_output.cpp b/src/protocol/rtmp_output.cpp index 1e0bf62..a0fff6c 100644 --- a/src/protocol/rtmp_output.cpp +++ b/src/protocol/rtmp_output.cpp @@ -325,6 +325,7 @@ private: const encode::EncodedStreamInfo &stream_info) { Session session{}; session.url = url; + AVDictionary *muxer_options{nullptr}; const auto alloc_result = avformat_alloc_output_context2(&session.format_context, nullptr, "flv", url.c_str()); if (alloc_result < 0 || session.format_context == nullptr) { @@ -369,7 +370,18 @@ private: } } - const auto header_result = avformat_write_header(session.format_context, nullptr); + // RTMP sockets are non-seekable, so the FLV muxer must not try to backfill + // duration/filesize metadata during trailer write. + const auto options_result = av_dict_set(&muxer_options, "flvflags", "no_duration_filesize", 0); + if (options_result < 0) { + av_dict_free(&muxer_options); + close_session(session); + return unexpected_error( + ERR_ALLOCATION_FAILED, + "failed to configure FLV muxer flags for '" + url + "': " + av_error_string(options_result)); + } + const auto header_result = avformat_write_header(session.format_context, &muxer_options); + av_dict_free(&muxer_options); if (header_result < 0) { close_session(session); return unexpected_error( diff --git a/src/testers/rtp_output_tester.cpp b/src/testers/rtp_output_tester.cpp index a5cfece..1a57f41 100644 --- a/src/testers/rtp_output_tester.cpp +++ b/src/testers/rtp_output_tester.cpp @@ -163,7 +163,7 @@ int main(int argc, char **argv) { return exit_code(TesterExitCode::BackendInitError); } - auto publisher = cvmmap_streamer::protocol::UdpRtpPublisher::create(config); + auto publisher = cvmmap_streamer::protocol::UdpRtpPublisher::create(config, *codec); if (!publisher) { spdlog::error("failed to initialize RTP publisher: {}", publisher.error()); return exit_code(TesterExitCode::PublisherInitError);