Compare commits
2 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| c320bf01af | |||
| bb3ace43b7 |
+6
-2
@@ -58,7 +58,10 @@ set(CVMMAP_LOCAL_CORE_DIR "${CVMMAP_LOCAL_ROOT}/build/core" CACHE PATH "Path to
|
|||||||
set(CVMMAP_LOCAL_NATS_STATIC "${CVMMAP_LOCAL_ROOT}/build/lib/libnats_static.a" CACHE PATH "Path to local cnats static library")
|
set(CVMMAP_LOCAL_NATS_STATIC "${CVMMAP_LOCAL_ROOT}/build/lib/libnats_static.a" CACHE PATH "Path to local cnats static library")
|
||||||
if (CVMMAP_CNATS_PROVIDER STREQUAL "system")
|
if (CVMMAP_CNATS_PROVIDER STREQUAL "system")
|
||||||
find_package(cnats CONFIG REQUIRED)
|
find_package(cnats CONFIG REQUIRED)
|
||||||
find_package(cvmmap-core CONFIG QUIET)
|
find_package(cvmmap-core CONFIG QUIET PATHS "${CVMMAP_LOCAL_CORE_DIR}" NO_DEFAULT_PATH)
|
||||||
|
if (NOT TARGET cvmmap::client)
|
||||||
|
find_package(cvmmap-core CONFIG QUIET)
|
||||||
|
endif()
|
||||||
else()
|
else()
|
||||||
if (NOT EXISTS "${CVMMAP_LOCAL_NATS_STATIC}")
|
if (NOT EXISTS "${CVMMAP_LOCAL_NATS_STATIC}")
|
||||||
message(FATAL_ERROR
|
message(FATAL_ERROR
|
||||||
@@ -256,7 +259,8 @@ add_library(cvmmap_streamer_common STATIC
|
|||||||
target_include_directories(cvmmap_streamer_common
|
target_include_directories(cvmmap_streamer_common
|
||||||
PUBLIC
|
PUBLIC
|
||||||
"${CMAKE_CURRENT_LIST_DIR}/include"
|
"${CMAKE_CURRENT_LIST_DIR}/include"
|
||||||
"${CMAKE_CURRENT_BINARY_DIR}")
|
"${CMAKE_CURRENT_BINARY_DIR}"
|
||||||
|
"${CVMMAP_LOCAL_ROOT}/core/include")
|
||||||
|
|
||||||
set(CVMMAP_STREAMER_LINK_DEPS
|
set(CVMMAP_STREAMER_LINK_DEPS
|
||||||
Threads::Threads
|
Threads::Threads
|
||||||
|
|||||||
@@ -0,0 +1,89 @@
|
|||||||
|
# Encoded Passthrough From `cv-mmap` ABI v2.1
|
||||||
|
|
||||||
|
## Summary
|
||||||
|
|
||||||
|
`cvmmap-streamer` can now consume encoded access units directly from a `cv-mmap` shared-memory snapshot when the producer exposes the ABI v2.1 encoded plane.
|
||||||
|
|
||||||
|
This is intended for producers such as `cv-mmap` with `video.backend = "udp_rtp"`:
|
||||||
|
|
||||||
|
- `cv-mmap` receives RTP H.265
|
||||||
|
- `cv-mmap` parses once and publishes raw BGR plus encoded AU in the same snapshot
|
||||||
|
- `cvmmap-streamer` can forward or mux the encoded AU directly
|
||||||
|
|
||||||
|
The video no longer needs a decode -> encode round trip inside the streamer.
|
||||||
|
|
||||||
|
## Input selection
|
||||||
|
|
||||||
|
`input.video_source` controls how the pipeline uses the snapshot:
|
||||||
|
|
||||||
|
```toml
|
||||||
|
[input]
|
||||||
|
uri = "cvmmap://default"
|
||||||
|
nats_url = "nats://localhost:4222"
|
||||||
|
video_source = "auto"
|
||||||
|
```
|
||||||
|
|
||||||
|
Valid values:
|
||||||
|
|
||||||
|
- `auto`: prefer encoded passthrough when the encoded AU plane is present, otherwise use raw encoding
|
||||||
|
- `raw`: always use the raw left plane and the local encoder backend
|
||||||
|
- `encoded`: require the encoded AU plane; fail if it is missing
|
||||||
|
|
||||||
|
CLI equivalent:
|
||||||
|
|
||||||
|
```text
|
||||||
|
--input-video-source auto|raw|encoded
|
||||||
|
```
|
||||||
|
|
||||||
|
## Encoded passthrough behavior
|
||||||
|
|
||||||
|
When `video_source` resolves to encoded mode:
|
||||||
|
|
||||||
|
- no encoder backend is created
|
||||||
|
- the streamer reads codec, bitstream format, keyframe flag, frame rate, and stream PTS from the SHM metadata
|
||||||
|
- width and height still come from the raw left-plane frame metadata
|
||||||
|
- the encoded AU is forwarded as-is
|
||||||
|
|
||||||
|
Current assumptions:
|
||||||
|
|
||||||
|
- codec is H.265 or H.264
|
||||||
|
- bitstream format is Annex B
|
||||||
|
- one SHM encoded plane contains one AU
|
||||||
|
- time base is treated as `1/1e9`
|
||||||
|
|
||||||
|
## Outputs
|
||||||
|
|
||||||
|
### RTP
|
||||||
|
|
||||||
|
RTP output accepts passthrough access units directly.
|
||||||
|
|
||||||
|
- in encoded mode, SDP signaling and RTP packetization follow the codec from SHM metadata
|
||||||
|
- `config.encoder.codec` is ignored for RTP while passthrough is active
|
||||||
|
- raw mode still uses the configured local encoder and its codec
|
||||||
|
|
||||||
|
### RTMP
|
||||||
|
|
||||||
|
RTMP output accepts passthrough access units directly.
|
||||||
|
|
||||||
|
- If decoder config is present, it is used.
|
||||||
|
- If decoder config is absent, the output relies on in-band parameter sets from keyframes.
|
||||||
|
|
||||||
|
For `cv-mmap udp_rtp`, this works because the producer republishes keyframes with VPS/SPS/PPS in-band.
|
||||||
|
|
||||||
|
### MCAP
|
||||||
|
|
||||||
|
MCAP recording writes the compressed video directly.
|
||||||
|
|
||||||
|
- no local encoder is used
|
||||||
|
- keyframes are written exactly from the encoded AU payload
|
||||||
|
- depth and body-tracking recording continue to use the same raw/depth/NATS paths as before
|
||||||
|
|
||||||
|
## Fallback to raw mode
|
||||||
|
|
||||||
|
If the producer does not expose the encoded plane, `auto` mode falls back to the existing raw pipeline:
|
||||||
|
|
||||||
|
- read raw BGR from SHM
|
||||||
|
- push frames through the configured encoder backend
|
||||||
|
- publish or record the encoder output
|
||||||
|
|
||||||
|
This keeps existing producers compatible.
|
||||||
@@ -40,6 +40,12 @@ enum class EncoderDeviceType {
|
|||||||
Software,
|
Software,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
enum class InputVideoSource {
|
||||||
|
Auto,
|
||||||
|
Raw,
|
||||||
|
Encoded,
|
||||||
|
};
|
||||||
|
|
||||||
enum class McapCompression {
|
enum class McapCompression {
|
||||||
None,
|
None,
|
||||||
Lz4,
|
Lz4,
|
||||||
@@ -49,6 +55,7 @@ enum class McapCompression {
|
|||||||
struct InputConfig {
|
struct InputConfig {
|
||||||
std::string uri{"cvmmap://default"};
|
std::string uri{"cvmmap://default"};
|
||||||
std::string nats_url{"nats://localhost:4222"};
|
std::string nats_url{"nats://localhost:4222"};
|
||||||
|
InputVideoSource video_source{InputVideoSource::Auto};
|
||||||
};
|
};
|
||||||
|
|
||||||
struct EncoderConfig {
|
struct EncoderConfig {
|
||||||
@@ -125,6 +132,7 @@ std::string_view to_string(RtmpMode mode);
|
|||||||
std::string_view to_string(RtmpTransportType transport);
|
std::string_view to_string(RtmpTransportType transport);
|
||||||
std::string_view to_string(EncoderBackendType backend);
|
std::string_view to_string(EncoderBackendType backend);
|
||||||
std::string_view to_string(EncoderDeviceType device);
|
std::string_view to_string(EncoderDeviceType device);
|
||||||
|
std::string_view to_string(InputVideoSource source);
|
||||||
std::string_view to_string(McapCompression compression);
|
std::string_view to_string(McapCompression compression);
|
||||||
std::expected<McapCompression, std::string> parse_mcap_compression(std::string_view raw);
|
std::expected<McapCompression, std::string> parse_mcap_compression(std::string_view raw);
|
||||||
|
|
||||||
|
|||||||
@@ -51,6 +51,19 @@ enum class DepthUnit : std::uint8_t {
|
|||||||
Meter = 2,
|
Meter = 2,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
enum class EncodedCodec : std::uint8_t {
|
||||||
|
Unknown = 0,
|
||||||
|
H264 = 1,
|
||||||
|
H265 = 2,
|
||||||
|
};
|
||||||
|
|
||||||
|
enum class EncodedBitstreamFormat : std::uint8_t {
|
||||||
|
Unknown = 0,
|
||||||
|
AnnexB = 1,
|
||||||
|
};
|
||||||
|
|
||||||
|
constexpr std::uint16_t kEncodedFlagKeyframe = 0x0001u;
|
||||||
|
|
||||||
enum class ModuleStatus : std::int32_t {
|
enum class ModuleStatus : std::int32_t {
|
||||||
Online = 0xa1,
|
Online = 0xa1,
|
||||||
Offline = 0xa0,
|
Offline = 0xa0,
|
||||||
@@ -163,22 +176,36 @@ struct ControlResponseMessage {
|
|||||||
struct ValidatedShmView {
|
struct ValidatedShmView {
|
||||||
FrameMetadata metadata;
|
FrameMetadata metadata;
|
||||||
DepthUnit depth_unit{DepthUnit::Unknown};
|
DepthUnit depth_unit{DepthUnit::Unknown};
|
||||||
|
EncodedCodec encoded_codec{EncodedCodec::Unknown};
|
||||||
|
EncodedBitstreamFormat encoded_bitstream_format{EncodedBitstreamFormat::Unknown};
|
||||||
|
std::uint16_t encoded_flags{0};
|
||||||
|
std::uint16_t encoded_frame_rate_num{0};
|
||||||
|
std::uint16_t encoded_frame_rate_den{0};
|
||||||
|
std::uint64_t encoded_stream_pts_ns{0};
|
||||||
std::span<const std::uint8_t> payload;
|
std::span<const std::uint8_t> payload;
|
||||||
std::span<const std::uint8_t> left;
|
std::span<const std::uint8_t> left;
|
||||||
std::optional<FrameInfo> depth_info{};
|
std::optional<FrameInfo> depth_info{};
|
||||||
std::span<const std::uint8_t> depth{};
|
std::span<const std::uint8_t> depth{};
|
||||||
std::optional<FrameInfo> confidence_info{};
|
std::optional<FrameInfo> confidence_info{};
|
||||||
std::span<const std::uint8_t> confidence{};
|
std::span<const std::uint8_t> confidence{};
|
||||||
|
std::span<const std::uint8_t> encoded_access_unit{};
|
||||||
};
|
};
|
||||||
|
|
||||||
struct CoherentSnapshot {
|
struct CoherentSnapshot {
|
||||||
FrameMetadata metadata;
|
FrameMetadata metadata;
|
||||||
DepthUnit depth_unit{DepthUnit::Unknown};
|
DepthUnit depth_unit{DepthUnit::Unknown};
|
||||||
|
EncodedCodec encoded_codec{EncodedCodec::Unknown};
|
||||||
|
EncodedBitstreamFormat encoded_bitstream_format{EncodedBitstreamFormat::Unknown};
|
||||||
|
std::uint16_t encoded_flags{0};
|
||||||
|
std::uint16_t encoded_frame_rate_num{0};
|
||||||
|
std::uint16_t encoded_frame_rate_den{0};
|
||||||
|
std::uint64_t encoded_stream_pts_ns{0};
|
||||||
std::span<const std::uint8_t> left;
|
std::span<const std::uint8_t> left;
|
||||||
std::optional<FrameInfo> depth_info{};
|
std::optional<FrameInfo> depth_info{};
|
||||||
std::span<const std::uint8_t> depth{};
|
std::span<const std::uint8_t> depth{};
|
||||||
std::optional<FrameInfo> confidence_info{};
|
std::optional<FrameInfo> confidence_info{};
|
||||||
std::span<const std::uint8_t> confidence{};
|
std::span<const std::uint8_t> confidence{};
|
||||||
|
std::span<const std::uint8_t> encoded_access_unit{};
|
||||||
std::size_t bytes_copied;
|
std::size_t bytes_copied;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|||||||
@@ -33,7 +33,7 @@ public:
|
|||||||
UdpRtpPublisher &operator=(UdpRtpPublisher &&other) noexcept;
|
UdpRtpPublisher &operator=(UdpRtpPublisher &&other) noexcept;
|
||||||
|
|
||||||
[[nodiscard]]
|
[[nodiscard]]
|
||||||
static std::expected<UdpRtpPublisher, std::string> create(const RuntimeConfig &config);
|
static std::expected<UdpRtpPublisher, std::string> create(const RuntimeConfig &config, CodecType codec);
|
||||||
|
|
||||||
void publish_access_unit(std::span<const std::uint8_t> access_unit, std::uint64_t pts_ns);
|
void publish_access_unit(std::span<const std::uint8_t> access_unit, std::uint64_t pts_ns);
|
||||||
|
|
||||||
|
|||||||
@@ -155,6 +155,19 @@ std::expected<EncoderDeviceType, std::string> parse_encoder_device(std::string_v
|
|||||||
return std::unexpected("invalid encoder device: '" + std::string(raw) + "' (expected: auto|nvidia|software)");
|
return std::unexpected("invalid encoder device: '" + std::string(raw) + "' (expected: auto|nvidia|software)");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
std::expected<InputVideoSource, std::string> parse_input_video_source(std::string_view raw) {
|
||||||
|
if (raw == "auto") {
|
||||||
|
return InputVideoSource::Auto;
|
||||||
|
}
|
||||||
|
if (raw == "raw") {
|
||||||
|
return InputVideoSource::Raw;
|
||||||
|
}
|
||||||
|
if (raw == "encoded") {
|
||||||
|
return InputVideoSource::Encoded;
|
||||||
|
}
|
||||||
|
return std::unexpected("invalid input video source: '" + std::string(raw) + "' (expected: auto|raw|encoded)");
|
||||||
|
}
|
||||||
|
|
||||||
std::expected<McapCompression, std::string> parse_mcap_compression_impl(std::string_view raw) {
|
std::expected<McapCompression, std::string> parse_mcap_compression_impl(std::string_view raw) {
|
||||||
if (raw == "none") {
|
if (raw == "none") {
|
||||||
return McapCompression::None;
|
return McapCompression::None;
|
||||||
@@ -263,6 +276,13 @@ std::expected<void, std::string> apply_toml_file(RuntimeConfig &config, const st
|
|||||||
if (auto value = toml_value<std::string>(table, "input.nats_url")) {
|
if (auto value = toml_value<std::string>(table, "input.nats_url")) {
|
||||||
config.input.nats_url = *value;
|
config.input.nats_url = *value;
|
||||||
}
|
}
|
||||||
|
if (auto value = toml_value<std::string>(table, "input.video_source")) {
|
||||||
|
auto parsed = parse_input_video_source(*value);
|
||||||
|
if (!parsed) {
|
||||||
|
return std::unexpected(parsed.error());
|
||||||
|
}
|
||||||
|
config.input.video_source = *parsed;
|
||||||
|
}
|
||||||
if (auto value = toml_value<std::string>(table, "run_mode")) {
|
if (auto value = toml_value<std::string>(table, "run_mode")) {
|
||||||
auto parsed = parse_run_mode(*value);
|
auto parsed = parse_run_mode(*value);
|
||||||
if (!parsed) {
|
if (!parsed) {
|
||||||
@@ -553,6 +573,18 @@ std::string_view to_string(EncoderDeviceType device) {
|
|||||||
return "unknown";
|
return "unknown";
|
||||||
}
|
}
|
||||||
|
|
||||||
|
std::string_view to_string(InputVideoSource source) {
|
||||||
|
switch (source) {
|
||||||
|
case InputVideoSource::Auto:
|
||||||
|
return "auto";
|
||||||
|
case InputVideoSource::Raw:
|
||||||
|
return "raw";
|
||||||
|
case InputVideoSource::Encoded:
|
||||||
|
return "encoded";
|
||||||
|
}
|
||||||
|
return "unknown";
|
||||||
|
}
|
||||||
|
|
||||||
std::string_view to_string(McapCompression compression) {
|
std::string_view to_string(McapCompression compression) {
|
||||||
switch (compression) {
|
switch (compression) {
|
||||||
case McapCompression::None:
|
case McapCompression::None:
|
||||||
@@ -571,6 +603,7 @@ std::expected<RuntimeConfig, std::string> parse_runtime_config(int argc, char **
|
|||||||
std::string config_path_raw{};
|
std::string config_path_raw{};
|
||||||
std::string input_uri_raw{};
|
std::string input_uri_raw{};
|
||||||
std::string input_nats_url_raw{};
|
std::string input_nats_url_raw{};
|
||||||
|
std::string input_video_source_raw{};
|
||||||
std::string run_mode_raw{};
|
std::string run_mode_raw{};
|
||||||
std::string codec_raw{};
|
std::string codec_raw{};
|
||||||
std::string encoder_backend_raw{};
|
std::string encoder_backend_raw{};
|
||||||
@@ -612,6 +645,7 @@ std::expected<RuntimeConfig, std::string> parse_runtime_config(int argc, char **
|
|||||||
app.add_option("--config", config_path_raw);
|
app.add_option("--config", config_path_raw);
|
||||||
app.add_option("--input-uri", input_uri_raw);
|
app.add_option("--input-uri", input_uri_raw);
|
||||||
app.add_option("--nats-url", input_nats_url_raw);
|
app.add_option("--nats-url", input_nats_url_raw);
|
||||||
|
app.add_option("--input-video-source", input_video_source_raw);
|
||||||
app.add_option("--run-mode", run_mode_raw);
|
app.add_option("--run-mode", run_mode_raw);
|
||||||
app.add_option("--codec", codec_raw);
|
app.add_option("--codec", codec_raw);
|
||||||
app.add_option("--encoder-backend", encoder_backend_raw);
|
app.add_option("--encoder-backend", encoder_backend_raw);
|
||||||
@@ -670,6 +704,13 @@ std::expected<RuntimeConfig, std::string> parse_runtime_config(int argc, char **
|
|||||||
if (!input_nats_url_raw.empty()) {
|
if (!input_nats_url_raw.empty()) {
|
||||||
config.input.nats_url = input_nats_url_raw;
|
config.input.nats_url = input_nats_url_raw;
|
||||||
}
|
}
|
||||||
|
if (!input_video_source_raw.empty()) {
|
||||||
|
auto parsed = parse_input_video_source(input_video_source_raw);
|
||||||
|
if (!parsed) {
|
||||||
|
return std::unexpected(parsed.error());
|
||||||
|
}
|
||||||
|
config.input.video_source = *parsed;
|
||||||
|
}
|
||||||
if (!run_mode_raw.empty()) {
|
if (!run_mode_raw.empty()) {
|
||||||
auto parsed = parse_run_mode(run_mode_raw);
|
auto parsed = parse_run_mode(run_mode_raw);
|
||||||
if (!parsed) {
|
if (!parsed) {
|
||||||
@@ -938,6 +979,7 @@ std::string summarize_runtime_config(const RuntimeConfig &config) {
|
|||||||
std::ostringstream ss;
|
std::ostringstream ss;
|
||||||
ss << "input.uri=" << config.input.uri;
|
ss << "input.uri=" << config.input.uri;
|
||||||
ss << ", input.nats_url=" << config.input.nats_url;
|
ss << ", input.nats_url=" << config.input.nats_url;
|
||||||
|
ss << ", input.video_source=" << to_string(config.input.video_source);
|
||||||
ss << ", run_mode=" << to_string(config.run_mode);
|
ss << ", run_mode=" << to_string(config.run_mode);
|
||||||
ss << ", encoder.backend=" << to_string(config.encoder.backend);
|
ss << ", encoder.backend=" << to_string(config.encoder.backend);
|
||||||
ss << ", encoder.device=" << to_string(config.encoder.device);
|
ss << ", encoder.device=" << to_string(config.encoder.device);
|
||||||
|
|||||||
+42
-1
@@ -173,6 +173,31 @@ namespace {
|
|||||||
return converted;
|
return converted;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
[[nodiscard]]
|
||||||
|
EncodedCodec from_core_encoded_codec(const cvmmap::EncodedCodec codec) {
|
||||||
|
switch (codec) {
|
||||||
|
case cvmmap::EncodedCodec::H264:
|
||||||
|
return EncodedCodec::H264;
|
||||||
|
case cvmmap::EncodedCodec::H265:
|
||||||
|
return EncodedCodec::H265;
|
||||||
|
case cvmmap::EncodedCodec::Unknown:
|
||||||
|
default:
|
||||||
|
return EncodedCodec::Unknown;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
[[nodiscard]]
|
||||||
|
EncodedBitstreamFormat from_core_encoded_bitstream_format(
|
||||||
|
const cvmmap::EncodedBitstreamFormat format) {
|
||||||
|
switch (format) {
|
||||||
|
case cvmmap::EncodedBitstreamFormat::AnnexB:
|
||||||
|
return EncodedBitstreamFormat::AnnexB;
|
||||||
|
case cvmmap::EncodedBitstreamFormat::Unknown:
|
||||||
|
default:
|
||||||
|
return EncodedBitstreamFormat::Unknown;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
[[nodiscard]]
|
[[nodiscard]]
|
||||||
std::size_t span_offset(
|
std::size_t span_offset(
|
||||||
const std::span<const std::uint8_t> outer,
|
const std::span<const std::uint8_t> outer,
|
||||||
@@ -198,6 +223,7 @@ namespace {
|
|||||||
};
|
};
|
||||||
consider(metadata.depth_plane);
|
consider(metadata.depth_plane);
|
||||||
consider(metadata.confidence_plane);
|
consider(metadata.confidence_plane);
|
||||||
|
consider(metadata.encoded_access_unit);
|
||||||
return payload_bytes;
|
return payload_bytes;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -420,6 +446,13 @@ std::expected<ValidatedShmView, ParseError> validate_shm_region(std::span<const
|
|||||||
return ValidatedShmView{
|
return ValidatedShmView{
|
||||||
.metadata = from_core_metadata(metadata_result->normalized_metadata),
|
.metadata = from_core_metadata(metadata_result->normalized_metadata),
|
||||||
.depth_unit = from_core_depth_unit(metadata_result->depth_unit),
|
.depth_unit = from_core_depth_unit(metadata_result->depth_unit),
|
||||||
|
.encoded_codec = from_core_encoded_codec(metadata_result->encoded_codec),
|
||||||
|
.encoded_bitstream_format =
|
||||||
|
from_core_encoded_bitstream_format(metadata_result->encoded_bitstream_format),
|
||||||
|
.encoded_flags = metadata_result->encoded_flags,
|
||||||
|
.encoded_frame_rate_num = metadata_result->encoded_frame_rate_num,
|
||||||
|
.encoded_frame_rate_den = metadata_result->encoded_frame_rate_den,
|
||||||
|
.encoded_stream_pts_ns = metadata_result->encoded_stream_pts_ns,
|
||||||
.payload = payload_region.first(payload_bytes),
|
.payload = payload_region.first(payload_bytes),
|
||||||
.left = metadata_result->left_plane,
|
.left = metadata_result->left_plane,
|
||||||
.depth_info = metadata_result->depth_info
|
.depth_info = metadata_result->depth_info
|
||||||
@@ -429,7 +462,8 @@ std::expected<ValidatedShmView, ParseError> validate_shm_region(std::span<const
|
|||||||
.confidence_info = metadata_result->confidence_info
|
.confidence_info = metadata_result->confidence_info
|
||||||
? std::optional<FrameInfo>(from_core_frame_info(*metadata_result->confidence_info))
|
? std::optional<FrameInfo>(from_core_frame_info(*metadata_result->confidence_info))
|
||||||
: std::nullopt,
|
: std::nullopt,
|
||||||
.confidence = metadata_result->confidence_plane};
|
.confidence = metadata_result->confidence_plane,
|
||||||
|
.encoded_access_unit = metadata_result->encoded_access_unit};
|
||||||
}
|
}
|
||||||
|
|
||||||
std::expected<CoherentSnapshot, SnapshotError> read_coherent_snapshot(
|
std::expected<CoherentSnapshot, SnapshotError> read_coherent_snapshot(
|
||||||
@@ -466,11 +500,18 @@ std::expected<CoherentSnapshot, SnapshotError> read_coherent_snapshot(
|
|||||||
return CoherentSnapshot{
|
return CoherentSnapshot{
|
||||||
.metadata = first->metadata,
|
.metadata = first->metadata,
|
||||||
.depth_unit = first->depth_unit,
|
.depth_unit = first->depth_unit,
|
||||||
|
.encoded_codec = first->encoded_codec,
|
||||||
|
.encoded_bitstream_format = first->encoded_bitstream_format,
|
||||||
|
.encoded_flags = first->encoded_flags,
|
||||||
|
.encoded_frame_rate_num = first->encoded_frame_rate_num,
|
||||||
|
.encoded_frame_rate_den = first->encoded_frame_rate_den,
|
||||||
|
.encoded_stream_pts_ns = first->encoded_stream_pts_ns,
|
||||||
.left = translate_span(first->payload, copied_payload, first->left),
|
.left = translate_span(first->payload, copied_payload, first->left),
|
||||||
.depth_info = first->depth_info,
|
.depth_info = first->depth_info,
|
||||||
.depth = translate_span(first->payload, copied_payload, first->depth),
|
.depth = translate_span(first->payload, copied_payload, first->depth),
|
||||||
.confidence_info = first->confidence_info,
|
.confidence_info = first->confidence_info,
|
||||||
.confidence = translate_span(first->payload, copied_payload, first->confidence),
|
.confidence = translate_span(first->payload, copied_payload, first->confidence),
|
||||||
|
.encoded_access_unit = translate_span(first->payload, copied_payload, first->encoded_access_unit),
|
||||||
.bytes_copied = first->payload.size()};
|
.bytes_copied = first->payload.size()};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -243,6 +243,90 @@ std::expected<record::RawDepthMapView, std::string> make_depth_map_view(const ip
|
|||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
[[nodiscard]]
|
||||||
|
bool snapshot_has_encoded_access_unit(const ipc::CoherentSnapshot &snapshot) {
|
||||||
|
return !snapshot.encoded_access_unit.empty() &&
|
||||||
|
snapshot.encoded_codec != ipc::EncodedCodec::Unknown &&
|
||||||
|
snapshot.encoded_bitstream_format == ipc::EncodedBitstreamFormat::AnnexB;
|
||||||
|
}
|
||||||
|
|
||||||
|
[[nodiscard]]
|
||||||
|
std::expected<CodecType, std::string> codec_type_from_snapshot(const ipc::CoherentSnapshot &snapshot) {
|
||||||
|
switch (snapshot.encoded_codec) {
|
||||||
|
case ipc::EncodedCodec::H264:
|
||||||
|
return CodecType::H264;
|
||||||
|
case ipc::EncodedCodec::H265:
|
||||||
|
return CodecType::H265;
|
||||||
|
case ipc::EncodedCodec::Unknown:
|
||||||
|
default:
|
||||||
|
return std::unexpected("encoded snapshot codec is unsupported");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
[[nodiscard]]
|
||||||
|
std::expected<encode::EncodedStreamInfo, std::string> make_stream_info_from_snapshot(
|
||||||
|
const ipc::CoherentSnapshot &snapshot) {
|
||||||
|
auto codec = codec_type_from_snapshot(snapshot);
|
||||||
|
if (!codec) {
|
||||||
|
return std::unexpected(codec.error());
|
||||||
|
}
|
||||||
|
if (snapshot.encoded_bitstream_format != ipc::EncodedBitstreamFormat::AnnexB) {
|
||||||
|
return std::unexpected("encoded snapshot bitstream format is unsupported");
|
||||||
|
}
|
||||||
|
if (snapshot.metadata.info.width == 0 || snapshot.metadata.info.height == 0) {
|
||||||
|
return std::unexpected("encoded snapshot dimensions are invalid");
|
||||||
|
}
|
||||||
|
|
||||||
|
encode::EncodedStreamInfo stream_info{};
|
||||||
|
stream_info.codec = *codec;
|
||||||
|
stream_info.width = snapshot.metadata.info.width;
|
||||||
|
stream_info.height = snapshot.metadata.info.height;
|
||||||
|
stream_info.time_base_num = 1;
|
||||||
|
stream_info.time_base_den = 1'000'000'000u;
|
||||||
|
stream_info.frame_rate_num = snapshot.encoded_frame_rate_num == 0 ? 30u : snapshot.encoded_frame_rate_num;
|
||||||
|
stream_info.frame_rate_den = snapshot.encoded_frame_rate_den == 0 ? 1u : snapshot.encoded_frame_rate_den;
|
||||||
|
stream_info.bitstream_format = encode::EncodedBitstreamFormat::AnnexB;
|
||||||
|
stream_info.decoder_config.clear();
|
||||||
|
return stream_info;
|
||||||
|
}
|
||||||
|
|
||||||
|
[[nodiscard]]
|
||||||
|
std::expected<encode::EncodedAccessUnit, std::string> make_access_unit_from_snapshot(
|
||||||
|
const ipc::CoherentSnapshot &snapshot) {
|
||||||
|
auto codec = codec_type_from_snapshot(snapshot);
|
||||||
|
if (!codec) {
|
||||||
|
return std::unexpected(codec.error());
|
||||||
|
}
|
||||||
|
if (snapshot.encoded_access_unit.empty()) {
|
||||||
|
return std::unexpected("encoded snapshot access unit is empty");
|
||||||
|
}
|
||||||
|
|
||||||
|
encode::EncodedAccessUnit access_unit{};
|
||||||
|
access_unit.codec = *codec;
|
||||||
|
access_unit.source_timestamp_ns = snapshot.metadata.timestamp_ns;
|
||||||
|
access_unit.stream_pts_ns =
|
||||||
|
snapshot.encoded_stream_pts_ns == 0 ? snapshot.metadata.timestamp_ns : snapshot.encoded_stream_pts_ns;
|
||||||
|
access_unit.keyframe = (snapshot.encoded_flags & ipc::kEncodedFlagKeyframe) != 0;
|
||||||
|
access_unit.annexb_bytes.assign(
|
||||||
|
snapshot.encoded_access_unit.begin(),
|
||||||
|
snapshot.encoded_access_unit.end());
|
||||||
|
return access_unit;
|
||||||
|
}
|
||||||
|
|
||||||
|
[[nodiscard]]
|
||||||
|
bool stream_info_equal(
|
||||||
|
const encode::EncodedStreamInfo &lhs,
|
||||||
|
const encode::EncodedStreamInfo &rhs) {
|
||||||
|
return lhs.codec == rhs.codec &&
|
||||||
|
lhs.width == rhs.width &&
|
||||||
|
lhs.height == rhs.height &&
|
||||||
|
lhs.time_base_num == rhs.time_base_num &&
|
||||||
|
lhs.time_base_den == rhs.time_base_den &&
|
||||||
|
lhs.frame_rate_num == rhs.frame_rate_num &&
|
||||||
|
lhs.frame_rate_den == rhs.frame_rate_den &&
|
||||||
|
lhs.bitstream_format == rhs.bitstream_format;
|
||||||
|
}
|
||||||
|
|
||||||
[[nodiscard]]
|
[[nodiscard]]
|
||||||
std::uint64_t body_tracking_timestamp_ns(const cvmmap::body_tracking_frame_t &frame) {
|
std::uint64_t body_tracking_timestamp_ns(const cvmmap::body_tracking_frame_t &frame) {
|
||||||
if (frame.header.timestamp_ns != 0) {
|
if (frame.header.timestamp_ns != 0) {
|
||||||
@@ -559,11 +643,7 @@ int run_pipeline(const RuntimeConfig &config) {
|
|||||||
return exit_code(PipelineExitCode::InputError);
|
return exit_code(PipelineExitCode::InputError);
|
||||||
}
|
}
|
||||||
|
|
||||||
auto backend = encode::make_encoder_backend(config);
|
std::optional<encode::EncoderBackend> backend{};
|
||||||
if (!backend) {
|
|
||||||
spdlog::error("pipeline encoder backend selection failed: {}", format_error(backend.error()));
|
|
||||||
return exit_code(PipelineExitCode::InitializationError);
|
|
||||||
}
|
|
||||||
|
|
||||||
auto shm = SharedMemoryView::open_readonly(input_endpoints->shm_name);
|
auto shm = SharedMemoryView::open_readonly(input_endpoints->shm_name);
|
||||||
if (!shm) {
|
if (!shm) {
|
||||||
@@ -655,20 +735,13 @@ int run_pipeline(const RuntimeConfig &config) {
|
|||||||
return exit_code(PipelineExitCode::SubscriberError);
|
return exit_code(PipelineExitCode::SubscriberError);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (config.outputs.rtp.enabled) {
|
|
||||||
auto created = protocol::UdpRtpPublisher::create(config);
|
|
||||||
if (!created) {
|
|
||||||
spdlog::error("pipeline RTP publisher init failed: {}", created.error());
|
|
||||||
return exit_code(PipelineExitCode::InitializationError);
|
|
||||||
}
|
|
||||||
rtp_publisher.emplace(std::move(*created));
|
|
||||||
}
|
|
||||||
|
|
||||||
PipelineStats stats{};
|
PipelineStats stats{};
|
||||||
metrics::IngestEmitLatencyTracker latency_tracker{};
|
metrics::IngestEmitLatencyTracker latency_tracker{};
|
||||||
bool producer_offline{false};
|
bool producer_offline{false};
|
||||||
bool started{false};
|
bool started{false};
|
||||||
|
bool using_encoded_input{false};
|
||||||
std::optional<ipc::FrameInfo> active_info{};
|
std::optional<ipc::FrameInfo> active_info{};
|
||||||
|
std::optional<encode::EncodedStreamInfo> active_stream_info{};
|
||||||
std::optional<ipc::FrameInfo> restart_target_info{};
|
std::optional<ipc::FrameInfo> restart_target_info{};
|
||||||
bool restart_pending{false};
|
bool restart_pending{false};
|
||||||
bool warned_unknown_depth_unit{false};
|
bool warned_unknown_depth_unit{false};
|
||||||
@@ -678,33 +751,52 @@ int run_pipeline(const RuntimeConfig &config) {
|
|||||||
stats.supervised_restarts += 1;
|
stats.supervised_restarts += 1;
|
||||||
}
|
}
|
||||||
spdlog::warn(
|
spdlog::warn(
|
||||||
"PIPELINE_RESTART backend={} reason='{}' restarts={}",
|
"PIPELINE_RESTART mode={} reason='{}' restarts={}",
|
||||||
(*backend)->backend_name(),
|
using_encoded_input
|
||||||
|
? "encoded_passthrough"
|
||||||
|
: (backend ? std::string((*backend)->backend_name()) : std::string("encoder_uninitialized")),
|
||||||
reason,
|
reason,
|
||||||
stats.supervised_restarts);
|
stats.supervised_restarts);
|
||||||
(*backend)->shutdown();
|
if (backend) {
|
||||||
|
(*backend)->shutdown();
|
||||||
|
}
|
||||||
started = false;
|
started = false;
|
||||||
restart_pending = true;
|
restart_pending = true;
|
||||||
restart_target_info = target_info;
|
restart_target_info = target_info;
|
||||||
warned_unknown_depth_unit = false;
|
warned_unknown_depth_unit = false;
|
||||||
|
using_encoded_input = false;
|
||||||
|
active_stream_info.reset();
|
||||||
|
rtp_publisher.reset();
|
||||||
rtmp_output.reset();
|
rtmp_output.reset();
|
||||||
};
|
};
|
||||||
|
|
||||||
const auto attempt_backend_start = [&](const ipc::FrameInfo &target_info) -> Status {
|
const auto ensure_encoder_backend = [&]() -> Status {
|
||||||
(*backend)->shutdown();
|
if (backend) {
|
||||||
|
return {};
|
||||||
|
}
|
||||||
|
auto created = encode::make_encoder_backend(config);
|
||||||
|
if (!created) {
|
||||||
|
return std::unexpected(created.error());
|
||||||
|
}
|
||||||
|
backend.emplace(std::move(*created));
|
||||||
|
return {};
|
||||||
|
};
|
||||||
|
|
||||||
|
const auto start_outputs_from_stream_info =
|
||||||
|
[&](const encode::EncodedStreamInfo &stream_info, const ipc::FrameInfo &target_info) -> Status {
|
||||||
|
rtp_publisher.reset();
|
||||||
rtmp_output.reset();
|
rtmp_output.reset();
|
||||||
auto init = (*backend)->init(config, target_info);
|
if (config.outputs.rtp.enabled) {
|
||||||
if (!init) {
|
auto created = protocol::UdpRtpPublisher::create(config, stream_info.codec);
|
||||||
return std::unexpected(init.error());
|
if (!created) {
|
||||||
|
return unexpected_error(
|
||||||
|
ERR_INTERNAL,
|
||||||
|
"pipeline RTP publisher init failed: " + created.error());
|
||||||
|
}
|
||||||
|
rtp_publisher.emplace(std::move(*created));
|
||||||
}
|
}
|
||||||
if (config.outputs.rtmp.enabled) {
|
if (config.outputs.rtmp.enabled) {
|
||||||
auto stream_info = (*backend)->stream_info();
|
auto created = protocol::make_rtmp_output(config, stream_info);
|
||||||
if (!stream_info) {
|
|
||||||
return unexpected_error(
|
|
||||||
stream_info.error().code,
|
|
||||||
"pipeline RTMP output stream info unavailable: " + format_error(stream_info.error()));
|
|
||||||
}
|
|
||||||
auto created = protocol::make_rtmp_output(config, *stream_info);
|
|
||||||
if (!created) {
|
if (!created) {
|
||||||
return unexpected_error(
|
return unexpected_error(
|
||||||
created.error().code,
|
created.error().code,
|
||||||
@@ -712,17 +804,11 @@ int run_pipeline(const RuntimeConfig &config) {
|
|||||||
}
|
}
|
||||||
rtmp_output.emplace(std::move(*created));
|
rtmp_output.emplace(std::move(*created));
|
||||||
}
|
}
|
||||||
auto stream_info = (*backend)->stream_info();
|
update_mcap_stream_info(mcap_recorder, stream_info);
|
||||||
if (!stream_info) {
|
|
||||||
return unexpected_error(
|
|
||||||
stream_info.error().code,
|
|
||||||
"pipeline encoder stream info unavailable: " + format_error(stream_info.error()));
|
|
||||||
}
|
|
||||||
update_mcap_stream_info(mcap_recorder, *stream_info);
|
|
||||||
if (config.record.mcap.enabled) {
|
if (config.record.mcap.enabled) {
|
||||||
std::lock_guard lock(mcap_recorder.mutex);
|
std::lock_guard lock(mcap_recorder.mutex);
|
||||||
if (!mcap_recorder.sink) {
|
if (!mcap_recorder.sink) {
|
||||||
auto created = record::McapRecordSink::create(config, *stream_info);
|
auto created = record::McapRecordSink::create(config, stream_info);
|
||||||
if (!created) {
|
if (!created) {
|
||||||
return unexpected_error(
|
return unexpected_error(
|
||||||
ERR_INTERNAL,
|
ERR_INTERNAL,
|
||||||
@@ -742,9 +828,43 @@ int run_pipeline(const RuntimeConfig &config) {
|
|||||||
restart_target_info.reset();
|
restart_target_info.reset();
|
||||||
warned_unknown_depth_unit = false;
|
warned_unknown_depth_unit = false;
|
||||||
active_info = target_info;
|
active_info = target_info;
|
||||||
|
active_stream_info = stream_info;
|
||||||
return {};
|
return {};
|
||||||
};
|
};
|
||||||
|
|
||||||
|
const auto attempt_raw_backend_start = [&](const ipc::FrameInfo &target_info) -> Status {
|
||||||
|
auto ensure = ensure_encoder_backend();
|
||||||
|
if (!ensure) {
|
||||||
|
return ensure;
|
||||||
|
}
|
||||||
|
(*backend)->shutdown();
|
||||||
|
auto init = (*backend)->init(config, target_info);
|
||||||
|
if (!init) {
|
||||||
|
return std::unexpected(init.error());
|
||||||
|
}
|
||||||
|
auto stream_info = (*backend)->stream_info();
|
||||||
|
if (!stream_info) {
|
||||||
|
return unexpected_error(
|
||||||
|
stream_info.error().code,
|
||||||
|
"pipeline encoder stream info unavailable: " + format_error(stream_info.error()));
|
||||||
|
}
|
||||||
|
using_encoded_input = false;
|
||||||
|
return start_outputs_from_stream_info(*stream_info, target_info);
|
||||||
|
};
|
||||||
|
|
||||||
|
const auto attempt_encoded_passthrough_start = [&](
|
||||||
|
const ipc::CoherentSnapshot &snapshot) -> Status {
|
||||||
|
auto stream_info = make_stream_info_from_snapshot(snapshot);
|
||||||
|
if (!stream_info) {
|
||||||
|
return unexpected_error(ERR_PROTOCOL, stream_info.error());
|
||||||
|
}
|
||||||
|
if (backend) {
|
||||||
|
(*backend)->shutdown();
|
||||||
|
}
|
||||||
|
using_encoded_input = true;
|
||||||
|
return start_outputs_from_stream_info(*stream_info, snapshot.metadata.info);
|
||||||
|
};
|
||||||
|
|
||||||
const auto idle_timeout = std::chrono::milliseconds(config.latency.ingest_idle_timeout_ms);
|
const auto idle_timeout = std::chrono::milliseconds(config.latency.ingest_idle_timeout_ms);
|
||||||
auto last_event = std::chrono::steady_clock::now();
|
auto last_event = std::chrono::steady_clock::now();
|
||||||
|
|
||||||
@@ -774,11 +894,16 @@ int run_pipeline(const RuntimeConfig &config) {
|
|||||||
case cvmmap::ModuleStatus::StreamReset:
|
case cvmmap::ModuleStatus::StreamReset:
|
||||||
spdlog::info("pipeline status event status=stream_reset");
|
spdlog::info("pipeline status event status=stream_reset");
|
||||||
stats.resets += 1;
|
stats.resets += 1;
|
||||||
(*backend)->shutdown();
|
if (backend) {
|
||||||
|
(*backend)->shutdown();
|
||||||
|
}
|
||||||
started = false;
|
started = false;
|
||||||
restart_pending = false;
|
restart_pending = false;
|
||||||
restart_target_info.reset();
|
restart_target_info.reset();
|
||||||
|
active_stream_info.reset();
|
||||||
|
using_encoded_input = false;
|
||||||
active_info.reset();
|
active_info.reset();
|
||||||
|
rtp_publisher.reset();
|
||||||
rtmp_output.reset();
|
rtmp_output.reset();
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
@@ -808,10 +933,12 @@ int run_pipeline(const RuntimeConfig &config) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
auto poll = (*backend)->poll();
|
if (backend && !using_encoded_input) {
|
||||||
if (!poll) {
|
auto poll = (*backend)->poll();
|
||||||
const auto reason = format_error(poll.error());
|
if (!poll) {
|
||||||
restart_backend(reason, active_info);
|
const auto reason = format_error(poll.error());
|
||||||
|
restart_backend(reason, active_info);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
std::array<zmq::pollitem_t, 1> poll_items{{
|
std::array<zmq::pollitem_t, 1> poll_items{{
|
||||||
@@ -827,34 +954,34 @@ int run_pipeline(const RuntimeConfig &config) {
|
|||||||
const bool frame_socket_ready = (poll_items[0].revents & ZMQ_POLLIN) != 0;
|
const bool frame_socket_ready = (poll_items[0].revents & ZMQ_POLLIN) != 0;
|
||||||
if (!frame_socket_ready) {
|
if (!frame_socket_ready) {
|
||||||
const auto now = std::chrono::steady_clock::now();
|
const auto now = std::chrono::steady_clock::now();
|
||||||
if (restart_pending && restart_target_info) {
|
if (restart_pending && restart_target_info && backend && !using_encoded_input) {
|
||||||
auto start_result = attempt_backend_start(*restart_target_info);
|
auto start_result = attempt_raw_backend_start(*restart_target_info);
|
||||||
if (!start_result) {
|
if (!start_result) {
|
||||||
spdlog::error("pipeline backend restart failed: {}", format_error(start_result.error()));
|
spdlog::error("pipeline backend restart failed: {}", format_error(start_result.error()));
|
||||||
return exit_code(PipelineExitCode::RuntimeError);
|
return exit_code(PipelineExitCode::RuntimeError);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
if (now - last_event >= idle_timeout) {
|
||||||
if (now - last_event >= idle_timeout) {
|
spdlog::info("pipeline idle timeout reached ({} ms), stopping", config.latency.ingest_idle_timeout_ms);
|
||||||
spdlog::info("pipeline idle timeout reached ({} ms), stopping", config.latency.ingest_idle_timeout_ms);
|
break;
|
||||||
break;
|
|
||||||
}
|
|
||||||
if (!producer_offline && started) {
|
|
||||||
auto drain = drain_encoder(
|
|
||||||
config,
|
|
||||||
*backend,
|
|
||||||
false,
|
|
||||||
stats,
|
|
||||||
rtp_publisher ? &*rtp_publisher : nullptr,
|
|
||||||
rtmp_output ? &*rtmp_output : nullptr,
|
|
||||||
&mcap_recorder,
|
|
||||||
latency_tracker);
|
|
||||||
if (!drain) {
|
|
||||||
const auto reason = format_error(drain.error());
|
|
||||||
restart_backend(reason, active_info);
|
|
||||||
}
|
}
|
||||||
|
if (!producer_offline && started && backend && !using_encoded_input) {
|
||||||
|
auto drain = drain_encoder(
|
||||||
|
config,
|
||||||
|
*backend,
|
||||||
|
false,
|
||||||
|
stats,
|
||||||
|
rtp_publisher ? &*rtp_publisher : nullptr,
|
||||||
|
rtmp_output ? &*rtmp_output : nullptr,
|
||||||
|
&mcap_recorder,
|
||||||
|
latency_tracker);
|
||||||
|
if (!drain) {
|
||||||
|
const auto reason = format_error(drain.error());
|
||||||
|
restart_backend(reason, active_info);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
continue;
|
||||||
}
|
}
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
zmq::message_t message;
|
zmq::message_t message;
|
||||||
const auto recv_result = subscriber.recv(message, zmq::recv_flags::dontwait);
|
const auto recv_result = subscriber.recv(message, zmq::recv_flags::dontwait);
|
||||||
@@ -894,14 +1021,45 @@ int run_pipeline(const RuntimeConfig &config) {
|
|||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const bool has_encoded_access_unit = snapshot_has_encoded_access_unit(*snapshot);
|
||||||
|
const bool want_encoded_input =
|
||||||
|
config.input.video_source == InputVideoSource::Encoded ||
|
||||||
|
(config.input.video_source == InputVideoSource::Auto && has_encoded_access_unit);
|
||||||
|
if (config.input.video_source == InputVideoSource::Encoded && !has_encoded_access_unit) {
|
||||||
|
spdlog::error("pipeline encoded input requested but SHM snapshot does not contain an encoded access unit");
|
||||||
|
return exit_code(PipelineExitCode::InitializationError);
|
||||||
|
}
|
||||||
|
|
||||||
if (active_info && !frame_info_equal(*active_info, snapshot->metadata.info)) {
|
if (active_info && !frame_info_equal(*active_info, snapshot->metadata.info)) {
|
||||||
stats.format_rebuilds += 1;
|
stats.format_rebuilds += 1;
|
||||||
restart_backend("frame_info_change", snapshot->metadata.info);
|
restart_backend("frame_info_change", snapshot->metadata.info);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!started || restart_pending) {
|
if (started && using_encoded_input != want_encoded_input) {
|
||||||
|
stats.format_rebuilds += 1;
|
||||||
|
restart_backend("input_video_source_switch", snapshot->metadata.info);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (want_encoded_input) {
|
||||||
|
auto stream_info = make_stream_info_from_snapshot(*snapshot);
|
||||||
|
if (!stream_info) {
|
||||||
|
spdlog::error("pipeline encoded snapshot metadata invalid: {}", stream_info.error());
|
||||||
|
return exit_code(PipelineExitCode::InitializationError);
|
||||||
|
}
|
||||||
|
if (active_stream_info && !stream_info_equal(*active_stream_info, *stream_info)) {
|
||||||
|
stats.format_rebuilds += 1;
|
||||||
|
restart_backend("encoded_stream_info_change", snapshot->metadata.info);
|
||||||
|
}
|
||||||
|
if (!started || restart_pending) {
|
||||||
|
auto start_result = attempt_encoded_passthrough_start(*snapshot);
|
||||||
|
if (!start_result) {
|
||||||
|
spdlog::error("pipeline encoded passthrough init failed: {}", format_error(start_result.error()));
|
||||||
|
return exit_code(PipelineExitCode::InitializationError);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else if (!started || restart_pending) {
|
||||||
const auto target_info = restart_target_info.value_or(snapshot->metadata.info);
|
const auto target_info = restart_target_info.value_or(snapshot->metadata.info);
|
||||||
auto start_result = attempt_backend_start(target_info);
|
auto start_result = attempt_raw_backend_start(target_info);
|
||||||
if (!start_result) {
|
if (!start_result) {
|
||||||
spdlog::error("pipeline backend init failed: {}", format_error(start_result.error()));
|
spdlog::error("pipeline backend init failed: {}", format_error(start_result.error()));
|
||||||
return exit_code(PipelineExitCode::InitializationError);
|
return exit_code(PipelineExitCode::InitializationError);
|
||||||
@@ -910,15 +1068,39 @@ int run_pipeline(const RuntimeConfig &config) {
|
|||||||
|
|
||||||
latency_tracker.note_ingest();
|
latency_tracker.note_ingest();
|
||||||
|
|
||||||
auto push = (*backend)->push_frame(encode::RawVideoFrame{
|
if (want_encoded_input) {
|
||||||
.info = snapshot->metadata.info,
|
auto access_unit = make_access_unit_from_snapshot(*snapshot);
|
||||||
.source_timestamp_ns = snapshot->metadata.timestamp_ns,
|
if (!access_unit) {
|
||||||
.bytes = std::span<const std::uint8_t>(snapshot_buffer.data(), snapshot->bytes_copied),
|
const auto reason = "pipeline encoded snapshot invalid: " + access_unit.error();
|
||||||
});
|
restart_backend(reason, snapshot->metadata.info);
|
||||||
if (!push) {
|
continue;
|
||||||
const auto reason = format_error(push.error());
|
}
|
||||||
restart_backend(reason, active_info);
|
std::vector<encode::EncodedAccessUnit> access_units{};
|
||||||
continue;
|
access_units.push_back(std::move(*access_unit));
|
||||||
|
auto publish = publish_access_units(
|
||||||
|
config,
|
||||||
|
std::move(access_units),
|
||||||
|
stats,
|
||||||
|
rtp_publisher ? &*rtp_publisher : nullptr,
|
||||||
|
rtmp_output ? &*rtmp_output : nullptr,
|
||||||
|
&mcap_recorder,
|
||||||
|
latency_tracker);
|
||||||
|
if (!publish) {
|
||||||
|
const auto reason = format_error(publish.error());
|
||||||
|
restart_backend(reason, active_info);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
auto push = (*backend)->push_frame(encode::RawVideoFrame{
|
||||||
|
.info = snapshot->metadata.info,
|
||||||
|
.source_timestamp_ns = snapshot->metadata.timestamp_ns,
|
||||||
|
.bytes = std::span<const std::uint8_t>(snapshot_buffer.data(), snapshot->bytes_copied),
|
||||||
|
});
|
||||||
|
if (!push) {
|
||||||
|
const auto reason = format_error(push.error());
|
||||||
|
restart_backend(reason, active_info);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!snapshot->depth.empty()) {
|
if (!snapshot->depth.empty()) {
|
||||||
@@ -946,19 +1128,21 @@ int run_pipeline(const RuntimeConfig &config) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
stats.pushed_frames += 1;
|
stats.pushed_frames += 1;
|
||||||
auto drain = drain_encoder(
|
if (!want_encoded_input) {
|
||||||
config,
|
auto drain = drain_encoder(
|
||||||
*backend,
|
config,
|
||||||
false,
|
*backend,
|
||||||
stats,
|
false,
|
||||||
rtp_publisher ? &*rtp_publisher : nullptr,
|
stats,
|
||||||
rtmp_output ? &*rtmp_output : nullptr,
|
rtp_publisher ? &*rtp_publisher : nullptr,
|
||||||
&mcap_recorder,
|
rtmp_output ? &*rtmp_output : nullptr,
|
||||||
latency_tracker);
|
&mcap_recorder,
|
||||||
if (!drain) {
|
latency_tracker);
|
||||||
const auto reason = format_error(drain.error());
|
if (!drain) {
|
||||||
restart_backend(reason, active_info);
|
const auto reason = format_error(drain.error());
|
||||||
continue;
|
restart_backend(reason, active_info);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (config.latency.ingest_max_frames > 0 && stats.pushed_frames >= config.latency.ingest_max_frames) {
|
if (config.latency.ingest_max_frames > 0 && stats.pushed_frames >= config.latency.ingest_max_frames) {
|
||||||
@@ -973,7 +1157,7 @@ int run_pipeline(const RuntimeConfig &config) {
|
|||||||
|
|
||||||
nats_client.Stop();
|
nats_client.Stop();
|
||||||
|
|
||||||
if (started) {
|
if (started && backend && !using_encoded_input) {
|
||||||
auto drain = drain_encoder(
|
auto drain = drain_encoder(
|
||||||
config,
|
config,
|
||||||
*backend,
|
*backend,
|
||||||
@@ -989,14 +1173,18 @@ int run_pipeline(const RuntimeConfig &config) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
(*backend)->shutdown();
|
if (backend) {
|
||||||
|
(*backend)->shutdown();
|
||||||
|
}
|
||||||
std::ignore = stop_mcap_recording(mcap_recorder);
|
std::ignore = stop_mcap_recording(mcap_recorder);
|
||||||
recorder_service.Stop();
|
recorder_service.Stop();
|
||||||
|
|
||||||
spdlog::info(
|
spdlog::info(
|
||||||
"PIPELINE_METRICS codec={} backend={} sync_messages={} status_messages={} torn_frames={} pushed_frames={} encoded_access_units={} resets={} format_rebuilds={} supervised_restarts={}",
|
"PIPELINE_METRICS codec={} backend={} sync_messages={} status_messages={} torn_frames={} pushed_frames={} encoded_access_units={} resets={} format_rebuilds={} supervised_restarts={}",
|
||||||
to_string(config.encoder.codec),
|
active_stream_info ? to_string(active_stream_info->codec) : to_string(config.encoder.codec),
|
||||||
(*backend)->backend_name(),
|
using_encoded_input
|
||||||
|
? std::string("encoded_passthrough")
|
||||||
|
: (backend ? std::string((*backend)->backend_name()) : std::string("encoder_uninitialized")),
|
||||||
stats.sync_messages,
|
stats.sync_messages,
|
||||||
stats.status_messages,
|
stats.status_messages,
|
||||||
stats.torn_frames,
|
stats.torn_frames,
|
||||||
|
|||||||
@@ -191,7 +191,7 @@ Result<std::vector<std::uint8_t>> decoder_config_to_annexb(
|
|||||||
CodecType codec,
|
CodecType codec,
|
||||||
std::span<const std::uint8_t> decoder_config) {
|
std::span<const std::uint8_t> decoder_config) {
|
||||||
if (decoder_config.empty()) {
|
if (decoder_config.empty()) {
|
||||||
return unexpected_error(ERR_PROTOCOL, "decoder config is required");
|
return std::vector<std::uint8_t>{};
|
||||||
}
|
}
|
||||||
if (looks_like_annexb(decoder_config)) {
|
if (looks_like_annexb(decoder_config)) {
|
||||||
return std::vector<std::uint8_t>(decoder_config.begin(), decoder_config.end());
|
return std::vector<std::uint8_t>(decoder_config.begin(), decoder_config.end());
|
||||||
@@ -248,10 +248,6 @@ public:
|
|||||||
static Result<RtmpOutput> create(
|
static Result<RtmpOutput> create(
|
||||||
const RuntimeConfig &config,
|
const RuntimeConfig &config,
|
||||||
const encode::EncodedStreamInfo &stream_info) {
|
const encode::EncodedStreamInfo &stream_info) {
|
||||||
if (stream_info.decoder_config.empty()) {
|
|
||||||
return unexpected_error(ERR_PROTOCOL, "libavformat RTMP requires encoder decoder_config/extradata");
|
|
||||||
}
|
|
||||||
|
|
||||||
avformat_network_init();
|
avformat_network_init();
|
||||||
|
|
||||||
LibavformatRtmpOutput output{};
|
LibavformatRtmpOutput output{};
|
||||||
@@ -442,9 +438,7 @@ public:
|
|||||||
};
|
};
|
||||||
auto decoder_config_annexb = decoder_config_to_annexb(stream_info.codec, stream_info.decoder_config);
|
auto decoder_config_annexb = decoder_config_to_annexb(stream_info.codec, stream_info.decoder_config);
|
||||||
if (!decoder_config_annexb) {
|
if (!decoder_config_annexb) {
|
||||||
return unexpected_error(
|
return std::unexpected(decoder_config_annexb.error());
|
||||||
ERR_PROTOCOL,
|
|
||||||
"ffmpeg_process RTMP requires decoder config: " + format_error(decoder_config_annexb.error()));
|
|
||||||
}
|
}
|
||||||
output.decoder_config_annexb_ = std::move(*decoder_config_annexb);
|
output.decoder_config_annexb_ = std::move(*decoder_config_annexb);
|
||||||
|
|
||||||
|
|||||||
@@ -179,7 +179,7 @@ UdpRtpPublisher &UdpRtpPublisher::operator=(UdpRtpPublisher &&other) noexcept {
|
|||||||
return *this;
|
return *this;
|
||||||
}
|
}
|
||||||
|
|
||||||
std::expected<UdpRtpPublisher, std::string> UdpRtpPublisher::create(const RuntimeConfig &config) {
|
std::expected<UdpRtpPublisher, std::string> UdpRtpPublisher::create(const RuntimeConfig &config, const CodecType codec) {
|
||||||
if (!config.outputs.rtp.enabled) {
|
if (!config.outputs.rtp.enabled) {
|
||||||
return std::unexpected("invalid RTP publisher init: RTP output disabled");
|
return std::unexpected("invalid RTP publisher init: RTP output disabled");
|
||||||
}
|
}
|
||||||
@@ -191,7 +191,7 @@ std::expected<UdpRtpPublisher, std::string> UdpRtpPublisher::create(const Runtim
|
|||||||
publisher.destination_host_ = *config.outputs.rtp.host;
|
publisher.destination_host_ = *config.outputs.rtp.host;
|
||||||
publisher.destination_port_ = *config.outputs.rtp.port;
|
publisher.destination_port_ = *config.outputs.rtp.port;
|
||||||
publisher.payload_type_ = config.outputs.rtp.payload_type;
|
publisher.payload_type_ = config.outputs.rtp.payload_type;
|
||||||
publisher.codec_ = config.encoder.codec;
|
publisher.codec_ = codec;
|
||||||
publisher.sequence_ = compute_initial_sequence();
|
publisher.sequence_ = compute_initial_sequence();
|
||||||
publisher.ssrc_ = compute_ssrc(
|
publisher.ssrc_ = compute_ssrc(
|
||||||
publisher.destination_host_,
|
publisher.destination_host_,
|
||||||
|
|||||||
Reference in New Issue
Block a user