diff --git a/CMakeLists.txt b/CMakeLists.txt index fc8ee28..0730e81 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -8,10 +8,42 @@ set(CMAKE_EXPORT_COMPILE_COMMANDS ON) find_package(Threads REQUIRED) find_package(cppzmq QUIET) +if (DEFINED CVMMAP_STREAMER_USE_SYSTEM_CNATS) + message(FATAL_ERROR + "CVMMAP_STREAMER_USE_SYSTEM_CNATS was removed; use CVMMAP_CNATS_PROVIDER=system") +endif() +set( + CVMMAP_CNATS_PROVIDER + "system" + CACHE STRING + "How to resolve cnats: system or workspace") +set_property(CACHE CVMMAP_CNATS_PROVIDER PROPERTY STRINGS system workspace) +set(_CVMMAP_STREAMER_CNATS_PROVIDER_VALUES system workspace) +list(FIND _CVMMAP_STREAMER_CNATS_PROVIDER_VALUES "${CVMMAP_CNATS_PROVIDER}" _CVMMAP_STREAMER_CNATS_PROVIDER_INDEX) +if (_CVMMAP_STREAMER_CNATS_PROVIDER_INDEX EQUAL -1) + message(FATAL_ERROR + "Invalid CVMMAP_CNATS_PROVIDER='${CVMMAP_CNATS_PROVIDER}' (expected: system|workspace)") +endif() set(CVMMAP_LOCAL_ROOT "${CMAKE_CURRENT_LIST_DIR}/../cv-mmap" CACHE PATH "Path to a local cv-mmap checkout") set(CVMMAP_LOCAL_BUILD "${CVMMAP_LOCAL_ROOT}/build/core" CACHE PATH "Path to local cv-mmap build artifacts") -if (cvmmap-core_DIR) +set(CVMMAP_LOCAL_CORE_DIR "${CVMMAP_LOCAL_ROOT}/build/core" CACHE PATH "Path to local cvmmap-core package config") +set(CVMMAP_LOCAL_NATS_STATIC "${CVMMAP_LOCAL_ROOT}/build/lib/libnats_static.a" CACHE PATH "Path to local cnats static library") +if (CVMMAP_CNATS_PROVIDER STREQUAL "system") + find_package(cnats CONFIG REQUIRED) find_package(cvmmap-core CONFIG QUIET) +else() + if (NOT EXISTS "${CVMMAP_LOCAL_NATS_STATIC}") + message(FATAL_ERROR + "workspace cnats provider requires ${CVMMAP_LOCAL_NATS_STATIC}") + endif() + find_package(OpenSSL REQUIRED) + if (NOT TARGET cnats::nats_static) + add_library(cnats::nats_static STATIC IMPORTED GLOBAL) + set_target_properties(cnats::nats_static PROPERTIES + IMPORTED_LOCATION "${CVMMAP_LOCAL_NATS_STATIC}" + INTERFACE_INCLUDE_DIRECTORIES "${CVMMAP_LOCAL_ROOT}/third_party/nats.c/src" + INTERFACE_LINK_LIBRARIES "OpenSSL::SSL;OpenSSL::Crypto;Threads::Threads") + endif() endif() find_package(ZeroMQ QUIET) find_package(spdlog REQUIRED) @@ -33,12 +65,18 @@ if (NOT TARGET cvmmap::client) if ( EXISTS "${CVMMAP_LOCAL_ROOT}/core/include/cvmmap/client.hpp" AND EXISTS "${CVMMAP_LOCAL_BUILD}/libcvmmap_client.a" + AND EXISTS "${CVMMAP_LOCAL_BUILD}/libcvmmap_nats.a" + AND EXISTS "${CVMMAP_LOCAL_BUILD}/libcvmmap_proto.a" AND EXISTS "${CVMMAP_LOCAL_BUILD}/libcvmmap_ipc.a" AND EXISTS "${CVMMAP_LOCAL_BUILD}/libcvmmap_target.a") + add_library(cvmmap::nats INTERFACE IMPORTED) add_library(cvmmap::client INTERFACE IMPORTED) + set_target_properties(cvmmap::nats PROPERTIES + INTERFACE_INCLUDE_DIRECTORIES "${CVMMAP_LOCAL_ROOT}/core/include" + INTERFACE_LINK_LIBRARIES "${CVMMAP_LOCAL_BUILD}/libcvmmap_nats.a;${CVMMAP_LOCAL_BUILD}/libcvmmap_proto.a;${CVMMAP_LOCAL_BUILD}/libcvmmap_ipc.a;${CVMMAP_LOCAL_BUILD}/libcvmmap_target.a;cnats::nats_static") set_target_properties(cvmmap::client PROPERTIES INTERFACE_INCLUDE_DIRECTORIES "${CVMMAP_LOCAL_ROOT}/core/include" - INTERFACE_LINK_LIBRARIES "${CVMMAP_LOCAL_BUILD}/libcvmmap_client.a;${CVMMAP_LOCAL_BUILD}/libcvmmap_ipc.a;${CVMMAP_LOCAL_BUILD}/libcvmmap_target.a") + INTERFACE_LINK_LIBRARIES "${CVMMAP_LOCAL_BUILD}/libcvmmap_client.a;${CVMMAP_LOCAL_BUILD}/libcvmmap_nats.a;${CVMMAP_LOCAL_BUILD}/libcvmmap_proto.a;${CVMMAP_LOCAL_BUILD}/libcvmmap_ipc.a;${CVMMAP_LOCAL_BUILD}/libcvmmap_target.a;cnats::nats_static") else() message(FATAL_ERROR "cvmmap::client target is unavailable and local cv-mmap build artifacts were not found") endif() @@ -169,6 +207,7 @@ set(CVMMAP_STREAMER_LINK_DEPS PkgConfig::ZSTD PkgConfig::LZ4 cvmmap::client + cvmmap::nats CLI11::CLI11 tomlplusplus::tomlplusplus mcap::mcap) diff --git a/README.md b/README.md index 3cc6691..41a1917 100644 --- a/README.md +++ b/README.md @@ -35,11 +35,24 @@ sudo pacman -S cmake gstreamer gst-plugins-base gst-plugins-good \ ### Build +`cvmmap-streamer` uses `CVMMAP_CNATS_PROVIDER` to decide how `cnats` is resolved: + +- `system` (default): use an installed `cnats` package, typically from a top-level `cv-mmap` install under a standard prefix like `/usr/local` +- `workspace`: use the local `cv-mmap` build-tree exports + ```bash cmake -B build -S . cmake --build build ``` +```bash +# Use a local cv-mmap build tree +cmake -B build -S . \ + -DCVMMAP_CNATS_PROVIDER=workspace \ + -DCVMMAP_LOCAL_ROOT=/path/to/cv-mmap +cmake --build build +``` + **Verify binaries exist:** ```bash ls -la build/{cvmmap_streamer,rtp_receiver_tester,rtmp_stub_tester} diff --git a/include/cvmmap_streamer/config/runtime_config.hpp b/include/cvmmap_streamer/config/runtime_config.hpp index 1e9320b..eedd3ae 100644 --- a/include/cvmmap_streamer/config/runtime_config.hpp +++ b/include/cvmmap_streamer/config/runtime_config.hpp @@ -125,6 +125,7 @@ std::string_view to_string(RtmpTransportType transport); std::string_view to_string(EncoderBackendType backend); std::string_view to_string(EncoderDeviceType device); std::string_view to_string(McapCompression compression); +std::expected parse_mcap_compression(std::string_view raw); std::expected parse_runtime_config(int argc, char **argv); std::expected validate_runtime_config(const RuntimeConfig &config); diff --git a/scripts/zed_recording_mcap_tool.py b/scripts/zed_recording_mcap_tool.py new file mode 100755 index 0000000..930887b --- /dev/null +++ b/scripts/zed_recording_mcap_tool.py @@ -0,0 +1,367 @@ +#!/usr/bin/env python3 + +from __future__ import annotations + +import argparse +import math +import os +import shlex +import subprocess +import sys +import tempfile +from collections import Counter +from pathlib import Path +from typing import Iterable + +import cv2 +import numpy as np + + +SCRIPT_PATH = Path(__file__).resolve() +REPO_ROOT = SCRIPT_PATH.parents[1] +WORKSPACE_ROOT = REPO_ROOT.parent +MCAP_PYTHON_ROOT = WORKSPACE_ROOT / "mcap" / "python" / "mcap" +if str(MCAP_PYTHON_ROOT) not in sys.path: + sys.path.insert(0, str(MCAP_PYTHON_ROOT)) + +from mcap.reader import make_reader # noqa: E402 + + +VIDEO_FORMATS = ("h264", "h265") + + +def parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser( + description=( + "Convert ZED SVO/SVO2 recordings to MCAP and generate a lightweight preview. " + "If the input is already an MCAP file, conversion is skipped." + ) + ) + parser.add_argument("input", help="Input .svo/.svo2 file, .mcap file, or a directory containing SVO files") + parser.add_argument("--output-dir", help="Directory for generated MCAP files and previews") + parser.add_argument( + "--preview-all", + action="store_true", + help="When the input is a directory, generate a preview for every converted MCAP instead of just the first one", + ) + parser.add_argument("--no-preview", action="store_true", help="Convert only, do not generate preview images") + parser.add_argument( + "--format", + choices=("auto", "h264", "h265"), + default="auto", + help="CompressedVideo format to extract from MCAP during preview", + ) + parser.add_argument("--codec", choices=VIDEO_FORMATS, default="h264", help="Video codec for SVO to MCAP conversion") + parser.add_argument( + "--encoder-device", + choices=("auto", "nvidia", "software"), + default="software", + help="Encoder device passed to zed_svo_to_mcap", + ) + parser.add_argument( + "--mcap-compression", + choices=("none", "lz4", "zstd"), + default="none", + help="MCAP chunk compression passed to zed_svo_to_mcap", + ) + parser.add_argument( + "--depth-mode", + choices=("neural", "quality", "performance", "ultra"), + default="neural", + help="Depth mode passed to zed_svo_to_mcap", + ) + parser.add_argument("--start-frame", type=int, default=0, help="First SVO frame to convert") + parser.add_argument("--end-frame", type=int, help="Last SVO frame to convert") + parser.add_argument( + "--sample-count", + type=int, + default=9, + help="Number of decoded frames to place in the preview contact sheet", + ) + parser.add_argument( + "--frame-step", + type=int, + default=15, + help="Decode every Nth frame for the contact sheet", + ) + parser.add_argument( + "--contact-sheet-width", + type=int, + default=480, + help="Width of each preview tile in pixels", + ) + parser.add_argument( + "--cuda-visible-devices", + help=( + "Optional CUDA_VISIBLE_DEVICES value to export while running zed_svo_to_mcap. " + "Useful when the ZED SDK must be pinned to a specific GPU UUID." + ), + ) + parser.add_argument("--zed-bin", help="Explicit path to zed_svo_to_mcap") + parser.add_argument("--reader-bin", help="Explicit path to mcap_reader_tester") + return parser.parse_args() + + +def locate_binary(name: str, override: str | None) -> Path: + if override: + path = Path(override).expanduser().resolve() + if not path.is_file(): + raise FileNotFoundError(f"binary not found: {path}") + return path + + candidates = ( + REPO_ROOT / "build" / "bin" / name, + REPO_ROOT / "build" / name, + ) + for candidate in candidates: + if candidate.is_file(): + return candidate + raise FileNotFoundError(f"could not find {name} under {REPO_ROOT / 'build'}") + + +def quote_command(args: Iterable[str]) -> str: + return " ".join(shlex.quote(arg) for arg in args) + + +def run(args: list[str], env: dict[str, str] | None = None) -> None: + print(f"$ {quote_command(args)}", flush=True) + subprocess.run(args, check=True, env=env) + + +def summarize_mcap(mcap_path: Path) -> list[tuple[str, str, str, int]]: + counts: Counter[tuple[str, str, str]] = Counter() + with mcap_path.open("rb") as stream: + reader = make_reader(stream) + for schema, channel, _message in reader.iter_messages(): + schema_name = schema.name if schema is not None else "" + counts[(channel.topic, channel.message_encoding, schema_name)] += 1 + summary_rows = [ + (topic, encoding, schema_name, count) + for (topic, encoding, schema_name), count in sorted(counts.items()) + ] + print(f"MCAP summary: {mcap_path}") + for topic, encoding, schema_name, count in summary_rows: + print(f" {count:6d} topic={topic} encoding={encoding} schema={schema_name}") + return summary_rows + + +def infer_video_format(reader_bin: Path, mcap_path: Path, requested: str) -> str: + if requested != "auto": + return requested + + for candidate in VIDEO_FORMATS: + result = subprocess.run( + [str(reader_bin), str(mcap_path), "--expect-format", candidate, "--min-messages", "1"], + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + text=True, + check=False, + ) + if result.returncode == 0: + return candidate + raise RuntimeError(f"could not infer video format from {mcap_path}") + + +def dump_annexb(reader_bin: Path, mcap_path: Path, video_format: str, output_path: Path) -> None: + run( + [ + str(reader_bin), + str(mcap_path), + "--expect-format", + video_format, + "--min-messages", + "1", + "--dump-annexb-output", + str(output_path), + ] + ) + + +def make_contact_sheet(stream_path: Path, image_path: Path, sample_count: int, frame_step: int, tile_width: int) -> int: + capture = cv2.VideoCapture(str(stream_path)) + if not capture.isOpened(): + raise RuntimeError(f"OpenCV could not open decoded stream {stream_path}") + + frames: list[np.ndarray] = [] + frame_index = 0 + while len(frames) < sample_count: + ok, frame = capture.read() + if not ok: + break + if frame_index % frame_step == 0: + annotated = frame.copy() + cv2.putText( + annotated, + f"frame {frame_index}", + (20, 40), + cv2.FONT_HERSHEY_SIMPLEX, + 1.0, + (0, 255, 0), + 2, + cv2.LINE_AA, + ) + frames.append(annotated) + frame_index += 1 + capture.release() + + if not frames: + raise RuntimeError(f"no frames decoded from {stream_path}") + + tile_width = max(64, tile_width) + resized: list[np.ndarray] = [] + for frame in frames: + scale = tile_width / frame.shape[1] + tile_height = max(1, int(round(frame.shape[0] * scale))) + resized.append(cv2.resize(frame, (tile_width, tile_height), interpolation=cv2.INTER_AREA)) + + max_height = max(frame.shape[0] for frame in resized) + padded: list[np.ndarray] = [] + for frame in resized: + if frame.shape[0] == max_height: + padded.append(frame) + continue + canvas = np.zeros((max_height, frame.shape[1], 3), dtype=np.uint8) + canvas[: frame.shape[0], :, :] = frame + padded.append(canvas) + + columns = max(1, math.ceil(math.sqrt(len(padded)))) + rows = math.ceil(len(padded) / columns) + blank = np.zeros_like(padded[0]) + + row_images: list[np.ndarray] = [] + for row_index in range(rows): + row_frames = padded[row_index * columns : (row_index + 1) * columns] + while len(row_frames) < columns: + row_frames.append(blank) + row_images.append(np.concatenate(row_frames, axis=1)) + + sheet = np.concatenate(row_images, axis=0) + image_path.parent.mkdir(parents=True, exist_ok=True) + if not cv2.imwrite(str(image_path), sheet): + raise RuntimeError(f"failed to write preview image {image_path}") + print(f"Preview contact sheet: {image_path}") + return len(frames) + + +def collect_svo_inputs(input_path: Path) -> list[Path]: + if input_path.is_file(): + if input_path.suffix.lower() in {".svo", ".svo2"}: + return [input_path] + if input_path.suffix.lower() == ".mcap": + return [] + raise ValueError(f"unsupported input file: {input_path}") + + if input_path.is_dir(): + return sorted( + path for path in input_path.rglob("*") if path.suffix.lower() in {".svo", ".svo2"} + ) + + raise FileNotFoundError(f"input not found: {input_path}") + + +def default_output_dir(input_path: Path) -> Path: + if input_path.is_dir(): + return input_path / "mcap_preview" + return input_path.parent / "mcap_preview" + + +def convert_svo( + zed_bin: Path, + svo_path: Path, + mcap_path: Path, + args: argparse.Namespace, +) -> None: + env = os.environ.copy() + if args.cuda_visible_devices: + env["CUDA_VISIBLE_DEVICES"] = args.cuda_visible_devices + + command = [ + str(zed_bin), + "--input", + str(svo_path), + "--output", + str(mcap_path), + "--codec", + args.codec, + "--encoder-device", + args.encoder_device, + "--mcap-compression", + args.mcap_compression, + "--depth-mode", + args.depth_mode, + "--start-frame", + str(args.start_frame), + ] + if args.end_frame is not None: + command.extend(["--end-frame", str(args.end_frame)]) + + mcap_path.parent.mkdir(parents=True, exist_ok=True) + run(command, env=env) + + +def preview_mcap(reader_bin: Path, mcap_path: Path, args: argparse.Namespace) -> None: + summarize_mcap(mcap_path) + video_format = infer_video_format(reader_bin, mcap_path, args.format) + print(f"Detected video format: {video_format}") + + stream_extension = ".h265" if video_format == "h265" else ".h264" + with tempfile.TemporaryDirectory(prefix="zed_mcap_preview_") as temp_dir: + temp_root = Path(temp_dir) + stream_path = temp_root / f"preview{stream_extension}" + dump_annexb(reader_bin, mcap_path, video_format, stream_path) + + preview_path = mcap_path.with_suffix(".preview.png") + decoded = make_contact_sheet( + stream_path, + preview_path, + sample_count=args.sample_count, + frame_step=args.frame_step, + tile_width=args.contact_sheet_width, + ) + print(f"Decoded {decoded} preview frame(s)") + + +def main() -> int: + args = parse_args() + + input_path = Path(args.input).expanduser().resolve() + output_dir = Path(args.output_dir).expanduser().resolve() if args.output_dir else default_output_dir(input_path) + output_dir.mkdir(parents=True, exist_ok=True) + + reader_bin = locate_binary("mcap_reader_tester", args.reader_bin) + zed_bin = locate_binary("zed_svo_to_mcap", args.zed_bin) if input_path.suffix.lower() != ".mcap" or input_path.is_dir() else None + + if input_path.is_file() and input_path.suffix.lower() == ".mcap": + if not args.no_preview: + preview_mcap(reader_bin, input_path, args) + return 0 + + svo_inputs = collect_svo_inputs(input_path) + if not svo_inputs: + raise RuntimeError(f"no .svo/.svo2 files found under {input_path}") + + converted_paths: list[Path] = [] + for svo_path in svo_inputs: + output_name = f"{svo_path.stem}.mcap" + mcap_path = output_dir / output_name + convert_svo(zed_bin, svo_path, mcap_path, args) + converted_paths.append(mcap_path) + + if args.no_preview: + return 0 + + preview_targets = converted_paths if args.preview_all else converted_paths[:1] + for mcap_path in preview_targets: + preview_mcap(reader_bin, mcap_path, args) + + print("Generated MCAP files:") + for mcap_path in converted_paths: + print(f" {mcap_path}") + return 0 + + +if __name__ == "__main__": + try: + raise SystemExit(main()) + except KeyboardInterrupt: + raise SystemExit(130) diff --git a/src/config/runtime_config.cpp b/src/config/runtime_config.cpp index 7937567..7603752 100644 --- a/src/config/runtime_config.cpp +++ b/src/config/runtime_config.cpp @@ -155,7 +155,7 @@ std::expected parse_encoder_device(std::string_v return std::unexpected("invalid encoder device: '" + std::string(raw) + "' (expected: auto|nvidia|software)"); } -std::expected parse_mcap_compression(std::string_view raw) { +std::expected parse_mcap_compression_impl(std::string_view raw) { if (raw == "none") { return McapCompression::None; } @@ -481,6 +481,10 @@ void finalize_rtp_endpoint(RuntimeConfig &config) { } +std::expected parse_mcap_compression(std::string_view raw) { + return parse_mcap_compression_impl(raw); +} + RuntimeConfig RuntimeConfig::defaults() { return RuntimeConfig{}; } diff --git a/src/pipeline/pipeline_runtime.cpp b/src/pipeline/pipeline_runtime.cpp index 437e748..02078ba 100644 --- a/src/pipeline/pipeline_runtime.cpp +++ b/src/pipeline/pipeline_runtime.cpp @@ -9,6 +9,7 @@ #include #include +#include #include #include @@ -250,6 +251,219 @@ std::uint64_t body_tracking_timestamp_ns(const cvmmap::body_tracking_frame_t &fr return frame.header.sdk_timestamp_ns; } +struct McapRecorderState { + mutable std::mutex mutex{}; + RuntimeConfig base_config{}; + std::optional active_record_config{}; + std::optional current_stream_info{}; + std::optional sink{}; + cvmmap::RecordingStatus status{ + .format = cvmmap::RecordingFormat::Mcap, + .can_record = true, + }; +}; + +[[nodiscard]] +cvmmap::ControlError make_recording_control_error( + const int32_t code, + std::string message) { + return cvmmap::ControlError{ + .code = code, + .message = std::move(message), + }; +} + +[[nodiscard]] +RuntimeConfig make_mcap_record_config( + const RuntimeConfig &base_config, + const cvmmap::RecordingRequest &request) { + auto record_config = base_config; + record_config.record.mcap.enabled = true; + record_config.record.mcap.path = request.output_path; + if (request.mcap_options) { + if (request.mcap_options->topic) { + record_config.record.mcap.topic = *request.mcap_options->topic; + } + if (request.mcap_options->depth_topic) { + record_config.record.mcap.depth_topic = *request.mcap_options->depth_topic; + } + if (request.mcap_options->body_topic) { + record_config.record.mcap.body_topic = *request.mcap_options->body_topic; + } + if (request.mcap_options->frame_id) { + record_config.record.mcap.frame_id = *request.mcap_options->frame_id; + } + } + return record_config; +} + +void reset_mcap_status_after_stop(cvmmap::RecordingStatus &status) { + status.is_recording = false; + status.is_paused = false; + status.active_path.clear(); +} + +[[nodiscard]] +std::expected start_mcap_recording( + McapRecorderState &recorder_state, + const cvmmap::RecordingRequest &request) { + std::lock_guard lock(recorder_state.mutex); + if (request.format != cvmmap::RecordingFormat::Mcap) { + return std::unexpected(make_recording_control_error( + cvmmap::CONTROL_RESPONSE_UNSUPPORTED, + "recording format is not supported by the streamer")); + } + if (!recorder_state.current_stream_info) { + return std::unexpected(make_recording_control_error( + cvmmap::CONTROL_RESPONSE_ERROR, + "MCAP recorder is not ready; stream info unavailable")); + } + if (recorder_state.sink) { + return std::unexpected(make_recording_control_error( + cvmmap::CONTROL_RESPONSE_ERROR, + "MCAP recording is already active")); + } + + auto record_config = make_mcap_record_config(recorder_state.base_config, request); + if (request.mcap_options && request.mcap_options->compression) { + auto parsed = parse_mcap_compression(*request.mcap_options->compression); + if (!parsed) { + return std::unexpected(make_recording_control_error( + cvmmap::CONTROL_RESPONSE_INVALID_PAYLOAD, + parsed.error())); + } + record_config.record.mcap.compression = *parsed; + } + + auto created = record::McapRecordSink::create(record_config, *recorder_state.current_stream_info); + if (!created) { + return std::unexpected(make_recording_control_error( + cvmmap::CONTROL_RESPONSE_ERROR, + "pipeline MCAP sink init failed: " + created.error())); + } + + recorder_state.active_record_config = record_config; + recorder_state.sink.emplace(std::move(*created)); + recorder_state.status.can_record = true; + recorder_state.status.is_recording = true; + recorder_state.status.is_paused = false; + recorder_state.status.last_frame_ok = true; + recorder_state.status.frames_ingested = 0; + recorder_state.status.frames_encoded = 0; + recorder_state.status.active_path = request.output_path; + return recorder_state.status; +} + +[[nodiscard]] +std::expected stop_mcap_recording( + McapRecorderState &recorder_state) { + std::lock_guard lock(recorder_state.mutex); + if (recorder_state.sink) { + recorder_state.sink->close(); + recorder_state.sink.reset(); + } + recorder_state.active_record_config.reset(); + reset_mcap_status_after_stop(recorder_state.status); + return recorder_state.status; +} + +[[nodiscard]] +std::expected get_mcap_recording_status( + McapRecorderState &recorder_state) { + std::lock_guard lock(recorder_state.mutex); + recorder_state.status.can_record = true; + return recorder_state.status; +} + +void update_mcap_stream_info( + McapRecorderState &recorder_state, + const encode::EncodedStreamInfo &stream_info) { + std::lock_guard lock(recorder_state.mutex); + recorder_state.current_stream_info = stream_info; + if (recorder_state.sink) { + auto updated = recorder_state.sink->update_stream_info(stream_info); + if (!updated) { + recorder_state.status.last_frame_ok = false; + recorder_state.status.is_recording = false; + recorder_state.status.active_path.clear(); + recorder_state.sink->close(); + recorder_state.sink.reset(); + recorder_state.active_record_config.reset(); + spdlog::error("pipeline MCAP stream update failed: {}", updated.error()); + } + } +} + +record::McapRecordSink *lock_mcap_sink( + McapRecorderState *recorder_state, + std::unique_lock &lock) { + if (recorder_state == nullptr) { + return nullptr; + } + lock = std::unique_lock(recorder_state->mutex); + if (!recorder_state->sink) { + return nullptr; + } + return &*recorder_state->sink; +} + +Status write_mcap_access_unit( + McapRecorderState *recorder_state, + const encode::EncodedAccessUnit &access_unit) { + std::unique_lock lock{}; + auto *sink = lock_mcap_sink(recorder_state, lock); + if (sink == nullptr) { + return {}; + } + + auto write = sink->write_access_unit(access_unit); + if (!write) { + recorder_state->status.last_frame_ok = false; + return unexpected_error(ERR_SERIALIZATION, write.error()); + } + recorder_state->status.last_frame_ok = true; + recorder_state->status.is_recording = true; + recorder_state->status.frames_ingested += 1; + recorder_state->status.frames_encoded += 1; + return {}; +} + +Status write_mcap_body_message( + McapRecorderState *recorder_state, + const record::RawBodyTrackingMessageView &body_message) { + std::unique_lock lock{}; + auto *sink = lock_mcap_sink(recorder_state, lock); + if (sink == nullptr) { + return {}; + } + + auto write = sink->write_body_tracking_message(body_message); + if (!write) { + recorder_state->status.last_frame_ok = false; + return unexpected_error(ERR_SERIALIZATION, write.error()); + } + recorder_state->status.last_frame_ok = true; + return {}; +} + +Status write_mcap_depth_map( + McapRecorderState *recorder_state, + const record::RawDepthMapView &depth_map) { + std::unique_lock lock{}; + auto *sink = lock_mcap_sink(recorder_state, lock); + if (sink == nullptr) { + return {}; + } + + auto write = sink->write_depth_map(depth_map); + if (!write) { + recorder_state->status.last_frame_ok = false; + return unexpected_error(ERR_SERIALIZATION, write.error()); + } + recorder_state->status.last_frame_ok = true; + return {}; +} + [[nodiscard]] Status publish_access_units( const RuntimeConfig &config, @@ -257,7 +471,7 @@ Status publish_access_units( PipelineStats &stats, protocol::UdpRtpPublisher *rtp_publisher, protocol::RtmpOutput *rtmp_output, - record::McapRecordSink *mcap_sink, + McapRecorderState *mcap_recorder, metrics::IngestEmitLatencyTracker &latency_tracker) { for (auto &access_unit : access_units) { if (access_unit.annexb_bytes.empty()) { @@ -278,10 +492,10 @@ Status publish_access_units( return std::unexpected(publish.error()); } } - if (mcap_sink != nullptr) { - auto write = mcap_sink->write_access_unit(access_unit); + if (mcap_recorder != nullptr) { + auto write = write_mcap_access_unit(mcap_recorder, access_unit); if (!write) { - return unexpected_error(ERR_SERIALIZATION, write.error()); + return std::unexpected(write.error()); } } @@ -308,7 +522,7 @@ Status drain_encoder( PipelineStats &stats, protocol::UdpRtpPublisher *rtp_publisher, protocol::RtmpOutput *rtmp_output, - record::McapRecordSink *mcap_sink, + McapRecorderState *mcap_recorder, metrics::IngestEmitLatencyTracker &latency_tracker) { auto drained = flushing ? backend->flush() : backend->drain(); if (!drained) { @@ -320,7 +534,7 @@ Status drain_encoder( stats, rtp_publisher, rtmp_output, - mcap_sink, + mcap_recorder, latency_tracker); } @@ -377,10 +591,16 @@ int run_pipeline(const RuntimeConfig &config) { std::optional rtp_publisher{}; std::optional rtmp_output{}; - std::optional mcap_sink{}; + McapRecorderState mcap_recorder{}; + mcap_recorder.base_config = config; + mcap_recorder.status.can_record = true; cvmmap::NatsControlClient nats_client( input_endpoints->nats_target_key, config.input.nats_url); + cvmmap::NatsControlService recorder_service( + config.input.uri, + input_endpoints->nats_target_key, + config.input.nats_url); std::mutex nats_event_mutex{}; std::deque> pending_body_packets{}; std::deque pending_status_codes{}; @@ -389,18 +609,52 @@ int run_pipeline(const RuntimeConfig &config) { std::lock_guard lock(nats_event_mutex); pending_status_codes.push_back(status_code); }); - if (config.record.mcap.enabled) { - nats_client.SetBodyTrackingRawCallback( - [&nats_event_mutex, &pending_body_packets](std::span bytes) { - std::lock_guard lock(nats_event_mutex); - pending_body_packets.emplace_back(bytes.begin(), bytes.end()); - }); - } + nats_client.SetBodyTrackingRawCallback( + [&nats_event_mutex, &pending_body_packets](std::span bytes) { + std::lock_guard lock(nats_event_mutex); + pending_body_packets.emplace_back(bytes.begin(), bytes.end()); + }); if (!nats_client.Start()) { spdlog::error("pipeline NATS subscribe failed on '{}'", config.input.nats_url); return exit_code(PipelineExitCode::SubscriberError); } + cvmmap::NatsControlHandlers recorder_handlers{}; + recorder_handlers.on_recording_available = + [](const cvmmap::RecordingFormat format) { + return format == cvmmap::RecordingFormat::Mcap; + }; + recorder_handlers.on_start_recording = + [&mcap_recorder](const cvmmap::RecordingRequest &request) { + return start_mcap_recording(mcap_recorder, request); + }; + recorder_handlers.on_stop_recording = + [&mcap_recorder](const cvmmap::RecordingFormat format) + -> std::expected { + if (format != cvmmap::RecordingFormat::Mcap) { + return std::unexpected(make_recording_control_error( + cvmmap::CONTROL_RESPONSE_UNSUPPORTED, + "recording format is not supported by the streamer")); + } + return stop_mcap_recording(mcap_recorder); + }; + recorder_handlers.on_get_recording_status = + [&mcap_recorder](const cvmmap::RecordingFormat format) + -> std::expected { + if (format != cvmmap::RecordingFormat::Mcap) { + return std::unexpected(make_recording_control_error( + cvmmap::CONTROL_RESPONSE_UNSUPPORTED, + "recording format is not supported by the streamer")); + } + return get_mcap_recording_status(mcap_recorder); + }; + recorder_service.SetHandlers(std::move(recorder_handlers)); + if (!recorder_service.Start()) { + spdlog::error("pipeline recorder control service failed on '{}'", config.input.nats_url); + nats_client.Stop(); + return exit_code(PipelineExitCode::SubscriberError); + } + if (config.outputs.rtp.enabled) { auto created = protocol::UdpRtpPublisher::create(config); if (!created) { @@ -458,28 +712,29 @@ int run_pipeline(const RuntimeConfig &config) { } rtmp_output.emplace(std::move(*created)); } + auto stream_info = (*backend)->stream_info(); + if (!stream_info) { + return unexpected_error( + stream_info.error().code, + "pipeline encoder stream info unavailable: " + format_error(stream_info.error())); + } + update_mcap_stream_info(mcap_recorder, *stream_info); if (config.record.mcap.enabled) { - auto stream_info = (*backend)->stream_info(); - if (!stream_info) { - return unexpected_error( - stream_info.error().code, - "pipeline MCAP stream info unavailable: " + format_error(stream_info.error())); - } - if (!mcap_sink) { + std::lock_guard lock(mcap_recorder.mutex); + if (!mcap_recorder.sink) { auto created = record::McapRecordSink::create(config, *stream_info); if (!created) { return unexpected_error( ERR_INTERNAL, "pipeline MCAP sink init failed: " + created.error()); } - mcap_sink.emplace(std::move(*created)); - } else { - auto updated = mcap_sink->update_stream_info(*stream_info); - if (!updated) { - return unexpected_error( - ERR_INTERNAL, - "pipeline MCAP stream update failed: " + updated.error()); - } + mcap_recorder.active_record_config = config; + mcap_recorder.sink.emplace(std::move(*created)); + mcap_recorder.status.format = cvmmap::RecordingFormat::Mcap; + mcap_recorder.status.can_record = true; + mcap_recorder.status.is_recording = true; + mcap_recorder.status.last_frame_ok = true; + mcap_recorder.status.active_path = config.record.mcap.path; } } started = true; @@ -529,10 +784,6 @@ int run_pipeline(const RuntimeConfig &config) { } } for (const auto &body_bytes_vec : body_packets) { - if (!mcap_sink) { - continue; - } - const auto body_bytes = std::span( body_bytes_vec.data(), body_bytes_vec.size()); @@ -546,12 +797,12 @@ int run_pipeline(const RuntimeConfig &config) { continue; } - auto write_body = mcap_sink->write_body_tracking_message(record::RawBodyTrackingMessageView{ + auto write_body = write_mcap_body_message(&mcap_recorder, record::RawBodyTrackingMessageView{ .timestamp_ns = body_tracking_timestamp_ns(*parsed_body), .bytes = body_bytes, }); if (!write_body) { - const auto reason = "pipeline body MCAP write failed: " + write_body.error(); + const auto reason = "pipeline body MCAP write failed: " + format_error(write_body.error()); restart_backend(reason, active_info); break; } @@ -595,7 +846,7 @@ int run_pipeline(const RuntimeConfig &config) { stats, rtp_publisher ? &*rtp_publisher : nullptr, rtmp_output ? &*rtmp_output : nullptr, - mcap_sink ? &*mcap_sink : nullptr, + &mcap_recorder, latency_tracker); if (!drain) { const auto reason = format_error(drain.error()); @@ -670,7 +921,7 @@ int run_pipeline(const RuntimeConfig &config) { continue; } - if (mcap_sink.has_value() && !snapshot->depth.empty()) { + if (!snapshot->depth.empty()) { if (snapshot->depth_unit == ipc::DepthUnit::Unknown) { if (!warned_unknown_depth_unit) { spdlog::warn( @@ -686,9 +937,9 @@ int run_pipeline(const RuntimeConfig &config) { continue; } - auto write_depth = mcap_sink->write_depth_map(*depth_map); + auto write_depth = write_mcap_depth_map(&mcap_recorder, *depth_map); if (!write_depth) { - const auto reason = "pipeline depth MCAP write failed: " + write_depth.error(); + const auto reason = "pipeline depth MCAP write failed: " + format_error(write_depth.error()); restart_backend(reason, active_info); continue; } @@ -702,7 +953,7 @@ int run_pipeline(const RuntimeConfig &config) { stats, rtp_publisher ? &*rtp_publisher : nullptr, rtmp_output ? &*rtmp_output : nullptr, - mcap_sink ? &*mcap_sink : nullptr, + &mcap_recorder, latency_tracker); if (!drain) { const auto reason = format_error(drain.error()); @@ -730,7 +981,7 @@ int run_pipeline(const RuntimeConfig &config) { stats, rtp_publisher ? &*rtp_publisher : nullptr, rtmp_output ? &*rtmp_output : nullptr, - mcap_sink ? &*mcap_sink : nullptr, + &mcap_recorder, latency_tracker); if (!drain) { spdlog::error("pipeline publish failed during flush: {}", format_error(drain.error())); @@ -739,9 +990,8 @@ int run_pipeline(const RuntimeConfig &config) { } (*backend)->shutdown(); - if (mcap_sink) { - mcap_sink->close(); - } + std::ignore = stop_mcap_recording(mcap_recorder); + recorder_service.Stop(); spdlog::info( "PIPELINE_METRICS codec={} backend={} sync_messages={} status_messages={} torn_frames={} pushed_frames={} encoded_access_units={} resets={} format_rebuilds={} supervised_restarts={}",