fix(stream): preserve live outputs and disable idle exit by default

Preserve RTP/RTMP session continuity across upstream stream_reset events by
forcing a keyframe on restart, remapping live timestamps, and keeping live
outputs open when the runtime requests reset continuity.

Disable idle auto-exit by default by changing ingest_idle_timeout_ms to 0,
removing validation that rejected 0, and only enforcing idle shutdown when a
positive timeout is configured in pipeline and ingest loops.

Also suppress libavformat FLV trailer header backfill attempts on RTMP sockets
and update the RTP output tester for the newer publisher create signature.

Docs are updated to state that 0 disables the idle timeout.
This commit is contained in:
2026-04-09 12:17:54 +08:00
parent 0a3da46f19
commit 965b03c053
9 changed files with 357 additions and 165 deletions
+331 -157
View File
@@ -62,6 +62,10 @@ constexpr int exit_code(PipelineExitCode code) {
}
struct ResolvedInputEndpoints {
std::string instance_name;
std::string namespace_name;
std::string ipc_prefix;
std::string base_name;
std::string shm_name;
std::string zmq_endpoint;
std::string nats_target_key;
@@ -71,17 +75,21 @@ struct ResolvedInputEndpoints {
std::expected<ResolvedInputEndpoints, std::string> resolve_input_endpoints(const RuntimeConfig &config) {
try {
auto target = cvmmap::resolve_cvmmap_target_or_throw(config.input.uri);
spdlog::info(
"pipeline input resolved: uri='{}' shm_name='{}' zmq_endpoint='{}' nats_target_key='{}'",
config.input.uri,
target.shm_name,
target.zmq_addr,
target.nats_target_key);
return ResolvedInputEndpoints{
.shm_name = target.shm_name,
.zmq_endpoint = target.zmq_addr,
.nats_target_key = target.nats_target_key,
};
spdlog::info(
"pipeline input resolved: uri='{}' shm_name='{}' zmq_endpoint='{}' nats_target_key='{}'",
config.input.uri,
target.shm_name,
target.zmq_addr,
target.nats_target_key);
return ResolvedInputEndpoints{
.instance_name = target.instance,
.namespace_name = target.namespace_name,
.ipc_prefix = target.prefix,
.base_name = target.base_name,
.shm_name = target.shm_name,
.zmq_endpoint = target.zmq_addr,
.nats_target_key = target.nats_target_key,
};
} catch (const std::exception &e) {
return std::unexpected(std::string("invalid cvmmap input URI: ") + e.what());
}
@@ -327,6 +335,81 @@ bool stream_info_equal(
lhs.bitstream_format == rhs.bitstream_format;
}
struct LiveOutputContinuityState {
bool boundary_pending{false};
bool encoded_keyframe_required{false};
std::optional<std::uint64_t> last_output_pts_ns{};
std::optional<std::uint64_t> segment_input_start_pts_ns{};
std::uint64_t segment_output_start_pts_ns{0};
std::uint64_t nominal_frame_interval_ns{33'333'333ull};
void reset() {
boundary_pending = false;
encoded_keyframe_required = false;
last_output_pts_ns.reset();
segment_input_start_pts_ns.reset();
segment_output_start_pts_ns = 0;
nominal_frame_interval_ns = 33'333'333ull;
}
void note_reset_boundary() {
boundary_pending = true;
encoded_keyframe_required = true;
segment_input_start_pts_ns.reset();
}
void note_new_session(const encode::EncodedStreamInfo &stream_info) {
reset();
update_nominal_frame_interval(stream_info);
}
void update_nominal_frame_interval(const encode::EncodedStreamInfo &stream_info) {
if (stream_info.frame_rate_num == 0 || stream_info.frame_rate_den == 0) {
nominal_frame_interval_ns = 33'333'333ull;
return;
}
const auto numerator = static_cast<std::uint64_t>(stream_info.frame_rate_den) * 1'000'000'000ull;
const auto denominator = static_cast<std::uint64_t>(stream_info.frame_rate_num);
if (denominator == 0) {
nominal_frame_interval_ns = 33'333'333ull;
return;
}
const auto interval = numerator / denominator;
nominal_frame_interval_ns = interval == 0 ? 1ull : interval;
}
[[nodiscard]]
std::uint64_t remap_pts(const std::uint64_t input_pts_ns) {
if (!last_output_pts_ns) {
last_output_pts_ns = input_pts_ns;
segment_input_start_pts_ns = input_pts_ns;
segment_output_start_pts_ns = input_pts_ns;
boundary_pending = false;
return input_pts_ns;
}
if (boundary_pending || !segment_input_start_pts_ns) {
segment_input_start_pts_ns = input_pts_ns;
segment_output_start_pts_ns = *last_output_pts_ns + nominal_frame_interval_ns;
boundary_pending = false;
}
const auto input_delta =
input_pts_ns >= *segment_input_start_pts_ns
? input_pts_ns - *segment_input_start_pts_ns
: 0ull;
auto remapped_pts = segment_output_start_pts_ns + input_delta;
if (remapped_pts <= *last_output_pts_ns) {
remapped_pts = *last_output_pts_ns + nominal_frame_interval_ns;
}
last_output_pts_ns = remapped_pts;
return remapped_pts;
}
};
[[nodiscard]]
std::uint64_t body_tracking_timestamp_ns(const cvmmap::body_tracking_frame_t &frame) {
if (frame.header.timestamp_ns != 0) {
@@ -556,6 +639,7 @@ Status publish_access_units(
protocol::UdpRtpPublisher *rtp_publisher,
protocol::RtmpOutput *rtmp_output,
McapRecorderState *mcap_recorder,
LiveOutputContinuityState *live_output_continuity,
metrics::IngestEmitLatencyTracker &latency_tracker) {
for (auto &access_unit : access_units) {
if (access_unit.annexb_bytes.empty()) {
@@ -567,11 +651,29 @@ Status publish_access_units(
latency_tracker.note_emit_stall();
}
const auto should_rewrite_live_timestamps =
live_output_continuity != nullptr &&
(rtp_publisher != nullptr || rtmp_output != nullptr);
auto live_access_unit = access_unit;
const bool live_boundary_packet =
should_rewrite_live_timestamps &&
live_output_continuity->boundary_pending;
if (should_rewrite_live_timestamps) {
live_access_unit.stream_pts_ns = live_output_continuity->remap_pts(access_unit.stream_pts_ns);
}
if (live_boundary_packet) {
spdlog::info(
"PIPELINE_RESET_CONTINUITY first live packet keyframe={} source_pts_ns={} remapped_pts_ns={}",
access_unit.keyframe ? "true" : "false",
access_unit.stream_pts_ns,
live_access_unit.stream_pts_ns);
}
if (rtp_publisher != nullptr) {
rtp_publisher->publish_access_unit(access_unit.annexb_bytes, access_unit.stream_pts_ns);
rtp_publisher->publish_access_unit(live_access_unit.annexb_bytes, live_access_unit.stream_pts_ns);
}
if (rtmp_output != nullptr) {
auto publish = (*rtmp_output)->publish_access_unit(access_unit);
auto publish = (*rtmp_output)->publish_access_unit(live_access_unit);
if (!publish) {
return std::unexpected(publish.error());
}
@@ -607,6 +709,7 @@ Status drain_encoder(
protocol::UdpRtpPublisher *rtp_publisher,
protocol::RtmpOutput *rtmp_output,
McapRecorderState *mcap_recorder,
LiveOutputContinuityState *live_output_continuity,
metrics::IngestEmitLatencyTracker &latency_tracker) {
auto drained = flushing ? backend->flush() : backend->drain();
if (!drained) {
@@ -619,6 +722,7 @@ Status drain_encoder(
rtp_publisher,
rtmp_output,
mcap_recorder,
live_output_continuity,
latency_tracker);
}
@@ -678,9 +782,17 @@ int run_pipeline(const RuntimeConfig &config) {
input_endpoints->nats_target_key,
config.input.nats_url);
cvmmap::NatsControlService recorder_service(
config.input.uri,
input_endpoints->nats_target_key,
config.input.nats_url);
cvmmap::NatsControlServiceOptions{
.instance_name = input_endpoints->instance_name,
.namespace_name = input_endpoints->namespace_name,
.ipc_prefix = input_endpoints->ipc_prefix,
.base_name = input_endpoints->base_name,
.target_key = input_endpoints->nats_target_key,
.shm_name = input_endpoints->shm_name,
.zmq_addr = input_endpoints->zmq_endpoint,
.backend = std::string((*source)->backend_name()),
.nats_url = config.input.nats_url,
});
std::mutex nats_event_mutex{};
std::deque<std::vector<std::uint8_t>> pending_body_packets{};
std::deque<int32_t> pending_status_codes{};
@@ -737,6 +849,10 @@ int run_pipeline(const RuntimeConfig &config) {
PipelineStats stats{};
metrics::IngestEmitLatencyTracker latency_tracker{};
LiveOutputContinuityState live_output_continuity{};
const bool keep_live_outputs_on_reset =
config.latency.keep_stream_on_reset &&
(config.outputs.rtp.enabled || config.outputs.rtmp.enabled);
bool producer_offline{false};
bool started{false};
bool using_encoded_input{false};
@@ -766,6 +882,7 @@ int run_pipeline(const RuntimeConfig &config) {
warned_unknown_depth_unit = false;
using_encoded_input = false;
active_stream_info.reset();
live_output_continuity.reset();
rtp_publisher.reset();
rtmp_output.reset();
};
@@ -784,26 +901,55 @@ int run_pipeline(const RuntimeConfig &config) {
const auto start_outputs_from_stream_info =
[&](const encode::EncodedStreamInfo &stream_info, const ipc::FrameInfo &target_info) -> Status {
rtp_publisher.reset();
rtmp_output.reset();
if (config.outputs.rtp.enabled) {
auto created = protocol::UdpRtpPublisher::create(config, stream_info.codec);
if (!created) {
return unexpected_error(
ERR_INTERNAL,
"pipeline RTP publisher init failed: " + created.error());
const bool preserving_live_outputs =
keep_live_outputs_on_reset &&
live_output_continuity.boundary_pending &&
active_stream_info.has_value() &&
stream_info_equal(*active_stream_info, stream_info) &&
((config.outputs.rtp.enabled && rtp_publisher.has_value()) ||
(config.outputs.rtmp.enabled && rtmp_output.has_value()));
if (preserving_live_outputs) {
spdlog::info(
"PIPELINE_RESET_CONTINUITY preserving live outputs codec={} width={} height={}",
to_string(stream_info.codec),
stream_info.width,
stream_info.height);
live_output_continuity.update_nominal_frame_interval(stream_info);
} else {
if (keep_live_outputs_on_reset && live_output_continuity.boundary_pending && active_stream_info.has_value() &&
!stream_info_equal(*active_stream_info, stream_info)) {
spdlog::warn(
"PIPELINE_RESET_CONTINUITY fallback to output rebuild reason='stream_info_change' old_codec={} new_codec={} old={}x{} new={}x{}",
to_string(active_stream_info->codec),
to_string(stream_info.codec),
active_stream_info->width,
active_stream_info->height,
stream_info.width,
stream_info.height);
}
rtp_publisher.emplace(std::move(*created));
}
if (config.outputs.rtmp.enabled) {
auto created = protocol::make_rtmp_output(config, stream_info);
if (!created) {
return unexpected_error(
created.error().code,
"pipeline RTMP output init failed: " + format_error(created.error()));
rtp_publisher.reset();
rtmp_output.reset();
if (config.outputs.rtp.enabled) {
auto created = protocol::UdpRtpPublisher::create(config, stream_info.codec);
if (!created) {
return unexpected_error(
ERR_INTERNAL,
"pipeline RTP publisher init failed: " + created.error());
}
rtp_publisher.emplace(std::move(*created));
}
rtmp_output.emplace(std::move(*created));
if (config.outputs.rtmp.enabled) {
auto created = protocol::make_rtmp_output(config, stream_info);
if (!created) {
return unexpected_error(
created.error().code,
"pipeline RTMP output init failed: " + format_error(created.error()));
}
rtmp_output.emplace(std::move(*created));
}
live_output_continuity.note_new_session(stream_info);
}
update_mcap_stream_info(mcap_recorder, stream_info);
if (config.record.mcap.enabled) {
std::lock_guard lock(mcap_recorder.mutex);
@@ -865,35 +1011,49 @@ int run_pipeline(const RuntimeConfig &config) {
return start_outputs_from_stream_info(*stream_info, snapshot.metadata.info);
};
const auto idle_timeout = std::chrono::milliseconds(config.latency.ingest_idle_timeout_ms);
auto last_event = std::chrono::steady_clock::now();
const auto idle_timeout_enabled = config.latency.ingest_idle_timeout_ms > 0;
const auto idle_timeout = std::chrono::milliseconds(config.latency.ingest_idle_timeout_ms);
auto last_event = std::chrono::steady_clock::now();
while (true) {
std::deque<std::vector<std::uint8_t>> body_packets;
std::deque<int32_t> status_codes;
{
std::lock_guard lock(nats_event_mutex);
body_packets.swap(pending_body_packets);
status_codes.swap(pending_status_codes);
}
const bool had_nats_events = !body_packets.empty() || !status_codes.empty();
if (had_nats_events) {
last_event = std::chrono::steady_clock::now();
}
for (const auto status_code : status_codes) {
stats.status_messages += 1;
switch (static_cast<cvmmap::ModuleStatus>(status_code)) {
case cvmmap::ModuleStatus::Online:
spdlog::info("pipeline status event status=online");
producer_offline = false;
break;
case cvmmap::ModuleStatus::Offline:
spdlog::info("pipeline status event status=offline");
producer_offline = true;
break;
case cvmmap::ModuleStatus::StreamReset:
spdlog::info("pipeline status event status=stream_reset");
stats.resets += 1;
while (true) {
std::deque<std::vector<std::uint8_t>> body_packets;
std::deque<int32_t> status_codes;
{
std::lock_guard lock(nats_event_mutex);
body_packets.swap(pending_body_packets);
status_codes.swap(pending_status_codes);
}
const bool had_nats_events = !body_packets.empty() || !status_codes.empty();
if (had_nats_events) {
last_event = std::chrono::steady_clock::now();
}
for (const auto status_code : status_codes) {
stats.status_messages += 1;
switch (static_cast<cvmmap::ModuleStatus>(status_code)) {
case cvmmap::ModuleStatus::Online:
spdlog::info("pipeline status event status=online");
producer_offline = false;
break;
case cvmmap::ModuleStatus::Offline:
spdlog::info("pipeline status event status=offline");
producer_offline = true;
break;
case cvmmap::ModuleStatus::StreamReset:
spdlog::info("pipeline status event status=stream_reset");
stats.resets += 1;
if (keep_live_outputs_on_reset) {
live_output_continuity.note_reset_boundary();
if (!using_encoded_input && backend) {
(*backend)->shutdown();
started = false;
}
restart_pending = false;
restart_target_info.reset();
spdlog::info(
"PIPELINE_RESET_CONTINUITY armed outputs_active={} encoded_input={}",
(rtp_publisher.has_value() || rtmp_output.has_value()) ? "true" : "false",
using_encoded_input ? "true" : "false");
} else {
if (backend) {
(*backend)->shutdown();
}
@@ -905,86 +1065,88 @@ int run_pipeline(const RuntimeConfig &config) {
active_info.reset();
rtp_publisher.reset();
rtmp_output.reset();
break;
}
break;
}
for (const auto &body_bytes_vec : body_packets) {
const auto body_bytes = std::span<const std::uint8_t>(
body_bytes_vec.data(),
body_bytes_vec.size());
if (body_bytes.empty()) {
continue;
}
auto parsed_body = cvmmap::parse_body_tracking_message(body_bytes);
if (!parsed_body) {
spdlog::warn("pipeline body packet parse error: {}", parsed_body.error());
continue;
}
auto write_body = write_mcap_body_message(&mcap_recorder, record::RawBodyTrackingMessageView{
.timestamp_ns = body_tracking_timestamp_ns(*parsed_body),
.bytes = body_bytes,
});
if (!write_body) {
const auto reason = "pipeline body MCAP write failed: " + format_error(write_body.error());
restart_backend(reason, active_info);
break;
}
}
if (backend && !using_encoded_input) {
auto poll = (*backend)->poll();
if (!poll) {
const auto reason = format_error(poll.error());
restart_backend(reason, active_info);
}
}
std::array<zmq::pollitem_t, 1> poll_items{{
{subscriber.handle(), 0, ZMQ_POLLIN, 0},
}};
try {
zmq::poll(poll_items, std::chrono::milliseconds{20});
} catch (const zmq::error_t &e) {
spdlog::error("pipeline poll failed: {}", e.what());
return exit_code(PipelineExitCode::SubscriberError);
}
const bool frame_socket_ready = (poll_items[0].revents & ZMQ_POLLIN) != 0;
if (!frame_socket_ready) {
const auto now = std::chrono::steady_clock::now();
if (restart_pending && restart_target_info && backend && !using_encoded_input) {
auto start_result = attempt_raw_backend_start(*restart_target_info);
if (!start_result) {
spdlog::error("pipeline backend restart failed: {}", format_error(start_result.error()));
return exit_code(PipelineExitCode::RuntimeError);
}
}
if (now - last_event >= idle_timeout) {
spdlog::info("pipeline idle timeout reached ({} ms), stopping", config.latency.ingest_idle_timeout_ms);
break;
}
if (!producer_offline && started && backend && !using_encoded_input) {
auto drain = drain_encoder(
config,
*backend,
false,
stats,
rtp_publisher ? &*rtp_publisher : nullptr,
rtmp_output ? &*rtmp_output : nullptr,
&mcap_recorder,
latency_tracker);
if (!drain) {
const auto reason = format_error(drain.error());
restart_backend(reason, active_info);
}
}
}
for (const auto &body_bytes_vec : body_packets) {
const auto body_bytes = std::span<const std::uint8_t>(
body_bytes_vec.data(),
body_bytes_vec.size());
if (body_bytes.empty()) {
continue;
}
zmq::message_t message;
const auto recv_result = subscriber.recv(message, zmq::recv_flags::dontwait);
auto parsed_body = cvmmap::parse_body_tracking_message(body_bytes);
if (!parsed_body) {
spdlog::warn("pipeline body packet parse error: {}", parsed_body.error());
continue;
}
auto write_body = write_mcap_body_message(&mcap_recorder, record::RawBodyTrackingMessageView{
.timestamp_ns = body_tracking_timestamp_ns(*parsed_body),
.bytes = body_bytes,
});
if (!write_body) {
const auto reason = "pipeline body MCAP write failed: " + format_error(write_body.error());
restart_backend(reason, active_info);
break;
}
}
if (backend && !using_encoded_input) {
auto poll = (*backend)->poll();
if (!poll) {
const auto reason = format_error(poll.error());
restart_backend(reason, active_info);
}
}
std::array<zmq::pollitem_t, 1> poll_items{{
{subscriber.handle(), 0, ZMQ_POLLIN, 0},
}};
try {
zmq::poll(poll_items, std::chrono::milliseconds{20});
} catch (const zmq::error_t &e) {
spdlog::error("pipeline poll failed: {}", e.what());
return exit_code(PipelineExitCode::SubscriberError);
}
const bool frame_socket_ready = (poll_items[0].revents & ZMQ_POLLIN) != 0;
if (!frame_socket_ready) {
const auto now = std::chrono::steady_clock::now();
if (restart_pending && restart_target_info && backend && !using_encoded_input) {
auto start_result = attempt_raw_backend_start(*restart_target_info);
if (!start_result) {
spdlog::error("pipeline backend restart failed: {}", format_error(start_result.error()));
return exit_code(PipelineExitCode::RuntimeError);
}
}
if (idle_timeout_enabled && now - last_event >= idle_timeout) {
spdlog::info("pipeline idle timeout reached ({} ms), stopping", config.latency.ingest_idle_timeout_ms);
break;
}
if (!producer_offline && started && backend && !using_encoded_input) {
auto drain = drain_encoder(
config,
*backend,
false,
stats,
rtp_publisher ? &*rtp_publisher : nullptr,
rtmp_output ? &*rtmp_output : nullptr,
&mcap_recorder,
keep_live_outputs_on_reset ? &live_output_continuity : nullptr,
latency_tracker);
if (!drain) {
const auto reason = format_error(drain.error());
restart_backend(reason, active_info);
}
}
continue;
}
zmq::message_t message;
const auto recv_result = subscriber.recv(message, zmq::recv_flags::dontwait);
if (!recv_result) {
continue;
}
@@ -1075,6 +1237,14 @@ int run_pipeline(const RuntimeConfig &config) {
restart_backend(reason, snapshot->metadata.info);
continue;
}
if (keep_live_outputs_on_reset && live_output_continuity.encoded_keyframe_required && !access_unit->keyframe) {
spdlog::info(
"PIPELINE_RESET_CONTINUITY dropping encoded access unit until keyframe pts_ns={} source_timestamp_ns={}",
access_unit->stream_pts_ns,
access_unit->source_timestamp_ns);
continue;
}
live_output_continuity.encoded_keyframe_required = false;
std::vector<encode::EncodedAccessUnit> access_units{};
access_units.push_back(std::move(*access_unit));
auto publish = publish_access_units(
@@ -1084,6 +1254,7 @@ int run_pipeline(const RuntimeConfig &config) {
rtp_publisher ? &*rtp_publisher : nullptr,
rtmp_output ? &*rtmp_output : nullptr,
&mcap_recorder,
keep_live_outputs_on_reset ? &live_output_continuity : nullptr,
latency_tracker);
if (!publish) {
const auto reason = format_error(publish.error());
@@ -1094,6 +1265,7 @@ int run_pipeline(const RuntimeConfig &config) {
auto push = (*backend)->push_frame(encode::RawVideoFrame{
.info = snapshot->metadata.info,
.source_timestamp_ns = snapshot->metadata.timestamp_ns,
.force_keyframe = keep_live_outputs_on_reset && live_output_continuity.boundary_pending,
.bytes = std::span<const std::uint8_t>(snapshot_buffer.data(), snapshot->bytes_copied),
});
if (!push) {
@@ -1103,30 +1275,30 @@ int run_pipeline(const RuntimeConfig &config) {
}
}
if (!snapshot->depth.empty()) {
if (snapshot->depth_unit == ipc::DepthUnit::Unknown) {
if (!warned_unknown_depth_unit) {
spdlog::warn(
"pipeline depth plane present but depth_unit is unknown; assuming millimeters for MCAP recording, producer should upgrade to the ABI with explicit depth_unit metadata");
warned_unknown_depth_unit = true;
}
}
auto depth_map = make_depth_map_view(*snapshot);
if (!depth_map) {
const auto reason = "pipeline depth snapshot invalid: " + depth_map.error();
restart_backend(reason, active_info);
continue;
}
auto write_depth = write_mcap_depth_map(&mcap_recorder, *depth_map);
if (!write_depth) {
const auto reason = "pipeline depth MCAP write failed: " + format_error(write_depth.error());
restart_backend(reason, active_info);
continue;
if (!snapshot->depth.empty()) {
if (snapshot->depth_unit == ipc::DepthUnit::Unknown) {
if (!warned_unknown_depth_unit) {
spdlog::warn(
"pipeline depth plane present but depth_unit is unknown; assuming millimeters for MCAP recording, producer should upgrade to the ABI with explicit depth_unit metadata");
warned_unknown_depth_unit = true;
}
}
auto depth_map = make_depth_map_view(*snapshot);
if (!depth_map) {
const auto reason = "pipeline depth snapshot invalid: " + depth_map.error();
restart_backend(reason, active_info);
continue;
}
auto write_depth = write_mcap_depth_map(&mcap_recorder, *depth_map);
if (!write_depth) {
const auto reason = "pipeline depth MCAP write failed: " + format_error(write_depth.error());
restart_backend(reason, active_info);
continue;
}
}
stats.pushed_frames += 1;
if (!want_encoded_input) {
auto drain = drain_encoder(
@@ -1137,6 +1309,7 @@ int run_pipeline(const RuntimeConfig &config) {
rtp_publisher ? &*rtp_publisher : nullptr,
rtmp_output ? &*rtmp_output : nullptr,
&mcap_recorder,
keep_live_outputs_on_reset ? &live_output_continuity : nullptr,
latency_tracker);
if (!drain) {
const auto reason = format_error(drain.error());
@@ -1166,6 +1339,7 @@ int run_pipeline(const RuntimeConfig &config) {
rtp_publisher ? &*rtp_publisher : nullptr,
rtmp_output ? &*rtmp_output : nullptr,
&mcap_recorder,
keep_live_outputs_on_reset ? &live_output_continuity : nullptr,
latency_tracker);
if (!drain) {
spdlog::error("pipeline publish failed during flush: {}", format_error(drain.error()));