Compare commits

...

2 Commits

Author SHA1 Message Date
crosstyan c320bf01af Use source codec for encoded RTP passthrough 2026-03-27 11:19:03 +08:00
crosstyan bb3ace43b7 Add encoded SHM passthrough support 2026-03-27 10:43:34 +08:00
10 changed files with 502 additions and 109 deletions
+5 -1
View File
@@ -58,7 +58,10 @@ set(CVMMAP_LOCAL_CORE_DIR "${CVMMAP_LOCAL_ROOT}/build/core" CACHE PATH "Path to
set(CVMMAP_LOCAL_NATS_STATIC "${CVMMAP_LOCAL_ROOT}/build/lib/libnats_static.a" CACHE PATH "Path to local cnats static library") set(CVMMAP_LOCAL_NATS_STATIC "${CVMMAP_LOCAL_ROOT}/build/lib/libnats_static.a" CACHE PATH "Path to local cnats static library")
if (CVMMAP_CNATS_PROVIDER STREQUAL "system") if (CVMMAP_CNATS_PROVIDER STREQUAL "system")
find_package(cnats CONFIG REQUIRED) find_package(cnats CONFIG REQUIRED)
find_package(cvmmap-core CONFIG QUIET PATHS "${CVMMAP_LOCAL_CORE_DIR}" NO_DEFAULT_PATH)
if (NOT TARGET cvmmap::client)
find_package(cvmmap-core CONFIG QUIET) find_package(cvmmap-core CONFIG QUIET)
endif()
else() else()
if (NOT EXISTS "${CVMMAP_LOCAL_NATS_STATIC}") if (NOT EXISTS "${CVMMAP_LOCAL_NATS_STATIC}")
message(FATAL_ERROR message(FATAL_ERROR
@@ -256,7 +259,8 @@ add_library(cvmmap_streamer_common STATIC
target_include_directories(cvmmap_streamer_common target_include_directories(cvmmap_streamer_common
PUBLIC PUBLIC
"${CMAKE_CURRENT_LIST_DIR}/include" "${CMAKE_CURRENT_LIST_DIR}/include"
"${CMAKE_CURRENT_BINARY_DIR}") "${CMAKE_CURRENT_BINARY_DIR}"
"${CVMMAP_LOCAL_ROOT}/core/include")
set(CVMMAP_STREAMER_LINK_DEPS set(CVMMAP_STREAMER_LINK_DEPS
Threads::Threads Threads::Threads
+89
View File
@@ -0,0 +1,89 @@
# Encoded Passthrough From `cv-mmap` ABI v2.1
## Summary
`cvmmap-streamer` can now consume encoded access units directly from a `cv-mmap` shared-memory snapshot when the producer exposes the ABI v2.1 encoded plane.
This is intended for producers such as `cv-mmap` with `video.backend = "udp_rtp"`:
- `cv-mmap` receives RTP H.265
- `cv-mmap` parses once and publishes raw BGR plus encoded AU in the same snapshot
- `cvmmap-streamer` can forward or mux the encoded AU directly
The video no longer needs a decode -> encode round trip inside the streamer.
## Input selection
`input.video_source` controls how the pipeline uses the snapshot:
```toml
[input]
uri = "cvmmap://default"
nats_url = "nats://localhost:4222"
video_source = "auto"
```
Valid values:
- `auto`: prefer encoded passthrough when the encoded AU plane is present, otherwise use raw encoding
- `raw`: always use the raw left plane and the local encoder backend
- `encoded`: require the encoded AU plane; fail if it is missing
CLI equivalent:
```text
--input-video-source auto|raw|encoded
```
## Encoded passthrough behavior
When `video_source` resolves to encoded mode:
- no encoder backend is created
- the streamer reads codec, bitstream format, keyframe flag, frame rate, and stream PTS from the SHM metadata
- width and height still come from the raw left-plane frame metadata
- the encoded AU is forwarded as-is
Current assumptions:
- codec is H.265 or H.264
- bitstream format is Annex B
- one SHM encoded plane contains one AU
- time base is treated as `1/1e9`
## Outputs
### RTP
RTP output accepts passthrough access units directly.
- in encoded mode, SDP signaling and RTP packetization follow the codec from SHM metadata
- `config.encoder.codec` is ignored for RTP while passthrough is active
- raw mode still uses the configured local encoder and its codec
### RTMP
RTMP output accepts passthrough access units directly.
- If decoder config is present, it is used.
- If decoder config is absent, the output relies on in-band parameter sets from keyframes.
For `cv-mmap udp_rtp`, this works because the producer republishes keyframes with VPS/SPS/PPS in-band.
### MCAP
MCAP recording writes the compressed video directly.
- no local encoder is used
- keyframes are written exactly from the encoded AU payload
- depth and body-tracking recording continue to use the same raw/depth/NATS paths as before
## Fallback to raw mode
If the producer does not expose the encoded plane, `auto` mode falls back to the existing raw pipeline:
- read raw BGR from SHM
- push frames through the configured encoder backend
- publish or record the encoder output
This keeps existing producers compatible.
@@ -40,6 +40,12 @@ enum class EncoderDeviceType {
Software, Software,
}; };
enum class InputVideoSource {
Auto,
Raw,
Encoded,
};
enum class McapCompression { enum class McapCompression {
None, None,
Lz4, Lz4,
@@ -49,6 +55,7 @@ enum class McapCompression {
struct InputConfig { struct InputConfig {
std::string uri{"cvmmap://default"}; std::string uri{"cvmmap://default"};
std::string nats_url{"nats://localhost:4222"}; std::string nats_url{"nats://localhost:4222"};
InputVideoSource video_source{InputVideoSource::Auto};
}; };
struct EncoderConfig { struct EncoderConfig {
@@ -125,6 +132,7 @@ std::string_view to_string(RtmpMode mode);
std::string_view to_string(RtmpTransportType transport); std::string_view to_string(RtmpTransportType transport);
std::string_view to_string(EncoderBackendType backend); std::string_view to_string(EncoderBackendType backend);
std::string_view to_string(EncoderDeviceType device); std::string_view to_string(EncoderDeviceType device);
std::string_view to_string(InputVideoSource source);
std::string_view to_string(McapCompression compression); std::string_view to_string(McapCompression compression);
std::expected<McapCompression, std::string> parse_mcap_compression(std::string_view raw); std::expected<McapCompression, std::string> parse_mcap_compression(std::string_view raw);
+27
View File
@@ -51,6 +51,19 @@ enum class DepthUnit : std::uint8_t {
Meter = 2, Meter = 2,
}; };
enum class EncodedCodec : std::uint8_t {
Unknown = 0,
H264 = 1,
H265 = 2,
};
enum class EncodedBitstreamFormat : std::uint8_t {
Unknown = 0,
AnnexB = 1,
};
constexpr std::uint16_t kEncodedFlagKeyframe = 0x0001u;
enum class ModuleStatus : std::int32_t { enum class ModuleStatus : std::int32_t {
Online = 0xa1, Online = 0xa1,
Offline = 0xa0, Offline = 0xa0,
@@ -163,22 +176,36 @@ struct ControlResponseMessage {
struct ValidatedShmView { struct ValidatedShmView {
FrameMetadata metadata; FrameMetadata metadata;
DepthUnit depth_unit{DepthUnit::Unknown}; DepthUnit depth_unit{DepthUnit::Unknown};
EncodedCodec encoded_codec{EncodedCodec::Unknown};
EncodedBitstreamFormat encoded_bitstream_format{EncodedBitstreamFormat::Unknown};
std::uint16_t encoded_flags{0};
std::uint16_t encoded_frame_rate_num{0};
std::uint16_t encoded_frame_rate_den{0};
std::uint64_t encoded_stream_pts_ns{0};
std::span<const std::uint8_t> payload; std::span<const std::uint8_t> payload;
std::span<const std::uint8_t> left; std::span<const std::uint8_t> left;
std::optional<FrameInfo> depth_info{}; std::optional<FrameInfo> depth_info{};
std::span<const std::uint8_t> depth{}; std::span<const std::uint8_t> depth{};
std::optional<FrameInfo> confidence_info{}; std::optional<FrameInfo> confidence_info{};
std::span<const std::uint8_t> confidence{}; std::span<const std::uint8_t> confidence{};
std::span<const std::uint8_t> encoded_access_unit{};
}; };
struct CoherentSnapshot { struct CoherentSnapshot {
FrameMetadata metadata; FrameMetadata metadata;
DepthUnit depth_unit{DepthUnit::Unknown}; DepthUnit depth_unit{DepthUnit::Unknown};
EncodedCodec encoded_codec{EncodedCodec::Unknown};
EncodedBitstreamFormat encoded_bitstream_format{EncodedBitstreamFormat::Unknown};
std::uint16_t encoded_flags{0};
std::uint16_t encoded_frame_rate_num{0};
std::uint16_t encoded_frame_rate_den{0};
std::uint64_t encoded_stream_pts_ns{0};
std::span<const std::uint8_t> left; std::span<const std::uint8_t> left;
std::optional<FrameInfo> depth_info{}; std::optional<FrameInfo> depth_info{};
std::span<const std::uint8_t> depth{}; std::span<const std::uint8_t> depth{};
std::optional<FrameInfo> confidence_info{}; std::optional<FrameInfo> confidence_info{};
std::span<const std::uint8_t> confidence{}; std::span<const std::uint8_t> confidence{};
std::span<const std::uint8_t> encoded_access_unit{};
std::size_t bytes_copied; std::size_t bytes_copied;
}; };
@@ -33,7 +33,7 @@ public:
UdpRtpPublisher &operator=(UdpRtpPublisher &&other) noexcept; UdpRtpPublisher &operator=(UdpRtpPublisher &&other) noexcept;
[[nodiscard]] [[nodiscard]]
static std::expected<UdpRtpPublisher, std::string> create(const RuntimeConfig &config); static std::expected<UdpRtpPublisher, std::string> create(const RuntimeConfig &config, CodecType codec);
void publish_access_unit(std::span<const std::uint8_t> access_unit, std::uint64_t pts_ns); void publish_access_unit(std::span<const std::uint8_t> access_unit, std::uint64_t pts_ns);
+42
View File
@@ -155,6 +155,19 @@ std::expected<EncoderDeviceType, std::string> parse_encoder_device(std::string_v
return std::unexpected("invalid encoder device: '" + std::string(raw) + "' (expected: auto|nvidia|software)"); return std::unexpected("invalid encoder device: '" + std::string(raw) + "' (expected: auto|nvidia|software)");
} }
std::expected<InputVideoSource, std::string> parse_input_video_source(std::string_view raw) {
if (raw == "auto") {
return InputVideoSource::Auto;
}
if (raw == "raw") {
return InputVideoSource::Raw;
}
if (raw == "encoded") {
return InputVideoSource::Encoded;
}
return std::unexpected("invalid input video source: '" + std::string(raw) + "' (expected: auto|raw|encoded)");
}
std::expected<McapCompression, std::string> parse_mcap_compression_impl(std::string_view raw) { std::expected<McapCompression, std::string> parse_mcap_compression_impl(std::string_view raw) {
if (raw == "none") { if (raw == "none") {
return McapCompression::None; return McapCompression::None;
@@ -263,6 +276,13 @@ std::expected<void, std::string> apply_toml_file(RuntimeConfig &config, const st
if (auto value = toml_value<std::string>(table, "input.nats_url")) { if (auto value = toml_value<std::string>(table, "input.nats_url")) {
config.input.nats_url = *value; config.input.nats_url = *value;
} }
if (auto value = toml_value<std::string>(table, "input.video_source")) {
auto parsed = parse_input_video_source(*value);
if (!parsed) {
return std::unexpected(parsed.error());
}
config.input.video_source = *parsed;
}
if (auto value = toml_value<std::string>(table, "run_mode")) { if (auto value = toml_value<std::string>(table, "run_mode")) {
auto parsed = parse_run_mode(*value); auto parsed = parse_run_mode(*value);
if (!parsed) { if (!parsed) {
@@ -553,6 +573,18 @@ std::string_view to_string(EncoderDeviceType device) {
return "unknown"; return "unknown";
} }
std::string_view to_string(InputVideoSource source) {
switch (source) {
case InputVideoSource::Auto:
return "auto";
case InputVideoSource::Raw:
return "raw";
case InputVideoSource::Encoded:
return "encoded";
}
return "unknown";
}
std::string_view to_string(McapCompression compression) { std::string_view to_string(McapCompression compression) {
switch (compression) { switch (compression) {
case McapCompression::None: case McapCompression::None:
@@ -571,6 +603,7 @@ std::expected<RuntimeConfig, std::string> parse_runtime_config(int argc, char **
std::string config_path_raw{}; std::string config_path_raw{};
std::string input_uri_raw{}; std::string input_uri_raw{};
std::string input_nats_url_raw{}; std::string input_nats_url_raw{};
std::string input_video_source_raw{};
std::string run_mode_raw{}; std::string run_mode_raw{};
std::string codec_raw{}; std::string codec_raw{};
std::string encoder_backend_raw{}; std::string encoder_backend_raw{};
@@ -612,6 +645,7 @@ std::expected<RuntimeConfig, std::string> parse_runtime_config(int argc, char **
app.add_option("--config", config_path_raw); app.add_option("--config", config_path_raw);
app.add_option("--input-uri", input_uri_raw); app.add_option("--input-uri", input_uri_raw);
app.add_option("--nats-url", input_nats_url_raw); app.add_option("--nats-url", input_nats_url_raw);
app.add_option("--input-video-source", input_video_source_raw);
app.add_option("--run-mode", run_mode_raw); app.add_option("--run-mode", run_mode_raw);
app.add_option("--codec", codec_raw); app.add_option("--codec", codec_raw);
app.add_option("--encoder-backend", encoder_backend_raw); app.add_option("--encoder-backend", encoder_backend_raw);
@@ -670,6 +704,13 @@ std::expected<RuntimeConfig, std::string> parse_runtime_config(int argc, char **
if (!input_nats_url_raw.empty()) { if (!input_nats_url_raw.empty()) {
config.input.nats_url = input_nats_url_raw; config.input.nats_url = input_nats_url_raw;
} }
if (!input_video_source_raw.empty()) {
auto parsed = parse_input_video_source(input_video_source_raw);
if (!parsed) {
return std::unexpected(parsed.error());
}
config.input.video_source = *parsed;
}
if (!run_mode_raw.empty()) { if (!run_mode_raw.empty()) {
auto parsed = parse_run_mode(run_mode_raw); auto parsed = parse_run_mode(run_mode_raw);
if (!parsed) { if (!parsed) {
@@ -938,6 +979,7 @@ std::string summarize_runtime_config(const RuntimeConfig &config) {
std::ostringstream ss; std::ostringstream ss;
ss << "input.uri=" << config.input.uri; ss << "input.uri=" << config.input.uri;
ss << ", input.nats_url=" << config.input.nats_url; ss << ", input.nats_url=" << config.input.nats_url;
ss << ", input.video_source=" << to_string(config.input.video_source);
ss << ", run_mode=" << to_string(config.run_mode); ss << ", run_mode=" << to_string(config.run_mode);
ss << ", encoder.backend=" << to_string(config.encoder.backend); ss << ", encoder.backend=" << to_string(config.encoder.backend);
ss << ", encoder.device=" << to_string(config.encoder.device); ss << ", encoder.device=" << to_string(config.encoder.device);
+42 -1
View File
@@ -173,6 +173,31 @@ namespace {
return converted; return converted;
} }
[[nodiscard]]
EncodedCodec from_core_encoded_codec(const cvmmap::EncodedCodec codec) {
switch (codec) {
case cvmmap::EncodedCodec::H264:
return EncodedCodec::H264;
case cvmmap::EncodedCodec::H265:
return EncodedCodec::H265;
case cvmmap::EncodedCodec::Unknown:
default:
return EncodedCodec::Unknown;
}
}
[[nodiscard]]
EncodedBitstreamFormat from_core_encoded_bitstream_format(
const cvmmap::EncodedBitstreamFormat format) {
switch (format) {
case cvmmap::EncodedBitstreamFormat::AnnexB:
return EncodedBitstreamFormat::AnnexB;
case cvmmap::EncodedBitstreamFormat::Unknown:
default:
return EncodedBitstreamFormat::Unknown;
}
}
[[nodiscard]] [[nodiscard]]
std::size_t span_offset( std::size_t span_offset(
const std::span<const std::uint8_t> outer, const std::span<const std::uint8_t> outer,
@@ -198,6 +223,7 @@ namespace {
}; };
consider(metadata.depth_plane); consider(metadata.depth_plane);
consider(metadata.confidence_plane); consider(metadata.confidence_plane);
consider(metadata.encoded_access_unit);
return payload_bytes; return payload_bytes;
} }
@@ -420,6 +446,13 @@ std::expected<ValidatedShmView, ParseError> validate_shm_region(std::span<const
return ValidatedShmView{ return ValidatedShmView{
.metadata = from_core_metadata(metadata_result->normalized_metadata), .metadata = from_core_metadata(metadata_result->normalized_metadata),
.depth_unit = from_core_depth_unit(metadata_result->depth_unit), .depth_unit = from_core_depth_unit(metadata_result->depth_unit),
.encoded_codec = from_core_encoded_codec(metadata_result->encoded_codec),
.encoded_bitstream_format =
from_core_encoded_bitstream_format(metadata_result->encoded_bitstream_format),
.encoded_flags = metadata_result->encoded_flags,
.encoded_frame_rate_num = metadata_result->encoded_frame_rate_num,
.encoded_frame_rate_den = metadata_result->encoded_frame_rate_den,
.encoded_stream_pts_ns = metadata_result->encoded_stream_pts_ns,
.payload = payload_region.first(payload_bytes), .payload = payload_region.first(payload_bytes),
.left = metadata_result->left_plane, .left = metadata_result->left_plane,
.depth_info = metadata_result->depth_info .depth_info = metadata_result->depth_info
@@ -429,7 +462,8 @@ std::expected<ValidatedShmView, ParseError> validate_shm_region(std::span<const
.confidence_info = metadata_result->confidence_info .confidence_info = metadata_result->confidence_info
? std::optional<FrameInfo>(from_core_frame_info(*metadata_result->confidence_info)) ? std::optional<FrameInfo>(from_core_frame_info(*metadata_result->confidence_info))
: std::nullopt, : std::nullopt,
.confidence = metadata_result->confidence_plane}; .confidence = metadata_result->confidence_plane,
.encoded_access_unit = metadata_result->encoded_access_unit};
} }
std::expected<CoherentSnapshot, SnapshotError> read_coherent_snapshot( std::expected<CoherentSnapshot, SnapshotError> read_coherent_snapshot(
@@ -466,11 +500,18 @@ std::expected<CoherentSnapshot, SnapshotError> read_coherent_snapshot(
return CoherentSnapshot{ return CoherentSnapshot{
.metadata = first->metadata, .metadata = first->metadata,
.depth_unit = first->depth_unit, .depth_unit = first->depth_unit,
.encoded_codec = first->encoded_codec,
.encoded_bitstream_format = first->encoded_bitstream_format,
.encoded_flags = first->encoded_flags,
.encoded_frame_rate_num = first->encoded_frame_rate_num,
.encoded_frame_rate_den = first->encoded_frame_rate_den,
.encoded_stream_pts_ns = first->encoded_stream_pts_ns,
.left = translate_span(first->payload, copied_payload, first->left), .left = translate_span(first->payload, copied_payload, first->left),
.depth_info = first->depth_info, .depth_info = first->depth_info,
.depth = translate_span(first->payload, copied_payload, first->depth), .depth = translate_span(first->payload, copied_payload, first->depth),
.confidence_info = first->confidence_info, .confidence_info = first->confidence_info,
.confidence = translate_span(first->payload, copied_payload, first->confidence), .confidence = translate_span(first->payload, copied_payload, first->confidence),
.encoded_access_unit = translate_span(first->payload, copied_payload, first->encoded_access_unit),
.bytes_copied = first->payload.size()}; .bytes_copied = first->payload.size()};
} }
+231 -43
View File
@@ -243,6 +243,90 @@ std::expected<record::RawDepthMapView, std::string> make_depth_map_view(const ip
}; };
} }
[[nodiscard]]
bool snapshot_has_encoded_access_unit(const ipc::CoherentSnapshot &snapshot) {
return !snapshot.encoded_access_unit.empty() &&
snapshot.encoded_codec != ipc::EncodedCodec::Unknown &&
snapshot.encoded_bitstream_format == ipc::EncodedBitstreamFormat::AnnexB;
}
[[nodiscard]]
std::expected<CodecType, std::string> codec_type_from_snapshot(const ipc::CoherentSnapshot &snapshot) {
switch (snapshot.encoded_codec) {
case ipc::EncodedCodec::H264:
return CodecType::H264;
case ipc::EncodedCodec::H265:
return CodecType::H265;
case ipc::EncodedCodec::Unknown:
default:
return std::unexpected("encoded snapshot codec is unsupported");
}
}
[[nodiscard]]
std::expected<encode::EncodedStreamInfo, std::string> make_stream_info_from_snapshot(
const ipc::CoherentSnapshot &snapshot) {
auto codec = codec_type_from_snapshot(snapshot);
if (!codec) {
return std::unexpected(codec.error());
}
if (snapshot.encoded_bitstream_format != ipc::EncodedBitstreamFormat::AnnexB) {
return std::unexpected("encoded snapshot bitstream format is unsupported");
}
if (snapshot.metadata.info.width == 0 || snapshot.metadata.info.height == 0) {
return std::unexpected("encoded snapshot dimensions are invalid");
}
encode::EncodedStreamInfo stream_info{};
stream_info.codec = *codec;
stream_info.width = snapshot.metadata.info.width;
stream_info.height = snapshot.metadata.info.height;
stream_info.time_base_num = 1;
stream_info.time_base_den = 1'000'000'000u;
stream_info.frame_rate_num = snapshot.encoded_frame_rate_num == 0 ? 30u : snapshot.encoded_frame_rate_num;
stream_info.frame_rate_den = snapshot.encoded_frame_rate_den == 0 ? 1u : snapshot.encoded_frame_rate_den;
stream_info.bitstream_format = encode::EncodedBitstreamFormat::AnnexB;
stream_info.decoder_config.clear();
return stream_info;
}
[[nodiscard]]
std::expected<encode::EncodedAccessUnit, std::string> make_access_unit_from_snapshot(
const ipc::CoherentSnapshot &snapshot) {
auto codec = codec_type_from_snapshot(snapshot);
if (!codec) {
return std::unexpected(codec.error());
}
if (snapshot.encoded_access_unit.empty()) {
return std::unexpected("encoded snapshot access unit is empty");
}
encode::EncodedAccessUnit access_unit{};
access_unit.codec = *codec;
access_unit.source_timestamp_ns = snapshot.metadata.timestamp_ns;
access_unit.stream_pts_ns =
snapshot.encoded_stream_pts_ns == 0 ? snapshot.metadata.timestamp_ns : snapshot.encoded_stream_pts_ns;
access_unit.keyframe = (snapshot.encoded_flags & ipc::kEncodedFlagKeyframe) != 0;
access_unit.annexb_bytes.assign(
snapshot.encoded_access_unit.begin(),
snapshot.encoded_access_unit.end());
return access_unit;
}
[[nodiscard]]
bool stream_info_equal(
const encode::EncodedStreamInfo &lhs,
const encode::EncodedStreamInfo &rhs) {
return lhs.codec == rhs.codec &&
lhs.width == rhs.width &&
lhs.height == rhs.height &&
lhs.time_base_num == rhs.time_base_num &&
lhs.time_base_den == rhs.time_base_den &&
lhs.frame_rate_num == rhs.frame_rate_num &&
lhs.frame_rate_den == rhs.frame_rate_den &&
lhs.bitstream_format == rhs.bitstream_format;
}
[[nodiscard]] [[nodiscard]]
std::uint64_t body_tracking_timestamp_ns(const cvmmap::body_tracking_frame_t &frame) { std::uint64_t body_tracking_timestamp_ns(const cvmmap::body_tracking_frame_t &frame) {
if (frame.header.timestamp_ns != 0) { if (frame.header.timestamp_ns != 0) {
@@ -559,11 +643,7 @@ int run_pipeline(const RuntimeConfig &config) {
return exit_code(PipelineExitCode::InputError); return exit_code(PipelineExitCode::InputError);
} }
auto backend = encode::make_encoder_backend(config); std::optional<encode::EncoderBackend> backend{};
if (!backend) {
spdlog::error("pipeline encoder backend selection failed: {}", format_error(backend.error()));
return exit_code(PipelineExitCode::InitializationError);
}
auto shm = SharedMemoryView::open_readonly(input_endpoints->shm_name); auto shm = SharedMemoryView::open_readonly(input_endpoints->shm_name);
if (!shm) { if (!shm) {
@@ -655,20 +735,13 @@ int run_pipeline(const RuntimeConfig &config) {
return exit_code(PipelineExitCode::SubscriberError); return exit_code(PipelineExitCode::SubscriberError);
} }
if (config.outputs.rtp.enabled) {
auto created = protocol::UdpRtpPublisher::create(config);
if (!created) {
spdlog::error("pipeline RTP publisher init failed: {}", created.error());
return exit_code(PipelineExitCode::InitializationError);
}
rtp_publisher.emplace(std::move(*created));
}
PipelineStats stats{}; PipelineStats stats{};
metrics::IngestEmitLatencyTracker latency_tracker{}; metrics::IngestEmitLatencyTracker latency_tracker{};
bool producer_offline{false}; bool producer_offline{false};
bool started{false}; bool started{false};
bool using_encoded_input{false};
std::optional<ipc::FrameInfo> active_info{}; std::optional<ipc::FrameInfo> active_info{};
std::optional<encode::EncodedStreamInfo> active_stream_info{};
std::optional<ipc::FrameInfo> restart_target_info{}; std::optional<ipc::FrameInfo> restart_target_info{};
bool restart_pending{false}; bool restart_pending{false};
bool warned_unknown_depth_unit{false}; bool warned_unknown_depth_unit{false};
@@ -678,33 +751,52 @@ int run_pipeline(const RuntimeConfig &config) {
stats.supervised_restarts += 1; stats.supervised_restarts += 1;
} }
spdlog::warn( spdlog::warn(
"PIPELINE_RESTART backend={} reason='{}' restarts={}", "PIPELINE_RESTART mode={} reason='{}' restarts={}",
(*backend)->backend_name(), using_encoded_input
? "encoded_passthrough"
: (backend ? std::string((*backend)->backend_name()) : std::string("encoder_uninitialized")),
reason, reason,
stats.supervised_restarts); stats.supervised_restarts);
if (backend) {
(*backend)->shutdown(); (*backend)->shutdown();
}
started = false; started = false;
restart_pending = true; restart_pending = true;
restart_target_info = target_info; restart_target_info = target_info;
warned_unknown_depth_unit = false; warned_unknown_depth_unit = false;
using_encoded_input = false;
active_stream_info.reset();
rtp_publisher.reset();
rtmp_output.reset(); rtmp_output.reset();
}; };
const auto attempt_backend_start = [&](const ipc::FrameInfo &target_info) -> Status { const auto ensure_encoder_backend = [&]() -> Status {
(*backend)->shutdown(); if (backend) {
return {};
}
auto created = encode::make_encoder_backend(config);
if (!created) {
return std::unexpected(created.error());
}
backend.emplace(std::move(*created));
return {};
};
const auto start_outputs_from_stream_info =
[&](const encode::EncodedStreamInfo &stream_info, const ipc::FrameInfo &target_info) -> Status {
rtp_publisher.reset();
rtmp_output.reset(); rtmp_output.reset();
auto init = (*backend)->init(config, target_info); if (config.outputs.rtp.enabled) {
if (!init) { auto created = protocol::UdpRtpPublisher::create(config, stream_info.codec);
return std::unexpected(init.error()); if (!created) {
return unexpected_error(
ERR_INTERNAL,
"pipeline RTP publisher init failed: " + created.error());
}
rtp_publisher.emplace(std::move(*created));
} }
if (config.outputs.rtmp.enabled) { if (config.outputs.rtmp.enabled) {
auto stream_info = (*backend)->stream_info(); auto created = protocol::make_rtmp_output(config, stream_info);
if (!stream_info) {
return unexpected_error(
stream_info.error().code,
"pipeline RTMP output stream info unavailable: " + format_error(stream_info.error()));
}
auto created = protocol::make_rtmp_output(config, *stream_info);
if (!created) { if (!created) {
return unexpected_error( return unexpected_error(
created.error().code, created.error().code,
@@ -712,17 +804,11 @@ int run_pipeline(const RuntimeConfig &config) {
} }
rtmp_output.emplace(std::move(*created)); rtmp_output.emplace(std::move(*created));
} }
auto stream_info = (*backend)->stream_info(); update_mcap_stream_info(mcap_recorder, stream_info);
if (!stream_info) {
return unexpected_error(
stream_info.error().code,
"pipeline encoder stream info unavailable: " + format_error(stream_info.error()));
}
update_mcap_stream_info(mcap_recorder, *stream_info);
if (config.record.mcap.enabled) { if (config.record.mcap.enabled) {
std::lock_guard lock(mcap_recorder.mutex); std::lock_guard lock(mcap_recorder.mutex);
if (!mcap_recorder.sink) { if (!mcap_recorder.sink) {
auto created = record::McapRecordSink::create(config, *stream_info); auto created = record::McapRecordSink::create(config, stream_info);
if (!created) { if (!created) {
return unexpected_error( return unexpected_error(
ERR_INTERNAL, ERR_INTERNAL,
@@ -742,9 +828,43 @@ int run_pipeline(const RuntimeConfig &config) {
restart_target_info.reset(); restart_target_info.reset();
warned_unknown_depth_unit = false; warned_unknown_depth_unit = false;
active_info = target_info; active_info = target_info;
active_stream_info = stream_info;
return {}; return {};
}; };
const auto attempt_raw_backend_start = [&](const ipc::FrameInfo &target_info) -> Status {
auto ensure = ensure_encoder_backend();
if (!ensure) {
return ensure;
}
(*backend)->shutdown();
auto init = (*backend)->init(config, target_info);
if (!init) {
return std::unexpected(init.error());
}
auto stream_info = (*backend)->stream_info();
if (!stream_info) {
return unexpected_error(
stream_info.error().code,
"pipeline encoder stream info unavailable: " + format_error(stream_info.error()));
}
using_encoded_input = false;
return start_outputs_from_stream_info(*stream_info, target_info);
};
const auto attempt_encoded_passthrough_start = [&](
const ipc::CoherentSnapshot &snapshot) -> Status {
auto stream_info = make_stream_info_from_snapshot(snapshot);
if (!stream_info) {
return unexpected_error(ERR_PROTOCOL, stream_info.error());
}
if (backend) {
(*backend)->shutdown();
}
using_encoded_input = true;
return start_outputs_from_stream_info(*stream_info, snapshot.metadata.info);
};
const auto idle_timeout = std::chrono::milliseconds(config.latency.ingest_idle_timeout_ms); const auto idle_timeout = std::chrono::milliseconds(config.latency.ingest_idle_timeout_ms);
auto last_event = std::chrono::steady_clock::now(); auto last_event = std::chrono::steady_clock::now();
@@ -774,11 +894,16 @@ int run_pipeline(const RuntimeConfig &config) {
case cvmmap::ModuleStatus::StreamReset: case cvmmap::ModuleStatus::StreamReset:
spdlog::info("pipeline status event status=stream_reset"); spdlog::info("pipeline status event status=stream_reset");
stats.resets += 1; stats.resets += 1;
if (backend) {
(*backend)->shutdown(); (*backend)->shutdown();
}
started = false; started = false;
restart_pending = false; restart_pending = false;
restart_target_info.reset(); restart_target_info.reset();
active_stream_info.reset();
using_encoded_input = false;
active_info.reset(); active_info.reset();
rtp_publisher.reset();
rtmp_output.reset(); rtmp_output.reset();
break; break;
} }
@@ -808,11 +933,13 @@ int run_pipeline(const RuntimeConfig &config) {
} }
} }
if (backend && !using_encoded_input) {
auto poll = (*backend)->poll(); auto poll = (*backend)->poll();
if (!poll) { if (!poll) {
const auto reason = format_error(poll.error()); const auto reason = format_error(poll.error());
restart_backend(reason, active_info); restart_backend(reason, active_info);
} }
}
std::array<zmq::pollitem_t, 1> poll_items{{ std::array<zmq::pollitem_t, 1> poll_items{{
{subscriber.handle(), 0, ZMQ_POLLIN, 0}, {subscriber.handle(), 0, ZMQ_POLLIN, 0},
@@ -827,8 +954,8 @@ int run_pipeline(const RuntimeConfig &config) {
const bool frame_socket_ready = (poll_items[0].revents & ZMQ_POLLIN) != 0; const bool frame_socket_ready = (poll_items[0].revents & ZMQ_POLLIN) != 0;
if (!frame_socket_ready) { if (!frame_socket_ready) {
const auto now = std::chrono::steady_clock::now(); const auto now = std::chrono::steady_clock::now();
if (restart_pending && restart_target_info) { if (restart_pending && restart_target_info && backend && !using_encoded_input) {
auto start_result = attempt_backend_start(*restart_target_info); auto start_result = attempt_raw_backend_start(*restart_target_info);
if (!start_result) { if (!start_result) {
spdlog::error("pipeline backend restart failed: {}", format_error(start_result.error())); spdlog::error("pipeline backend restart failed: {}", format_error(start_result.error()));
return exit_code(PipelineExitCode::RuntimeError); return exit_code(PipelineExitCode::RuntimeError);
@@ -838,7 +965,7 @@ int run_pipeline(const RuntimeConfig &config) {
spdlog::info("pipeline idle timeout reached ({} ms), stopping", config.latency.ingest_idle_timeout_ms); spdlog::info("pipeline idle timeout reached ({} ms), stopping", config.latency.ingest_idle_timeout_ms);
break; break;
} }
if (!producer_offline && started) { if (!producer_offline && started && backend && !using_encoded_input) {
auto drain = drain_encoder( auto drain = drain_encoder(
config, config,
*backend, *backend,
@@ -894,14 +1021,45 @@ int run_pipeline(const RuntimeConfig &config) {
continue; continue;
} }
const bool has_encoded_access_unit = snapshot_has_encoded_access_unit(*snapshot);
const bool want_encoded_input =
config.input.video_source == InputVideoSource::Encoded ||
(config.input.video_source == InputVideoSource::Auto && has_encoded_access_unit);
if (config.input.video_source == InputVideoSource::Encoded && !has_encoded_access_unit) {
spdlog::error("pipeline encoded input requested but SHM snapshot does not contain an encoded access unit");
return exit_code(PipelineExitCode::InitializationError);
}
if (active_info && !frame_info_equal(*active_info, snapshot->metadata.info)) { if (active_info && !frame_info_equal(*active_info, snapshot->metadata.info)) {
stats.format_rebuilds += 1; stats.format_rebuilds += 1;
restart_backend("frame_info_change", snapshot->metadata.info); restart_backend("frame_info_change", snapshot->metadata.info);
} }
if (started && using_encoded_input != want_encoded_input) {
stats.format_rebuilds += 1;
restart_backend("input_video_source_switch", snapshot->metadata.info);
}
if (want_encoded_input) {
auto stream_info = make_stream_info_from_snapshot(*snapshot);
if (!stream_info) {
spdlog::error("pipeline encoded snapshot metadata invalid: {}", stream_info.error());
return exit_code(PipelineExitCode::InitializationError);
}
if (active_stream_info && !stream_info_equal(*active_stream_info, *stream_info)) {
stats.format_rebuilds += 1;
restart_backend("encoded_stream_info_change", snapshot->metadata.info);
}
if (!started || restart_pending) { if (!started || restart_pending) {
auto start_result = attempt_encoded_passthrough_start(*snapshot);
if (!start_result) {
spdlog::error("pipeline encoded passthrough init failed: {}", format_error(start_result.error()));
return exit_code(PipelineExitCode::InitializationError);
}
}
} else if (!started || restart_pending) {
const auto target_info = restart_target_info.value_or(snapshot->metadata.info); const auto target_info = restart_target_info.value_or(snapshot->metadata.info);
auto start_result = attempt_backend_start(target_info); auto start_result = attempt_raw_backend_start(target_info);
if (!start_result) { if (!start_result) {
spdlog::error("pipeline backend init failed: {}", format_error(start_result.error())); spdlog::error("pipeline backend init failed: {}", format_error(start_result.error()));
return exit_code(PipelineExitCode::InitializationError); return exit_code(PipelineExitCode::InitializationError);
@@ -910,6 +1068,29 @@ int run_pipeline(const RuntimeConfig &config) {
latency_tracker.note_ingest(); latency_tracker.note_ingest();
if (want_encoded_input) {
auto access_unit = make_access_unit_from_snapshot(*snapshot);
if (!access_unit) {
const auto reason = "pipeline encoded snapshot invalid: " + access_unit.error();
restart_backend(reason, snapshot->metadata.info);
continue;
}
std::vector<encode::EncodedAccessUnit> access_units{};
access_units.push_back(std::move(*access_unit));
auto publish = publish_access_units(
config,
std::move(access_units),
stats,
rtp_publisher ? &*rtp_publisher : nullptr,
rtmp_output ? &*rtmp_output : nullptr,
&mcap_recorder,
latency_tracker);
if (!publish) {
const auto reason = format_error(publish.error());
restart_backend(reason, active_info);
continue;
}
} else {
auto push = (*backend)->push_frame(encode::RawVideoFrame{ auto push = (*backend)->push_frame(encode::RawVideoFrame{
.info = snapshot->metadata.info, .info = snapshot->metadata.info,
.source_timestamp_ns = snapshot->metadata.timestamp_ns, .source_timestamp_ns = snapshot->metadata.timestamp_ns,
@@ -920,6 +1101,7 @@ int run_pipeline(const RuntimeConfig &config) {
restart_backend(reason, active_info); restart_backend(reason, active_info);
continue; continue;
} }
}
if (!snapshot->depth.empty()) { if (!snapshot->depth.empty()) {
if (snapshot->depth_unit == ipc::DepthUnit::Unknown) { if (snapshot->depth_unit == ipc::DepthUnit::Unknown) {
@@ -946,6 +1128,7 @@ int run_pipeline(const RuntimeConfig &config) {
} }
stats.pushed_frames += 1; stats.pushed_frames += 1;
if (!want_encoded_input) {
auto drain = drain_encoder( auto drain = drain_encoder(
config, config,
*backend, *backend,
@@ -960,6 +1143,7 @@ int run_pipeline(const RuntimeConfig &config) {
restart_backend(reason, active_info); restart_backend(reason, active_info);
continue; continue;
} }
}
if (config.latency.ingest_max_frames > 0 && stats.pushed_frames >= config.latency.ingest_max_frames) { if (config.latency.ingest_max_frames > 0 && stats.pushed_frames >= config.latency.ingest_max_frames) {
spdlog::info("pipeline reached ingest_max_frames={}, stopping", config.latency.ingest_max_frames); spdlog::info("pipeline reached ingest_max_frames={}, stopping", config.latency.ingest_max_frames);
@@ -973,7 +1157,7 @@ int run_pipeline(const RuntimeConfig &config) {
nats_client.Stop(); nats_client.Stop();
if (started) { if (started && backend && !using_encoded_input) {
auto drain = drain_encoder( auto drain = drain_encoder(
config, config,
*backend, *backend,
@@ -989,14 +1173,18 @@ int run_pipeline(const RuntimeConfig &config) {
} }
} }
if (backend) {
(*backend)->shutdown(); (*backend)->shutdown();
}
std::ignore = stop_mcap_recording(mcap_recorder); std::ignore = stop_mcap_recording(mcap_recorder);
recorder_service.Stop(); recorder_service.Stop();
spdlog::info( spdlog::info(
"PIPELINE_METRICS codec={} backend={} sync_messages={} status_messages={} torn_frames={} pushed_frames={} encoded_access_units={} resets={} format_rebuilds={} supervised_restarts={}", "PIPELINE_METRICS codec={} backend={} sync_messages={} status_messages={} torn_frames={} pushed_frames={} encoded_access_units={} resets={} format_rebuilds={} supervised_restarts={}",
to_string(config.encoder.codec), active_stream_info ? to_string(active_stream_info->codec) : to_string(config.encoder.codec),
(*backend)->backend_name(), using_encoded_input
? std::string("encoded_passthrough")
: (backend ? std::string((*backend)->backend_name()) : std::string("encoder_uninitialized")),
stats.sync_messages, stats.sync_messages,
stats.status_messages, stats.status_messages,
stats.torn_frames, stats.torn_frames,
+2 -8
View File
@@ -191,7 +191,7 @@ Result<std::vector<std::uint8_t>> decoder_config_to_annexb(
CodecType codec, CodecType codec,
std::span<const std::uint8_t> decoder_config) { std::span<const std::uint8_t> decoder_config) {
if (decoder_config.empty()) { if (decoder_config.empty()) {
return unexpected_error(ERR_PROTOCOL, "decoder config is required"); return std::vector<std::uint8_t>{};
} }
if (looks_like_annexb(decoder_config)) { if (looks_like_annexb(decoder_config)) {
return std::vector<std::uint8_t>(decoder_config.begin(), decoder_config.end()); return std::vector<std::uint8_t>(decoder_config.begin(), decoder_config.end());
@@ -248,10 +248,6 @@ public:
static Result<RtmpOutput> create( static Result<RtmpOutput> create(
const RuntimeConfig &config, const RuntimeConfig &config,
const encode::EncodedStreamInfo &stream_info) { const encode::EncodedStreamInfo &stream_info) {
if (stream_info.decoder_config.empty()) {
return unexpected_error(ERR_PROTOCOL, "libavformat RTMP requires encoder decoder_config/extradata");
}
avformat_network_init(); avformat_network_init();
LibavformatRtmpOutput output{}; LibavformatRtmpOutput output{};
@@ -442,9 +438,7 @@ public:
}; };
auto decoder_config_annexb = decoder_config_to_annexb(stream_info.codec, stream_info.decoder_config); auto decoder_config_annexb = decoder_config_to_annexb(stream_info.codec, stream_info.decoder_config);
if (!decoder_config_annexb) { if (!decoder_config_annexb) {
return unexpected_error( return std::unexpected(decoder_config_annexb.error());
ERR_PROTOCOL,
"ffmpeg_process RTMP requires decoder config: " + format_error(decoder_config_annexb.error()));
} }
output.decoder_config_annexb_ = std::move(*decoder_config_annexb); output.decoder_config_annexb_ = std::move(*decoder_config_annexb);
+2 -2
View File
@@ -179,7 +179,7 @@ UdpRtpPublisher &UdpRtpPublisher::operator=(UdpRtpPublisher &&other) noexcept {
return *this; return *this;
} }
std::expected<UdpRtpPublisher, std::string> UdpRtpPublisher::create(const RuntimeConfig &config) { std::expected<UdpRtpPublisher, std::string> UdpRtpPublisher::create(const RuntimeConfig &config, const CodecType codec) {
if (!config.outputs.rtp.enabled) { if (!config.outputs.rtp.enabled) {
return std::unexpected("invalid RTP publisher init: RTP output disabled"); return std::unexpected("invalid RTP publisher init: RTP output disabled");
} }
@@ -191,7 +191,7 @@ std::expected<UdpRtpPublisher, std::string> UdpRtpPublisher::create(const Runtim
publisher.destination_host_ = *config.outputs.rtp.host; publisher.destination_host_ = *config.outputs.rtp.host;
publisher.destination_port_ = *config.outputs.rtp.port; publisher.destination_port_ = *config.outputs.rtp.port;
publisher.payload_type_ = config.outputs.rtp.payload_type; publisher.payload_type_ = config.outputs.rtp.payload_type;
publisher.codec_ = config.encoder.codec; publisher.codec_ = codec;
publisher.sequence_ = compute_initial_sequence(); publisher.sequence_ = compute_initial_sequence();
publisher.ssrc_ = compute_ssrc( publisher.ssrc_ = compute_ssrc(
publisher.destination_host_, publisher.destination_host_,