diff --git a/README.md b/README.md index a3d1f84..2a10425 100644 --- a/README.md +++ b/README.md @@ -512,7 +512,7 @@ Run the fault injection and latency validation suite. | Flag | Description | Default | |------|-------------|---------| | `--ingest-max-frames N` | Process at most N frames then exit | 0 (unlimited) | -| `--ingest-idle-timeout-ms MS` | Exit if idle for MS milliseconds | 0 (disabled) | +| `--ingest-idle-timeout-ms MS` | Exit if idle for MS milliseconds; 0 disables the timeout | 0 (disabled) | ## Architecture diff --git a/include/cvmmap_streamer/config/runtime_config.hpp b/include/cvmmap_streamer/config/runtime_config.hpp index 03bbe29..a722952 100644 --- a/include/cvmmap_streamer/config/runtime_config.hpp +++ b/include/cvmmap_streamer/config/runtime_config.hpp @@ -108,8 +108,9 @@ struct LatencyConfig { std::size_t queue_size{1}; bool realtime_sync{true}; bool force_idr_on_reset{true}; + bool keep_stream_on_reset{false}; std::uint32_t ingest_max_frames{0}; - std::uint32_t ingest_idle_timeout_ms{1000}; + std::uint32_t ingest_idle_timeout_ms{0}; std::uint32_t ingest_consumer_delay_ms{0}; std::uint32_t snapshot_copy_delay_us{0}; std::uint32_t emit_stall_ms{0}; diff --git a/include/cvmmap_streamer/encode/encoder_backend.hpp b/include/cvmmap_streamer/encode/encoder_backend.hpp index 423d0e4..ef53593 100644 --- a/include/cvmmap_streamer/encode/encoder_backend.hpp +++ b/include/cvmmap_streamer/encode/encoder_backend.hpp @@ -18,6 +18,7 @@ struct RawVideoFrame { ipc::FrameInfo info{}; std::uint64_t source_timestamp_ns{0}; std::size_t row_stride_bytes{0}; + bool force_keyframe{false}; std::span bytes{}; }; diff --git a/src/config/runtime_config.cpp b/src/config/runtime_config.cpp index 606ff89..3418c6e 100644 --- a/src/config/runtime_config.cpp +++ b/src/config/runtime_config.cpp @@ -968,9 +968,6 @@ std::expected validate_runtime_config(const RuntimeConfig &co if (config.encoder.b_frames > config.encoder.gop) { return std::unexpected("invalid encoder config: b_frames must be <= gop"); } - if (config.latency.ingest_idle_timeout_ms == 0) { - return std::unexpected("invalid ingest config: ingest_idle_timeout_ms must be >= 1"); - } return {}; } diff --git a/src/core/ingest_runtime.cpp b/src/core/ingest_runtime.cpp index 82de528..27dee63 100644 --- a/src/core/ingest_runtime.cpp +++ b/src/core/ingest_runtime.cpp @@ -275,6 +275,7 @@ int run_ingest_loop(const RuntimeConfig &config) { } }); + const auto idle_timeout_enabled = config.latency.ingest_idle_timeout_ms > 0; const auto idle_timeout = std::chrono::milliseconds(config.latency.ingest_idle_timeout_ms); auto last_event = std::chrono::steady_clock::now(); @@ -283,7 +284,7 @@ int run_ingest_loop(const RuntimeConfig &config) { const auto recv_result = subscriber.recv(message, zmq::recv_flags::none); if (!recv_result) { const auto now = std::chrono::steady_clock::now(); - if (now - last_event >= idle_timeout) { + if (idle_timeout_enabled && now - last_event >= idle_timeout) { spdlog::info( "ingest idle timeout reached ({} ms), stopping", config.latency.ingest_idle_timeout_ms); diff --git a/src/encode/ffmpeg_encoder_backend.cpp b/src/encode/ffmpeg_encoder_backend.cpp index aa86dff..c58c50a 100644 --- a/src/encode/ffmpeg_encoder_backend.cpp +++ b/src/encode/ffmpeg_encoder_backend.cpp @@ -217,6 +217,12 @@ public: first_source_timestamp_ns_ = frame.source_timestamp_ns; } + frame_->pict_type = frame.force_keyframe ? AV_PICTURE_TYPE_I : AV_PICTURE_TYPE_NONE; + if (frame.force_keyframe) { + frame_->flags |= AV_FRAME_FLAG_KEY; + } else { + frame_->flags &= ~AV_FRAME_FLAG_KEY; + } frame_->pts = static_cast(frame.source_timestamp_ns - *first_source_timestamp_ns_); const auto send_result = avcodec_send_frame(context_, frame_); if (send_result < 0) { diff --git a/src/pipeline/pipeline_runtime.cpp b/src/pipeline/pipeline_runtime.cpp index 1e653d2..c39a04f 100644 --- a/src/pipeline/pipeline_runtime.cpp +++ b/src/pipeline/pipeline_runtime.cpp @@ -62,6 +62,10 @@ constexpr int exit_code(PipelineExitCode code) { } struct ResolvedInputEndpoints { + std::string instance_name; + std::string namespace_name; + std::string ipc_prefix; + std::string base_name; std::string shm_name; std::string zmq_endpoint; std::string nats_target_key; @@ -71,17 +75,21 @@ struct ResolvedInputEndpoints { std::expected resolve_input_endpoints(const RuntimeConfig &config) { try { auto target = cvmmap::resolve_cvmmap_target_or_throw(config.input.uri); - spdlog::info( - "pipeline input resolved: uri='{}' shm_name='{}' zmq_endpoint='{}' nats_target_key='{}'", - config.input.uri, - target.shm_name, - target.zmq_addr, - target.nats_target_key); - return ResolvedInputEndpoints{ - .shm_name = target.shm_name, - .zmq_endpoint = target.zmq_addr, - .nats_target_key = target.nats_target_key, - }; + spdlog::info( + "pipeline input resolved: uri='{}' shm_name='{}' zmq_endpoint='{}' nats_target_key='{}'", + config.input.uri, + target.shm_name, + target.zmq_addr, + target.nats_target_key); + return ResolvedInputEndpoints{ + .instance_name = target.instance, + .namespace_name = target.namespace_name, + .ipc_prefix = target.prefix, + .base_name = target.base_name, + .shm_name = target.shm_name, + .zmq_endpoint = target.zmq_addr, + .nats_target_key = target.nats_target_key, + }; } catch (const std::exception &e) { return std::unexpected(std::string("invalid cvmmap input URI: ") + e.what()); } @@ -327,6 +335,81 @@ bool stream_info_equal( lhs.bitstream_format == rhs.bitstream_format; } +struct LiveOutputContinuityState { + bool boundary_pending{false}; + bool encoded_keyframe_required{false}; + std::optional last_output_pts_ns{}; + std::optional segment_input_start_pts_ns{}; + std::uint64_t segment_output_start_pts_ns{0}; + std::uint64_t nominal_frame_interval_ns{33'333'333ull}; + + void reset() { + boundary_pending = false; + encoded_keyframe_required = false; + last_output_pts_ns.reset(); + segment_input_start_pts_ns.reset(); + segment_output_start_pts_ns = 0; + nominal_frame_interval_ns = 33'333'333ull; + } + + void note_reset_boundary() { + boundary_pending = true; + encoded_keyframe_required = true; + segment_input_start_pts_ns.reset(); + } + + void note_new_session(const encode::EncodedStreamInfo &stream_info) { + reset(); + update_nominal_frame_interval(stream_info); + } + + void update_nominal_frame_interval(const encode::EncodedStreamInfo &stream_info) { + if (stream_info.frame_rate_num == 0 || stream_info.frame_rate_den == 0) { + nominal_frame_interval_ns = 33'333'333ull; + return; + } + + const auto numerator = static_cast(stream_info.frame_rate_den) * 1'000'000'000ull; + const auto denominator = static_cast(stream_info.frame_rate_num); + if (denominator == 0) { + nominal_frame_interval_ns = 33'333'333ull; + return; + } + + const auto interval = numerator / denominator; + nominal_frame_interval_ns = interval == 0 ? 1ull : interval; + } + + [[nodiscard]] + std::uint64_t remap_pts(const std::uint64_t input_pts_ns) { + if (!last_output_pts_ns) { + last_output_pts_ns = input_pts_ns; + segment_input_start_pts_ns = input_pts_ns; + segment_output_start_pts_ns = input_pts_ns; + boundary_pending = false; + return input_pts_ns; + } + + if (boundary_pending || !segment_input_start_pts_ns) { + segment_input_start_pts_ns = input_pts_ns; + segment_output_start_pts_ns = *last_output_pts_ns + nominal_frame_interval_ns; + boundary_pending = false; + } + + const auto input_delta = + input_pts_ns >= *segment_input_start_pts_ns + ? input_pts_ns - *segment_input_start_pts_ns + : 0ull; + auto remapped_pts = segment_output_start_pts_ns + input_delta; + if (remapped_pts <= *last_output_pts_ns) { + remapped_pts = *last_output_pts_ns + nominal_frame_interval_ns; + } + + last_output_pts_ns = remapped_pts; + return remapped_pts; + } +}; + [[nodiscard]] std::uint64_t body_tracking_timestamp_ns(const cvmmap::body_tracking_frame_t &frame) { if (frame.header.timestamp_ns != 0) { @@ -556,6 +639,7 @@ Status publish_access_units( protocol::UdpRtpPublisher *rtp_publisher, protocol::RtmpOutput *rtmp_output, McapRecorderState *mcap_recorder, + LiveOutputContinuityState *live_output_continuity, metrics::IngestEmitLatencyTracker &latency_tracker) { for (auto &access_unit : access_units) { if (access_unit.annexb_bytes.empty()) { @@ -567,11 +651,29 @@ Status publish_access_units( latency_tracker.note_emit_stall(); } + const auto should_rewrite_live_timestamps = + live_output_continuity != nullptr && + (rtp_publisher != nullptr || rtmp_output != nullptr); + auto live_access_unit = access_unit; + const bool live_boundary_packet = + should_rewrite_live_timestamps && + live_output_continuity->boundary_pending; + if (should_rewrite_live_timestamps) { + live_access_unit.stream_pts_ns = live_output_continuity->remap_pts(access_unit.stream_pts_ns); + } + if (live_boundary_packet) { + spdlog::info( + "PIPELINE_RESET_CONTINUITY first live packet keyframe={} source_pts_ns={} remapped_pts_ns={}", + access_unit.keyframe ? "true" : "false", + access_unit.stream_pts_ns, + live_access_unit.stream_pts_ns); + } + if (rtp_publisher != nullptr) { - rtp_publisher->publish_access_unit(access_unit.annexb_bytes, access_unit.stream_pts_ns); + rtp_publisher->publish_access_unit(live_access_unit.annexb_bytes, live_access_unit.stream_pts_ns); } if (rtmp_output != nullptr) { - auto publish = (*rtmp_output)->publish_access_unit(access_unit); + auto publish = (*rtmp_output)->publish_access_unit(live_access_unit); if (!publish) { return std::unexpected(publish.error()); } @@ -607,6 +709,7 @@ Status drain_encoder( protocol::UdpRtpPublisher *rtp_publisher, protocol::RtmpOutput *rtmp_output, McapRecorderState *mcap_recorder, + LiveOutputContinuityState *live_output_continuity, metrics::IngestEmitLatencyTracker &latency_tracker) { auto drained = flushing ? backend->flush() : backend->drain(); if (!drained) { @@ -619,6 +722,7 @@ Status drain_encoder( rtp_publisher, rtmp_output, mcap_recorder, + live_output_continuity, latency_tracker); } @@ -678,9 +782,17 @@ int run_pipeline(const RuntimeConfig &config) { input_endpoints->nats_target_key, config.input.nats_url); cvmmap::NatsControlService recorder_service( - config.input.uri, - input_endpoints->nats_target_key, - config.input.nats_url); + cvmmap::NatsControlServiceOptions{ + .instance_name = input_endpoints->instance_name, + .namespace_name = input_endpoints->namespace_name, + .ipc_prefix = input_endpoints->ipc_prefix, + .base_name = input_endpoints->base_name, + .target_key = input_endpoints->nats_target_key, + .shm_name = input_endpoints->shm_name, + .zmq_addr = input_endpoints->zmq_endpoint, + .backend = std::string((*source)->backend_name()), + .nats_url = config.input.nats_url, + }); std::mutex nats_event_mutex{}; std::deque> pending_body_packets{}; std::deque pending_status_codes{}; @@ -737,6 +849,10 @@ int run_pipeline(const RuntimeConfig &config) { PipelineStats stats{}; metrics::IngestEmitLatencyTracker latency_tracker{}; + LiveOutputContinuityState live_output_continuity{}; + const bool keep_live_outputs_on_reset = + config.latency.keep_stream_on_reset && + (config.outputs.rtp.enabled || config.outputs.rtmp.enabled); bool producer_offline{false}; bool started{false}; bool using_encoded_input{false}; @@ -766,6 +882,7 @@ int run_pipeline(const RuntimeConfig &config) { warned_unknown_depth_unit = false; using_encoded_input = false; active_stream_info.reset(); + live_output_continuity.reset(); rtp_publisher.reset(); rtmp_output.reset(); }; @@ -784,26 +901,55 @@ int run_pipeline(const RuntimeConfig &config) { const auto start_outputs_from_stream_info = [&](const encode::EncodedStreamInfo &stream_info, const ipc::FrameInfo &target_info) -> Status { - rtp_publisher.reset(); - rtmp_output.reset(); - if (config.outputs.rtp.enabled) { - auto created = protocol::UdpRtpPublisher::create(config, stream_info.codec); - if (!created) { - return unexpected_error( - ERR_INTERNAL, - "pipeline RTP publisher init failed: " + created.error()); + const bool preserving_live_outputs = + keep_live_outputs_on_reset && + live_output_continuity.boundary_pending && + active_stream_info.has_value() && + stream_info_equal(*active_stream_info, stream_info) && + ((config.outputs.rtp.enabled && rtp_publisher.has_value()) || + (config.outputs.rtmp.enabled && rtmp_output.has_value())); + if (preserving_live_outputs) { + spdlog::info( + "PIPELINE_RESET_CONTINUITY preserving live outputs codec={} width={} height={}", + to_string(stream_info.codec), + stream_info.width, + stream_info.height); + live_output_continuity.update_nominal_frame_interval(stream_info); + } else { + if (keep_live_outputs_on_reset && live_output_continuity.boundary_pending && active_stream_info.has_value() && + !stream_info_equal(*active_stream_info, stream_info)) { + spdlog::warn( + "PIPELINE_RESET_CONTINUITY fallback to output rebuild reason='stream_info_change' old_codec={} new_codec={} old={}x{} new={}x{}", + to_string(active_stream_info->codec), + to_string(stream_info.codec), + active_stream_info->width, + active_stream_info->height, + stream_info.width, + stream_info.height); } - rtp_publisher.emplace(std::move(*created)); - } - if (config.outputs.rtmp.enabled) { - auto created = protocol::make_rtmp_output(config, stream_info); - if (!created) { - return unexpected_error( - created.error().code, - "pipeline RTMP output init failed: " + format_error(created.error())); + rtp_publisher.reset(); + rtmp_output.reset(); + if (config.outputs.rtp.enabled) { + auto created = protocol::UdpRtpPublisher::create(config, stream_info.codec); + if (!created) { + return unexpected_error( + ERR_INTERNAL, + "pipeline RTP publisher init failed: " + created.error()); + } + rtp_publisher.emplace(std::move(*created)); } - rtmp_output.emplace(std::move(*created)); + if (config.outputs.rtmp.enabled) { + auto created = protocol::make_rtmp_output(config, stream_info); + if (!created) { + return unexpected_error( + created.error().code, + "pipeline RTMP output init failed: " + format_error(created.error())); + } + rtmp_output.emplace(std::move(*created)); + } + live_output_continuity.note_new_session(stream_info); } + update_mcap_stream_info(mcap_recorder, stream_info); if (config.record.mcap.enabled) { std::lock_guard lock(mcap_recorder.mutex); @@ -865,35 +1011,49 @@ int run_pipeline(const RuntimeConfig &config) { return start_outputs_from_stream_info(*stream_info, snapshot.metadata.info); }; - const auto idle_timeout = std::chrono::milliseconds(config.latency.ingest_idle_timeout_ms); - auto last_event = std::chrono::steady_clock::now(); + const auto idle_timeout_enabled = config.latency.ingest_idle_timeout_ms > 0; + const auto idle_timeout = std::chrono::milliseconds(config.latency.ingest_idle_timeout_ms); + auto last_event = std::chrono::steady_clock::now(); - while (true) { - std::deque> body_packets; - std::deque status_codes; - { - std::lock_guard lock(nats_event_mutex); - body_packets.swap(pending_body_packets); - status_codes.swap(pending_status_codes); - } - const bool had_nats_events = !body_packets.empty() || !status_codes.empty(); - if (had_nats_events) { - last_event = std::chrono::steady_clock::now(); - } - for (const auto status_code : status_codes) { - stats.status_messages += 1; - switch (static_cast(status_code)) { - case cvmmap::ModuleStatus::Online: - spdlog::info("pipeline status event status=online"); - producer_offline = false; - break; - case cvmmap::ModuleStatus::Offline: - spdlog::info("pipeline status event status=offline"); - producer_offline = true; - break; - case cvmmap::ModuleStatus::StreamReset: - spdlog::info("pipeline status event status=stream_reset"); - stats.resets += 1; + while (true) { + std::deque> body_packets; + std::deque status_codes; + { + std::lock_guard lock(nats_event_mutex); + body_packets.swap(pending_body_packets); + status_codes.swap(pending_status_codes); + } + const bool had_nats_events = !body_packets.empty() || !status_codes.empty(); + if (had_nats_events) { + last_event = std::chrono::steady_clock::now(); + } + for (const auto status_code : status_codes) { + stats.status_messages += 1; + switch (static_cast(status_code)) { + case cvmmap::ModuleStatus::Online: + spdlog::info("pipeline status event status=online"); + producer_offline = false; + break; + case cvmmap::ModuleStatus::Offline: + spdlog::info("pipeline status event status=offline"); + producer_offline = true; + break; + case cvmmap::ModuleStatus::StreamReset: + spdlog::info("pipeline status event status=stream_reset"); + stats.resets += 1; + if (keep_live_outputs_on_reset) { + live_output_continuity.note_reset_boundary(); + if (!using_encoded_input && backend) { + (*backend)->shutdown(); + started = false; + } + restart_pending = false; + restart_target_info.reset(); + spdlog::info( + "PIPELINE_RESET_CONTINUITY armed outputs_active={} encoded_input={}", + (rtp_publisher.has_value() || rtmp_output.has_value()) ? "true" : "false", + using_encoded_input ? "true" : "false"); + } else { if (backend) { (*backend)->shutdown(); } @@ -905,86 +1065,88 @@ int run_pipeline(const RuntimeConfig &config) { active_info.reset(); rtp_publisher.reset(); rtmp_output.reset(); - break; } + break; } - for (const auto &body_bytes_vec : body_packets) { - const auto body_bytes = std::span( - body_bytes_vec.data(), - body_bytes_vec.size()); - if (body_bytes.empty()) { - continue; - } - - auto parsed_body = cvmmap::parse_body_tracking_message(body_bytes); - if (!parsed_body) { - spdlog::warn("pipeline body packet parse error: {}", parsed_body.error()); - continue; - } - - auto write_body = write_mcap_body_message(&mcap_recorder, record::RawBodyTrackingMessageView{ - .timestamp_ns = body_tracking_timestamp_ns(*parsed_body), - .bytes = body_bytes, - }); - if (!write_body) { - const auto reason = "pipeline body MCAP write failed: " + format_error(write_body.error()); - restart_backend(reason, active_info); - break; - } - } - - if (backend && !using_encoded_input) { - auto poll = (*backend)->poll(); - if (!poll) { - const auto reason = format_error(poll.error()); - restart_backend(reason, active_info); - } - } - - std::array poll_items{{ - {subscriber.handle(), 0, ZMQ_POLLIN, 0}, - }}; - try { - zmq::poll(poll_items, std::chrono::milliseconds{20}); - } catch (const zmq::error_t &e) { - spdlog::error("pipeline poll failed: {}", e.what()); - return exit_code(PipelineExitCode::SubscriberError); - } - - const bool frame_socket_ready = (poll_items[0].revents & ZMQ_POLLIN) != 0; - if (!frame_socket_ready) { - const auto now = std::chrono::steady_clock::now(); - if (restart_pending && restart_target_info && backend && !using_encoded_input) { - auto start_result = attempt_raw_backend_start(*restart_target_info); - if (!start_result) { - spdlog::error("pipeline backend restart failed: {}", format_error(start_result.error())); - return exit_code(PipelineExitCode::RuntimeError); - } - } - if (now - last_event >= idle_timeout) { - spdlog::info("pipeline idle timeout reached ({} ms), stopping", config.latency.ingest_idle_timeout_ms); - break; - } - if (!producer_offline && started && backend && !using_encoded_input) { - auto drain = drain_encoder( - config, - *backend, - false, - stats, - rtp_publisher ? &*rtp_publisher : nullptr, - rtmp_output ? &*rtmp_output : nullptr, - &mcap_recorder, - latency_tracker); - if (!drain) { - const auto reason = format_error(drain.error()); - restart_backend(reason, active_info); - } - } + } + for (const auto &body_bytes_vec : body_packets) { + const auto body_bytes = std::span( + body_bytes_vec.data(), + body_bytes_vec.size()); + if (body_bytes.empty()) { continue; } - zmq::message_t message; - const auto recv_result = subscriber.recv(message, zmq::recv_flags::dontwait); + auto parsed_body = cvmmap::parse_body_tracking_message(body_bytes); + if (!parsed_body) { + spdlog::warn("pipeline body packet parse error: {}", parsed_body.error()); + continue; + } + + auto write_body = write_mcap_body_message(&mcap_recorder, record::RawBodyTrackingMessageView{ + .timestamp_ns = body_tracking_timestamp_ns(*parsed_body), + .bytes = body_bytes, + }); + if (!write_body) { + const auto reason = "pipeline body MCAP write failed: " + format_error(write_body.error()); + restart_backend(reason, active_info); + break; + } + } + + if (backend && !using_encoded_input) { + auto poll = (*backend)->poll(); + if (!poll) { + const auto reason = format_error(poll.error()); + restart_backend(reason, active_info); + } + } + + std::array poll_items{{ + {subscriber.handle(), 0, ZMQ_POLLIN, 0}, + }}; + try { + zmq::poll(poll_items, std::chrono::milliseconds{20}); + } catch (const zmq::error_t &e) { + spdlog::error("pipeline poll failed: {}", e.what()); + return exit_code(PipelineExitCode::SubscriberError); + } + + const bool frame_socket_ready = (poll_items[0].revents & ZMQ_POLLIN) != 0; + if (!frame_socket_ready) { + const auto now = std::chrono::steady_clock::now(); + if (restart_pending && restart_target_info && backend && !using_encoded_input) { + auto start_result = attempt_raw_backend_start(*restart_target_info); + if (!start_result) { + spdlog::error("pipeline backend restart failed: {}", format_error(start_result.error())); + return exit_code(PipelineExitCode::RuntimeError); + } + } + if (idle_timeout_enabled && now - last_event >= idle_timeout) { + spdlog::info("pipeline idle timeout reached ({} ms), stopping", config.latency.ingest_idle_timeout_ms); + break; + } + if (!producer_offline && started && backend && !using_encoded_input) { + auto drain = drain_encoder( + config, + *backend, + false, + stats, + rtp_publisher ? &*rtp_publisher : nullptr, + rtmp_output ? &*rtmp_output : nullptr, + &mcap_recorder, + keep_live_outputs_on_reset ? &live_output_continuity : nullptr, + latency_tracker); + if (!drain) { + const auto reason = format_error(drain.error()); + restart_backend(reason, active_info); + } + } + continue; + } + + zmq::message_t message; + const auto recv_result = subscriber.recv(message, zmq::recv_flags::dontwait); if (!recv_result) { continue; } @@ -1075,6 +1237,14 @@ int run_pipeline(const RuntimeConfig &config) { restart_backend(reason, snapshot->metadata.info); continue; } + if (keep_live_outputs_on_reset && live_output_continuity.encoded_keyframe_required && !access_unit->keyframe) { + spdlog::info( + "PIPELINE_RESET_CONTINUITY dropping encoded access unit until keyframe pts_ns={} source_timestamp_ns={}", + access_unit->stream_pts_ns, + access_unit->source_timestamp_ns); + continue; + } + live_output_continuity.encoded_keyframe_required = false; std::vector access_units{}; access_units.push_back(std::move(*access_unit)); auto publish = publish_access_units( @@ -1084,6 +1254,7 @@ int run_pipeline(const RuntimeConfig &config) { rtp_publisher ? &*rtp_publisher : nullptr, rtmp_output ? &*rtmp_output : nullptr, &mcap_recorder, + keep_live_outputs_on_reset ? &live_output_continuity : nullptr, latency_tracker); if (!publish) { const auto reason = format_error(publish.error()); @@ -1094,6 +1265,7 @@ int run_pipeline(const RuntimeConfig &config) { auto push = (*backend)->push_frame(encode::RawVideoFrame{ .info = snapshot->metadata.info, .source_timestamp_ns = snapshot->metadata.timestamp_ns, + .force_keyframe = keep_live_outputs_on_reset && live_output_continuity.boundary_pending, .bytes = std::span(snapshot_buffer.data(), snapshot->bytes_copied), }); if (!push) { @@ -1103,30 +1275,30 @@ int run_pipeline(const RuntimeConfig &config) { } } - if (!snapshot->depth.empty()) { - if (snapshot->depth_unit == ipc::DepthUnit::Unknown) { - if (!warned_unknown_depth_unit) { - spdlog::warn( - "pipeline depth plane present but depth_unit is unknown; assuming millimeters for MCAP recording, producer should upgrade to the ABI with explicit depth_unit metadata"); - warned_unknown_depth_unit = true; - } - } - - auto depth_map = make_depth_map_view(*snapshot); - if (!depth_map) { - const auto reason = "pipeline depth snapshot invalid: " + depth_map.error(); - restart_backend(reason, active_info); - continue; - } - - auto write_depth = write_mcap_depth_map(&mcap_recorder, *depth_map); - if (!write_depth) { - const auto reason = "pipeline depth MCAP write failed: " + format_error(write_depth.error()); - restart_backend(reason, active_info); - continue; + if (!snapshot->depth.empty()) { + if (snapshot->depth_unit == ipc::DepthUnit::Unknown) { + if (!warned_unknown_depth_unit) { + spdlog::warn( + "pipeline depth plane present but depth_unit is unknown; assuming millimeters for MCAP recording, producer should upgrade to the ABI with explicit depth_unit metadata"); + warned_unknown_depth_unit = true; } } + auto depth_map = make_depth_map_view(*snapshot); + if (!depth_map) { + const auto reason = "pipeline depth snapshot invalid: " + depth_map.error(); + restart_backend(reason, active_info); + continue; + } + + auto write_depth = write_mcap_depth_map(&mcap_recorder, *depth_map); + if (!write_depth) { + const auto reason = "pipeline depth MCAP write failed: " + format_error(write_depth.error()); + restart_backend(reason, active_info); + continue; + } + } + stats.pushed_frames += 1; if (!want_encoded_input) { auto drain = drain_encoder( @@ -1137,6 +1309,7 @@ int run_pipeline(const RuntimeConfig &config) { rtp_publisher ? &*rtp_publisher : nullptr, rtmp_output ? &*rtmp_output : nullptr, &mcap_recorder, + keep_live_outputs_on_reset ? &live_output_continuity : nullptr, latency_tracker); if (!drain) { const auto reason = format_error(drain.error()); @@ -1166,6 +1339,7 @@ int run_pipeline(const RuntimeConfig &config) { rtp_publisher ? &*rtp_publisher : nullptr, rtmp_output ? &*rtmp_output : nullptr, &mcap_recorder, + keep_live_outputs_on_reset ? &live_output_continuity : nullptr, latency_tracker); if (!drain) { spdlog::error("pipeline publish failed during flush: {}", format_error(drain.error())); diff --git a/src/protocol/rtmp_output.cpp b/src/protocol/rtmp_output.cpp index 1e0bf62..a0fff6c 100644 --- a/src/protocol/rtmp_output.cpp +++ b/src/protocol/rtmp_output.cpp @@ -325,6 +325,7 @@ private: const encode::EncodedStreamInfo &stream_info) { Session session{}; session.url = url; + AVDictionary *muxer_options{nullptr}; const auto alloc_result = avformat_alloc_output_context2(&session.format_context, nullptr, "flv", url.c_str()); if (alloc_result < 0 || session.format_context == nullptr) { @@ -369,7 +370,18 @@ private: } } - const auto header_result = avformat_write_header(session.format_context, nullptr); + // RTMP sockets are non-seekable, so the FLV muxer must not try to backfill + // duration/filesize metadata during trailer write. + const auto options_result = av_dict_set(&muxer_options, "flvflags", "no_duration_filesize", 0); + if (options_result < 0) { + av_dict_free(&muxer_options); + close_session(session); + return unexpected_error( + ERR_ALLOCATION_FAILED, + "failed to configure FLV muxer flags for '" + url + "': " + av_error_string(options_result)); + } + const auto header_result = avformat_write_header(session.format_context, &muxer_options); + av_dict_free(&muxer_options); if (header_result < 0) { close_session(session); return unexpected_error( diff --git a/src/testers/rtp_output_tester.cpp b/src/testers/rtp_output_tester.cpp index a5cfece..1a57f41 100644 --- a/src/testers/rtp_output_tester.cpp +++ b/src/testers/rtp_output_tester.cpp @@ -163,7 +163,7 @@ int main(int argc, char **argv) { return exit_code(TesterExitCode::BackendInitError); } - auto publisher = cvmmap_streamer::protocol::UdpRtpPublisher::create(config); + auto publisher = cvmmap_streamer::protocol::UdpRtpPublisher::create(config, *codec); if (!publisher) { spdlog::error("failed to initialize RTP publisher: {}", publisher.error()); return exit_code(TesterExitCode::PublisherInitError);