Add encoded SHM passthrough support

This commit is contained in:
2026-03-27 10:43:34 +08:00
parent 0c9f0a944f
commit bb3ace43b7
8 changed files with 480 additions and 98 deletions
+272 -87
View File
@@ -243,6 +243,90 @@ std::expected<record::RawDepthMapView, std::string> make_depth_map_view(const ip
};
}
[[nodiscard]]
bool snapshot_has_encoded_access_unit(const ipc::CoherentSnapshot &snapshot) {
return !snapshot.encoded_access_unit.empty() &&
snapshot.encoded_codec != ipc::EncodedCodec::Unknown &&
snapshot.encoded_bitstream_format == ipc::EncodedBitstreamFormat::AnnexB;
}
[[nodiscard]]
std::expected<CodecType, std::string> codec_type_from_snapshot(const ipc::CoherentSnapshot &snapshot) {
switch (snapshot.encoded_codec) {
case ipc::EncodedCodec::H264:
return CodecType::H264;
case ipc::EncodedCodec::H265:
return CodecType::H265;
case ipc::EncodedCodec::Unknown:
default:
return std::unexpected("encoded snapshot codec is unsupported");
}
}
[[nodiscard]]
std::expected<encode::EncodedStreamInfo, std::string> make_stream_info_from_snapshot(
const ipc::CoherentSnapshot &snapshot) {
auto codec = codec_type_from_snapshot(snapshot);
if (!codec) {
return std::unexpected(codec.error());
}
if (snapshot.encoded_bitstream_format != ipc::EncodedBitstreamFormat::AnnexB) {
return std::unexpected("encoded snapshot bitstream format is unsupported");
}
if (snapshot.metadata.info.width == 0 || snapshot.metadata.info.height == 0) {
return std::unexpected("encoded snapshot dimensions are invalid");
}
encode::EncodedStreamInfo stream_info{};
stream_info.codec = *codec;
stream_info.width = snapshot.metadata.info.width;
stream_info.height = snapshot.metadata.info.height;
stream_info.time_base_num = 1;
stream_info.time_base_den = 1'000'000'000u;
stream_info.frame_rate_num = snapshot.encoded_frame_rate_num == 0 ? 30u : snapshot.encoded_frame_rate_num;
stream_info.frame_rate_den = snapshot.encoded_frame_rate_den == 0 ? 1u : snapshot.encoded_frame_rate_den;
stream_info.bitstream_format = encode::EncodedBitstreamFormat::AnnexB;
stream_info.decoder_config.clear();
return stream_info;
}
[[nodiscard]]
std::expected<encode::EncodedAccessUnit, std::string> make_access_unit_from_snapshot(
const ipc::CoherentSnapshot &snapshot) {
auto codec = codec_type_from_snapshot(snapshot);
if (!codec) {
return std::unexpected(codec.error());
}
if (snapshot.encoded_access_unit.empty()) {
return std::unexpected("encoded snapshot access unit is empty");
}
encode::EncodedAccessUnit access_unit{};
access_unit.codec = *codec;
access_unit.source_timestamp_ns = snapshot.metadata.timestamp_ns;
access_unit.stream_pts_ns =
snapshot.encoded_stream_pts_ns == 0 ? snapshot.metadata.timestamp_ns : snapshot.encoded_stream_pts_ns;
access_unit.keyframe = (snapshot.encoded_flags & ipc::kEncodedFlagKeyframe) != 0;
access_unit.annexb_bytes.assign(
snapshot.encoded_access_unit.begin(),
snapshot.encoded_access_unit.end());
return access_unit;
}
[[nodiscard]]
bool stream_info_equal(
const encode::EncodedStreamInfo &lhs,
const encode::EncodedStreamInfo &rhs) {
return lhs.codec == rhs.codec &&
lhs.width == rhs.width &&
lhs.height == rhs.height &&
lhs.time_base_num == rhs.time_base_num &&
lhs.time_base_den == rhs.time_base_den &&
lhs.frame_rate_num == rhs.frame_rate_num &&
lhs.frame_rate_den == rhs.frame_rate_den &&
lhs.bitstream_format == rhs.bitstream_format;
}
[[nodiscard]]
std::uint64_t body_tracking_timestamp_ns(const cvmmap::body_tracking_frame_t &frame) {
if (frame.header.timestamp_ns != 0) {
@@ -559,11 +643,7 @@ int run_pipeline(const RuntimeConfig &config) {
return exit_code(PipelineExitCode::InputError);
}
auto backend = encode::make_encoder_backend(config);
if (!backend) {
spdlog::error("pipeline encoder backend selection failed: {}", format_error(backend.error()));
return exit_code(PipelineExitCode::InitializationError);
}
std::optional<encode::EncoderBackend> backend{};
auto shm = SharedMemoryView::open_readonly(input_endpoints->shm_name);
if (!shm) {
@@ -668,7 +748,9 @@ int run_pipeline(const RuntimeConfig &config) {
metrics::IngestEmitLatencyTracker latency_tracker{};
bool producer_offline{false};
bool started{false};
bool using_encoded_input{false};
std::optional<ipc::FrameInfo> active_info{};
std::optional<encode::EncodedStreamInfo> active_stream_info{};
std::optional<ipc::FrameInfo> restart_target_info{};
bool restart_pending{false};
bool warned_unknown_depth_unit{false};
@@ -678,33 +760,41 @@ int run_pipeline(const RuntimeConfig &config) {
stats.supervised_restarts += 1;
}
spdlog::warn(
"PIPELINE_RESTART backend={} reason='{}' restarts={}",
(*backend)->backend_name(),
"PIPELINE_RESTART mode={} reason='{}' restarts={}",
using_encoded_input
? "encoded_passthrough"
: (backend ? std::string((*backend)->backend_name()) : std::string("encoder_uninitialized")),
reason,
stats.supervised_restarts);
(*backend)->shutdown();
if (backend) {
(*backend)->shutdown();
}
started = false;
restart_pending = true;
restart_target_info = target_info;
warned_unknown_depth_unit = false;
using_encoded_input = false;
active_stream_info.reset();
rtmp_output.reset();
};
const auto attempt_backend_start = [&](const ipc::FrameInfo &target_info) -> Status {
(*backend)->shutdown();
rtmp_output.reset();
auto init = (*backend)->init(config, target_info);
if (!init) {
return std::unexpected(init.error());
const auto ensure_encoder_backend = [&]() -> Status {
if (backend) {
return {};
}
auto created = encode::make_encoder_backend(config);
if (!created) {
return std::unexpected(created.error());
}
backend.emplace(std::move(*created));
return {};
};
const auto start_outputs_from_stream_info =
[&](const encode::EncodedStreamInfo &stream_info, const ipc::FrameInfo &target_info) -> Status {
rtmp_output.reset();
if (config.outputs.rtmp.enabled) {
auto stream_info = (*backend)->stream_info();
if (!stream_info) {
return unexpected_error(
stream_info.error().code,
"pipeline RTMP output stream info unavailable: " + format_error(stream_info.error()));
}
auto created = protocol::make_rtmp_output(config, *stream_info);
auto created = protocol::make_rtmp_output(config, stream_info);
if (!created) {
return unexpected_error(
created.error().code,
@@ -712,17 +802,11 @@ int run_pipeline(const RuntimeConfig &config) {
}
rtmp_output.emplace(std::move(*created));
}
auto stream_info = (*backend)->stream_info();
if (!stream_info) {
return unexpected_error(
stream_info.error().code,
"pipeline encoder stream info unavailable: " + format_error(stream_info.error()));
}
update_mcap_stream_info(mcap_recorder, *stream_info);
update_mcap_stream_info(mcap_recorder, stream_info);
if (config.record.mcap.enabled) {
std::lock_guard lock(mcap_recorder.mutex);
if (!mcap_recorder.sink) {
auto created = record::McapRecordSink::create(config, *stream_info);
auto created = record::McapRecordSink::create(config, stream_info);
if (!created) {
return unexpected_error(
ERR_INTERNAL,
@@ -742,9 +826,43 @@ int run_pipeline(const RuntimeConfig &config) {
restart_target_info.reset();
warned_unknown_depth_unit = false;
active_info = target_info;
active_stream_info = stream_info;
return {};
};
const auto attempt_raw_backend_start = [&](const ipc::FrameInfo &target_info) -> Status {
auto ensure = ensure_encoder_backend();
if (!ensure) {
return ensure;
}
(*backend)->shutdown();
auto init = (*backend)->init(config, target_info);
if (!init) {
return std::unexpected(init.error());
}
auto stream_info = (*backend)->stream_info();
if (!stream_info) {
return unexpected_error(
stream_info.error().code,
"pipeline encoder stream info unavailable: " + format_error(stream_info.error()));
}
using_encoded_input = false;
return start_outputs_from_stream_info(*stream_info, target_info);
};
const auto attempt_encoded_passthrough_start = [&](
const ipc::CoherentSnapshot &snapshot) -> Status {
auto stream_info = make_stream_info_from_snapshot(snapshot);
if (!stream_info) {
return unexpected_error(ERR_PROTOCOL, stream_info.error());
}
if (backend) {
(*backend)->shutdown();
}
using_encoded_input = true;
return start_outputs_from_stream_info(*stream_info, snapshot.metadata.info);
};
const auto idle_timeout = std::chrono::milliseconds(config.latency.ingest_idle_timeout_ms);
auto last_event = std::chrono::steady_clock::now();
@@ -774,10 +892,14 @@ int run_pipeline(const RuntimeConfig &config) {
case cvmmap::ModuleStatus::StreamReset:
spdlog::info("pipeline status event status=stream_reset");
stats.resets += 1;
(*backend)->shutdown();
if (backend) {
(*backend)->shutdown();
}
started = false;
restart_pending = false;
restart_target_info.reset();
active_stream_info.reset();
using_encoded_input = false;
active_info.reset();
rtmp_output.reset();
break;
@@ -808,10 +930,12 @@ int run_pipeline(const RuntimeConfig &config) {
}
}
auto poll = (*backend)->poll();
if (!poll) {
const auto reason = format_error(poll.error());
restart_backend(reason, active_info);
if (backend && !using_encoded_input) {
auto poll = (*backend)->poll();
if (!poll) {
const auto reason = format_error(poll.error());
restart_backend(reason, active_info);
}
}
std::array<zmq::pollitem_t, 1> poll_items{{
@@ -827,34 +951,34 @@ int run_pipeline(const RuntimeConfig &config) {
const bool frame_socket_ready = (poll_items[0].revents & ZMQ_POLLIN) != 0;
if (!frame_socket_ready) {
const auto now = std::chrono::steady_clock::now();
if (restart_pending && restart_target_info) {
auto start_result = attempt_backend_start(*restart_target_info);
if (!start_result) {
spdlog::error("pipeline backend restart failed: {}", format_error(start_result.error()));
return exit_code(PipelineExitCode::RuntimeError);
if (restart_pending && restart_target_info && backend && !using_encoded_input) {
auto start_result = attempt_raw_backend_start(*restart_target_info);
if (!start_result) {
spdlog::error("pipeline backend restart failed: {}", format_error(start_result.error()));
return exit_code(PipelineExitCode::RuntimeError);
}
}
}
if (now - last_event >= idle_timeout) {
spdlog::info("pipeline idle timeout reached ({} ms), stopping", config.latency.ingest_idle_timeout_ms);
break;
}
if (!producer_offline && started) {
auto drain = drain_encoder(
config,
*backend,
false,
stats,
rtp_publisher ? &*rtp_publisher : nullptr,
rtmp_output ? &*rtmp_output : nullptr,
&mcap_recorder,
latency_tracker);
if (!drain) {
const auto reason = format_error(drain.error());
restart_backend(reason, active_info);
if (now - last_event >= idle_timeout) {
spdlog::info("pipeline idle timeout reached ({} ms), stopping", config.latency.ingest_idle_timeout_ms);
break;
}
if (!producer_offline && started && backend && !using_encoded_input) {
auto drain = drain_encoder(
config,
*backend,
false,
stats,
rtp_publisher ? &*rtp_publisher : nullptr,
rtmp_output ? &*rtmp_output : nullptr,
&mcap_recorder,
latency_tracker);
if (!drain) {
const auto reason = format_error(drain.error());
restart_backend(reason, active_info);
}
}
continue;
}
continue;
}
zmq::message_t message;
const auto recv_result = subscriber.recv(message, zmq::recv_flags::dontwait);
@@ -894,14 +1018,45 @@ int run_pipeline(const RuntimeConfig &config) {
continue;
}
const bool has_encoded_access_unit = snapshot_has_encoded_access_unit(*snapshot);
const bool want_encoded_input =
config.input.video_source == InputVideoSource::Encoded ||
(config.input.video_source == InputVideoSource::Auto && has_encoded_access_unit);
if (config.input.video_source == InputVideoSource::Encoded && !has_encoded_access_unit) {
spdlog::error("pipeline encoded input requested but SHM snapshot does not contain an encoded access unit");
return exit_code(PipelineExitCode::InitializationError);
}
if (active_info && !frame_info_equal(*active_info, snapshot->metadata.info)) {
stats.format_rebuilds += 1;
restart_backend("frame_info_change", snapshot->metadata.info);
}
if (!started || restart_pending) {
if (started && using_encoded_input != want_encoded_input) {
stats.format_rebuilds += 1;
restart_backend("input_video_source_switch", snapshot->metadata.info);
}
if (want_encoded_input) {
auto stream_info = make_stream_info_from_snapshot(*snapshot);
if (!stream_info) {
spdlog::error("pipeline encoded snapshot metadata invalid: {}", stream_info.error());
return exit_code(PipelineExitCode::InitializationError);
}
if (active_stream_info && !stream_info_equal(*active_stream_info, *stream_info)) {
stats.format_rebuilds += 1;
restart_backend("encoded_stream_info_change", snapshot->metadata.info);
}
if (!started || restart_pending) {
auto start_result = attempt_encoded_passthrough_start(*snapshot);
if (!start_result) {
spdlog::error("pipeline encoded passthrough init failed: {}", format_error(start_result.error()));
return exit_code(PipelineExitCode::InitializationError);
}
}
} else if (!started || restart_pending) {
const auto target_info = restart_target_info.value_or(snapshot->metadata.info);
auto start_result = attempt_backend_start(target_info);
auto start_result = attempt_raw_backend_start(target_info);
if (!start_result) {
spdlog::error("pipeline backend init failed: {}", format_error(start_result.error()));
return exit_code(PipelineExitCode::InitializationError);
@@ -910,15 +1065,39 @@ int run_pipeline(const RuntimeConfig &config) {
latency_tracker.note_ingest();
auto push = (*backend)->push_frame(encode::RawVideoFrame{
.info = snapshot->metadata.info,
.source_timestamp_ns = snapshot->metadata.timestamp_ns,
.bytes = std::span<const std::uint8_t>(snapshot_buffer.data(), snapshot->bytes_copied),
});
if (!push) {
const auto reason = format_error(push.error());
restart_backend(reason, active_info);
continue;
if (want_encoded_input) {
auto access_unit = make_access_unit_from_snapshot(*snapshot);
if (!access_unit) {
const auto reason = "pipeline encoded snapshot invalid: " + access_unit.error();
restart_backend(reason, snapshot->metadata.info);
continue;
}
std::vector<encode::EncodedAccessUnit> access_units{};
access_units.push_back(std::move(*access_unit));
auto publish = publish_access_units(
config,
std::move(access_units),
stats,
rtp_publisher ? &*rtp_publisher : nullptr,
rtmp_output ? &*rtmp_output : nullptr,
&mcap_recorder,
latency_tracker);
if (!publish) {
const auto reason = format_error(publish.error());
restart_backend(reason, active_info);
continue;
}
} else {
auto push = (*backend)->push_frame(encode::RawVideoFrame{
.info = snapshot->metadata.info,
.source_timestamp_ns = snapshot->metadata.timestamp_ns,
.bytes = std::span<const std::uint8_t>(snapshot_buffer.data(), snapshot->bytes_copied),
});
if (!push) {
const auto reason = format_error(push.error());
restart_backend(reason, active_info);
continue;
}
}
if (!snapshot->depth.empty()) {
@@ -946,19 +1125,21 @@ int run_pipeline(const RuntimeConfig &config) {
}
stats.pushed_frames += 1;
auto drain = drain_encoder(
config,
*backend,
false,
stats,
rtp_publisher ? &*rtp_publisher : nullptr,
rtmp_output ? &*rtmp_output : nullptr,
&mcap_recorder,
latency_tracker);
if (!drain) {
const auto reason = format_error(drain.error());
restart_backend(reason, active_info);
continue;
if (!want_encoded_input) {
auto drain = drain_encoder(
config,
*backend,
false,
stats,
rtp_publisher ? &*rtp_publisher : nullptr,
rtmp_output ? &*rtmp_output : nullptr,
&mcap_recorder,
latency_tracker);
if (!drain) {
const auto reason = format_error(drain.error());
restart_backend(reason, active_info);
continue;
}
}
if (config.latency.ingest_max_frames > 0 && stats.pushed_frames >= config.latency.ingest_max_frames) {
@@ -973,7 +1154,7 @@ int run_pipeline(const RuntimeConfig &config) {
nats_client.Stop();
if (started) {
if (started && backend && !using_encoded_input) {
auto drain = drain_encoder(
config,
*backend,
@@ -989,14 +1170,18 @@ int run_pipeline(const RuntimeConfig &config) {
}
}
(*backend)->shutdown();
if (backend) {
(*backend)->shutdown();
}
std::ignore = stop_mcap_recording(mcap_recorder);
recorder_service.Stop();
spdlog::info(
"PIPELINE_METRICS codec={} backend={} sync_messages={} status_messages={} torn_frames={} pushed_frames={} encoded_access_units={} resets={} format_rebuilds={} supervised_restarts={}",
to_string(config.encoder.codec),
(*backend)->backend_name(),
active_stream_info ? to_string(active_stream_info->codec) : to_string(config.encoder.codec),
using_encoded_input
? std::string("encoded_passthrough")
: (backend ? std::string((*backend)->backend_name()) : std::string("encoder_uninitialized")),
stats.sync_messages,
stats.status_messages,
stats.torn_frames,