#include "cvmmap_streamer/config/runtime_config.hpp" #include "cvmmap_streamer/core/frame_source.hpp" #include "cvmmap_streamer/encode/encoder_backend.hpp" #include "cvmmap_streamer/ipc/contracts.hpp" #include "cvmmap_streamer/metrics/latency_tracker.hpp" #include "cvmmap_streamer/protocol/rtmp_output.hpp" #include "cvmmap_streamer/protocol/rtp_publisher.hpp" #include "cvmmap_streamer/record/mcap_record_sink.hpp" #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include namespace cvmmap_streamer { void pipeline_tick() {} } namespace cvmmap_streamer::core { namespace { namespace ipc = cvmmap_streamer::ipc; enum class PipelineExitCode : int { Success = 0, InputError = 2, SharedMemoryError = 3, SubscriberError = 4, InitializationError = 5, RuntimeError = 6, }; [[nodiscard]] constexpr int exit_code(PipelineExitCode code) { return static_cast(code); } struct ResolvedInputEndpoints { std::string shm_name; std::string zmq_endpoint; std::string body_zmq_endpoint; }; [[nodiscard]] std::expected resolve_input_endpoints(const RuntimeConfig &config) { try { auto target = cvmmap::resolve_cvmmap_target_or_throw(config.input.uri); spdlog::info( "pipeline input resolved: uri='{}' shm_name='{}' zmq_endpoint='{}'", config.input.uri, target.shm_name, target.zmq_addr); return ResolvedInputEndpoints{ .shm_name = target.shm_name, .zmq_endpoint = target.zmq_addr, .body_zmq_endpoint = target.zmq_body_addr, }; } catch (const std::exception &e) { return std::unexpected(std::string("invalid cvmmap input URI: ") + e.what()); } } struct SharedMemoryView { SharedMemoryView() = default; int fd{-1}; std::uint8_t *ptr{nullptr}; std::size_t bytes{0}; ~SharedMemoryView() { if (ptr != nullptr && ptr != MAP_FAILED && bytes > 0) { munmap(ptr, bytes); } if (fd >= 0) { close(fd); } } SharedMemoryView(const SharedMemoryView &) = delete; SharedMemoryView &operator=(const SharedMemoryView &) = delete; SharedMemoryView(SharedMemoryView &&other) noexcept { fd = std::exchange(other.fd, -1); ptr = std::exchange(other.ptr, nullptr); bytes = std::exchange(other.bytes, 0); } SharedMemoryView &operator=(SharedMemoryView &&other) noexcept { if (this == &other) { return *this; } if (ptr != nullptr && ptr != MAP_FAILED && bytes > 0) { munmap(ptr, bytes); } if (fd >= 0) { close(fd); } fd = std::exchange(other.fd, -1); ptr = std::exchange(other.ptr, nullptr); bytes = std::exchange(other.bytes, 0); return *this; } [[nodiscard]] std::span region() const { return std::span(ptr, bytes); } [[nodiscard]] static std::expected open_readonly(const std::string &raw_name) { const auto shm_name = raw_name.starts_with('/') ? raw_name : "/" + raw_name; const int fd = shm_open(shm_name.c_str(), O_RDONLY, 0); if (fd < 0) { return std::unexpected("shm_open failed for '" + shm_name + "'"); } struct stat statbuf{}; if (fstat(fd, &statbuf) != 0) { close(fd); return std::unexpected("fstat failed for '" + shm_name + "'"); } if (statbuf.st_size <= 0) { close(fd); return std::unexpected("shared memory size is zero for '" + shm_name + "'"); } const auto bytes = static_cast(statbuf.st_size); auto *mapped = static_cast(mmap(nullptr, bytes, PROT_READ, MAP_SHARED, fd, 0)); if (mapped == MAP_FAILED) { close(fd); return std::unexpected("mmap failed for '" + shm_name + "'"); } SharedMemoryView view; view.fd = fd; view.ptr = mapped; view.bytes = bytes; return view; } }; struct PipelineStats { std::uint64_t sync_messages{0}; std::uint64_t status_messages{0}; std::uint64_t torn_frames{0}; std::uint64_t pushed_frames{0}; std::uint64_t encoded_access_units{0}; std::uint64_t resets{0}; std::uint64_t format_rebuilds{0}; std::uint64_t supervised_restarts{0}; }; [[nodiscard]] std::string_view status_to_string(ipc::ModuleStatus status) { switch (status) { case ipc::ModuleStatus::Online: return "online"; case ipc::ModuleStatus::Offline: return "offline"; case ipc::ModuleStatus::StreamReset: return "stream_reset"; } return "unknown"; } [[nodiscard]] bool frame_info_equal(const ipc::FrameInfo &lhs, const ipc::FrameInfo &rhs) { return lhs.width == rhs.width && lhs.height == rhs.height && lhs.channels == rhs.channels && lhs.depth == rhs.depth && lhs.pixel_format == rhs.pixel_format && lhs.buffer_size == rhs.buffer_size; } [[nodiscard]] ipc::DepthUnit normalize_depth_unit_for_recording(ipc::DepthUnit unit) { if (unit == ipc::DepthUnit::Unknown) { return ipc::DepthUnit::Millimeter; } return unit; } [[nodiscard]] std::expected make_depth_map_view(const ipc::CoherentSnapshot &snapshot) { if (!snapshot.depth_info) { return std::unexpected("depth plane metadata is missing"); } if (snapshot.depth.empty()) { return std::unexpected("depth plane bytes are missing"); } const auto &depth_info = *snapshot.depth_info; if (depth_info.depth != ipc::Depth::F32 || depth_info.pixel_format != ipc::PixelFormat::GRAY) { return std::unexpected("depth plane must be GRAY/F32"); } const auto pixel_count = static_cast(depth_info.width) * static_cast(depth_info.height); const auto expected_bytes = pixel_count * sizeof(float); if (snapshot.depth.size() != expected_bytes) { return std::unexpected( "depth plane byte size does not match width*height*sizeof(float)"); } if ((reinterpret_cast(snapshot.depth.data()) % alignof(float)) != 0) { return std::unexpected("depth plane is not aligned for float access"); } return record::RawDepthMapView{ .timestamp_ns = snapshot.metadata.timestamp_ns, .width = depth_info.width, .height = depth_info.height, .source_unit = normalize_depth_unit_for_recording(snapshot.depth_unit), .pixels = std::span( reinterpret_cast(snapshot.depth.data()), pixel_count), }; } [[nodiscard]] std::uint64_t body_tracking_timestamp_ns(const cvmmap::body_tracking_frame_t &frame) { if (frame.header.timestamp_ns != 0) { return frame.header.timestamp_ns; } return frame.header.sdk_timestamp_ns; } [[nodiscard]] Status publish_access_units( const RuntimeConfig &config, std::vector &&access_units, PipelineStats &stats, protocol::UdpRtpPublisher *rtp_publisher, protocol::RtmpOutput *rtmp_output, record::McapRecordSink *mcap_sink, metrics::IngestEmitLatencyTracker &latency_tracker) { for (auto &access_unit : access_units) { if (access_unit.annexb_bytes.empty()) { continue; } if (config.latency.emit_stall_ms > 0) { std::this_thread::sleep_for(std::chrono::milliseconds(config.latency.emit_stall_ms)); latency_tracker.note_emit_stall(); } if (rtp_publisher != nullptr) { rtp_publisher->publish_access_unit(access_unit.annexb_bytes, access_unit.stream_pts_ns); } if (rtmp_output != nullptr) { auto publish = (*rtmp_output)->publish_access_unit(access_unit); if (!publish) { return std::unexpected(publish.error()); } } if (mcap_sink != nullptr) { auto write = mcap_sink->write_access_unit(access_unit); if (!write) { return unexpected_error(ERR_SERIALIZATION, write.error()); } } latency_tracker.note_emit(); stats.encoded_access_units += 1; if (stats.encoded_access_units <= 10 || (stats.encoded_access_units % 30) == 0) { spdlog::info( "ENCODED_AU codec={} count={} bytes={} keyframe={}", to_string(access_unit.codec), stats.encoded_access_units, access_unit.annexb_bytes.size(), access_unit.keyframe ? "true" : "false"); } } return {}; } [[nodiscard]] Status drain_encoder( const RuntimeConfig &config, encode::EncoderBackend &backend, bool flushing, PipelineStats &stats, protocol::UdpRtpPublisher *rtp_publisher, protocol::RtmpOutput *rtmp_output, record::McapRecordSink *mcap_sink, metrics::IngestEmitLatencyTracker &latency_tracker) { auto drained = flushing ? backend->flush() : backend->drain(); if (!drained) { return std::unexpected(drained.error()); } return publish_access_units( config, std::move(*drained), stats, rtp_publisher, rtmp_output, mcap_sink, latency_tracker); } } int run_pipeline(const RuntimeConfig &config) { auto input_endpoints = resolve_input_endpoints(config); if (!input_endpoints) { spdlog::error("{}", input_endpoints.error()); return exit_code(PipelineExitCode::InputError); } auto source = make_frame_source(config); if (!source) { spdlog::error("pipeline input source selection failed: {}", source.error()); return exit_code(PipelineExitCode::InputError); } auto source_prepare = (*source)->prepare_runtime(); if (!source_prepare) { spdlog::error("pipeline source backend '{}' setup failed: {}", (*source)->backend_name(), source_prepare.error()); return exit_code(PipelineExitCode::InputError); } auto backend = encode::make_encoder_backend(config); if (!backend) { spdlog::error("pipeline encoder backend selection failed: {}", format_error(backend.error())); return exit_code(PipelineExitCode::InitializationError); } auto shm = SharedMemoryView::open_readonly(input_endpoints->shm_name); if (!shm) { spdlog::error("pipeline open shared memory failed: {}", shm.error()); return exit_code(PipelineExitCode::SharedMemoryError); } if (shm->bytes <= ipc::kShmPayloadOffset) { spdlog::error("pipeline invalid shared memory size: {}", shm->bytes); return exit_code(PipelineExitCode::SharedMemoryError); } std::vector snapshot_buffer( shm->bytes - ipc::kShmPayloadOffset, static_cast(0)); zmq::context_t zmq_ctx{1}; zmq::socket_t subscriber(zmq_ctx, zmq::socket_type::sub); try { subscriber.set(zmq::sockopt::subscribe, ""); subscriber.connect(input_endpoints->zmq_endpoint); } catch (const zmq::error_t &e) { spdlog::error("pipeline subscribe failed on '{}': {}", input_endpoints->zmq_endpoint, e.what()); return exit_code(PipelineExitCode::SubscriberError); } std::optional body_subscriber{}; if (config.record.mcap.enabled) { try { body_subscriber.emplace(zmq_ctx, zmq::socket_type::sub); body_subscriber->set(zmq::sockopt::subscribe, ""); body_subscriber->connect(input_endpoints->body_zmq_endpoint); } catch (const zmq::error_t &e) { spdlog::error("pipeline body subscribe failed on '{}': {}", input_endpoints->body_zmq_endpoint, e.what()); return exit_code(PipelineExitCode::SubscriberError); } } std::optional rtp_publisher{}; std::optional rtmp_output{}; std::optional mcap_sink{}; if (config.outputs.rtp.enabled) { auto created = protocol::UdpRtpPublisher::create(config); if (!created) { spdlog::error("pipeline RTP publisher init failed: {}", created.error()); return exit_code(PipelineExitCode::InitializationError); } rtp_publisher.emplace(std::move(*created)); } PipelineStats stats{}; metrics::IngestEmitLatencyTracker latency_tracker{}; bool producer_offline{false}; bool started{false}; std::optional active_info{}; std::optional restart_target_info{}; bool restart_pending{false}; bool warned_unknown_depth_unit{false}; const auto restart_backend = [&](std::string_view reason, std::optional target_info) { if (started) { stats.supervised_restarts += 1; } spdlog::warn( "PIPELINE_RESTART backend={} reason='{}' restarts={}", (*backend)->backend_name(), reason, stats.supervised_restarts); (*backend)->shutdown(); started = false; restart_pending = true; restart_target_info = target_info; warned_unknown_depth_unit = false; rtmp_output.reset(); }; const auto attempt_backend_start = [&](const ipc::FrameInfo &target_info) -> Status { (*backend)->shutdown(); rtmp_output.reset(); auto init = (*backend)->init(config, target_info); if (!init) { return std::unexpected(init.error()); } if (config.outputs.rtmp.enabled) { auto stream_info = (*backend)->stream_info(); if (!stream_info) { return unexpected_error( stream_info.error().code, "pipeline RTMP output stream info unavailable: " + format_error(stream_info.error())); } auto created = protocol::make_rtmp_output(config, *stream_info); if (!created) { return unexpected_error( created.error().code, "pipeline RTMP output init failed: " + format_error(created.error())); } rtmp_output.emplace(std::move(*created)); } if (config.record.mcap.enabled) { auto stream_info = (*backend)->stream_info(); if (!stream_info) { return unexpected_error( stream_info.error().code, "pipeline MCAP stream info unavailable: " + format_error(stream_info.error())); } if (!mcap_sink) { auto created = record::McapRecordSink::create(config, *stream_info); if (!created) { return unexpected_error( ERR_INTERNAL, "pipeline MCAP sink init failed: " + created.error()); } mcap_sink.emplace(std::move(*created)); } else { auto updated = mcap_sink->update_stream_info(*stream_info); if (!updated) { return unexpected_error( ERR_INTERNAL, "pipeline MCAP stream update failed: " + updated.error()); } } } started = true; restart_pending = false; restart_target_info.reset(); warned_unknown_depth_unit = false; active_info = target_info; return {}; }; const auto idle_timeout = std::chrono::milliseconds(config.latency.ingest_idle_timeout_ms); auto last_event = std::chrono::steady_clock::now(); while (true) { auto poll = (*backend)->poll(); if (!poll) { const auto reason = format_error(poll.error()); restart_backend(reason, active_info); } std::array poll_items{{ {subscriber.handle(), 0, ZMQ_POLLIN, 0}, {body_subscriber ? body_subscriber->handle() : nullptr, 0, ZMQ_POLLIN, 0}, }}; try { zmq::poll(poll_items, std::chrono::milliseconds{20}); } catch (const zmq::error_t &e) { spdlog::error("pipeline poll failed: {}", e.what()); return exit_code(PipelineExitCode::SubscriberError); } const bool frame_socket_ready = (poll_items[0].revents & ZMQ_POLLIN) != 0; const bool body_socket_ready = body_subscriber.has_value() && (poll_items[1].revents & ZMQ_POLLIN) != 0; if (!frame_socket_ready && !body_socket_ready) { const auto now = std::chrono::steady_clock::now(); if (restart_pending && restart_target_info) { auto start_result = attempt_backend_start(*restart_target_info); if (!start_result) { spdlog::error("pipeline backend restart failed: {}", format_error(start_result.error())); return exit_code(PipelineExitCode::RuntimeError); } } if (now - last_event >= idle_timeout) { spdlog::info("pipeline idle timeout reached ({} ms), stopping", config.latency.ingest_idle_timeout_ms); break; } if (!producer_offline && started) { auto drain = drain_encoder( config, *backend, false, stats, rtp_publisher ? &*rtp_publisher : nullptr, rtmp_output ? &*rtmp_output : nullptr, mcap_sink ? &*mcap_sink : nullptr, latency_tracker); if (!drain) { const auto reason = format_error(drain.error()); restart_backend(reason, active_info); } } continue; } if (body_socket_ready && body_subscriber) { while (true) { zmq::message_t body_message; const auto recv_body = body_subscriber->recv(body_message, zmq::recv_flags::dontwait); if (!recv_body) { break; } last_event = std::chrono::steady_clock::now(); auto body_bytes = std::span( static_cast(body_message.data()), body_message.size()); if (body_bytes.empty()) { continue; } if (!mcap_sink) { continue; } auto parsed_body = cvmmap::parse_body_tracking_message(body_bytes); if (!parsed_body) { spdlog::warn("pipeline body packet parse error: {}", parsed_body.error()); continue; } auto write_body = mcap_sink->write_body_tracking_message(record::RawBodyTrackingMessageView{ .timestamp_ns = body_tracking_timestamp_ns(*parsed_body), .bytes = body_bytes, }); if (!write_body) { const auto reason = "pipeline body MCAP write failed: " + write_body.error(); restart_backend(reason, active_info); break; } } } if (!frame_socket_ready) { continue; } zmq::message_t message; const auto recv_result = subscriber.recv(message, zmq::recv_flags::dontwait); if (!recv_result) { continue; } last_event = std::chrono::steady_clock::now(); auto bytes = std::span( static_cast(message.data()), message.size()); if (bytes.empty()) { continue; } if (bytes[0] == ipc::kFrameTopicMagic) { stats.sync_messages += 1; auto sync = ipc::parse_sync_message(bytes); if (!sync) { spdlog::warn("pipeline sync parse error: {}", ipc::to_string(sync.error())); continue; } auto snapshot = ipc::read_coherent_snapshot( shm->region(), snapshot_buffer, [&]() { if (config.latency.snapshot_copy_delay_us > 0) { std::this_thread::sleep_for(std::chrono::microseconds(config.latency.snapshot_copy_delay_us)); } }); if (!snapshot) { if (snapshot.error() == ipc::SnapshotError::TornRead) { stats.torn_frames += 1; } spdlog::warn("pipeline snapshot rejected: {}", ipc::to_string(snapshot.error())); continue; } if (active_info && !frame_info_equal(*active_info, snapshot->metadata.info)) { stats.format_rebuilds += 1; restart_backend("frame_info_change", snapshot->metadata.info); } if (!started || restart_pending) { const auto target_info = restart_target_info.value_or(snapshot->metadata.info); auto start_result = attempt_backend_start(target_info); if (!start_result) { spdlog::error("pipeline backend init failed: {}", format_error(start_result.error())); return exit_code(PipelineExitCode::InitializationError); } } latency_tracker.note_ingest(); auto push = (*backend)->push_frame(encode::RawVideoFrame{ .info = snapshot->metadata.info, .source_timestamp_ns = snapshot->metadata.timestamp_ns, .bytes = std::span(snapshot_buffer.data(), snapshot->bytes_copied), }); if (!push) { const auto reason = format_error(push.error()); restart_backend(reason, active_info); continue; } if (mcap_sink.has_value() && !snapshot->depth.empty()) { if (snapshot->depth_unit == ipc::DepthUnit::Unknown) { if (!warned_unknown_depth_unit) { spdlog::warn( "pipeline depth plane present but depth_unit is unknown; assuming millimeters for MCAP recording, producer should upgrade to the ABI with explicit depth_unit metadata"); warned_unknown_depth_unit = true; } } auto depth_map = make_depth_map_view(*snapshot); if (!depth_map) { const auto reason = "pipeline depth snapshot invalid: " + depth_map.error(); restart_backend(reason, active_info); continue; } auto write_depth = mcap_sink->write_depth_map(*depth_map); if (!write_depth) { const auto reason = "pipeline depth MCAP write failed: " + write_depth.error(); restart_backend(reason, active_info); continue; } } stats.pushed_frames += 1; auto drain = drain_encoder( config, *backend, false, stats, rtp_publisher ? &*rtp_publisher : nullptr, rtmp_output ? &*rtmp_output : nullptr, mcap_sink ? &*mcap_sink : nullptr, latency_tracker); if (!drain) { const auto reason = format_error(drain.error()); restart_backend(reason, active_info); continue; } if (config.latency.ingest_max_frames > 0 && stats.pushed_frames >= config.latency.ingest_max_frames) { spdlog::info("pipeline reached ingest_max_frames={}, stopping", config.latency.ingest_max_frames); break; } continue; } if (bytes[0] == ipc::kModuleStatusMagic) { stats.status_messages += 1; auto status = ipc::parse_module_status_message(bytes); if (!status) { spdlog::warn("pipeline status parse error: {}", ipc::to_string(status.error())); continue; } spdlog::info("pipeline status event label='{}' status={}", status->label(), status_to_string(status->module_status)); if (status->module_status == ipc::ModuleStatus::Online) { producer_offline = false; } if (status->module_status == ipc::ModuleStatus::Offline) { producer_offline = true; } if (status->module_status == ipc::ModuleStatus::StreamReset) { stats.resets += 1; (*backend)->shutdown(); started = false; restart_pending = false; restart_target_info.reset(); active_info.reset(); rtmp_output.reset(); } continue; } spdlog::warn("pipeline unknown message type: magic=0x{:02x} size={}", bytes[0], bytes.size()); } if (started) { auto drain = drain_encoder( config, *backend, true, stats, rtp_publisher ? &*rtp_publisher : nullptr, rtmp_output ? &*rtmp_output : nullptr, mcap_sink ? &*mcap_sink : nullptr, latency_tracker); if (!drain) { spdlog::error("pipeline publish failed during flush: {}", format_error(drain.error())); return exit_code(PipelineExitCode::RuntimeError); } } (*backend)->shutdown(); if (mcap_sink) { mcap_sink->close(); } spdlog::info( "PIPELINE_METRICS codec={} backend={} sync_messages={} status_messages={} torn_frames={} pushed_frames={} encoded_access_units={} resets={} format_rebuilds={} supervised_restarts={}", to_string(config.encoder.codec), (*backend)->backend_name(), stats.sync_messages, stats.status_messages, stats.torn_frames, stats.pushed_frames, stats.encoded_access_units, stats.resets, stats.format_rebuilds, stats.supervised_restarts); const auto latency_summary = latency_tracker.summarize(); const auto pending_frames = latency_tracker.pending_frames(); const auto ingest_drop_frames = stats.sync_messages >= stats.pushed_frames ? stats.sync_messages - stats.pushed_frames : 0ull; const auto total_drop_frames = ingest_drop_frames + stats.torn_frames + pending_frames; const auto drop_ratio_ppm = stats.sync_messages > 0 ? (total_drop_frames * 1'000'000ull) / stats.sync_messages : 0ull; spdlog::info( "LATENCY_METRICS ingest_to_emit_samples={} p50_us={} p95_us={} p99_us={} min_us={} max_us={} avg_us={} pending_frames={} ingest_drop_frames={} total_drop_frames={} drop_ratio_ppm={} sink_stall_events={}", latency_summary.samples, latency_summary.p50_us, latency_summary.p95_us, latency_summary.p99_us, latency_summary.min_us, latency_summary.max_us, latency_summary.avg_us, pending_frames, ingest_drop_frames, total_drop_frames, drop_ratio_ppm, latency_tracker.emit_stall_events()); if (rtp_publisher) { rtp_publisher->log_metrics(); } if (rtmp_output) { (*rtmp_output)->log_metrics(); } return exit_code(PipelineExitCode::Success); } }