#include "cvmmap_streamer/config/runtime_config.hpp" #include "cvmmap_streamer/core/frame_source.hpp" #include "cvmmap_streamer/encode/encoder_backend.hpp" #include "cvmmap_streamer/ipc/contracts.hpp" #include "cvmmap_streamer/metrics/latency_tracker.hpp" #include "cvmmap_streamer/protocol/rtmp_output.hpp" #include "cvmmap_streamer/protocol/rtp_publisher.hpp" #include "cvmmap_streamer/record/mcap_record_sink.hpp" #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include namespace cvmmap_streamer { void pipeline_tick() {} } namespace cvmmap_streamer::core { namespace { namespace ipc = cvmmap_streamer::ipc; enum class PipelineExitCode : int { Success = 0, InputError = 2, SharedMemoryError = 3, SubscriberError = 4, InitializationError = 5, RuntimeError = 6, }; [[nodiscard]] constexpr int exit_code(PipelineExitCode code) { return static_cast(code); } struct ResolvedInputEndpoints { std::string shm_name; std::string zmq_endpoint; std::string nats_target_key; }; [[nodiscard]] std::expected resolve_input_endpoints(const RuntimeConfig &config) { try { auto target = cvmmap::resolve_cvmmap_target_or_throw(config.input.uri); spdlog::info( "pipeline input resolved: uri='{}' shm_name='{}' zmq_endpoint='{}' nats_target_key='{}'", config.input.uri, target.shm_name, target.zmq_addr, target.nats_target_key); return ResolvedInputEndpoints{ .shm_name = target.shm_name, .zmq_endpoint = target.zmq_addr, .nats_target_key = target.nats_target_key, }; } catch (const std::exception &e) { return std::unexpected(std::string("invalid cvmmap input URI: ") + e.what()); } } struct SharedMemoryView { SharedMemoryView() = default; int fd{-1}; std::uint8_t *ptr{nullptr}; std::size_t bytes{0}; ~SharedMemoryView() { if (ptr != nullptr && ptr != MAP_FAILED && bytes > 0) { munmap(ptr, bytes); } if (fd >= 0) { close(fd); } } SharedMemoryView(const SharedMemoryView &) = delete; SharedMemoryView &operator=(const SharedMemoryView &) = delete; SharedMemoryView(SharedMemoryView &&other) noexcept { fd = std::exchange(other.fd, -1); ptr = std::exchange(other.ptr, nullptr); bytes = std::exchange(other.bytes, 0); } SharedMemoryView &operator=(SharedMemoryView &&other) noexcept { if (this == &other) { return *this; } if (ptr != nullptr && ptr != MAP_FAILED && bytes > 0) { munmap(ptr, bytes); } if (fd >= 0) { close(fd); } fd = std::exchange(other.fd, -1); ptr = std::exchange(other.ptr, nullptr); bytes = std::exchange(other.bytes, 0); return *this; } [[nodiscard]] std::span region() const { return std::span(ptr, bytes); } [[nodiscard]] static std::expected open_readonly(const std::string &raw_name) { const auto shm_name = raw_name.starts_with('/') ? raw_name : "/" + raw_name; const int fd = shm_open(shm_name.c_str(), O_RDONLY, 0); if (fd < 0) { return std::unexpected("shm_open failed for '" + shm_name + "'"); } struct stat statbuf{}; if (fstat(fd, &statbuf) != 0) { close(fd); return std::unexpected("fstat failed for '" + shm_name + "'"); } if (statbuf.st_size <= 0) { close(fd); return std::unexpected("shared memory size is zero for '" + shm_name + "'"); } const auto bytes = static_cast(statbuf.st_size); auto *mapped = static_cast(mmap(nullptr, bytes, PROT_READ, MAP_SHARED, fd, 0)); if (mapped == MAP_FAILED) { close(fd); return std::unexpected("mmap failed for '" + shm_name + "'"); } SharedMemoryView view; view.fd = fd; view.ptr = mapped; view.bytes = bytes; return view; } }; struct PipelineStats { std::uint64_t sync_messages{0}; std::uint64_t status_messages{0}; std::uint64_t torn_frames{0}; std::uint64_t pushed_frames{0}; std::uint64_t encoded_access_units{0}; std::uint64_t resets{0}; std::uint64_t format_rebuilds{0}; std::uint64_t supervised_restarts{0}; }; [[nodiscard]] std::string_view status_to_string(ipc::ModuleStatus status) { switch (status) { case ipc::ModuleStatus::Online: return "online"; case ipc::ModuleStatus::Offline: return "offline"; case ipc::ModuleStatus::StreamReset: return "stream_reset"; } return "unknown"; } [[nodiscard]] bool frame_info_equal(const ipc::FrameInfo &lhs, const ipc::FrameInfo &rhs) { return lhs.width == rhs.width && lhs.height == rhs.height && lhs.channels == rhs.channels && lhs.depth == rhs.depth && lhs.pixel_format == rhs.pixel_format && lhs.buffer_size == rhs.buffer_size; } [[nodiscard]] ipc::DepthUnit normalize_depth_unit_for_recording(ipc::DepthUnit unit) { if (unit == ipc::DepthUnit::Unknown) { return ipc::DepthUnit::Millimeter; } return unit; } [[nodiscard]] std::expected make_depth_map_view(const ipc::CoherentSnapshot &snapshot) { if (!snapshot.depth_info) { return std::unexpected("depth plane metadata is missing"); } if (snapshot.depth.empty()) { return std::unexpected("depth plane bytes are missing"); } const auto &depth_info = *snapshot.depth_info; if (depth_info.depth != ipc::Depth::F32 || depth_info.pixel_format != ipc::PixelFormat::GRAY) { return std::unexpected("depth plane must be GRAY/F32"); } const auto pixel_count = static_cast(depth_info.width) * static_cast(depth_info.height); const auto expected_bytes = pixel_count * sizeof(float); if (snapshot.depth.size() != expected_bytes) { return std::unexpected( "depth plane byte size does not match width*height*sizeof(float)"); } if ((reinterpret_cast(snapshot.depth.data()) % alignof(float)) != 0) { return std::unexpected("depth plane is not aligned for float access"); } return record::RawDepthMapView{ .timestamp_ns = snapshot.metadata.timestamp_ns, .width = depth_info.width, .height = depth_info.height, .source_unit = normalize_depth_unit_for_recording(snapshot.depth_unit), .pixels = std::span( reinterpret_cast(snapshot.depth.data()), pixel_count), }; } [[nodiscard]] bool snapshot_has_encoded_access_unit(const ipc::CoherentSnapshot &snapshot) { return !snapshot.encoded_access_unit.empty() && snapshot.encoded_codec != ipc::EncodedCodec::Unknown && snapshot.encoded_bitstream_format == ipc::EncodedBitstreamFormat::AnnexB; } [[nodiscard]] std::expected codec_type_from_snapshot(const ipc::CoherentSnapshot &snapshot) { switch (snapshot.encoded_codec) { case ipc::EncodedCodec::H264: return CodecType::H264; case ipc::EncodedCodec::H265: return CodecType::H265; case ipc::EncodedCodec::Unknown: default: return std::unexpected("encoded snapshot codec is unsupported"); } } [[nodiscard]] std::expected make_stream_info_from_snapshot( const ipc::CoherentSnapshot &snapshot) { auto codec = codec_type_from_snapshot(snapshot); if (!codec) { return std::unexpected(codec.error()); } if (snapshot.encoded_bitstream_format != ipc::EncodedBitstreamFormat::AnnexB) { return std::unexpected("encoded snapshot bitstream format is unsupported"); } if (snapshot.metadata.info.width == 0 || snapshot.metadata.info.height == 0) { return std::unexpected("encoded snapshot dimensions are invalid"); } encode::EncodedStreamInfo stream_info{}; stream_info.codec = *codec; stream_info.width = snapshot.metadata.info.width; stream_info.height = snapshot.metadata.info.height; stream_info.time_base_num = 1; stream_info.time_base_den = 1'000'000'000u; stream_info.frame_rate_num = snapshot.encoded_frame_rate_num == 0 ? 30u : snapshot.encoded_frame_rate_num; stream_info.frame_rate_den = snapshot.encoded_frame_rate_den == 0 ? 1u : snapshot.encoded_frame_rate_den; stream_info.bitstream_format = encode::EncodedBitstreamFormat::AnnexB; stream_info.decoder_config.clear(); return stream_info; } [[nodiscard]] std::expected make_access_unit_from_snapshot( const ipc::CoherentSnapshot &snapshot) { auto codec = codec_type_from_snapshot(snapshot); if (!codec) { return std::unexpected(codec.error()); } if (snapshot.encoded_access_unit.empty()) { return std::unexpected("encoded snapshot access unit is empty"); } encode::EncodedAccessUnit access_unit{}; access_unit.codec = *codec; access_unit.source_timestamp_ns = snapshot.metadata.timestamp_ns; access_unit.stream_pts_ns = snapshot.encoded_stream_pts_ns == 0 ? snapshot.metadata.timestamp_ns : snapshot.encoded_stream_pts_ns; access_unit.keyframe = (snapshot.encoded_flags & ipc::kEncodedFlagKeyframe) != 0; access_unit.annexb_bytes.assign( snapshot.encoded_access_unit.begin(), snapshot.encoded_access_unit.end()); return access_unit; } [[nodiscard]] bool stream_info_equal( const encode::EncodedStreamInfo &lhs, const encode::EncodedStreamInfo &rhs) { return lhs.codec == rhs.codec && lhs.width == rhs.width && lhs.height == rhs.height && lhs.time_base_num == rhs.time_base_num && lhs.time_base_den == rhs.time_base_den && lhs.frame_rate_num == rhs.frame_rate_num && lhs.frame_rate_den == rhs.frame_rate_den && lhs.bitstream_format == rhs.bitstream_format; } [[nodiscard]] std::uint64_t body_tracking_timestamp_ns(const cvmmap::body_tracking_frame_t &frame) { if (frame.header.timestamp_ns != 0) { return frame.header.timestamp_ns; } return frame.header.sdk_timestamp_ns; } struct McapRecorderState { mutable std::mutex mutex{}; RuntimeConfig base_config{}; std::optional active_record_config{}; std::optional current_stream_info{}; std::optional sink{}; cvmmap::RecordingStatus status{ .format = cvmmap::RecordingFormat::Mcap, .can_record = true, }; }; [[nodiscard]] cvmmap::ControlError make_recording_control_error( const int32_t code, std::string message) { return cvmmap::ControlError{ .code = code, .message = std::move(message), }; } [[nodiscard]] RuntimeConfig make_mcap_record_config( const RuntimeConfig &base_config, const cvmmap::RecordingRequest &request) { auto record_config = base_config; record_config.record.mcap.enabled = true; record_config.record.mcap.path = request.output_path; if (request.mcap_options) { if (request.mcap_options->topic) { record_config.record.mcap.topic = *request.mcap_options->topic; } if (request.mcap_options->depth_topic) { record_config.record.mcap.depth_topic = *request.mcap_options->depth_topic; } if (request.mcap_options->body_topic) { record_config.record.mcap.body_topic = *request.mcap_options->body_topic; } if (request.mcap_options->frame_id) { record_config.record.mcap.frame_id = *request.mcap_options->frame_id; } } return record_config; } void reset_mcap_status_after_stop(cvmmap::RecordingStatus &status) { status.is_recording = false; status.is_paused = false; status.active_path.clear(); } [[nodiscard]] std::expected start_mcap_recording( McapRecorderState &recorder_state, const cvmmap::RecordingRequest &request) { std::lock_guard lock(recorder_state.mutex); if (request.format != cvmmap::RecordingFormat::Mcap) { return std::unexpected(make_recording_control_error( cvmmap::CONTROL_RESPONSE_UNSUPPORTED, "recording format is not supported by the streamer")); } if (!recorder_state.current_stream_info) { return std::unexpected(make_recording_control_error( cvmmap::CONTROL_RESPONSE_ERROR, "MCAP recorder is not ready; stream info unavailable")); } if (recorder_state.sink) { return std::unexpected(make_recording_control_error( cvmmap::CONTROL_RESPONSE_ERROR, "MCAP recording is already active")); } auto record_config = make_mcap_record_config(recorder_state.base_config, request); if (request.mcap_options && request.mcap_options->compression) { auto parsed = parse_mcap_compression(*request.mcap_options->compression); if (!parsed) { return std::unexpected(make_recording_control_error( cvmmap::CONTROL_RESPONSE_INVALID_PAYLOAD, parsed.error())); } record_config.record.mcap.compression = *parsed; } auto created = record::McapRecordSink::create(record_config, *recorder_state.current_stream_info); if (!created) { return std::unexpected(make_recording_control_error( cvmmap::CONTROL_RESPONSE_ERROR, "pipeline MCAP sink init failed: " + created.error())); } recorder_state.active_record_config = record_config; recorder_state.sink.emplace(std::move(*created)); recorder_state.status.can_record = true; recorder_state.status.is_recording = true; recorder_state.status.is_paused = false; recorder_state.status.last_frame_ok = true; recorder_state.status.frames_ingested = 0; recorder_state.status.frames_encoded = 0; recorder_state.status.active_path = request.output_path; return recorder_state.status; } [[nodiscard]] std::expected stop_mcap_recording( McapRecorderState &recorder_state) { std::lock_guard lock(recorder_state.mutex); if (recorder_state.sink) { recorder_state.sink->close(); recorder_state.sink.reset(); } recorder_state.active_record_config.reset(); reset_mcap_status_after_stop(recorder_state.status); return recorder_state.status; } [[nodiscard]] std::expected get_mcap_recording_status( McapRecorderState &recorder_state) { std::lock_guard lock(recorder_state.mutex); recorder_state.status.can_record = true; return recorder_state.status; } void update_mcap_stream_info( McapRecorderState &recorder_state, const encode::EncodedStreamInfo &stream_info) { std::lock_guard lock(recorder_state.mutex); recorder_state.current_stream_info = stream_info; if (recorder_state.sink) { auto updated = recorder_state.sink->update_stream_info(stream_info); if (!updated) { recorder_state.status.last_frame_ok = false; recorder_state.status.is_recording = false; recorder_state.status.active_path.clear(); recorder_state.sink->close(); recorder_state.sink.reset(); recorder_state.active_record_config.reset(); spdlog::error("pipeline MCAP stream update failed: {}", updated.error()); } } } record::McapRecordSink *lock_mcap_sink( McapRecorderState *recorder_state, std::unique_lock &lock) { if (recorder_state == nullptr) { return nullptr; } lock = std::unique_lock(recorder_state->mutex); if (!recorder_state->sink) { return nullptr; } return &*recorder_state->sink; } Status write_mcap_access_unit( McapRecorderState *recorder_state, const encode::EncodedAccessUnit &access_unit) { std::unique_lock lock{}; auto *sink = lock_mcap_sink(recorder_state, lock); if (sink == nullptr) { return {}; } auto write = sink->write_access_unit(access_unit); if (!write) { recorder_state->status.last_frame_ok = false; return unexpected_error(ERR_SERIALIZATION, write.error()); } recorder_state->status.last_frame_ok = true; recorder_state->status.is_recording = true; recorder_state->status.frames_ingested += 1; recorder_state->status.frames_encoded += 1; return {}; } Status write_mcap_body_message( McapRecorderState *recorder_state, const record::RawBodyTrackingMessageView &body_message) { std::unique_lock lock{}; auto *sink = lock_mcap_sink(recorder_state, lock); if (sink == nullptr) { return {}; } auto write = sink->write_body_tracking_message(body_message); if (!write) { recorder_state->status.last_frame_ok = false; return unexpected_error(ERR_SERIALIZATION, write.error()); } recorder_state->status.last_frame_ok = true; return {}; } Status write_mcap_depth_map( McapRecorderState *recorder_state, const record::RawDepthMapView &depth_map) { std::unique_lock lock{}; auto *sink = lock_mcap_sink(recorder_state, lock); if (sink == nullptr) { return {}; } auto write = sink->write_depth_map(depth_map); if (!write) { recorder_state->status.last_frame_ok = false; return unexpected_error(ERR_SERIALIZATION, write.error()); } recorder_state->status.last_frame_ok = true; return {}; } [[nodiscard]] Status publish_access_units( const RuntimeConfig &config, std::vector &&access_units, PipelineStats &stats, protocol::UdpRtpPublisher *rtp_publisher, protocol::RtmpOutput *rtmp_output, McapRecorderState *mcap_recorder, metrics::IngestEmitLatencyTracker &latency_tracker) { for (auto &access_unit : access_units) { if (access_unit.annexb_bytes.empty()) { continue; } if (config.latency.emit_stall_ms > 0) { std::this_thread::sleep_for(std::chrono::milliseconds(config.latency.emit_stall_ms)); latency_tracker.note_emit_stall(); } if (rtp_publisher != nullptr) { rtp_publisher->publish_access_unit(access_unit.annexb_bytes, access_unit.stream_pts_ns); } if (rtmp_output != nullptr) { auto publish = (*rtmp_output)->publish_access_unit(access_unit); if (!publish) { return std::unexpected(publish.error()); } } if (mcap_recorder != nullptr) { auto write = write_mcap_access_unit(mcap_recorder, access_unit); if (!write) { return std::unexpected(write.error()); } } latency_tracker.note_emit(); stats.encoded_access_units += 1; if (stats.encoded_access_units <= 10 || (stats.encoded_access_units % 30) == 0) { spdlog::info( "ENCODED_AU codec={} count={} bytes={} keyframe={}", to_string(access_unit.codec), stats.encoded_access_units, access_unit.annexb_bytes.size(), access_unit.keyframe ? "true" : "false"); } } return {}; } [[nodiscard]] Status drain_encoder( const RuntimeConfig &config, encode::EncoderBackend &backend, bool flushing, PipelineStats &stats, protocol::UdpRtpPublisher *rtp_publisher, protocol::RtmpOutput *rtmp_output, McapRecorderState *mcap_recorder, metrics::IngestEmitLatencyTracker &latency_tracker) { auto drained = flushing ? backend->flush() : backend->drain(); if (!drained) { return std::unexpected(drained.error()); } return publish_access_units( config, std::move(*drained), stats, rtp_publisher, rtmp_output, mcap_recorder, latency_tracker); } } int run_pipeline(const RuntimeConfig &config) { auto input_endpoints = resolve_input_endpoints(config); if (!input_endpoints) { spdlog::error("{}", input_endpoints.error()); return exit_code(PipelineExitCode::InputError); } auto source = make_frame_source(config); if (!source) { spdlog::error("pipeline input source selection failed: {}", source.error()); return exit_code(PipelineExitCode::InputError); } auto source_prepare = (*source)->prepare_runtime(); if (!source_prepare) { spdlog::error("pipeline source backend '{}' setup failed: {}", (*source)->backend_name(), source_prepare.error()); return exit_code(PipelineExitCode::InputError); } std::optional backend{}; auto shm = SharedMemoryView::open_readonly(input_endpoints->shm_name); if (!shm) { spdlog::error("pipeline open shared memory failed: {}", shm.error()); return exit_code(PipelineExitCode::SharedMemoryError); } if (shm->bytes <= ipc::kShmPayloadOffset) { spdlog::error("pipeline invalid shared memory size: {}", shm->bytes); return exit_code(PipelineExitCode::SharedMemoryError); } std::vector snapshot_buffer( shm->bytes - ipc::kShmPayloadOffset, static_cast(0)); zmq::context_t zmq_ctx{1}; zmq::socket_t subscriber(zmq_ctx, zmq::socket_type::sub); try { subscriber.set(zmq::sockopt::subscribe, ""); subscriber.connect(input_endpoints->zmq_endpoint); } catch (const zmq::error_t &e) { spdlog::error("pipeline subscribe failed on '{}': {}", input_endpoints->zmq_endpoint, e.what()); return exit_code(PipelineExitCode::SubscriberError); } std::optional rtp_publisher{}; std::optional rtmp_output{}; McapRecorderState mcap_recorder{}; mcap_recorder.base_config = config; mcap_recorder.status.can_record = true; cvmmap::NatsControlClient nats_client( input_endpoints->nats_target_key, config.input.nats_url); cvmmap::NatsControlService recorder_service( config.input.uri, input_endpoints->nats_target_key, config.input.nats_url); std::mutex nats_event_mutex{}; std::deque> pending_body_packets{}; std::deque pending_status_codes{}; nats_client.SetModuleStatusCallback([&nats_event_mutex, &pending_status_codes](int32_t status_code) { std::lock_guard lock(nats_event_mutex); pending_status_codes.push_back(status_code); }); nats_client.SetBodyTrackingRawCallback( [&nats_event_mutex, &pending_body_packets](std::span bytes) { std::lock_guard lock(nats_event_mutex); pending_body_packets.emplace_back(bytes.begin(), bytes.end()); }); if (!nats_client.Start()) { spdlog::error("pipeline NATS subscribe failed on '{}'", config.input.nats_url); return exit_code(PipelineExitCode::SubscriberError); } cvmmap::NatsControlHandlers recorder_handlers{}; recorder_handlers.on_recording_available = [](const cvmmap::RecordingFormat format) { return format == cvmmap::RecordingFormat::Mcap; }; recorder_handlers.on_start_recording = [&mcap_recorder](const cvmmap::RecordingRequest &request) { return start_mcap_recording(mcap_recorder, request); }; recorder_handlers.on_stop_recording = [&mcap_recorder](const cvmmap::RecordingFormat format) -> std::expected { if (format != cvmmap::RecordingFormat::Mcap) { return std::unexpected(make_recording_control_error( cvmmap::CONTROL_RESPONSE_UNSUPPORTED, "recording format is not supported by the streamer")); } return stop_mcap_recording(mcap_recorder); }; recorder_handlers.on_get_recording_status = [&mcap_recorder](const cvmmap::RecordingFormat format) -> std::expected { if (format != cvmmap::RecordingFormat::Mcap) { return std::unexpected(make_recording_control_error( cvmmap::CONTROL_RESPONSE_UNSUPPORTED, "recording format is not supported by the streamer")); } return get_mcap_recording_status(mcap_recorder); }; recorder_service.SetHandlers(std::move(recorder_handlers)); if (!recorder_service.Start()) { spdlog::error("pipeline recorder control service failed on '{}'", config.input.nats_url); nats_client.Stop(); return exit_code(PipelineExitCode::SubscriberError); } PipelineStats stats{}; metrics::IngestEmitLatencyTracker latency_tracker{}; bool producer_offline{false}; bool started{false}; bool using_encoded_input{false}; std::optional active_info{}; std::optional active_stream_info{}; std::optional restart_target_info{}; bool restart_pending{false}; bool warned_unknown_depth_unit{false}; const auto restart_backend = [&](std::string_view reason, std::optional target_info) { if (started) { stats.supervised_restarts += 1; } spdlog::warn( "PIPELINE_RESTART mode={} reason='{}' restarts={}", using_encoded_input ? "encoded_passthrough" : (backend ? std::string((*backend)->backend_name()) : std::string("encoder_uninitialized")), reason, stats.supervised_restarts); if (backend) { (*backend)->shutdown(); } started = false; restart_pending = true; restart_target_info = target_info; warned_unknown_depth_unit = false; using_encoded_input = false; active_stream_info.reset(); rtp_publisher.reset(); rtmp_output.reset(); }; const auto ensure_encoder_backend = [&]() -> Status { if (backend) { return {}; } auto created = encode::make_encoder_backend(config); if (!created) { return std::unexpected(created.error()); } backend.emplace(std::move(*created)); return {}; }; const auto start_outputs_from_stream_info = [&](const encode::EncodedStreamInfo &stream_info, const ipc::FrameInfo &target_info) -> Status { rtp_publisher.reset(); rtmp_output.reset(); if (config.outputs.rtp.enabled) { auto created = protocol::UdpRtpPublisher::create(config, stream_info.codec); if (!created) { return unexpected_error( ERR_INTERNAL, "pipeline RTP publisher init failed: " + created.error()); } rtp_publisher.emplace(std::move(*created)); } if (config.outputs.rtmp.enabled) { auto created = protocol::make_rtmp_output(config, stream_info); if (!created) { return unexpected_error( created.error().code, "pipeline RTMP output init failed: " + format_error(created.error())); } rtmp_output.emplace(std::move(*created)); } update_mcap_stream_info(mcap_recorder, stream_info); if (config.record.mcap.enabled) { std::lock_guard lock(mcap_recorder.mutex); if (!mcap_recorder.sink) { auto created = record::McapRecordSink::create(config, stream_info); if (!created) { return unexpected_error( ERR_INTERNAL, "pipeline MCAP sink init failed: " + created.error()); } mcap_recorder.active_record_config = config; mcap_recorder.sink.emplace(std::move(*created)); mcap_recorder.status.format = cvmmap::RecordingFormat::Mcap; mcap_recorder.status.can_record = true; mcap_recorder.status.is_recording = true; mcap_recorder.status.last_frame_ok = true; mcap_recorder.status.active_path = config.record.mcap.path; } } started = true; restart_pending = false; restart_target_info.reset(); warned_unknown_depth_unit = false; active_info = target_info; active_stream_info = stream_info; return {}; }; const auto attempt_raw_backend_start = [&](const ipc::FrameInfo &target_info) -> Status { auto ensure = ensure_encoder_backend(); if (!ensure) { return ensure; } (*backend)->shutdown(); auto init = (*backend)->init(config, target_info); if (!init) { return std::unexpected(init.error()); } auto stream_info = (*backend)->stream_info(); if (!stream_info) { return unexpected_error( stream_info.error().code, "pipeline encoder stream info unavailable: " + format_error(stream_info.error())); } using_encoded_input = false; return start_outputs_from_stream_info(*stream_info, target_info); }; const auto attempt_encoded_passthrough_start = [&]( const ipc::CoherentSnapshot &snapshot) -> Status { auto stream_info = make_stream_info_from_snapshot(snapshot); if (!stream_info) { return unexpected_error(ERR_PROTOCOL, stream_info.error()); } if (backend) { (*backend)->shutdown(); } using_encoded_input = true; return start_outputs_from_stream_info(*stream_info, snapshot.metadata.info); }; const auto idle_timeout = std::chrono::milliseconds(config.latency.ingest_idle_timeout_ms); auto last_event = std::chrono::steady_clock::now(); while (true) { std::deque> body_packets; std::deque status_codes; { std::lock_guard lock(nats_event_mutex); body_packets.swap(pending_body_packets); status_codes.swap(pending_status_codes); } const bool had_nats_events = !body_packets.empty() || !status_codes.empty(); if (had_nats_events) { last_event = std::chrono::steady_clock::now(); } for (const auto status_code : status_codes) { stats.status_messages += 1; switch (static_cast(status_code)) { case cvmmap::ModuleStatus::Online: spdlog::info("pipeline status event status=online"); producer_offline = false; break; case cvmmap::ModuleStatus::Offline: spdlog::info("pipeline status event status=offline"); producer_offline = true; break; case cvmmap::ModuleStatus::StreamReset: spdlog::info("pipeline status event status=stream_reset"); stats.resets += 1; if (backend) { (*backend)->shutdown(); } started = false; restart_pending = false; restart_target_info.reset(); active_stream_info.reset(); using_encoded_input = false; active_info.reset(); rtp_publisher.reset(); rtmp_output.reset(); break; } } for (const auto &body_bytes_vec : body_packets) { const auto body_bytes = std::span( body_bytes_vec.data(), body_bytes_vec.size()); if (body_bytes.empty()) { continue; } auto parsed_body = cvmmap::parse_body_tracking_message(body_bytes); if (!parsed_body) { spdlog::warn("pipeline body packet parse error: {}", parsed_body.error()); continue; } auto write_body = write_mcap_body_message(&mcap_recorder, record::RawBodyTrackingMessageView{ .timestamp_ns = body_tracking_timestamp_ns(*parsed_body), .bytes = body_bytes, }); if (!write_body) { const auto reason = "pipeline body MCAP write failed: " + format_error(write_body.error()); restart_backend(reason, active_info); break; } } if (backend && !using_encoded_input) { auto poll = (*backend)->poll(); if (!poll) { const auto reason = format_error(poll.error()); restart_backend(reason, active_info); } } std::array poll_items{{ {subscriber.handle(), 0, ZMQ_POLLIN, 0}, }}; try { zmq::poll(poll_items, std::chrono::milliseconds{20}); } catch (const zmq::error_t &e) { spdlog::error("pipeline poll failed: {}", e.what()); return exit_code(PipelineExitCode::SubscriberError); } const bool frame_socket_ready = (poll_items[0].revents & ZMQ_POLLIN) != 0; if (!frame_socket_ready) { const auto now = std::chrono::steady_clock::now(); if (restart_pending && restart_target_info && backend && !using_encoded_input) { auto start_result = attempt_raw_backend_start(*restart_target_info); if (!start_result) { spdlog::error("pipeline backend restart failed: {}", format_error(start_result.error())); return exit_code(PipelineExitCode::RuntimeError); } } if (now - last_event >= idle_timeout) { spdlog::info("pipeline idle timeout reached ({} ms), stopping", config.latency.ingest_idle_timeout_ms); break; } if (!producer_offline && started && backend && !using_encoded_input) { auto drain = drain_encoder( config, *backend, false, stats, rtp_publisher ? &*rtp_publisher : nullptr, rtmp_output ? &*rtmp_output : nullptr, &mcap_recorder, latency_tracker); if (!drain) { const auto reason = format_error(drain.error()); restart_backend(reason, active_info); } } continue; } zmq::message_t message; const auto recv_result = subscriber.recv(message, zmq::recv_flags::dontwait); if (!recv_result) { continue; } last_event = std::chrono::steady_clock::now(); auto bytes = std::span( static_cast(message.data()), message.size()); if (bytes.empty()) { continue; } if (bytes[0] == ipc::kFrameTopicMagic) { stats.sync_messages += 1; auto sync = ipc::parse_sync_message(bytes); if (!sync) { spdlog::warn("pipeline sync parse error: {}", ipc::to_string(sync.error())); continue; } auto snapshot = ipc::read_coherent_snapshot( shm->region(), snapshot_buffer, [&]() { if (config.latency.snapshot_copy_delay_us > 0) { std::this_thread::sleep_for(std::chrono::microseconds(config.latency.snapshot_copy_delay_us)); } }); if (!snapshot) { if (snapshot.error() == ipc::SnapshotError::TornRead) { stats.torn_frames += 1; } spdlog::warn("pipeline snapshot rejected: {}", ipc::to_string(snapshot.error())); continue; } const bool has_encoded_access_unit = snapshot_has_encoded_access_unit(*snapshot); const bool want_encoded_input = config.input.video_source == InputVideoSource::Encoded || (config.input.video_source == InputVideoSource::Auto && has_encoded_access_unit); if (config.input.video_source == InputVideoSource::Encoded && !has_encoded_access_unit) { spdlog::error("pipeline encoded input requested but SHM snapshot does not contain an encoded access unit"); return exit_code(PipelineExitCode::InitializationError); } if (active_info && !frame_info_equal(*active_info, snapshot->metadata.info)) { stats.format_rebuilds += 1; restart_backend("frame_info_change", snapshot->metadata.info); } if (started && using_encoded_input != want_encoded_input) { stats.format_rebuilds += 1; restart_backend("input_video_source_switch", snapshot->metadata.info); } if (want_encoded_input) { auto stream_info = make_stream_info_from_snapshot(*snapshot); if (!stream_info) { spdlog::error("pipeline encoded snapshot metadata invalid: {}", stream_info.error()); return exit_code(PipelineExitCode::InitializationError); } if (active_stream_info && !stream_info_equal(*active_stream_info, *stream_info)) { stats.format_rebuilds += 1; restart_backend("encoded_stream_info_change", snapshot->metadata.info); } if (!started || restart_pending) { auto start_result = attempt_encoded_passthrough_start(*snapshot); if (!start_result) { spdlog::error("pipeline encoded passthrough init failed: {}", format_error(start_result.error())); return exit_code(PipelineExitCode::InitializationError); } } } else if (!started || restart_pending) { const auto target_info = restart_target_info.value_or(snapshot->metadata.info); auto start_result = attempt_raw_backend_start(target_info); if (!start_result) { spdlog::error("pipeline backend init failed: {}", format_error(start_result.error())); return exit_code(PipelineExitCode::InitializationError); } } latency_tracker.note_ingest(); if (want_encoded_input) { auto access_unit = make_access_unit_from_snapshot(*snapshot); if (!access_unit) { const auto reason = "pipeline encoded snapshot invalid: " + access_unit.error(); restart_backend(reason, snapshot->metadata.info); continue; } std::vector access_units{}; access_units.push_back(std::move(*access_unit)); auto publish = publish_access_units( config, std::move(access_units), stats, rtp_publisher ? &*rtp_publisher : nullptr, rtmp_output ? &*rtmp_output : nullptr, &mcap_recorder, latency_tracker); if (!publish) { const auto reason = format_error(publish.error()); restart_backend(reason, active_info); continue; } } else { auto push = (*backend)->push_frame(encode::RawVideoFrame{ .info = snapshot->metadata.info, .source_timestamp_ns = snapshot->metadata.timestamp_ns, .bytes = std::span(snapshot_buffer.data(), snapshot->bytes_copied), }); if (!push) { const auto reason = format_error(push.error()); restart_backend(reason, active_info); continue; } } if (!snapshot->depth.empty()) { if (snapshot->depth_unit == ipc::DepthUnit::Unknown) { if (!warned_unknown_depth_unit) { spdlog::warn( "pipeline depth plane present but depth_unit is unknown; assuming millimeters for MCAP recording, producer should upgrade to the ABI with explicit depth_unit metadata"); warned_unknown_depth_unit = true; } } auto depth_map = make_depth_map_view(*snapshot); if (!depth_map) { const auto reason = "pipeline depth snapshot invalid: " + depth_map.error(); restart_backend(reason, active_info); continue; } auto write_depth = write_mcap_depth_map(&mcap_recorder, *depth_map); if (!write_depth) { const auto reason = "pipeline depth MCAP write failed: " + format_error(write_depth.error()); restart_backend(reason, active_info); continue; } } stats.pushed_frames += 1; if (!want_encoded_input) { auto drain = drain_encoder( config, *backend, false, stats, rtp_publisher ? &*rtp_publisher : nullptr, rtmp_output ? &*rtmp_output : nullptr, &mcap_recorder, latency_tracker); if (!drain) { const auto reason = format_error(drain.error()); restart_backend(reason, active_info); continue; } } if (config.latency.ingest_max_frames > 0 && stats.pushed_frames >= config.latency.ingest_max_frames) { spdlog::info("pipeline reached ingest_max_frames={}, stopping", config.latency.ingest_max_frames); break; } continue; } spdlog::warn("pipeline unknown message type: magic=0x{:02x} size={}", bytes[0], bytes.size()); } nats_client.Stop(); if (started && backend && !using_encoded_input) { auto drain = drain_encoder( config, *backend, true, stats, rtp_publisher ? &*rtp_publisher : nullptr, rtmp_output ? &*rtmp_output : nullptr, &mcap_recorder, latency_tracker); if (!drain) { spdlog::error("pipeline publish failed during flush: {}", format_error(drain.error())); return exit_code(PipelineExitCode::RuntimeError); } } if (backend) { (*backend)->shutdown(); } std::ignore = stop_mcap_recording(mcap_recorder); recorder_service.Stop(); spdlog::info( "PIPELINE_METRICS codec={} backend={} sync_messages={} status_messages={} torn_frames={} pushed_frames={} encoded_access_units={} resets={} format_rebuilds={} supervised_restarts={}", active_stream_info ? to_string(active_stream_info->codec) : to_string(config.encoder.codec), using_encoded_input ? std::string("encoded_passthrough") : (backend ? std::string((*backend)->backend_name()) : std::string("encoder_uninitialized")), stats.sync_messages, stats.status_messages, stats.torn_frames, stats.pushed_frames, stats.encoded_access_units, stats.resets, stats.format_rebuilds, stats.supervised_restarts); const auto latency_summary = latency_tracker.summarize(); const auto pending_frames = latency_tracker.pending_frames(); const auto ingest_drop_frames = stats.sync_messages >= stats.pushed_frames ? stats.sync_messages - stats.pushed_frames : 0ull; const auto total_drop_frames = ingest_drop_frames + stats.torn_frames + pending_frames; const auto drop_ratio_ppm = stats.sync_messages > 0 ? (total_drop_frames * 1'000'000ull) / stats.sync_messages : 0ull; spdlog::info( "LATENCY_METRICS ingest_to_emit_samples={} p50_us={} p95_us={} p99_us={} min_us={} max_us={} avg_us={} pending_frames={} ingest_drop_frames={} total_drop_frames={} drop_ratio_ppm={} sink_stall_events={}", latency_summary.samples, latency_summary.p50_us, latency_summary.p95_us, latency_summary.p99_us, latency_summary.min_us, latency_summary.max_us, latency_summary.avg_us, pending_frames, ingest_drop_frames, total_drop_frames, drop_ratio_ppm, latency_tracker.emit_stall_events()); if (rtp_publisher) { rtp_publisher->log_metrics(); } if (rtmp_output) { (*rtmp_output)->log_metrics(); } return exit_code(PipelineExitCode::Success); } }