#include "cvmmap_streamer/config/runtime_config.hpp" #include "cvmmap_streamer/core/frame_source.hpp" #include "cvmmap_streamer/core/status.hpp" #include "cvmmap_streamer/encode/encoder_backend.hpp" #include "cvmmap_streamer/ipc/contracts.hpp" #include "cvmmap_streamer/metrics/latency_tracker.hpp" #include "cvmmap_streamer/protocol/nats_request_reply_server.hpp" #include "cvmmap_streamer/protocol/rtmp_output.hpp" #include "cvmmap_streamer/protocol/rtp_publisher.hpp" #include "cvmmap_streamer/protocol/streamer_subjects.hpp" #include "cvmmap_streamer/record/mcap_record_sink.hpp" #include "cvmmap_streamer/record/mp4_record_writer.hpp" #include "proto/cvmmap_streamer/recorder_control.pb.h" #ifndef CVMMAP_STREAMER_HAS_MCAP #define CVMMAP_STREAMER_HAS_MCAP 0 #endif #ifndef CVMMAP_STREAMER_HAS_MCAP_DEPTH #define CVMMAP_STREAMER_HAS_MCAP_DEPTH 0 #endif #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include namespace cvmmap_streamer { void pipeline_tick() {} } namespace cvmmap_streamer::core { namespace { namespace ipc = cvmmap_streamer::ipc; namespace recorder_pb = cvmmap_streamer::proto; enum class PipelineExitCode : int { Success = 0, InputError = 2, SharedMemoryError = 3, SubscriberError = 4, InitializationError = 5, RuntimeError = 6, }; [[nodiscard]] constexpr int exit_code(PipelineExitCode code) { return static_cast(code); } struct ResolvedInputEndpoints { std::string instance_name; std::string namespace_name; std::string ipc_prefix; std::string base_name; std::string shm_name; std::string zmq_endpoint; std::string nats_target_key; }; [[nodiscard]] std::expected resolve_input_endpoints(const RuntimeConfig &config) { try { auto target = cvmmap::resolve_cvmmap_target_or_throw(config.input.uri); spdlog::info( "pipeline input resolved: uri='{}' shm_name='{}' zmq_endpoint='{}' nats_target_key='{}'", config.input.uri, target.shm_name, target.zmq_addr, target.nats_target_key); return ResolvedInputEndpoints{ .instance_name = target.instance, .namespace_name = target.namespace_name, .ipc_prefix = target.prefix, .base_name = target.base_name, .shm_name = target.shm_name, .zmq_endpoint = target.zmq_addr, .nats_target_key = target.nats_target_key, }; } catch (const std::exception &e) { return std::unexpected(std::string("invalid cvmmap input URI: ") + e.what()); } } struct SharedMemoryView { SharedMemoryView() = default; int fd{-1}; std::uint8_t *ptr{nullptr}; std::size_t bytes{0}; ~SharedMemoryView() { if (ptr != nullptr && ptr != MAP_FAILED && bytes > 0) { munmap(ptr, bytes); } if (fd >= 0) { close(fd); } } SharedMemoryView(const SharedMemoryView &) = delete; SharedMemoryView &operator=(const SharedMemoryView &) = delete; SharedMemoryView(SharedMemoryView &&other) noexcept { fd = std::exchange(other.fd, -1); ptr = std::exchange(other.ptr, nullptr); bytes = std::exchange(other.bytes, 0); } SharedMemoryView &operator=(SharedMemoryView &&other) noexcept { if (this == &other) { return *this; } if (ptr != nullptr && ptr != MAP_FAILED && bytes > 0) { munmap(ptr, bytes); } if (fd >= 0) { close(fd); } fd = std::exchange(other.fd, -1); ptr = std::exchange(other.ptr, nullptr); bytes = std::exchange(other.bytes, 0); return *this; } [[nodiscard]] std::span region() const { return std::span(ptr, bytes); } [[nodiscard]] static std::expected open_readonly(const std::string &raw_name) { const auto shm_name = raw_name.starts_with('/') ? raw_name : "/" + raw_name; const int fd = shm_open(shm_name.c_str(), O_RDONLY, 0); if (fd < 0) { return std::unexpected("shm_open failed for '" + shm_name + "'"); } struct stat statbuf{}; if (fstat(fd, &statbuf) != 0) { close(fd); return std::unexpected("fstat failed for '" + shm_name + "'"); } if (statbuf.st_size <= 0) { close(fd); return std::unexpected("shared memory size is zero for '" + shm_name + "'"); } const auto bytes = static_cast(statbuf.st_size); auto *mapped = static_cast(mmap(nullptr, bytes, PROT_READ, MAP_SHARED, fd, 0)); if (mapped == MAP_FAILED) { close(fd); return std::unexpected("mmap failed for '" + shm_name + "'"); } SharedMemoryView view; view.fd = fd; view.ptr = mapped; view.bytes = bytes; return view; } }; struct PipelineStats { std::uint64_t sync_messages{0}; std::uint64_t status_messages{0}; std::uint64_t torn_frames{0}; std::uint64_t pushed_frames{0}; std::uint64_t encoded_access_units{0}; std::uint64_t resets{0}; std::uint64_t format_rebuilds{0}; std::uint64_t supervised_restarts{0}; }; [[nodiscard]] std::string_view status_to_string(ipc::ModuleStatus status) { switch (status) { case ipc::ModuleStatus::Online: return "online"; case ipc::ModuleStatus::Offline: return "offline"; case ipc::ModuleStatus::StreamReset: return "stream_reset"; } return "unknown"; } [[nodiscard]] bool frame_info_equal(const ipc::FrameInfo &lhs, const ipc::FrameInfo &rhs) { return lhs.width == rhs.width && lhs.height == rhs.height && lhs.channels == rhs.channels && lhs.depth == rhs.depth && lhs.pixel_format == rhs.pixel_format && lhs.buffer_size == rhs.buffer_size; } [[nodiscard]] ipc::DepthUnit normalize_depth_unit_for_recording(ipc::DepthUnit unit) { if (unit == ipc::DepthUnit::Unknown) { return ipc::DepthUnit::Millimeter; } return unit; } [[nodiscard]] std::expected make_depth_map_view(const ipc::CoherentSnapshot &snapshot) { if (!snapshot.depth_info) { return std::unexpected("depth plane metadata is missing"); } if (snapshot.depth.empty()) { return std::unexpected("depth plane bytes are missing"); } const auto &depth_info = *snapshot.depth_info; if (depth_info.depth != ipc::Depth::F32 || depth_info.pixel_format != ipc::PixelFormat::GRAY) { return std::unexpected("depth plane must be GRAY/F32"); } const auto pixel_count = static_cast(depth_info.width) * static_cast(depth_info.height); const auto expected_bytes = pixel_count * sizeof(float); if (snapshot.depth.size() != expected_bytes) { return std::unexpected( "depth plane byte size does not match width*height*sizeof(float)"); } if ((reinterpret_cast(snapshot.depth.data()) % alignof(float)) != 0) { return std::unexpected("depth plane is not aligned for float access"); } return record::RawDepthMapView{ .timestamp_ns = snapshot.metadata.timestamp_ns, .width = depth_info.width, .height = depth_info.height, .source_unit = normalize_depth_unit_for_recording(snapshot.depth_unit), .pixels = std::span( reinterpret_cast(snapshot.depth.data()), pixel_count), }; } [[nodiscard]] bool snapshot_has_encoded_access_unit(const ipc::CoherentSnapshot &snapshot) { return !snapshot.encoded_access_unit.empty() && snapshot.encoded_codec != ipc::EncodedCodec::Unknown && snapshot.encoded_bitstream_format == ipc::EncodedBitstreamFormat::AnnexB; } [[nodiscard]] std::expected codec_type_from_snapshot(const ipc::CoherentSnapshot &snapshot) { switch (snapshot.encoded_codec) { case ipc::EncodedCodec::H264: return CodecType::H264; case ipc::EncodedCodec::H265: return CodecType::H265; case ipc::EncodedCodec::Unknown: default: return std::unexpected("encoded snapshot codec is unsupported"); } } [[nodiscard]] std::expected make_stream_info_from_snapshot( const ipc::CoherentSnapshot &snapshot) { auto codec = codec_type_from_snapshot(snapshot); if (!codec) { return std::unexpected(codec.error()); } if (snapshot.encoded_bitstream_format != ipc::EncodedBitstreamFormat::AnnexB) { return std::unexpected("encoded snapshot bitstream format is unsupported"); } if (snapshot.metadata.info.width == 0 || snapshot.metadata.info.height == 0) { return std::unexpected("encoded snapshot dimensions are invalid"); } encode::EncodedStreamInfo stream_info{}; stream_info.codec = *codec; stream_info.width = snapshot.metadata.info.width; stream_info.height = snapshot.metadata.info.height; stream_info.time_base_num = 1; stream_info.time_base_den = 1'000'000'000u; stream_info.frame_rate_num = snapshot.encoded_frame_rate_num == 0 ? 30u : snapshot.encoded_frame_rate_num; stream_info.frame_rate_den = snapshot.encoded_frame_rate_den == 0 ? 1u : snapshot.encoded_frame_rate_den; stream_info.bitstream_format = encode::EncodedBitstreamFormat::AnnexB; stream_info.decoder_config.clear(); return stream_info; } [[nodiscard]] std::expected make_access_unit_from_snapshot( const ipc::CoherentSnapshot &snapshot) { auto codec = codec_type_from_snapshot(snapshot); if (!codec) { return std::unexpected(codec.error()); } if (snapshot.encoded_access_unit.empty()) { return std::unexpected("encoded snapshot access unit is empty"); } encode::EncodedAccessUnit access_unit{}; access_unit.codec = *codec; access_unit.source_timestamp_ns = snapshot.metadata.timestamp_ns; access_unit.stream_pts_ns = snapshot.encoded_stream_pts_ns == 0 ? snapshot.metadata.timestamp_ns : snapshot.encoded_stream_pts_ns; access_unit.keyframe = (snapshot.encoded_flags & ipc::kEncodedFlagKeyframe) != 0; access_unit.annexb_bytes.assign( snapshot.encoded_access_unit.begin(), snapshot.encoded_access_unit.end()); return access_unit; } [[nodiscard]] bool stream_info_equal( const encode::EncodedStreamInfo &lhs, const encode::EncodedStreamInfo &rhs) { return lhs.codec == rhs.codec && lhs.width == rhs.width && lhs.height == rhs.height && lhs.time_base_num == rhs.time_base_num && lhs.time_base_den == rhs.time_base_den && lhs.frame_rate_num == rhs.frame_rate_num && lhs.frame_rate_den == rhs.frame_rate_den && lhs.bitstream_format == rhs.bitstream_format; } struct LiveOutputContinuityState { bool boundary_pending{false}; bool encoded_keyframe_required{false}; std::optional last_output_pts_ns{}; std::optional segment_input_start_pts_ns{}; std::uint64_t segment_output_start_pts_ns{0}; std::uint64_t nominal_frame_interval_ns{33'333'333ull}; void reset() { boundary_pending = false; encoded_keyframe_required = false; last_output_pts_ns.reset(); segment_input_start_pts_ns.reset(); segment_output_start_pts_ns = 0; nominal_frame_interval_ns = 33'333'333ull; } void note_reset_boundary() { boundary_pending = true; encoded_keyframe_required = true; segment_input_start_pts_ns.reset(); } void note_new_session(const encode::EncodedStreamInfo &stream_info) { reset(); update_nominal_frame_interval(stream_info); } void update_nominal_frame_interval(const encode::EncodedStreamInfo &stream_info) { if (stream_info.frame_rate_num == 0 || stream_info.frame_rate_den == 0) { nominal_frame_interval_ns = 33'333'333ull; return; } const auto numerator = static_cast(stream_info.frame_rate_den) * 1'000'000'000ull; const auto denominator = static_cast(stream_info.frame_rate_num); if (denominator == 0) { nominal_frame_interval_ns = 33'333'333ull; return; } const auto interval = numerator / denominator; nominal_frame_interval_ns = interval == 0 ? 1ull : interval; } [[nodiscard]] std::uint64_t remap_pts(const std::uint64_t input_pts_ns) { if (!last_output_pts_ns) { last_output_pts_ns = input_pts_ns; segment_input_start_pts_ns = input_pts_ns; segment_output_start_pts_ns = input_pts_ns; boundary_pending = false; return input_pts_ns; } if (boundary_pending || !segment_input_start_pts_ns) { segment_input_start_pts_ns = input_pts_ns; segment_output_start_pts_ns = *last_output_pts_ns + nominal_frame_interval_ns; boundary_pending = false; } const auto input_delta = input_pts_ns >= *segment_input_start_pts_ns ? input_pts_ns - *segment_input_start_pts_ns : 0ull; auto remapped_pts = segment_output_start_pts_ns + input_delta; if (remapped_pts <= *last_output_pts_ns) { remapped_pts = *last_output_pts_ns + nominal_frame_interval_ns; } last_output_pts_ns = remapped_pts; return remapped_pts; } }; [[nodiscard]] std::uint64_t body_tracking_timestamp_ns(const cvmmap::body_tracking_frame_t &frame) { if (frame.header.timestamp_ns != 0) { return frame.header.timestamp_ns; } return frame.header.sdk_timestamp_ns; } [[nodiscard]] float stream_fps(const encode::EncodedStreamInfo &stream_info) { if (stream_info.frame_rate_num == 0 || stream_info.frame_rate_den == 0) { return 30.0f; } return static_cast(stream_info.frame_rate_num) / static_cast(stream_info.frame_rate_den); } template [[nodiscard]] bool runtime_mcap_depth_enabled(const Config &config) { if constexpr (requires { config.record.mcap.depth_enabled; }) { return config.record.mcap.depth_enabled; } else { return true; } } [[nodiscard]] constexpr bool has_mcap_support() { return CVMMAP_STREAMER_HAS_MCAP != 0; } [[nodiscard]] constexpr bool has_mcap_depth_support() { return CVMMAP_STREAMER_HAS_MCAP_DEPTH != 0; } [[nodiscard]] std::string mcap_disabled_message() { return "MCAP recording support is not compiled into this build"; } [[nodiscard]] std::string mcap_depth_disabled_message() { return "MCAP depth recording support is not compiled into this build"; } #if CVMMAP_STREAMER_HAS_MCAP struct McapRecorderState { mutable std::mutex mutex{}; RuntimeConfig base_config{}; std::optional active_record_config{}; std::optional current_stream_info{}; std::optional sink{}; struct Status { bool can_record{true}; bool is_recording{false}; bool last_frame_ok{false}; std::uint32_t frames_ingested{0}; std::uint32_t frames_encoded{0}; std::string active_path{}; std::string error_message{}; } status{}; }; [[nodiscard]] protocol::RpcError make_recorder_rpc_error( const protocol::RpcErrorCode code, std::string message) { return protocol::RpcError{ .code = code, .message = std::move(message), }; } [[nodiscard]] RuntimeConfig make_mcap_record_config( const RuntimeConfig &base_config, const recorder_pb::McapStartRequest &request) { auto record_config = base_config; record_config.record.mcap.enabled = true; record_config.record.mcap.path = request.output_path(); if (request.has_topic()) { record_config.record.mcap.topic = request.topic(); } if (request.has_depth_topic()) { record_config.record.mcap.depth_topic = request.depth_topic(); } if (request.has_body_topic()) { record_config.record.mcap.body_topic = request.body_topic(); } if (request.has_frame_id()) { record_config.record.mcap.frame_id = request.frame_id(); } return record_config; } void reset_mcap_status_after_stop(McapRecorderState::Status &status) { status.is_recording = false; status.active_path.clear(); } [[nodiscard]] recorder_pb::McapRecorderState to_proto_mcap_state( const McapRecorderState::Status &status) { recorder_pb::McapRecorderState wire_status; wire_status.set_can_record(status.can_record); wire_status.set_is_recording(status.is_recording); wire_status.set_last_frame_ok(status.last_frame_ok); wire_status.set_frames_ingested(status.frames_ingested); wire_status.set_frames_encoded(status.frames_encoded); wire_status.set_active_path(status.active_path); wire_status.set_error_message(status.error_message); return wire_status; } template [[nodiscard]] Response make_ok_mcap_response(const McapRecorderState::Status &status) { Response response; response.set_code(recorder_pb::RPC_CODE_OK); *response.mutable_state() = to_proto_mcap_state(status); return response; } [[nodiscard]] std::expected start_mcap_recording( McapRecorderState &recorder_state, const recorder_pb::McapStartRequest &request) { std::lock_guard lock(recorder_state.mutex); if (!recorder_state.current_stream_info) { return std::unexpected(make_recorder_rpc_error( protocol::RpcErrorCode::Internal, "MCAP recorder is not ready; stream info unavailable")); } if (recorder_state.sink) { return std::unexpected(make_recorder_rpc_error( protocol::RpcErrorCode::Busy, "MCAP recording is already active")); } if (request.output_path().empty()) { return std::unexpected(make_recorder_rpc_error( protocol::RpcErrorCode::InvalidRequest, "output_path must not be empty")); } const auto output_path = std::filesystem::path(request.output_path()); std::error_code mkdir_error{}; if (output_path.has_parent_path()) { std::filesystem::create_directories(output_path.parent_path(), mkdir_error); if (mkdir_error) { return std::unexpected(make_recorder_rpc_error( protocol::RpcErrorCode::Internal, "failed to create MCAP output directory: " + mkdir_error.message())); } } auto record_config = make_mcap_record_config(recorder_state.base_config, request); if (request.has_compression()) { auto parsed = parse_mcap_compression(request.compression()); if (!parsed) { return std::unexpected(make_recorder_rpc_error( protocol::RpcErrorCode::InvalidRequest, parsed.error())); } record_config.record.mcap.compression = *parsed; } auto created = record::McapRecordSink::create(record_config, *recorder_state.current_stream_info); if (!created) { return std::unexpected(make_recorder_rpc_error( protocol::RpcErrorCode::Internal, "pipeline MCAP sink init failed: " + created.error())); } recorder_state.active_record_config = record_config; recorder_state.sink.emplace(std::move(*created)); recorder_state.status.can_record = true; recorder_state.status.is_recording = true; recorder_state.status.last_frame_ok = true; recorder_state.status.frames_ingested = 0; recorder_state.status.frames_encoded = 0; recorder_state.status.active_path = request.output_path(); recorder_state.status.error_message.clear(); return recorder_state.status; } [[nodiscard]] std::expected stop_mcap_recording( McapRecorderState &recorder_state, const recorder_pb::McapStopRequest &) { std::lock_guard lock(recorder_state.mutex); if (recorder_state.sink) { recorder_state.sink->close(); recorder_state.sink.reset(); } recorder_state.active_record_config.reset(); recorder_state.status.last_frame_ok = true; recorder_state.status.error_message.clear(); reset_mcap_status_after_stop(recorder_state.status); return recorder_state.status; } [[nodiscard]] std::expected get_mcap_recording_status( McapRecorderState &recorder_state, const recorder_pb::McapStatusRequest &) { std::lock_guard lock(recorder_state.mutex); recorder_state.status.can_record = true; return recorder_state.status; } #else struct McapRecorderState { struct Status {}; }; [[nodiscard]] protocol::RpcError make_mcap_unsupported_rpc_error() { return protocol::RpcError{ .code = protocol::RpcErrorCode::Unsupported, .message = mcap_disabled_message(), }; } [[nodiscard]] std::expected start_mcap_recording( McapRecorderState &, const recorder_pb::McapStartRequest &) { return std::unexpected(make_mcap_unsupported_rpc_error()); } [[nodiscard]] std::expected stop_mcap_recording( McapRecorderState &, const recorder_pb::McapStopRequest &) { return std::unexpected(make_mcap_unsupported_rpc_error()); } [[nodiscard]] std::expected get_mcap_recording_status( McapRecorderState &, const recorder_pb::McapStatusRequest &) { return std::unexpected(make_mcap_unsupported_rpc_error()); } void update_mcap_stream_info( McapRecorderState &, const encode::EncodedStreamInfo &) {} Status write_mcap_access_unit( McapRecorderState *, const encode::EncodedAccessUnit &) { return unexpected_error(ERR_UNSUPPORTED, mcap_disabled_message()); } Status write_mcap_body_message( McapRecorderState *, const record::RawBodyTrackingMessageView &) { return unexpected_error(ERR_UNSUPPORTED, mcap_disabled_message()); } Status write_mcap_depth_map( McapRecorderState *, const record::RawDepthMapView &) { return unexpected_error(ERR_UNSUPPORTED, mcap_depth_disabled_message()); } #endif struct Mp4RecorderStatus { bool can_record{false}; bool is_recording{false}; bool last_frame_ok{false}; std::uint32_t frames_ingested{0}; std::uint32_t frames_encoded{0}; std::string active_path{}; std::string error_message{}; }; struct Mp4RecorderState { mutable std::mutex mutex{}; RuntimeConfig base_config{}; std::optional current_frame_info{}; std::optional current_input_pixel_format{}; float current_fps{30.0f}; std::optional writer{}; std::optional first_frame_timestamp_ns{}; Mp4RecorderStatus status{}; }; [[nodiscard]] record::Mp4EncodeTuning make_mp4_encode_tuning(const RuntimeConfig &base_config) { return record::Mp4EncodeTuning{ .quality = record::kDefaultMp4Quality, .gop = base_config.encoder.gop, .b_frames = base_config.encoder.b_frames, }; } [[nodiscard]] std::expected mp4_input_pixel_format( const ipc::FrameInfo &frame_info) { if (frame_info.depth != ipc::Depth::U8) { return std::unexpected("MP4 recorder requires 8-bit color frames"); } switch (frame_info.pixel_format) { case ipc::PixelFormat::BGR: return record::Mp4InputPixelFormat::Bgr24; case ipc::PixelFormat::RGB: return record::Mp4InputPixelFormat::Rgb24; case ipc::PixelFormat::BGRA: return record::Mp4InputPixelFormat::Bgra32; case ipc::PixelFormat::RGBA: return record::Mp4InputPixelFormat::Rgba32; case ipc::PixelFormat::GRAY: return record::Mp4InputPixelFormat::Gray8; case ipc::PixelFormat::YUV: case ipc::PixelFormat::YUYV: return std::unexpected("MP4 recorder does not support packed YUV snapshot frames"); } return std::unexpected("MP4 recorder does not support the snapshot pixel format"); } void reset_mp4_status_after_stop(Mp4RecorderStatus &status) { status.is_recording = false; status.active_path.clear(); } void close_mp4_writer_with_error( Mp4RecorderState &recorder_state, const std::string &message) { spdlog::error("pipeline MP4 recorder stopping after error: {}", message); if (recorder_state.writer) { auto flush = recorder_state.writer->flush(); if (!flush) { spdlog::warn("pipeline MP4 flush failed while closing recorder: {}", flush.error()); } recorder_state.writer.reset(); } recorder_state.first_frame_timestamp_ns.reset(); recorder_state.status.last_frame_ok = false; recorder_state.status.error_message = message; reset_mp4_status_after_stop(recorder_state.status); } [[nodiscard]] protocol::RpcError make_mp4_rpc_error( const protocol::RpcErrorCode code, std::string message) { return protocol::RpcError{ .code = code, .message = std::move(message), }; } [[nodiscard]] recorder_pb::Mp4RecorderState to_proto_mp4_state(const Mp4RecorderStatus &status) { recorder_pb::Mp4RecorderState wire_status; wire_status.set_can_record(status.can_record); wire_status.set_is_recording(status.is_recording); wire_status.set_last_frame_ok(status.last_frame_ok); wire_status.set_frames_ingested(status.frames_ingested); wire_status.set_frames_encoded(status.frames_encoded); wire_status.set_active_path(status.active_path); wire_status.set_error_message(status.error_message); return wire_status; } template [[nodiscard]] Response make_ok_mp4_response(const Mp4RecorderStatus &status) { Response response; response.set_code(recorder_pb::RPC_CODE_OK); *response.mutable_state() = to_proto_mp4_state(status); return response; } [[nodiscard]] std::expected start_mp4_recording( Mp4RecorderState &recorder_state, const recorder_pb::Mp4StartRequest &request) { std::lock_guard lock(recorder_state.mutex); if (!recorder_state.current_frame_info) { return std::unexpected(make_mp4_rpc_error( protocol::RpcErrorCode::Internal, "MP4 recorder is not ready; snapshot frame info unavailable")); } if (!recorder_state.current_input_pixel_format) { return std::unexpected(make_mp4_rpc_error( protocol::RpcErrorCode::Unsupported, recorder_state.status.error_message.empty() ? "MP4 recorder does not support the current snapshot format" : recorder_state.status.error_message)); } if (recorder_state.writer) { return std::unexpected(make_mp4_rpc_error( protocol::RpcErrorCode::Busy, "MP4 recording is already active")); } const auto output_path = std::filesystem::path(request.output_path()); if (output_path.empty()) { return std::unexpected(make_mp4_rpc_error( protocol::RpcErrorCode::InvalidRequest, "output_path must not be empty")); } std::error_code mkdir_error{}; if (output_path.has_parent_path()) { std::filesystem::create_directories(output_path.parent_path(), mkdir_error); if (mkdir_error) { return std::unexpected(make_mp4_rpc_error( protocol::RpcErrorCode::Internal, "failed to create MP4 output directory: " + mkdir_error.message())); } } record::Mp4RecordWriter writer{}; auto open = writer.open( output_path, recorder_state.base_config.encoder.codec, recorder_state.base_config.encoder.device, recorder_state.current_frame_info->width, recorder_state.current_frame_info->height, recorder_state.current_fps, make_mp4_encode_tuning(recorder_state.base_config), *recorder_state.current_input_pixel_format); if (!open) { return std::unexpected(make_mp4_rpc_error( protocol::RpcErrorCode::Internal, "pipeline MP4 writer init failed: " + open.error())); } recorder_state.writer.emplace(std::move(writer)); recorder_state.first_frame_timestamp_ns.reset(); recorder_state.status.can_record = true; recorder_state.status.is_recording = true; recorder_state.status.last_frame_ok = true; recorder_state.status.frames_ingested = 0; recorder_state.status.frames_encoded = 0; recorder_state.status.active_path = request.output_path(); recorder_state.status.error_message.clear(); return recorder_state.status; } [[nodiscard]] std::expected stop_mp4_recording( Mp4RecorderState &recorder_state, const recorder_pb::Mp4StopRequest &) { std::lock_guard lock(recorder_state.mutex); if (recorder_state.writer) { auto flush = recorder_state.writer->flush(); recorder_state.writer.reset(); recorder_state.first_frame_timestamp_ns.reset(); if (!flush) { recorder_state.status.last_frame_ok = false; recorder_state.status.error_message = "MP4 recording flush failed: " + flush.error(); reset_mp4_status_after_stop(recorder_state.status); return std::unexpected(make_mp4_rpc_error( protocol::RpcErrorCode::Internal, recorder_state.status.error_message)); } } recorder_state.status.last_frame_ok = true; recorder_state.status.error_message.clear(); reset_mp4_status_after_stop(recorder_state.status); return recorder_state.status; } [[nodiscard]] std::expected get_mp4_recording_status( Mp4RecorderState &recorder_state, const recorder_pb::Mp4StatusRequest &) { std::lock_guard lock(recorder_state.mutex); recorder_state.status.can_record = recorder_state.current_frame_info.has_value() && recorder_state.current_input_pixel_format.has_value(); return recorder_state.status; } void update_mp4_source_info( Mp4RecorderState &recorder_state, const ipc::FrameInfo &frame_info, const float fps) { std::lock_guard lock(recorder_state.mutex); const auto previous_frame_info = recorder_state.current_frame_info; recorder_state.current_frame_info = frame_info; if (fps > 0.0f) { recorder_state.current_fps = fps; } auto input_pixel_format = mp4_input_pixel_format(frame_info); if (!input_pixel_format) { recorder_state.current_input_pixel_format.reset(); recorder_state.status.can_record = false; recorder_state.status.error_message = input_pixel_format.error(); if (recorder_state.writer) { close_mp4_writer_with_error(recorder_state, input_pixel_format.error()); } return; } const bool source_changed = recorder_state.writer.has_value() && previous_frame_info.has_value() && !frame_info_equal(*previous_frame_info, frame_info); recorder_state.current_input_pixel_format = *input_pixel_format; recorder_state.status.can_record = true; recorder_state.status.error_message.clear(); if (source_changed) { close_mp4_writer_with_error( recorder_state, "MP4 recording stopped after snapshot frame format changed"); return; } } void write_mp4_frame( Mp4RecorderState *recorder_state, const ipc::CoherentSnapshot &snapshot) { if (recorder_state == nullptr) { return; } std::lock_guard lock(recorder_state->mutex); if (!recorder_state->writer) { return; } if (snapshot.left.empty()) { close_mp4_writer_with_error(*recorder_state, "MP4 recorder received an empty left frame"); return; } if (snapshot.metadata.info.height == 0) { close_mp4_writer_with_error(*recorder_state, "MP4 recorder received an invalid frame height"); return; } const auto row_stride_bytes = static_cast(snapshot.metadata.info.buffer_size) / static_cast(snapshot.metadata.info.height); if (row_stride_bytes == 0) { close_mp4_writer_with_error(*recorder_state, "MP4 recorder computed a zero row stride"); return; } if (!recorder_state->first_frame_timestamp_ns) { recorder_state->first_frame_timestamp_ns = snapshot.metadata.timestamp_ns; } const auto relative_timestamp_ns = snapshot.metadata.timestamp_ns >= *recorder_state->first_frame_timestamp_ns ? snapshot.metadata.timestamp_ns - *recorder_state->first_frame_timestamp_ns : 0ull; auto write = recorder_state->writer->write_frame( snapshot.left.data(), row_stride_bytes, relative_timestamp_ns); if (!write) { close_mp4_writer_with_error(*recorder_state, "MP4 frame write failed: " + write.error()); spdlog::error("pipeline MP4 frame write failed: {}", write.error()); return; } recorder_state->status.last_frame_ok = true; recorder_state->status.is_recording = true; recorder_state->status.frames_ingested += 1; recorder_state->status.frames_encoded += 1; } #if CVMMAP_STREAMER_HAS_MCAP void update_mcap_stream_info( McapRecorderState &recorder_state, const encode::EncodedStreamInfo &stream_info) { std::lock_guard lock(recorder_state.mutex); recorder_state.current_stream_info = stream_info; if (recorder_state.sink) { auto updated = recorder_state.sink->update_stream_info(stream_info); if (!updated) { recorder_state.status.last_frame_ok = false; recorder_state.status.is_recording = false; recorder_state.status.active_path.clear(); recorder_state.sink->close(); recorder_state.sink.reset(); recorder_state.active_record_config.reset(); spdlog::error("pipeline MCAP stream update failed: {}", updated.error()); } } } record::McapRecordSink *lock_mcap_sink( McapRecorderState *recorder_state, std::unique_lock &lock) { if (recorder_state == nullptr) { return nullptr; } lock = std::unique_lock(recorder_state->mutex); if (!recorder_state->sink) { return nullptr; } return &*recorder_state->sink; } Status write_mcap_access_unit( McapRecorderState *recorder_state, const encode::EncodedAccessUnit &access_unit) { std::unique_lock lock{}; auto *sink = lock_mcap_sink(recorder_state, lock); if (sink == nullptr) { return {}; } auto write = sink->write_access_unit(access_unit); if (!write) { recorder_state->status.last_frame_ok = false; return unexpected_error(ERR_SERIALIZATION, write.error()); } recorder_state->status.last_frame_ok = true; recorder_state->status.is_recording = true; recorder_state->status.frames_ingested += 1; recorder_state->status.frames_encoded += 1; return {}; } Status write_mcap_body_message( McapRecorderState *recorder_state, const record::RawBodyTrackingMessageView &body_message) { std::unique_lock lock{}; auto *sink = lock_mcap_sink(recorder_state, lock); if (sink == nullptr) { return {}; } auto write = sink->write_body_tracking_message(body_message); if (!write) { recorder_state->status.last_frame_ok = false; return unexpected_error(ERR_SERIALIZATION, write.error()); } recorder_state->status.last_frame_ok = true; return {}; } Status write_mcap_depth_map( McapRecorderState *recorder_state, const record::RawDepthMapView &depth_map) { std::unique_lock lock{}; auto *sink = lock_mcap_sink(recorder_state, lock); if (sink == nullptr) { return {}; } auto write = sink->write_depth_map(depth_map); if (!write) { recorder_state->status.last_frame_ok = false; return unexpected_error(ERR_SERIALIZATION, write.error()); } recorder_state->status.last_frame_ok = true; return {}; } #endif [[nodiscard]] Status publish_access_units( const RuntimeConfig &config, std::vector &&access_units, PipelineStats &stats, protocol::UdpRtpPublisher *rtp_publisher, protocol::RtmpOutput *rtmp_output, McapRecorderState *mcap_recorder, LiveOutputContinuityState *live_output_continuity, metrics::IngestEmitLatencyTracker &latency_tracker) { for (auto &access_unit : access_units) { if (access_unit.annexb_bytes.empty()) { continue; } if (config.latency.emit_stall_ms > 0) { std::this_thread::sleep_for(std::chrono::milliseconds(config.latency.emit_stall_ms)); latency_tracker.note_emit_stall(); } const auto should_rewrite_live_timestamps = live_output_continuity != nullptr && (rtp_publisher != nullptr || rtmp_output != nullptr); auto live_access_unit = access_unit; const bool live_boundary_packet = should_rewrite_live_timestamps && live_output_continuity->boundary_pending; if (should_rewrite_live_timestamps) { live_access_unit.stream_pts_ns = live_output_continuity->remap_pts(access_unit.stream_pts_ns); } if (live_boundary_packet) { spdlog::info( "PIPELINE_RESET_CONTINUITY first live packet keyframe={} source_pts_ns={} remapped_pts_ns={}", access_unit.keyframe ? "true" : "false", access_unit.stream_pts_ns, live_access_unit.stream_pts_ns); } if (rtp_publisher != nullptr) { rtp_publisher->publish_access_unit(live_access_unit.annexb_bytes, live_access_unit.stream_pts_ns); } if (rtmp_output != nullptr) { auto publish = (*rtmp_output)->publish_access_unit(live_access_unit); if (!publish) { return std::unexpected(publish.error()); } } if (mcap_recorder != nullptr) { auto write = write_mcap_access_unit(mcap_recorder, access_unit); if (!write) { return std::unexpected(write.error()); } } latency_tracker.note_emit(); stats.encoded_access_units += 1; if (stats.encoded_access_units <= 10 || (stats.encoded_access_units % 30) == 0) { spdlog::info( "ENCODED_AU codec={} count={} bytes={} keyframe={}", to_string(access_unit.codec), stats.encoded_access_units, access_unit.annexb_bytes.size(), access_unit.keyframe ? "true" : "false"); } } return {}; } [[nodiscard]] Status drain_encoder( const RuntimeConfig &config, encode::EncoderBackend &backend, bool flushing, PipelineStats &stats, protocol::UdpRtpPublisher *rtp_publisher, protocol::RtmpOutput *rtmp_output, McapRecorderState *mcap_recorder, LiveOutputContinuityState *live_output_continuity, metrics::IngestEmitLatencyTracker &latency_tracker) { auto drained = flushing ? backend->flush() : backend->drain(); if (!drained) { return std::unexpected(drained.error()); } return publish_access_units( config, std::move(*drained), stats, rtp_publisher, rtmp_output, mcap_recorder, live_output_continuity, latency_tracker); } } int run_pipeline(const RuntimeConfig &config) { auto input_endpoints = resolve_input_endpoints(config); if (!input_endpoints) { spdlog::error("{}", input_endpoints.error()); return exit_code(PipelineExitCode::InputError); } auto source = make_frame_source(config); if (!source) { spdlog::error("pipeline input source selection failed: {}", source.error()); return exit_code(PipelineExitCode::InputError); } auto source_prepare = (*source)->prepare_runtime(); if (!source_prepare) { spdlog::error("pipeline source backend '{}' setup failed: {}", (*source)->backend_name(), source_prepare.error()); return exit_code(PipelineExitCode::InputError); } std::optional backend{}; auto shm = SharedMemoryView::open_readonly(input_endpoints->shm_name); if (!shm) { spdlog::error("pipeline open shared memory failed: {}", shm.error()); return exit_code(PipelineExitCode::SharedMemoryError); } if (shm->bytes <= ipc::kShmPayloadOffset) { spdlog::error("pipeline invalid shared memory size: {}", shm->bytes); return exit_code(PipelineExitCode::SharedMemoryError); } std::vector snapshot_buffer( shm->bytes - ipc::kShmPayloadOffset, static_cast(0)); zmq::context_t zmq_ctx{1}; zmq::socket_t subscriber(zmq_ctx, zmq::socket_type::sub); try { subscriber.set(zmq::sockopt::subscribe, ""); subscriber.connect(input_endpoints->zmq_endpoint); } catch (const zmq::error_t &e) { spdlog::error("pipeline subscribe failed on '{}': {}", input_endpoints->zmq_endpoint, e.what()); return exit_code(PipelineExitCode::SubscriberError); } std::optional rtp_publisher{}; std::optional rtmp_output{}; #if CVMMAP_STREAMER_HAS_MCAP McapRecorderState mcap_recorder{}; mcap_recorder.base_config = config; mcap_recorder.status.can_record = true; #endif Mp4RecorderState mp4_recorder{}; mp4_recorder.base_config = config; cvmmap::NatsControlClient nats_client( input_endpoints->nats_target_key, config.input.nats_url); protocol::NatsRequestReplyServer recorder_rpc_server({ .nats_url = config.input.nats_url, .instance_name = input_endpoints->instance_name, .namespace_name = input_endpoints->namespace_name, .ipc_prefix = input_endpoints->ipc_prefix, .base_name = input_endpoints->base_name, .nats_target_key = input_endpoints->nats_target_key, .recording_formats = has_mcap_support() ? "mp4,mcap" : "mp4", }); std::mutex nats_event_mutex{}; std::deque> pending_body_packets{}; std::deque pending_status_codes{}; nats_client.SetModuleStatusCallback([&nats_event_mutex, &pending_status_codes](cvmmap::ModuleStatus status_code) { std::lock_guard lock(nats_event_mutex); pending_status_codes.push_back(static_cast(status_code)); }); nats_client.SetBodyTrackingRawCallback( [&nats_event_mutex, &pending_body_packets](std::span bytes) { std::lock_guard lock(nats_event_mutex); pending_body_packets.emplace_back(bytes.begin(), bytes.end()); }); if (!nats_client.Start()) { spdlog::error("pipeline NATS subscribe failed on '{}'", config.input.nats_url); return exit_code(PipelineExitCode::SubscriberError); } recorder_rpc_server.register_proto_endpoint< recorder_pb::Mp4StartRequest, recorder_pb::Mp4StartResponse>( "recorder_mp4_start", protocol::subject_recorder_mp4_start(input_endpoints->nats_target_key), [&mp4_recorder](const recorder_pb::Mp4StartRequest &request) -> std::expected { auto status = start_mp4_recording(mp4_recorder, request); if (!status) { return std::unexpected(status.error()); } return make_ok_mp4_response(*status); }); recorder_rpc_server.register_proto_endpoint< recorder_pb::Mp4StopRequest, recorder_pb::Mp4StopResponse>( "recorder_mp4_stop", protocol::subject_recorder_mp4_stop(input_endpoints->nats_target_key), [&mp4_recorder](const recorder_pb::Mp4StopRequest &request) -> std::expected { auto status = stop_mp4_recording(mp4_recorder, request); if (!status) { return std::unexpected(status.error()); } return make_ok_mp4_response(*status); }); recorder_rpc_server.register_proto_endpoint< recorder_pb::Mp4StatusRequest, recorder_pb::Mp4StatusResponse>( "recorder_mp4_status", protocol::subject_recorder_mp4_status(input_endpoints->nats_target_key), [&mp4_recorder](const recorder_pb::Mp4StatusRequest &request) -> std::expected { auto status = get_mp4_recording_status(mp4_recorder, request); if (!status) { return std::unexpected(status.error()); } return make_ok_mp4_response(*status); }); #if CVMMAP_STREAMER_HAS_MCAP recorder_rpc_server.register_proto_endpoint< recorder_pb::McapStartRequest, recorder_pb::McapStartResponse>( "recorder_mcap_start", protocol::subject_recorder_mcap_start(input_endpoints->nats_target_key), [&mcap_recorder](const recorder_pb::McapStartRequest &request) -> std::expected { auto status = start_mcap_recording(mcap_recorder, request); if (!status) { return std::unexpected(status.error()); } return make_ok_mcap_response(*status); }); recorder_rpc_server.register_proto_endpoint< recorder_pb::McapStopRequest, recorder_pb::McapStopResponse>( "recorder_mcap_stop", protocol::subject_recorder_mcap_stop(input_endpoints->nats_target_key), [&mcap_recorder](const recorder_pb::McapStopRequest &request) -> std::expected { auto status = stop_mcap_recording(mcap_recorder, request); if (!status) { return std::unexpected(status.error()); } return make_ok_mcap_response(*status); }); recorder_rpc_server.register_proto_endpoint< recorder_pb::McapStatusRequest, recorder_pb::McapStatusResponse>( "recorder_mcap_status", protocol::subject_recorder_mcap_status(input_endpoints->nats_target_key), [&mcap_recorder](const recorder_pb::McapStatusRequest &request) -> std::expected { auto status = get_mcap_recording_status(mcap_recorder, request); if (!status) { return std::unexpected(status.error()); } return make_ok_mcap_response(*status); }); #endif if (!recorder_rpc_server.start()) { spdlog::error("pipeline streamer recorder service failed on '{}'", config.input.nats_url); nats_client.Stop(); return exit_code(PipelineExitCode::SubscriberError); } PipelineStats stats{}; metrics::IngestEmitLatencyTracker latency_tracker{}; LiveOutputContinuityState live_output_continuity{}; const bool keep_live_outputs_on_reset = config.latency.keep_stream_on_reset && (config.outputs.rtp.enabled || config.outputs.rtmp.enabled); bool producer_offline{false}; bool started{false}; bool using_encoded_input{false}; std::optional active_info{}; std::optional active_stream_info{}; std::optional restart_target_info{}; bool restart_pending{false}; bool warned_unknown_depth_unit{false}; const auto restart_backend = [&](std::string_view reason, std::optional target_info) { if (started) { stats.supervised_restarts += 1; } spdlog::warn( "PIPELINE_RESTART mode={} reason='{}' restarts={}", using_encoded_input ? "encoded_passthrough" : (backend ? std::string((*backend)->backend_name()) : std::string("encoder_uninitialized")), reason, stats.supervised_restarts); if (backend) { (*backend)->shutdown(); } started = false; restart_pending = true; restart_target_info = target_info; warned_unknown_depth_unit = false; using_encoded_input = false; active_stream_info.reset(); live_output_continuity.reset(); rtp_publisher.reset(); rtmp_output.reset(); }; const auto ensure_encoder_backend = [&]() -> Status { if (backend) { return {}; } auto created = encode::make_encoder_backend(config); if (!created) { return std::unexpected(created.error()); } backend.emplace(std::move(*created)); return {}; }; const auto start_outputs_from_stream_info = [&](const encode::EncodedStreamInfo &stream_info, const ipc::FrameInfo &target_info) -> Status { const bool preserving_live_outputs = keep_live_outputs_on_reset && live_output_continuity.boundary_pending && active_stream_info.has_value() && stream_info_equal(*active_stream_info, stream_info) && ((config.outputs.rtp.enabled && rtp_publisher.has_value()) || (config.outputs.rtmp.enabled && rtmp_output.has_value())); if (preserving_live_outputs) { spdlog::info( "PIPELINE_RESET_CONTINUITY preserving live outputs codec={} width={} height={}", to_string(stream_info.codec), stream_info.width, stream_info.height); live_output_continuity.update_nominal_frame_interval(stream_info); } else { if (keep_live_outputs_on_reset && live_output_continuity.boundary_pending && active_stream_info.has_value() && !stream_info_equal(*active_stream_info, stream_info)) { spdlog::warn( "PIPELINE_RESET_CONTINUITY fallback to output rebuild reason='stream_info_change' old_codec={} new_codec={} old={}x{} new={}x{}", to_string(active_stream_info->codec), to_string(stream_info.codec), active_stream_info->width, active_stream_info->height, stream_info.width, stream_info.height); } rtp_publisher.reset(); rtmp_output.reset(); if (config.outputs.rtp.enabled) { auto created = protocol::UdpRtpPublisher::create(config, stream_info.codec); if (!created) { return unexpected_error( ERR_INTERNAL, "pipeline RTP publisher init failed: " + created.error()); } rtp_publisher.emplace(std::move(*created)); } if (config.outputs.rtmp.enabled) { auto created = protocol::make_rtmp_output(config, stream_info); if (!created) { return unexpected_error( created.error().code, "pipeline RTMP output init failed: " + format_error(created.error())); } rtmp_output.emplace(std::move(*created)); } live_output_continuity.note_new_session(stream_info); } #if CVMMAP_STREAMER_HAS_MCAP update_mcap_stream_info(mcap_recorder, stream_info); if (config.record.mcap.enabled) { std::lock_guard lock(mcap_recorder.mutex); if (!mcap_recorder.sink) { auto created = record::McapRecordSink::create(config, stream_info); if (!created) { return unexpected_error( ERR_INTERNAL, "pipeline MCAP sink init failed: " + created.error()); } mcap_recorder.active_record_config = config; mcap_recorder.sink.emplace(std::move(*created)); mcap_recorder.status.can_record = true; mcap_recorder.status.is_recording = true; mcap_recorder.status.last_frame_ok = true; mcap_recorder.status.active_path = config.record.mcap.path; } } #else if (config.record.mcap.enabled) { return unexpected_error(ERR_UNSUPPORTED, mcap_disabled_message()); } #endif update_mp4_source_info(mp4_recorder, target_info, stream_fps(stream_info)); started = true; restart_pending = false; restart_target_info.reset(); warned_unknown_depth_unit = false; active_info = target_info; active_stream_info = stream_info; return {}; }; const auto attempt_raw_backend_start = [&](const ipc::FrameInfo &target_info) -> Status { auto ensure = ensure_encoder_backend(); if (!ensure) { return ensure; } (*backend)->shutdown(); auto init = (*backend)->init(config, target_info); if (!init) { return std::unexpected(init.error()); } auto stream_info = (*backend)->stream_info(); if (!stream_info) { return unexpected_error( stream_info.error().code, "pipeline encoder stream info unavailable: " + format_error(stream_info.error())); } using_encoded_input = false; return start_outputs_from_stream_info(*stream_info, target_info); }; const auto attempt_encoded_passthrough_start = [&]( const ipc::CoherentSnapshot &snapshot) -> Status { auto stream_info = make_stream_info_from_snapshot(snapshot); if (!stream_info) { return unexpected_error(ERR_PROTOCOL, stream_info.error()); } if (backend) { (*backend)->shutdown(); } using_encoded_input = true; return start_outputs_from_stream_info(*stream_info, snapshot.metadata.info); }; const auto idle_timeout_enabled = config.latency.ingest_idle_timeout_ms > 0; const auto idle_timeout = std::chrono::milliseconds(config.latency.ingest_idle_timeout_ms); auto last_event = std::chrono::steady_clock::now(); while (true) { std::deque> body_packets; std::deque status_codes; { std::lock_guard lock(nats_event_mutex); body_packets.swap(pending_body_packets); status_codes.swap(pending_status_codes); } const bool had_nats_events = !body_packets.empty() || !status_codes.empty(); if (had_nats_events) { last_event = std::chrono::steady_clock::now(); } for (const auto status_code : status_codes) { stats.status_messages += 1; switch (static_cast(status_code)) { case cvmmap::ModuleStatus::Online: spdlog::info("pipeline status event status=online"); producer_offline = false; break; case cvmmap::ModuleStatus::Offline: spdlog::info("pipeline status event status=offline"); producer_offline = true; break; case cvmmap::ModuleStatus::StreamReset: spdlog::info("pipeline status event status=stream_reset"); stats.resets += 1; if (keep_live_outputs_on_reset) { live_output_continuity.note_reset_boundary(); if (!using_encoded_input && backend) { (*backend)->shutdown(); started = false; } restart_pending = false; restart_target_info.reset(); spdlog::info( "PIPELINE_RESET_CONTINUITY armed outputs_active={} encoded_input={}", (rtp_publisher.has_value() || rtmp_output.has_value()) ? "true" : "false", using_encoded_input ? "true" : "false"); } else { if (backend) { (*backend)->shutdown(); } started = false; restart_pending = false; restart_target_info.reset(); active_stream_info.reset(); using_encoded_input = false; active_info.reset(); rtp_publisher.reset(); rtmp_output.reset(); } break; } } for (const auto &body_bytes_vec : body_packets) { const auto body_bytes = std::span( body_bytes_vec.data(), body_bytes_vec.size()); if (body_bytes.empty()) { continue; } auto parsed_body = cvmmap::parse_body_tracking_message(body_bytes); if (!parsed_body) { spdlog::warn("pipeline body packet parse error: {}", parsed_body.error()); continue; } #if CVMMAP_STREAMER_HAS_MCAP auto write_body = write_mcap_body_message(&mcap_recorder, record::RawBodyTrackingMessageView{ .timestamp_ns = body_tracking_timestamp_ns(*parsed_body), .bytes = body_bytes, }); if (!write_body) { const auto reason = "pipeline body MCAP write failed: " + format_error(write_body.error()); restart_backend(reason, active_info); break; } #endif } if (backend && !using_encoded_input) { auto poll = (*backend)->poll(); if (!poll) { const auto reason = format_error(poll.error()); restart_backend(reason, active_info); } } std::array poll_items{{ {subscriber.handle(), 0, ZMQ_POLLIN, 0}, }}; try { zmq::poll(poll_items, std::chrono::milliseconds{20}); } catch (const zmq::error_t &e) { spdlog::error("pipeline poll failed: {}", e.what()); return exit_code(PipelineExitCode::SubscriberError); } const bool frame_socket_ready = (poll_items[0].revents & ZMQ_POLLIN) != 0; if (!frame_socket_ready) { const auto now = std::chrono::steady_clock::now(); if (restart_pending && restart_target_info && backend && !using_encoded_input) { auto start_result = attempt_raw_backend_start(*restart_target_info); if (!start_result) { spdlog::error("pipeline backend restart failed: {}", format_error(start_result.error())); return exit_code(PipelineExitCode::RuntimeError); } } if (idle_timeout_enabled && now - last_event >= idle_timeout) { spdlog::info("pipeline idle timeout reached ({} ms), stopping", config.latency.ingest_idle_timeout_ms); break; } if (!producer_offline && started && backend && !using_encoded_input) { auto drain = drain_encoder( config, *backend, false, stats, rtp_publisher ? &*rtp_publisher : nullptr, rtmp_output ? &*rtmp_output : nullptr, #if CVMMAP_STREAMER_HAS_MCAP &mcap_recorder, #else nullptr, #endif keep_live_outputs_on_reset ? &live_output_continuity : nullptr, latency_tracker); if (!drain) { const auto reason = format_error(drain.error()); restart_backend(reason, active_info); } } continue; } zmq::message_t message; const auto recv_result = subscriber.recv(message, zmq::recv_flags::dontwait); if (!recv_result) { continue; } last_event = std::chrono::steady_clock::now(); auto bytes = std::span( static_cast(message.data()), message.size()); if (bytes.empty()) { continue; } if (bytes[0] == ipc::kFrameTopicMagic) { stats.sync_messages += 1; auto sync = ipc::parse_sync_message(bytes); if (!sync) { spdlog::warn("pipeline sync parse error: {}", ipc::to_string(sync.error())); continue; } auto snapshot = ipc::read_coherent_snapshot( shm->region(), snapshot_buffer, [&]() { if (config.latency.snapshot_copy_delay_us > 0) { std::this_thread::sleep_for(std::chrono::microseconds(config.latency.snapshot_copy_delay_us)); } }); if (!snapshot) { if (snapshot.error() == ipc::SnapshotError::TornRead) { stats.torn_frames += 1; } spdlog::warn("pipeline snapshot rejected: {}", ipc::to_string(snapshot.error())); continue; } const bool has_encoded_access_unit = snapshot_has_encoded_access_unit(*snapshot); const bool want_encoded_input = config.input.video_source == InputVideoSource::Encoded || (config.input.video_source == InputVideoSource::Auto && has_encoded_access_unit); update_mp4_source_info( mp4_recorder, snapshot->metadata.info, active_stream_info ? stream_fps(*active_stream_info) : 30.0f); if (config.input.video_source == InputVideoSource::Encoded && !has_encoded_access_unit) { spdlog::error("pipeline encoded input requested but SHM snapshot does not contain an encoded access unit"); return exit_code(PipelineExitCode::InitializationError); } if (active_info && !frame_info_equal(*active_info, snapshot->metadata.info)) { stats.format_rebuilds += 1; restart_backend("frame_info_change", snapshot->metadata.info); } if (started && using_encoded_input != want_encoded_input) { stats.format_rebuilds += 1; restart_backend("input_video_source_switch", snapshot->metadata.info); } if (want_encoded_input) { auto stream_info = make_stream_info_from_snapshot(*snapshot); if (!stream_info) { spdlog::error("pipeline encoded snapshot metadata invalid: {}", stream_info.error()); return exit_code(PipelineExitCode::InitializationError); } if (active_stream_info && !stream_info_equal(*active_stream_info, *stream_info)) { stats.format_rebuilds += 1; restart_backend("encoded_stream_info_change", snapshot->metadata.info); } if (!started || restart_pending) { auto start_result = attempt_encoded_passthrough_start(*snapshot); if (!start_result) { spdlog::error("pipeline encoded passthrough init failed: {}", format_error(start_result.error())); return exit_code(PipelineExitCode::InitializationError); } } } else if (!started || restart_pending) { const auto target_info = restart_target_info.value_or(snapshot->metadata.info); auto start_result = attempt_raw_backend_start(target_info); if (!start_result) { spdlog::error("pipeline backend init failed: {}", format_error(start_result.error())); return exit_code(PipelineExitCode::InitializationError); } } latency_tracker.note_ingest(); write_mp4_frame(&mp4_recorder, *snapshot); if (want_encoded_input) { auto access_unit = make_access_unit_from_snapshot(*snapshot); if (!access_unit) { const auto reason = "pipeline encoded snapshot invalid: " + access_unit.error(); restart_backend(reason, snapshot->metadata.info); continue; } if (keep_live_outputs_on_reset && live_output_continuity.encoded_keyframe_required && !access_unit->keyframe) { spdlog::info( "PIPELINE_RESET_CONTINUITY dropping encoded access unit until keyframe pts_ns={} source_timestamp_ns={}", access_unit->stream_pts_ns, access_unit->source_timestamp_ns); continue; } live_output_continuity.encoded_keyframe_required = false; std::vector access_units{}; access_units.push_back(std::move(*access_unit)); auto publish = publish_access_units( config, std::move(access_units), stats, rtp_publisher ? &*rtp_publisher : nullptr, rtmp_output ? &*rtmp_output : nullptr, #if CVMMAP_STREAMER_HAS_MCAP &mcap_recorder, #else nullptr, #endif keep_live_outputs_on_reset ? &live_output_continuity : nullptr, latency_tracker); if (!publish) { const auto reason = format_error(publish.error()); restart_backend(reason, active_info); continue; } } else { auto push = (*backend)->push_frame(encode::RawVideoFrame{ .info = snapshot->metadata.info, .source_timestamp_ns = snapshot->metadata.timestamp_ns, .force_keyframe = keep_live_outputs_on_reset && live_output_continuity.boundary_pending, .bytes = std::span(snapshot_buffer.data(), snapshot->bytes_copied), }); if (!push) { const auto reason = format_error(push.error()); restart_backend(reason, active_info); continue; } } #if CVMMAP_STREAMER_HAS_MCAP if (!snapshot->depth.empty() && config.record.mcap.enabled && runtime_mcap_depth_enabled(config)) { #if !CVMMAP_STREAMER_HAS_MCAP_DEPTH const auto reason = "pipeline depth MCAP write failed: " + mcap_depth_disabled_message(); restart_backend(reason, active_info); continue; #else if (snapshot->depth_unit == ipc::DepthUnit::Unknown) { if (!warned_unknown_depth_unit) { spdlog::warn( "pipeline depth plane present but depth_unit is unknown; assuming millimeters for MCAP recording, producer should upgrade to the ABI with explicit depth_unit metadata"); warned_unknown_depth_unit = true; } } auto depth_map = make_depth_map_view(*snapshot); if (!depth_map) { const auto reason = "pipeline depth snapshot invalid: " + depth_map.error(); restart_backend(reason, active_info); continue; } auto write_depth = write_mcap_depth_map(&mcap_recorder, *depth_map); if (!write_depth) { const auto reason = "pipeline depth MCAP write failed: " + format_error(write_depth.error()); restart_backend(reason, active_info); continue; } #endif } #endif stats.pushed_frames += 1; if (!want_encoded_input) { auto drain = drain_encoder( config, *backend, false, stats, rtp_publisher ? &*rtp_publisher : nullptr, rtmp_output ? &*rtmp_output : nullptr, #if CVMMAP_STREAMER_HAS_MCAP &mcap_recorder, #else nullptr, #endif keep_live_outputs_on_reset ? &live_output_continuity : nullptr, latency_tracker); if (!drain) { const auto reason = format_error(drain.error()); restart_backend(reason, active_info); continue; } } if (config.latency.ingest_max_frames > 0 && stats.pushed_frames >= config.latency.ingest_max_frames) { spdlog::info("pipeline reached ingest_max_frames={}, stopping", config.latency.ingest_max_frames); break; } continue; } spdlog::warn("pipeline unknown message type: magic=0x{:02x} size={}", bytes[0], bytes.size()); } nats_client.Stop(); if (started && backend && !using_encoded_input) { auto drain = drain_encoder( config, *backend, true, stats, rtp_publisher ? &*rtp_publisher : nullptr, rtmp_output ? &*rtmp_output : nullptr, #if CVMMAP_STREAMER_HAS_MCAP &mcap_recorder, #else nullptr, #endif keep_live_outputs_on_reset ? &live_output_continuity : nullptr, latency_tracker); if (!drain) { spdlog::error("pipeline publish failed during flush: {}", format_error(drain.error())); return exit_code(PipelineExitCode::RuntimeError); } } if (backend) { (*backend)->shutdown(); } auto stop_mp4 = stop_mp4_recording(mp4_recorder, recorder_pb::Mp4StopRequest{}); if (!stop_mp4) { spdlog::warn("pipeline MP4 recorder stop during shutdown failed: {}", stop_mp4.error().message); } #if CVMMAP_STREAMER_HAS_MCAP auto stop_mcap = stop_mcap_recording(mcap_recorder, recorder_pb::McapStopRequest{}); if (!stop_mcap) { spdlog::warn("pipeline MCAP recorder stop during shutdown failed: {}", stop_mcap.error().message); } #endif recorder_rpc_server.stop(); spdlog::info( "PIPELINE_METRICS codec={} backend={} sync_messages={} status_messages={} torn_frames={} pushed_frames={} encoded_access_units={} resets={} format_rebuilds={} supervised_restarts={}", active_stream_info ? to_string(active_stream_info->codec) : to_string(config.encoder.codec), using_encoded_input ? std::string("encoded_passthrough") : (backend ? std::string((*backend)->backend_name()) : std::string("encoder_uninitialized")), stats.sync_messages, stats.status_messages, stats.torn_frames, stats.pushed_frames, stats.encoded_access_units, stats.resets, stats.format_rebuilds, stats.supervised_restarts); const auto latency_summary = latency_tracker.summarize(); const auto pending_frames = latency_tracker.pending_frames(); const auto ingest_drop_frames = stats.sync_messages >= stats.pushed_frames ? stats.sync_messages - stats.pushed_frames : 0ull; const auto total_drop_frames = ingest_drop_frames + stats.torn_frames + pending_frames; const auto drop_ratio_ppm = stats.sync_messages > 0 ? (total_drop_frames * 1'000'000ull) / stats.sync_messages : 0ull; spdlog::info( "LATENCY_METRICS ingest_to_emit_samples={} p50_us={} p95_us={} p99_us={} min_us={} max_us={} avg_us={} pending_frames={} ingest_drop_frames={} total_drop_frames={} drop_ratio_ppm={} sink_stall_events={}", latency_summary.samples, latency_summary.p50_us, latency_summary.p95_us, latency_summary.p99_us, latency_summary.min_us, latency_summary.max_us, latency_summary.avg_us, pending_frames, ingest_drop_frames, total_drop_frames, drop_ratio_ppm, latency_tracker.emit_stall_events()); if (rtp_publisher) { rtp_publisher->log_metrics(); } if (rtmp_output) { (*rtmp_output)->log_metrics(); } return exit_code(PipelineExitCode::Success); } }