Compare commits
2 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| c320bf01af | |||
| bb3ace43b7 |
+5
-1
@@ -58,7 +58,10 @@ set(CVMMAP_LOCAL_CORE_DIR "${CVMMAP_LOCAL_ROOT}/build/core" CACHE PATH "Path to
|
||||
set(CVMMAP_LOCAL_NATS_STATIC "${CVMMAP_LOCAL_ROOT}/build/lib/libnats_static.a" CACHE PATH "Path to local cnats static library")
|
||||
if (CVMMAP_CNATS_PROVIDER STREQUAL "system")
|
||||
find_package(cnats CONFIG REQUIRED)
|
||||
find_package(cvmmap-core CONFIG QUIET PATHS "${CVMMAP_LOCAL_CORE_DIR}" NO_DEFAULT_PATH)
|
||||
if (NOT TARGET cvmmap::client)
|
||||
find_package(cvmmap-core CONFIG QUIET)
|
||||
endif()
|
||||
else()
|
||||
if (NOT EXISTS "${CVMMAP_LOCAL_NATS_STATIC}")
|
||||
message(FATAL_ERROR
|
||||
@@ -256,7 +259,8 @@ add_library(cvmmap_streamer_common STATIC
|
||||
target_include_directories(cvmmap_streamer_common
|
||||
PUBLIC
|
||||
"${CMAKE_CURRENT_LIST_DIR}/include"
|
||||
"${CMAKE_CURRENT_BINARY_DIR}")
|
||||
"${CMAKE_CURRENT_BINARY_DIR}"
|
||||
"${CVMMAP_LOCAL_ROOT}/core/include")
|
||||
|
||||
set(CVMMAP_STREAMER_LINK_DEPS
|
||||
Threads::Threads
|
||||
|
||||
@@ -0,0 +1,89 @@
|
||||
# Encoded Passthrough From `cv-mmap` ABI v2.1
|
||||
|
||||
## Summary
|
||||
|
||||
`cvmmap-streamer` can now consume encoded access units directly from a `cv-mmap` shared-memory snapshot when the producer exposes the ABI v2.1 encoded plane.
|
||||
|
||||
This is intended for producers such as `cv-mmap` with `video.backend = "udp_rtp"`:
|
||||
|
||||
- `cv-mmap` receives RTP H.265
|
||||
- `cv-mmap` parses once and publishes raw BGR plus encoded AU in the same snapshot
|
||||
- `cvmmap-streamer` can forward or mux the encoded AU directly
|
||||
|
||||
The video no longer needs a decode -> encode round trip inside the streamer.
|
||||
|
||||
## Input selection
|
||||
|
||||
`input.video_source` controls how the pipeline uses the snapshot:
|
||||
|
||||
```toml
|
||||
[input]
|
||||
uri = "cvmmap://default"
|
||||
nats_url = "nats://localhost:4222"
|
||||
video_source = "auto"
|
||||
```
|
||||
|
||||
Valid values:
|
||||
|
||||
- `auto`: prefer encoded passthrough when the encoded AU plane is present, otherwise use raw encoding
|
||||
- `raw`: always use the raw left plane and the local encoder backend
|
||||
- `encoded`: require the encoded AU plane; fail if it is missing
|
||||
|
||||
CLI equivalent:
|
||||
|
||||
```text
|
||||
--input-video-source auto|raw|encoded
|
||||
```
|
||||
|
||||
## Encoded passthrough behavior
|
||||
|
||||
When `video_source` resolves to encoded mode:
|
||||
|
||||
- no encoder backend is created
|
||||
- the streamer reads codec, bitstream format, keyframe flag, frame rate, and stream PTS from the SHM metadata
|
||||
- width and height still come from the raw left-plane frame metadata
|
||||
- the encoded AU is forwarded as-is
|
||||
|
||||
Current assumptions:
|
||||
|
||||
- codec is H.265 or H.264
|
||||
- bitstream format is Annex B
|
||||
- one SHM encoded plane contains one AU
|
||||
- time base is treated as `1/1e9`
|
||||
|
||||
## Outputs
|
||||
|
||||
### RTP
|
||||
|
||||
RTP output accepts passthrough access units directly.
|
||||
|
||||
- in encoded mode, SDP signaling and RTP packetization follow the codec from SHM metadata
|
||||
- `config.encoder.codec` is ignored for RTP while passthrough is active
|
||||
- raw mode still uses the configured local encoder and its codec
|
||||
|
||||
### RTMP
|
||||
|
||||
RTMP output accepts passthrough access units directly.
|
||||
|
||||
- If decoder config is present, it is used.
|
||||
- If decoder config is absent, the output relies on in-band parameter sets from keyframes.
|
||||
|
||||
For `cv-mmap udp_rtp`, this works because the producer republishes keyframes with VPS/SPS/PPS in-band.
|
||||
|
||||
### MCAP
|
||||
|
||||
MCAP recording writes the compressed video directly.
|
||||
|
||||
- no local encoder is used
|
||||
- keyframes are written exactly from the encoded AU payload
|
||||
- depth and body-tracking recording continue to use the same raw/depth/NATS paths as before
|
||||
|
||||
## Fallback to raw mode
|
||||
|
||||
If the producer does not expose the encoded plane, `auto` mode falls back to the existing raw pipeline:
|
||||
|
||||
- read raw BGR from SHM
|
||||
- push frames through the configured encoder backend
|
||||
- publish or record the encoder output
|
||||
|
||||
This keeps existing producers compatible.
|
||||
@@ -40,6 +40,12 @@ enum class EncoderDeviceType {
|
||||
Software,
|
||||
};
|
||||
|
||||
enum class InputVideoSource {
|
||||
Auto,
|
||||
Raw,
|
||||
Encoded,
|
||||
};
|
||||
|
||||
enum class McapCompression {
|
||||
None,
|
||||
Lz4,
|
||||
@@ -49,6 +55,7 @@ enum class McapCompression {
|
||||
struct InputConfig {
|
||||
std::string uri{"cvmmap://default"};
|
||||
std::string nats_url{"nats://localhost:4222"};
|
||||
InputVideoSource video_source{InputVideoSource::Auto};
|
||||
};
|
||||
|
||||
struct EncoderConfig {
|
||||
@@ -125,6 +132,7 @@ std::string_view to_string(RtmpMode mode);
|
||||
std::string_view to_string(RtmpTransportType transport);
|
||||
std::string_view to_string(EncoderBackendType backend);
|
||||
std::string_view to_string(EncoderDeviceType device);
|
||||
std::string_view to_string(InputVideoSource source);
|
||||
std::string_view to_string(McapCompression compression);
|
||||
std::expected<McapCompression, std::string> parse_mcap_compression(std::string_view raw);
|
||||
|
||||
|
||||
@@ -51,6 +51,19 @@ enum class DepthUnit : std::uint8_t {
|
||||
Meter = 2,
|
||||
};
|
||||
|
||||
enum class EncodedCodec : std::uint8_t {
|
||||
Unknown = 0,
|
||||
H264 = 1,
|
||||
H265 = 2,
|
||||
};
|
||||
|
||||
enum class EncodedBitstreamFormat : std::uint8_t {
|
||||
Unknown = 0,
|
||||
AnnexB = 1,
|
||||
};
|
||||
|
||||
constexpr std::uint16_t kEncodedFlagKeyframe = 0x0001u;
|
||||
|
||||
enum class ModuleStatus : std::int32_t {
|
||||
Online = 0xa1,
|
||||
Offline = 0xa0,
|
||||
@@ -163,22 +176,36 @@ struct ControlResponseMessage {
|
||||
struct ValidatedShmView {
|
||||
FrameMetadata metadata;
|
||||
DepthUnit depth_unit{DepthUnit::Unknown};
|
||||
EncodedCodec encoded_codec{EncodedCodec::Unknown};
|
||||
EncodedBitstreamFormat encoded_bitstream_format{EncodedBitstreamFormat::Unknown};
|
||||
std::uint16_t encoded_flags{0};
|
||||
std::uint16_t encoded_frame_rate_num{0};
|
||||
std::uint16_t encoded_frame_rate_den{0};
|
||||
std::uint64_t encoded_stream_pts_ns{0};
|
||||
std::span<const std::uint8_t> payload;
|
||||
std::span<const std::uint8_t> left;
|
||||
std::optional<FrameInfo> depth_info{};
|
||||
std::span<const std::uint8_t> depth{};
|
||||
std::optional<FrameInfo> confidence_info{};
|
||||
std::span<const std::uint8_t> confidence{};
|
||||
std::span<const std::uint8_t> encoded_access_unit{};
|
||||
};
|
||||
|
||||
struct CoherentSnapshot {
|
||||
FrameMetadata metadata;
|
||||
DepthUnit depth_unit{DepthUnit::Unknown};
|
||||
EncodedCodec encoded_codec{EncodedCodec::Unknown};
|
||||
EncodedBitstreamFormat encoded_bitstream_format{EncodedBitstreamFormat::Unknown};
|
||||
std::uint16_t encoded_flags{0};
|
||||
std::uint16_t encoded_frame_rate_num{0};
|
||||
std::uint16_t encoded_frame_rate_den{0};
|
||||
std::uint64_t encoded_stream_pts_ns{0};
|
||||
std::span<const std::uint8_t> left;
|
||||
std::optional<FrameInfo> depth_info{};
|
||||
std::span<const std::uint8_t> depth{};
|
||||
std::optional<FrameInfo> confidence_info{};
|
||||
std::span<const std::uint8_t> confidence{};
|
||||
std::span<const std::uint8_t> encoded_access_unit{};
|
||||
std::size_t bytes_copied;
|
||||
};
|
||||
|
||||
|
||||
@@ -33,7 +33,7 @@ public:
|
||||
UdpRtpPublisher &operator=(UdpRtpPublisher &&other) noexcept;
|
||||
|
||||
[[nodiscard]]
|
||||
static std::expected<UdpRtpPublisher, std::string> create(const RuntimeConfig &config);
|
||||
static std::expected<UdpRtpPublisher, std::string> create(const RuntimeConfig &config, CodecType codec);
|
||||
|
||||
void publish_access_unit(std::span<const std::uint8_t> access_unit, std::uint64_t pts_ns);
|
||||
|
||||
|
||||
@@ -155,6 +155,19 @@ std::expected<EncoderDeviceType, std::string> parse_encoder_device(std::string_v
|
||||
return std::unexpected("invalid encoder device: '" + std::string(raw) + "' (expected: auto|nvidia|software)");
|
||||
}
|
||||
|
||||
std::expected<InputVideoSource, std::string> parse_input_video_source(std::string_view raw) {
|
||||
if (raw == "auto") {
|
||||
return InputVideoSource::Auto;
|
||||
}
|
||||
if (raw == "raw") {
|
||||
return InputVideoSource::Raw;
|
||||
}
|
||||
if (raw == "encoded") {
|
||||
return InputVideoSource::Encoded;
|
||||
}
|
||||
return std::unexpected("invalid input video source: '" + std::string(raw) + "' (expected: auto|raw|encoded)");
|
||||
}
|
||||
|
||||
std::expected<McapCompression, std::string> parse_mcap_compression_impl(std::string_view raw) {
|
||||
if (raw == "none") {
|
||||
return McapCompression::None;
|
||||
@@ -263,6 +276,13 @@ std::expected<void, std::string> apply_toml_file(RuntimeConfig &config, const st
|
||||
if (auto value = toml_value<std::string>(table, "input.nats_url")) {
|
||||
config.input.nats_url = *value;
|
||||
}
|
||||
if (auto value = toml_value<std::string>(table, "input.video_source")) {
|
||||
auto parsed = parse_input_video_source(*value);
|
||||
if (!parsed) {
|
||||
return std::unexpected(parsed.error());
|
||||
}
|
||||
config.input.video_source = *parsed;
|
||||
}
|
||||
if (auto value = toml_value<std::string>(table, "run_mode")) {
|
||||
auto parsed = parse_run_mode(*value);
|
||||
if (!parsed) {
|
||||
@@ -553,6 +573,18 @@ std::string_view to_string(EncoderDeviceType device) {
|
||||
return "unknown";
|
||||
}
|
||||
|
||||
std::string_view to_string(InputVideoSource source) {
|
||||
switch (source) {
|
||||
case InputVideoSource::Auto:
|
||||
return "auto";
|
||||
case InputVideoSource::Raw:
|
||||
return "raw";
|
||||
case InputVideoSource::Encoded:
|
||||
return "encoded";
|
||||
}
|
||||
return "unknown";
|
||||
}
|
||||
|
||||
std::string_view to_string(McapCompression compression) {
|
||||
switch (compression) {
|
||||
case McapCompression::None:
|
||||
@@ -571,6 +603,7 @@ std::expected<RuntimeConfig, std::string> parse_runtime_config(int argc, char **
|
||||
std::string config_path_raw{};
|
||||
std::string input_uri_raw{};
|
||||
std::string input_nats_url_raw{};
|
||||
std::string input_video_source_raw{};
|
||||
std::string run_mode_raw{};
|
||||
std::string codec_raw{};
|
||||
std::string encoder_backend_raw{};
|
||||
@@ -612,6 +645,7 @@ std::expected<RuntimeConfig, std::string> parse_runtime_config(int argc, char **
|
||||
app.add_option("--config", config_path_raw);
|
||||
app.add_option("--input-uri", input_uri_raw);
|
||||
app.add_option("--nats-url", input_nats_url_raw);
|
||||
app.add_option("--input-video-source", input_video_source_raw);
|
||||
app.add_option("--run-mode", run_mode_raw);
|
||||
app.add_option("--codec", codec_raw);
|
||||
app.add_option("--encoder-backend", encoder_backend_raw);
|
||||
@@ -670,6 +704,13 @@ std::expected<RuntimeConfig, std::string> parse_runtime_config(int argc, char **
|
||||
if (!input_nats_url_raw.empty()) {
|
||||
config.input.nats_url = input_nats_url_raw;
|
||||
}
|
||||
if (!input_video_source_raw.empty()) {
|
||||
auto parsed = parse_input_video_source(input_video_source_raw);
|
||||
if (!parsed) {
|
||||
return std::unexpected(parsed.error());
|
||||
}
|
||||
config.input.video_source = *parsed;
|
||||
}
|
||||
if (!run_mode_raw.empty()) {
|
||||
auto parsed = parse_run_mode(run_mode_raw);
|
||||
if (!parsed) {
|
||||
@@ -938,6 +979,7 @@ std::string summarize_runtime_config(const RuntimeConfig &config) {
|
||||
std::ostringstream ss;
|
||||
ss << "input.uri=" << config.input.uri;
|
||||
ss << ", input.nats_url=" << config.input.nats_url;
|
||||
ss << ", input.video_source=" << to_string(config.input.video_source);
|
||||
ss << ", run_mode=" << to_string(config.run_mode);
|
||||
ss << ", encoder.backend=" << to_string(config.encoder.backend);
|
||||
ss << ", encoder.device=" << to_string(config.encoder.device);
|
||||
|
||||
+42
-1
@@ -173,6 +173,31 @@ namespace {
|
||||
return converted;
|
||||
}
|
||||
|
||||
[[nodiscard]]
|
||||
EncodedCodec from_core_encoded_codec(const cvmmap::EncodedCodec codec) {
|
||||
switch (codec) {
|
||||
case cvmmap::EncodedCodec::H264:
|
||||
return EncodedCodec::H264;
|
||||
case cvmmap::EncodedCodec::H265:
|
||||
return EncodedCodec::H265;
|
||||
case cvmmap::EncodedCodec::Unknown:
|
||||
default:
|
||||
return EncodedCodec::Unknown;
|
||||
}
|
||||
}
|
||||
|
||||
[[nodiscard]]
|
||||
EncodedBitstreamFormat from_core_encoded_bitstream_format(
|
||||
const cvmmap::EncodedBitstreamFormat format) {
|
||||
switch (format) {
|
||||
case cvmmap::EncodedBitstreamFormat::AnnexB:
|
||||
return EncodedBitstreamFormat::AnnexB;
|
||||
case cvmmap::EncodedBitstreamFormat::Unknown:
|
||||
default:
|
||||
return EncodedBitstreamFormat::Unknown;
|
||||
}
|
||||
}
|
||||
|
||||
[[nodiscard]]
|
||||
std::size_t span_offset(
|
||||
const std::span<const std::uint8_t> outer,
|
||||
@@ -198,6 +223,7 @@ namespace {
|
||||
};
|
||||
consider(metadata.depth_plane);
|
||||
consider(metadata.confidence_plane);
|
||||
consider(metadata.encoded_access_unit);
|
||||
return payload_bytes;
|
||||
}
|
||||
|
||||
@@ -420,6 +446,13 @@ std::expected<ValidatedShmView, ParseError> validate_shm_region(std::span<const
|
||||
return ValidatedShmView{
|
||||
.metadata = from_core_metadata(metadata_result->normalized_metadata),
|
||||
.depth_unit = from_core_depth_unit(metadata_result->depth_unit),
|
||||
.encoded_codec = from_core_encoded_codec(metadata_result->encoded_codec),
|
||||
.encoded_bitstream_format =
|
||||
from_core_encoded_bitstream_format(metadata_result->encoded_bitstream_format),
|
||||
.encoded_flags = metadata_result->encoded_flags,
|
||||
.encoded_frame_rate_num = metadata_result->encoded_frame_rate_num,
|
||||
.encoded_frame_rate_den = metadata_result->encoded_frame_rate_den,
|
||||
.encoded_stream_pts_ns = metadata_result->encoded_stream_pts_ns,
|
||||
.payload = payload_region.first(payload_bytes),
|
||||
.left = metadata_result->left_plane,
|
||||
.depth_info = metadata_result->depth_info
|
||||
@@ -429,7 +462,8 @@ std::expected<ValidatedShmView, ParseError> validate_shm_region(std::span<const
|
||||
.confidence_info = metadata_result->confidence_info
|
||||
? std::optional<FrameInfo>(from_core_frame_info(*metadata_result->confidence_info))
|
||||
: std::nullopt,
|
||||
.confidence = metadata_result->confidence_plane};
|
||||
.confidence = metadata_result->confidence_plane,
|
||||
.encoded_access_unit = metadata_result->encoded_access_unit};
|
||||
}
|
||||
|
||||
std::expected<CoherentSnapshot, SnapshotError> read_coherent_snapshot(
|
||||
@@ -466,11 +500,18 @@ std::expected<CoherentSnapshot, SnapshotError> read_coherent_snapshot(
|
||||
return CoherentSnapshot{
|
||||
.metadata = first->metadata,
|
||||
.depth_unit = first->depth_unit,
|
||||
.encoded_codec = first->encoded_codec,
|
||||
.encoded_bitstream_format = first->encoded_bitstream_format,
|
||||
.encoded_flags = first->encoded_flags,
|
||||
.encoded_frame_rate_num = first->encoded_frame_rate_num,
|
||||
.encoded_frame_rate_den = first->encoded_frame_rate_den,
|
||||
.encoded_stream_pts_ns = first->encoded_stream_pts_ns,
|
||||
.left = translate_span(first->payload, copied_payload, first->left),
|
||||
.depth_info = first->depth_info,
|
||||
.depth = translate_span(first->payload, copied_payload, first->depth),
|
||||
.confidence_info = first->confidence_info,
|
||||
.confidence = translate_span(first->payload, copied_payload, first->confidence),
|
||||
.encoded_access_unit = translate_span(first->payload, copied_payload, first->encoded_access_unit),
|
||||
.bytes_copied = first->payload.size()};
|
||||
}
|
||||
|
||||
|
||||
@@ -243,6 +243,90 @@ std::expected<record::RawDepthMapView, std::string> make_depth_map_view(const ip
|
||||
};
|
||||
}
|
||||
|
||||
[[nodiscard]]
|
||||
bool snapshot_has_encoded_access_unit(const ipc::CoherentSnapshot &snapshot) {
|
||||
return !snapshot.encoded_access_unit.empty() &&
|
||||
snapshot.encoded_codec != ipc::EncodedCodec::Unknown &&
|
||||
snapshot.encoded_bitstream_format == ipc::EncodedBitstreamFormat::AnnexB;
|
||||
}
|
||||
|
||||
[[nodiscard]]
|
||||
std::expected<CodecType, std::string> codec_type_from_snapshot(const ipc::CoherentSnapshot &snapshot) {
|
||||
switch (snapshot.encoded_codec) {
|
||||
case ipc::EncodedCodec::H264:
|
||||
return CodecType::H264;
|
||||
case ipc::EncodedCodec::H265:
|
||||
return CodecType::H265;
|
||||
case ipc::EncodedCodec::Unknown:
|
||||
default:
|
||||
return std::unexpected("encoded snapshot codec is unsupported");
|
||||
}
|
||||
}
|
||||
|
||||
[[nodiscard]]
|
||||
std::expected<encode::EncodedStreamInfo, std::string> make_stream_info_from_snapshot(
|
||||
const ipc::CoherentSnapshot &snapshot) {
|
||||
auto codec = codec_type_from_snapshot(snapshot);
|
||||
if (!codec) {
|
||||
return std::unexpected(codec.error());
|
||||
}
|
||||
if (snapshot.encoded_bitstream_format != ipc::EncodedBitstreamFormat::AnnexB) {
|
||||
return std::unexpected("encoded snapshot bitstream format is unsupported");
|
||||
}
|
||||
if (snapshot.metadata.info.width == 0 || snapshot.metadata.info.height == 0) {
|
||||
return std::unexpected("encoded snapshot dimensions are invalid");
|
||||
}
|
||||
|
||||
encode::EncodedStreamInfo stream_info{};
|
||||
stream_info.codec = *codec;
|
||||
stream_info.width = snapshot.metadata.info.width;
|
||||
stream_info.height = snapshot.metadata.info.height;
|
||||
stream_info.time_base_num = 1;
|
||||
stream_info.time_base_den = 1'000'000'000u;
|
||||
stream_info.frame_rate_num = snapshot.encoded_frame_rate_num == 0 ? 30u : snapshot.encoded_frame_rate_num;
|
||||
stream_info.frame_rate_den = snapshot.encoded_frame_rate_den == 0 ? 1u : snapshot.encoded_frame_rate_den;
|
||||
stream_info.bitstream_format = encode::EncodedBitstreamFormat::AnnexB;
|
||||
stream_info.decoder_config.clear();
|
||||
return stream_info;
|
||||
}
|
||||
|
||||
[[nodiscard]]
|
||||
std::expected<encode::EncodedAccessUnit, std::string> make_access_unit_from_snapshot(
|
||||
const ipc::CoherentSnapshot &snapshot) {
|
||||
auto codec = codec_type_from_snapshot(snapshot);
|
||||
if (!codec) {
|
||||
return std::unexpected(codec.error());
|
||||
}
|
||||
if (snapshot.encoded_access_unit.empty()) {
|
||||
return std::unexpected("encoded snapshot access unit is empty");
|
||||
}
|
||||
|
||||
encode::EncodedAccessUnit access_unit{};
|
||||
access_unit.codec = *codec;
|
||||
access_unit.source_timestamp_ns = snapshot.metadata.timestamp_ns;
|
||||
access_unit.stream_pts_ns =
|
||||
snapshot.encoded_stream_pts_ns == 0 ? snapshot.metadata.timestamp_ns : snapshot.encoded_stream_pts_ns;
|
||||
access_unit.keyframe = (snapshot.encoded_flags & ipc::kEncodedFlagKeyframe) != 0;
|
||||
access_unit.annexb_bytes.assign(
|
||||
snapshot.encoded_access_unit.begin(),
|
||||
snapshot.encoded_access_unit.end());
|
||||
return access_unit;
|
||||
}
|
||||
|
||||
[[nodiscard]]
|
||||
bool stream_info_equal(
|
||||
const encode::EncodedStreamInfo &lhs,
|
||||
const encode::EncodedStreamInfo &rhs) {
|
||||
return lhs.codec == rhs.codec &&
|
||||
lhs.width == rhs.width &&
|
||||
lhs.height == rhs.height &&
|
||||
lhs.time_base_num == rhs.time_base_num &&
|
||||
lhs.time_base_den == rhs.time_base_den &&
|
||||
lhs.frame_rate_num == rhs.frame_rate_num &&
|
||||
lhs.frame_rate_den == rhs.frame_rate_den &&
|
||||
lhs.bitstream_format == rhs.bitstream_format;
|
||||
}
|
||||
|
||||
[[nodiscard]]
|
||||
std::uint64_t body_tracking_timestamp_ns(const cvmmap::body_tracking_frame_t &frame) {
|
||||
if (frame.header.timestamp_ns != 0) {
|
||||
@@ -559,11 +643,7 @@ int run_pipeline(const RuntimeConfig &config) {
|
||||
return exit_code(PipelineExitCode::InputError);
|
||||
}
|
||||
|
||||
auto backend = encode::make_encoder_backend(config);
|
||||
if (!backend) {
|
||||
spdlog::error("pipeline encoder backend selection failed: {}", format_error(backend.error()));
|
||||
return exit_code(PipelineExitCode::InitializationError);
|
||||
}
|
||||
std::optional<encode::EncoderBackend> backend{};
|
||||
|
||||
auto shm = SharedMemoryView::open_readonly(input_endpoints->shm_name);
|
||||
if (!shm) {
|
||||
@@ -655,20 +735,13 @@ int run_pipeline(const RuntimeConfig &config) {
|
||||
return exit_code(PipelineExitCode::SubscriberError);
|
||||
}
|
||||
|
||||
if (config.outputs.rtp.enabled) {
|
||||
auto created = protocol::UdpRtpPublisher::create(config);
|
||||
if (!created) {
|
||||
spdlog::error("pipeline RTP publisher init failed: {}", created.error());
|
||||
return exit_code(PipelineExitCode::InitializationError);
|
||||
}
|
||||
rtp_publisher.emplace(std::move(*created));
|
||||
}
|
||||
|
||||
PipelineStats stats{};
|
||||
metrics::IngestEmitLatencyTracker latency_tracker{};
|
||||
bool producer_offline{false};
|
||||
bool started{false};
|
||||
bool using_encoded_input{false};
|
||||
std::optional<ipc::FrameInfo> active_info{};
|
||||
std::optional<encode::EncodedStreamInfo> active_stream_info{};
|
||||
std::optional<ipc::FrameInfo> restart_target_info{};
|
||||
bool restart_pending{false};
|
||||
bool warned_unknown_depth_unit{false};
|
||||
@@ -678,33 +751,52 @@ int run_pipeline(const RuntimeConfig &config) {
|
||||
stats.supervised_restarts += 1;
|
||||
}
|
||||
spdlog::warn(
|
||||
"PIPELINE_RESTART backend={} reason='{}' restarts={}",
|
||||
(*backend)->backend_name(),
|
||||
"PIPELINE_RESTART mode={} reason='{}' restarts={}",
|
||||
using_encoded_input
|
||||
? "encoded_passthrough"
|
||||
: (backend ? std::string((*backend)->backend_name()) : std::string("encoder_uninitialized")),
|
||||
reason,
|
||||
stats.supervised_restarts);
|
||||
if (backend) {
|
||||
(*backend)->shutdown();
|
||||
}
|
||||
started = false;
|
||||
restart_pending = true;
|
||||
restart_target_info = target_info;
|
||||
warned_unknown_depth_unit = false;
|
||||
using_encoded_input = false;
|
||||
active_stream_info.reset();
|
||||
rtp_publisher.reset();
|
||||
rtmp_output.reset();
|
||||
};
|
||||
|
||||
const auto attempt_backend_start = [&](const ipc::FrameInfo &target_info) -> Status {
|
||||
(*backend)->shutdown();
|
||||
const auto ensure_encoder_backend = [&]() -> Status {
|
||||
if (backend) {
|
||||
return {};
|
||||
}
|
||||
auto created = encode::make_encoder_backend(config);
|
||||
if (!created) {
|
||||
return std::unexpected(created.error());
|
||||
}
|
||||
backend.emplace(std::move(*created));
|
||||
return {};
|
||||
};
|
||||
|
||||
const auto start_outputs_from_stream_info =
|
||||
[&](const encode::EncodedStreamInfo &stream_info, const ipc::FrameInfo &target_info) -> Status {
|
||||
rtp_publisher.reset();
|
||||
rtmp_output.reset();
|
||||
auto init = (*backend)->init(config, target_info);
|
||||
if (!init) {
|
||||
return std::unexpected(init.error());
|
||||
if (config.outputs.rtp.enabled) {
|
||||
auto created = protocol::UdpRtpPublisher::create(config, stream_info.codec);
|
||||
if (!created) {
|
||||
return unexpected_error(
|
||||
ERR_INTERNAL,
|
||||
"pipeline RTP publisher init failed: " + created.error());
|
||||
}
|
||||
rtp_publisher.emplace(std::move(*created));
|
||||
}
|
||||
if (config.outputs.rtmp.enabled) {
|
||||
auto stream_info = (*backend)->stream_info();
|
||||
if (!stream_info) {
|
||||
return unexpected_error(
|
||||
stream_info.error().code,
|
||||
"pipeline RTMP output stream info unavailable: " + format_error(stream_info.error()));
|
||||
}
|
||||
auto created = protocol::make_rtmp_output(config, *stream_info);
|
||||
auto created = protocol::make_rtmp_output(config, stream_info);
|
||||
if (!created) {
|
||||
return unexpected_error(
|
||||
created.error().code,
|
||||
@@ -712,17 +804,11 @@ int run_pipeline(const RuntimeConfig &config) {
|
||||
}
|
||||
rtmp_output.emplace(std::move(*created));
|
||||
}
|
||||
auto stream_info = (*backend)->stream_info();
|
||||
if (!stream_info) {
|
||||
return unexpected_error(
|
||||
stream_info.error().code,
|
||||
"pipeline encoder stream info unavailable: " + format_error(stream_info.error()));
|
||||
}
|
||||
update_mcap_stream_info(mcap_recorder, *stream_info);
|
||||
update_mcap_stream_info(mcap_recorder, stream_info);
|
||||
if (config.record.mcap.enabled) {
|
||||
std::lock_guard lock(mcap_recorder.mutex);
|
||||
if (!mcap_recorder.sink) {
|
||||
auto created = record::McapRecordSink::create(config, *stream_info);
|
||||
auto created = record::McapRecordSink::create(config, stream_info);
|
||||
if (!created) {
|
||||
return unexpected_error(
|
||||
ERR_INTERNAL,
|
||||
@@ -742,9 +828,43 @@ int run_pipeline(const RuntimeConfig &config) {
|
||||
restart_target_info.reset();
|
||||
warned_unknown_depth_unit = false;
|
||||
active_info = target_info;
|
||||
active_stream_info = stream_info;
|
||||
return {};
|
||||
};
|
||||
|
||||
const auto attempt_raw_backend_start = [&](const ipc::FrameInfo &target_info) -> Status {
|
||||
auto ensure = ensure_encoder_backend();
|
||||
if (!ensure) {
|
||||
return ensure;
|
||||
}
|
||||
(*backend)->shutdown();
|
||||
auto init = (*backend)->init(config, target_info);
|
||||
if (!init) {
|
||||
return std::unexpected(init.error());
|
||||
}
|
||||
auto stream_info = (*backend)->stream_info();
|
||||
if (!stream_info) {
|
||||
return unexpected_error(
|
||||
stream_info.error().code,
|
||||
"pipeline encoder stream info unavailable: " + format_error(stream_info.error()));
|
||||
}
|
||||
using_encoded_input = false;
|
||||
return start_outputs_from_stream_info(*stream_info, target_info);
|
||||
};
|
||||
|
||||
const auto attempt_encoded_passthrough_start = [&](
|
||||
const ipc::CoherentSnapshot &snapshot) -> Status {
|
||||
auto stream_info = make_stream_info_from_snapshot(snapshot);
|
||||
if (!stream_info) {
|
||||
return unexpected_error(ERR_PROTOCOL, stream_info.error());
|
||||
}
|
||||
if (backend) {
|
||||
(*backend)->shutdown();
|
||||
}
|
||||
using_encoded_input = true;
|
||||
return start_outputs_from_stream_info(*stream_info, snapshot.metadata.info);
|
||||
};
|
||||
|
||||
const auto idle_timeout = std::chrono::milliseconds(config.latency.ingest_idle_timeout_ms);
|
||||
auto last_event = std::chrono::steady_clock::now();
|
||||
|
||||
@@ -774,11 +894,16 @@ int run_pipeline(const RuntimeConfig &config) {
|
||||
case cvmmap::ModuleStatus::StreamReset:
|
||||
spdlog::info("pipeline status event status=stream_reset");
|
||||
stats.resets += 1;
|
||||
if (backend) {
|
||||
(*backend)->shutdown();
|
||||
}
|
||||
started = false;
|
||||
restart_pending = false;
|
||||
restart_target_info.reset();
|
||||
active_stream_info.reset();
|
||||
using_encoded_input = false;
|
||||
active_info.reset();
|
||||
rtp_publisher.reset();
|
||||
rtmp_output.reset();
|
||||
break;
|
||||
}
|
||||
@@ -808,11 +933,13 @@ int run_pipeline(const RuntimeConfig &config) {
|
||||
}
|
||||
}
|
||||
|
||||
if (backend && !using_encoded_input) {
|
||||
auto poll = (*backend)->poll();
|
||||
if (!poll) {
|
||||
const auto reason = format_error(poll.error());
|
||||
restart_backend(reason, active_info);
|
||||
}
|
||||
}
|
||||
|
||||
std::array<zmq::pollitem_t, 1> poll_items{{
|
||||
{subscriber.handle(), 0, ZMQ_POLLIN, 0},
|
||||
@@ -827,8 +954,8 @@ int run_pipeline(const RuntimeConfig &config) {
|
||||
const bool frame_socket_ready = (poll_items[0].revents & ZMQ_POLLIN) != 0;
|
||||
if (!frame_socket_ready) {
|
||||
const auto now = std::chrono::steady_clock::now();
|
||||
if (restart_pending && restart_target_info) {
|
||||
auto start_result = attempt_backend_start(*restart_target_info);
|
||||
if (restart_pending && restart_target_info && backend && !using_encoded_input) {
|
||||
auto start_result = attempt_raw_backend_start(*restart_target_info);
|
||||
if (!start_result) {
|
||||
spdlog::error("pipeline backend restart failed: {}", format_error(start_result.error()));
|
||||
return exit_code(PipelineExitCode::RuntimeError);
|
||||
@@ -838,7 +965,7 @@ int run_pipeline(const RuntimeConfig &config) {
|
||||
spdlog::info("pipeline idle timeout reached ({} ms), stopping", config.latency.ingest_idle_timeout_ms);
|
||||
break;
|
||||
}
|
||||
if (!producer_offline && started) {
|
||||
if (!producer_offline && started && backend && !using_encoded_input) {
|
||||
auto drain = drain_encoder(
|
||||
config,
|
||||
*backend,
|
||||
@@ -894,14 +1021,45 @@ int run_pipeline(const RuntimeConfig &config) {
|
||||
continue;
|
||||
}
|
||||
|
||||
const bool has_encoded_access_unit = snapshot_has_encoded_access_unit(*snapshot);
|
||||
const bool want_encoded_input =
|
||||
config.input.video_source == InputVideoSource::Encoded ||
|
||||
(config.input.video_source == InputVideoSource::Auto && has_encoded_access_unit);
|
||||
if (config.input.video_source == InputVideoSource::Encoded && !has_encoded_access_unit) {
|
||||
spdlog::error("pipeline encoded input requested but SHM snapshot does not contain an encoded access unit");
|
||||
return exit_code(PipelineExitCode::InitializationError);
|
||||
}
|
||||
|
||||
if (active_info && !frame_info_equal(*active_info, snapshot->metadata.info)) {
|
||||
stats.format_rebuilds += 1;
|
||||
restart_backend("frame_info_change", snapshot->metadata.info);
|
||||
}
|
||||
|
||||
if (started && using_encoded_input != want_encoded_input) {
|
||||
stats.format_rebuilds += 1;
|
||||
restart_backend("input_video_source_switch", snapshot->metadata.info);
|
||||
}
|
||||
|
||||
if (want_encoded_input) {
|
||||
auto stream_info = make_stream_info_from_snapshot(*snapshot);
|
||||
if (!stream_info) {
|
||||
spdlog::error("pipeline encoded snapshot metadata invalid: {}", stream_info.error());
|
||||
return exit_code(PipelineExitCode::InitializationError);
|
||||
}
|
||||
if (active_stream_info && !stream_info_equal(*active_stream_info, *stream_info)) {
|
||||
stats.format_rebuilds += 1;
|
||||
restart_backend("encoded_stream_info_change", snapshot->metadata.info);
|
||||
}
|
||||
if (!started || restart_pending) {
|
||||
auto start_result = attempt_encoded_passthrough_start(*snapshot);
|
||||
if (!start_result) {
|
||||
spdlog::error("pipeline encoded passthrough init failed: {}", format_error(start_result.error()));
|
||||
return exit_code(PipelineExitCode::InitializationError);
|
||||
}
|
||||
}
|
||||
} else if (!started || restart_pending) {
|
||||
const auto target_info = restart_target_info.value_or(snapshot->metadata.info);
|
||||
auto start_result = attempt_backend_start(target_info);
|
||||
auto start_result = attempt_raw_backend_start(target_info);
|
||||
if (!start_result) {
|
||||
spdlog::error("pipeline backend init failed: {}", format_error(start_result.error()));
|
||||
return exit_code(PipelineExitCode::InitializationError);
|
||||
@@ -910,6 +1068,29 @@ int run_pipeline(const RuntimeConfig &config) {
|
||||
|
||||
latency_tracker.note_ingest();
|
||||
|
||||
if (want_encoded_input) {
|
||||
auto access_unit = make_access_unit_from_snapshot(*snapshot);
|
||||
if (!access_unit) {
|
||||
const auto reason = "pipeline encoded snapshot invalid: " + access_unit.error();
|
||||
restart_backend(reason, snapshot->metadata.info);
|
||||
continue;
|
||||
}
|
||||
std::vector<encode::EncodedAccessUnit> access_units{};
|
||||
access_units.push_back(std::move(*access_unit));
|
||||
auto publish = publish_access_units(
|
||||
config,
|
||||
std::move(access_units),
|
||||
stats,
|
||||
rtp_publisher ? &*rtp_publisher : nullptr,
|
||||
rtmp_output ? &*rtmp_output : nullptr,
|
||||
&mcap_recorder,
|
||||
latency_tracker);
|
||||
if (!publish) {
|
||||
const auto reason = format_error(publish.error());
|
||||
restart_backend(reason, active_info);
|
||||
continue;
|
||||
}
|
||||
} else {
|
||||
auto push = (*backend)->push_frame(encode::RawVideoFrame{
|
||||
.info = snapshot->metadata.info,
|
||||
.source_timestamp_ns = snapshot->metadata.timestamp_ns,
|
||||
@@ -920,6 +1101,7 @@ int run_pipeline(const RuntimeConfig &config) {
|
||||
restart_backend(reason, active_info);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
if (!snapshot->depth.empty()) {
|
||||
if (snapshot->depth_unit == ipc::DepthUnit::Unknown) {
|
||||
@@ -946,6 +1128,7 @@ int run_pipeline(const RuntimeConfig &config) {
|
||||
}
|
||||
|
||||
stats.pushed_frames += 1;
|
||||
if (!want_encoded_input) {
|
||||
auto drain = drain_encoder(
|
||||
config,
|
||||
*backend,
|
||||
@@ -960,6 +1143,7 @@ int run_pipeline(const RuntimeConfig &config) {
|
||||
restart_backend(reason, active_info);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
if (config.latency.ingest_max_frames > 0 && stats.pushed_frames >= config.latency.ingest_max_frames) {
|
||||
spdlog::info("pipeline reached ingest_max_frames={}, stopping", config.latency.ingest_max_frames);
|
||||
@@ -973,7 +1157,7 @@ int run_pipeline(const RuntimeConfig &config) {
|
||||
|
||||
nats_client.Stop();
|
||||
|
||||
if (started) {
|
||||
if (started && backend && !using_encoded_input) {
|
||||
auto drain = drain_encoder(
|
||||
config,
|
||||
*backend,
|
||||
@@ -989,14 +1173,18 @@ int run_pipeline(const RuntimeConfig &config) {
|
||||
}
|
||||
}
|
||||
|
||||
if (backend) {
|
||||
(*backend)->shutdown();
|
||||
}
|
||||
std::ignore = stop_mcap_recording(mcap_recorder);
|
||||
recorder_service.Stop();
|
||||
|
||||
spdlog::info(
|
||||
"PIPELINE_METRICS codec={} backend={} sync_messages={} status_messages={} torn_frames={} pushed_frames={} encoded_access_units={} resets={} format_rebuilds={} supervised_restarts={}",
|
||||
to_string(config.encoder.codec),
|
||||
(*backend)->backend_name(),
|
||||
active_stream_info ? to_string(active_stream_info->codec) : to_string(config.encoder.codec),
|
||||
using_encoded_input
|
||||
? std::string("encoded_passthrough")
|
||||
: (backend ? std::string((*backend)->backend_name()) : std::string("encoder_uninitialized")),
|
||||
stats.sync_messages,
|
||||
stats.status_messages,
|
||||
stats.torn_frames,
|
||||
|
||||
@@ -191,7 +191,7 @@ Result<std::vector<std::uint8_t>> decoder_config_to_annexb(
|
||||
CodecType codec,
|
||||
std::span<const std::uint8_t> decoder_config) {
|
||||
if (decoder_config.empty()) {
|
||||
return unexpected_error(ERR_PROTOCOL, "decoder config is required");
|
||||
return std::vector<std::uint8_t>{};
|
||||
}
|
||||
if (looks_like_annexb(decoder_config)) {
|
||||
return std::vector<std::uint8_t>(decoder_config.begin(), decoder_config.end());
|
||||
@@ -248,10 +248,6 @@ public:
|
||||
static Result<RtmpOutput> create(
|
||||
const RuntimeConfig &config,
|
||||
const encode::EncodedStreamInfo &stream_info) {
|
||||
if (stream_info.decoder_config.empty()) {
|
||||
return unexpected_error(ERR_PROTOCOL, "libavformat RTMP requires encoder decoder_config/extradata");
|
||||
}
|
||||
|
||||
avformat_network_init();
|
||||
|
||||
LibavformatRtmpOutput output{};
|
||||
@@ -442,9 +438,7 @@ public:
|
||||
};
|
||||
auto decoder_config_annexb = decoder_config_to_annexb(stream_info.codec, stream_info.decoder_config);
|
||||
if (!decoder_config_annexb) {
|
||||
return unexpected_error(
|
||||
ERR_PROTOCOL,
|
||||
"ffmpeg_process RTMP requires decoder config: " + format_error(decoder_config_annexb.error()));
|
||||
return std::unexpected(decoder_config_annexb.error());
|
||||
}
|
||||
output.decoder_config_annexb_ = std::move(*decoder_config_annexb);
|
||||
|
||||
|
||||
@@ -179,7 +179,7 @@ UdpRtpPublisher &UdpRtpPublisher::operator=(UdpRtpPublisher &&other) noexcept {
|
||||
return *this;
|
||||
}
|
||||
|
||||
std::expected<UdpRtpPublisher, std::string> UdpRtpPublisher::create(const RuntimeConfig &config) {
|
||||
std::expected<UdpRtpPublisher, std::string> UdpRtpPublisher::create(const RuntimeConfig &config, const CodecType codec) {
|
||||
if (!config.outputs.rtp.enabled) {
|
||||
return std::unexpected("invalid RTP publisher init: RTP output disabled");
|
||||
}
|
||||
@@ -191,7 +191,7 @@ std::expected<UdpRtpPublisher, std::string> UdpRtpPublisher::create(const Runtim
|
||||
publisher.destination_host_ = *config.outputs.rtp.host;
|
||||
publisher.destination_port_ = *config.outputs.rtp.port;
|
||||
publisher.payload_type_ = config.outputs.rtp.payload_type;
|
||||
publisher.codec_ = config.encoder.codec;
|
||||
publisher.codec_ = codec;
|
||||
publisher.sequence_ = compute_initial_sequence();
|
||||
publisher.ssrc_ = compute_ssrc(
|
||||
publisher.destination_host_,
|
||||
|
||||
Reference in New Issue
Block a user