feat: add mcap recorder control and cnats providers

Register an MCAP recorder service on the streamer control subjects, reuse the shared recording request and status model, and expose the zed recording preview/conversion helper.

This also replaces the temporary cnats boolean with the explicit CVMMAP_CNATS_PROVIDER modes and documents the supported system and workspace build paths.
This commit is contained in:
2026-03-18 03:02:30 +00:00
parent 0fef0595fb
commit ae19b881b0
6 changed files with 721 additions and 47 deletions
+41 -2
View File
@@ -8,10 +8,42 @@ set(CMAKE_EXPORT_COMPILE_COMMANDS ON)
find_package(Threads REQUIRED) find_package(Threads REQUIRED)
find_package(cppzmq QUIET) find_package(cppzmq QUIET)
if (DEFINED CVMMAP_STREAMER_USE_SYSTEM_CNATS)
message(FATAL_ERROR
"CVMMAP_STREAMER_USE_SYSTEM_CNATS was removed; use CVMMAP_CNATS_PROVIDER=system")
endif()
set(
CVMMAP_CNATS_PROVIDER
"system"
CACHE STRING
"How to resolve cnats: system or workspace")
set_property(CACHE CVMMAP_CNATS_PROVIDER PROPERTY STRINGS system workspace)
set(_CVMMAP_STREAMER_CNATS_PROVIDER_VALUES system workspace)
list(FIND _CVMMAP_STREAMER_CNATS_PROVIDER_VALUES "${CVMMAP_CNATS_PROVIDER}" _CVMMAP_STREAMER_CNATS_PROVIDER_INDEX)
if (_CVMMAP_STREAMER_CNATS_PROVIDER_INDEX EQUAL -1)
message(FATAL_ERROR
"Invalid CVMMAP_CNATS_PROVIDER='${CVMMAP_CNATS_PROVIDER}' (expected: system|workspace)")
endif()
set(CVMMAP_LOCAL_ROOT "${CMAKE_CURRENT_LIST_DIR}/../cv-mmap" CACHE PATH "Path to a local cv-mmap checkout") set(CVMMAP_LOCAL_ROOT "${CMAKE_CURRENT_LIST_DIR}/../cv-mmap" CACHE PATH "Path to a local cv-mmap checkout")
set(CVMMAP_LOCAL_BUILD "${CVMMAP_LOCAL_ROOT}/build/core" CACHE PATH "Path to local cv-mmap build artifacts") set(CVMMAP_LOCAL_BUILD "${CVMMAP_LOCAL_ROOT}/build/core" CACHE PATH "Path to local cv-mmap build artifacts")
if (cvmmap-core_DIR) set(CVMMAP_LOCAL_CORE_DIR "${CVMMAP_LOCAL_ROOT}/build/core" CACHE PATH "Path to local cvmmap-core package config")
set(CVMMAP_LOCAL_NATS_STATIC "${CVMMAP_LOCAL_ROOT}/build/lib/libnats_static.a" CACHE PATH "Path to local cnats static library")
if (CVMMAP_CNATS_PROVIDER STREQUAL "system")
find_package(cnats CONFIG REQUIRED)
find_package(cvmmap-core CONFIG QUIET) find_package(cvmmap-core CONFIG QUIET)
else()
if (NOT EXISTS "${CVMMAP_LOCAL_NATS_STATIC}")
message(FATAL_ERROR
"workspace cnats provider requires ${CVMMAP_LOCAL_NATS_STATIC}")
endif()
find_package(OpenSSL REQUIRED)
if (NOT TARGET cnats::nats_static)
add_library(cnats::nats_static STATIC IMPORTED GLOBAL)
set_target_properties(cnats::nats_static PROPERTIES
IMPORTED_LOCATION "${CVMMAP_LOCAL_NATS_STATIC}"
INTERFACE_INCLUDE_DIRECTORIES "${CVMMAP_LOCAL_ROOT}/third_party/nats.c/src"
INTERFACE_LINK_LIBRARIES "OpenSSL::SSL;OpenSSL::Crypto;Threads::Threads")
endif()
endif() endif()
find_package(ZeroMQ QUIET) find_package(ZeroMQ QUIET)
find_package(spdlog REQUIRED) find_package(spdlog REQUIRED)
@@ -33,12 +65,18 @@ if (NOT TARGET cvmmap::client)
if ( if (
EXISTS "${CVMMAP_LOCAL_ROOT}/core/include/cvmmap/client.hpp" EXISTS "${CVMMAP_LOCAL_ROOT}/core/include/cvmmap/client.hpp"
AND EXISTS "${CVMMAP_LOCAL_BUILD}/libcvmmap_client.a" AND EXISTS "${CVMMAP_LOCAL_BUILD}/libcvmmap_client.a"
AND EXISTS "${CVMMAP_LOCAL_BUILD}/libcvmmap_nats.a"
AND EXISTS "${CVMMAP_LOCAL_BUILD}/libcvmmap_proto.a"
AND EXISTS "${CVMMAP_LOCAL_BUILD}/libcvmmap_ipc.a" AND EXISTS "${CVMMAP_LOCAL_BUILD}/libcvmmap_ipc.a"
AND EXISTS "${CVMMAP_LOCAL_BUILD}/libcvmmap_target.a") AND EXISTS "${CVMMAP_LOCAL_BUILD}/libcvmmap_target.a")
add_library(cvmmap::nats INTERFACE IMPORTED)
add_library(cvmmap::client INTERFACE IMPORTED) add_library(cvmmap::client INTERFACE IMPORTED)
set_target_properties(cvmmap::nats PROPERTIES
INTERFACE_INCLUDE_DIRECTORIES "${CVMMAP_LOCAL_ROOT}/core/include"
INTERFACE_LINK_LIBRARIES "${CVMMAP_LOCAL_BUILD}/libcvmmap_nats.a;${CVMMAP_LOCAL_BUILD}/libcvmmap_proto.a;${CVMMAP_LOCAL_BUILD}/libcvmmap_ipc.a;${CVMMAP_LOCAL_BUILD}/libcvmmap_target.a;cnats::nats_static")
set_target_properties(cvmmap::client PROPERTIES set_target_properties(cvmmap::client PROPERTIES
INTERFACE_INCLUDE_DIRECTORIES "${CVMMAP_LOCAL_ROOT}/core/include" INTERFACE_INCLUDE_DIRECTORIES "${CVMMAP_LOCAL_ROOT}/core/include"
INTERFACE_LINK_LIBRARIES "${CVMMAP_LOCAL_BUILD}/libcvmmap_client.a;${CVMMAP_LOCAL_BUILD}/libcvmmap_ipc.a;${CVMMAP_LOCAL_BUILD}/libcvmmap_target.a") INTERFACE_LINK_LIBRARIES "${CVMMAP_LOCAL_BUILD}/libcvmmap_client.a;${CVMMAP_LOCAL_BUILD}/libcvmmap_nats.a;${CVMMAP_LOCAL_BUILD}/libcvmmap_proto.a;${CVMMAP_LOCAL_BUILD}/libcvmmap_ipc.a;${CVMMAP_LOCAL_BUILD}/libcvmmap_target.a;cnats::nats_static")
else() else()
message(FATAL_ERROR "cvmmap::client target is unavailable and local cv-mmap build artifacts were not found") message(FATAL_ERROR "cvmmap::client target is unavailable and local cv-mmap build artifacts were not found")
endif() endif()
@@ -169,6 +207,7 @@ set(CVMMAP_STREAMER_LINK_DEPS
PkgConfig::ZSTD PkgConfig::ZSTD
PkgConfig::LZ4 PkgConfig::LZ4
cvmmap::client cvmmap::client
cvmmap::nats
CLI11::CLI11 CLI11::CLI11
tomlplusplus::tomlplusplus tomlplusplus::tomlplusplus
mcap::mcap) mcap::mcap)
+13
View File
@@ -35,11 +35,24 @@ sudo pacman -S cmake gstreamer gst-plugins-base gst-plugins-good \
### Build ### Build
`cvmmap-streamer` uses `CVMMAP_CNATS_PROVIDER` to decide how `cnats` is resolved:
- `system` (default): use an installed `cnats` package, typically from a top-level `cv-mmap` install under a standard prefix like `/usr/local`
- `workspace`: use the local `cv-mmap` build-tree exports
```bash ```bash
cmake -B build -S . cmake -B build -S .
cmake --build build cmake --build build
``` ```
```bash
# Use a local cv-mmap build tree
cmake -B build -S . \
-DCVMMAP_CNATS_PROVIDER=workspace \
-DCVMMAP_LOCAL_ROOT=/path/to/cv-mmap
cmake --build build
```
**Verify binaries exist:** **Verify binaries exist:**
```bash ```bash
ls -la build/{cvmmap_streamer,rtp_receiver_tester,rtmp_stub_tester} ls -la build/{cvmmap_streamer,rtp_receiver_tester,rtmp_stub_tester}
@@ -125,6 +125,7 @@ std::string_view to_string(RtmpTransportType transport);
std::string_view to_string(EncoderBackendType backend); std::string_view to_string(EncoderBackendType backend);
std::string_view to_string(EncoderDeviceType device); std::string_view to_string(EncoderDeviceType device);
std::string_view to_string(McapCompression compression); std::string_view to_string(McapCompression compression);
std::expected<McapCompression, std::string> parse_mcap_compression(std::string_view raw);
std::expected<RuntimeConfig, std::string> parse_runtime_config(int argc, char **argv); std::expected<RuntimeConfig, std::string> parse_runtime_config(int argc, char **argv);
std::expected<void, std::string> validate_runtime_config(const RuntimeConfig &config); std::expected<void, std::string> validate_runtime_config(const RuntimeConfig &config);
+367
View File
@@ -0,0 +1,367 @@
#!/usr/bin/env python3
from __future__ import annotations
import argparse
import math
import os
import shlex
import subprocess
import sys
import tempfile
from collections import Counter
from pathlib import Path
from typing import Iterable
import cv2
import numpy as np
SCRIPT_PATH = Path(__file__).resolve()
REPO_ROOT = SCRIPT_PATH.parents[1]
WORKSPACE_ROOT = REPO_ROOT.parent
MCAP_PYTHON_ROOT = WORKSPACE_ROOT / "mcap" / "python" / "mcap"
if str(MCAP_PYTHON_ROOT) not in sys.path:
sys.path.insert(0, str(MCAP_PYTHON_ROOT))
from mcap.reader import make_reader # noqa: E402
VIDEO_FORMATS = ("h264", "h265")
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description=(
"Convert ZED SVO/SVO2 recordings to MCAP and generate a lightweight preview. "
"If the input is already an MCAP file, conversion is skipped."
)
)
parser.add_argument("input", help="Input .svo/.svo2 file, .mcap file, or a directory containing SVO files")
parser.add_argument("--output-dir", help="Directory for generated MCAP files and previews")
parser.add_argument(
"--preview-all",
action="store_true",
help="When the input is a directory, generate a preview for every converted MCAP instead of just the first one",
)
parser.add_argument("--no-preview", action="store_true", help="Convert only, do not generate preview images")
parser.add_argument(
"--format",
choices=("auto", "h264", "h265"),
default="auto",
help="CompressedVideo format to extract from MCAP during preview",
)
parser.add_argument("--codec", choices=VIDEO_FORMATS, default="h264", help="Video codec for SVO to MCAP conversion")
parser.add_argument(
"--encoder-device",
choices=("auto", "nvidia", "software"),
default="software",
help="Encoder device passed to zed_svo_to_mcap",
)
parser.add_argument(
"--mcap-compression",
choices=("none", "lz4", "zstd"),
default="none",
help="MCAP chunk compression passed to zed_svo_to_mcap",
)
parser.add_argument(
"--depth-mode",
choices=("neural", "quality", "performance", "ultra"),
default="neural",
help="Depth mode passed to zed_svo_to_mcap",
)
parser.add_argument("--start-frame", type=int, default=0, help="First SVO frame to convert")
parser.add_argument("--end-frame", type=int, help="Last SVO frame to convert")
parser.add_argument(
"--sample-count",
type=int,
default=9,
help="Number of decoded frames to place in the preview contact sheet",
)
parser.add_argument(
"--frame-step",
type=int,
default=15,
help="Decode every Nth frame for the contact sheet",
)
parser.add_argument(
"--contact-sheet-width",
type=int,
default=480,
help="Width of each preview tile in pixels",
)
parser.add_argument(
"--cuda-visible-devices",
help=(
"Optional CUDA_VISIBLE_DEVICES value to export while running zed_svo_to_mcap. "
"Useful when the ZED SDK must be pinned to a specific GPU UUID."
),
)
parser.add_argument("--zed-bin", help="Explicit path to zed_svo_to_mcap")
parser.add_argument("--reader-bin", help="Explicit path to mcap_reader_tester")
return parser.parse_args()
def locate_binary(name: str, override: str | None) -> Path:
if override:
path = Path(override).expanduser().resolve()
if not path.is_file():
raise FileNotFoundError(f"binary not found: {path}")
return path
candidates = (
REPO_ROOT / "build" / "bin" / name,
REPO_ROOT / "build" / name,
)
for candidate in candidates:
if candidate.is_file():
return candidate
raise FileNotFoundError(f"could not find {name} under {REPO_ROOT / 'build'}")
def quote_command(args: Iterable[str]) -> str:
return " ".join(shlex.quote(arg) for arg in args)
def run(args: list[str], env: dict[str, str] | None = None) -> None:
print(f"$ {quote_command(args)}", flush=True)
subprocess.run(args, check=True, env=env)
def summarize_mcap(mcap_path: Path) -> list[tuple[str, str, str, int]]:
counts: Counter[tuple[str, str, str]] = Counter()
with mcap_path.open("rb") as stream:
reader = make_reader(stream)
for schema, channel, _message in reader.iter_messages():
schema_name = schema.name if schema is not None else "<none>"
counts[(channel.topic, channel.message_encoding, schema_name)] += 1
summary_rows = [
(topic, encoding, schema_name, count)
for (topic, encoding, schema_name), count in sorted(counts.items())
]
print(f"MCAP summary: {mcap_path}")
for topic, encoding, schema_name, count in summary_rows:
print(f" {count:6d} topic={topic} encoding={encoding} schema={schema_name}")
return summary_rows
def infer_video_format(reader_bin: Path, mcap_path: Path, requested: str) -> str:
if requested != "auto":
return requested
for candidate in VIDEO_FORMATS:
result = subprocess.run(
[str(reader_bin), str(mcap_path), "--expect-format", candidate, "--min-messages", "1"],
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
check=False,
)
if result.returncode == 0:
return candidate
raise RuntimeError(f"could not infer video format from {mcap_path}")
def dump_annexb(reader_bin: Path, mcap_path: Path, video_format: str, output_path: Path) -> None:
run(
[
str(reader_bin),
str(mcap_path),
"--expect-format",
video_format,
"--min-messages",
"1",
"--dump-annexb-output",
str(output_path),
]
)
def make_contact_sheet(stream_path: Path, image_path: Path, sample_count: int, frame_step: int, tile_width: int) -> int:
capture = cv2.VideoCapture(str(stream_path))
if not capture.isOpened():
raise RuntimeError(f"OpenCV could not open decoded stream {stream_path}")
frames: list[np.ndarray] = []
frame_index = 0
while len(frames) < sample_count:
ok, frame = capture.read()
if not ok:
break
if frame_index % frame_step == 0:
annotated = frame.copy()
cv2.putText(
annotated,
f"frame {frame_index}",
(20, 40),
cv2.FONT_HERSHEY_SIMPLEX,
1.0,
(0, 255, 0),
2,
cv2.LINE_AA,
)
frames.append(annotated)
frame_index += 1
capture.release()
if not frames:
raise RuntimeError(f"no frames decoded from {stream_path}")
tile_width = max(64, tile_width)
resized: list[np.ndarray] = []
for frame in frames:
scale = tile_width / frame.shape[1]
tile_height = max(1, int(round(frame.shape[0] * scale)))
resized.append(cv2.resize(frame, (tile_width, tile_height), interpolation=cv2.INTER_AREA))
max_height = max(frame.shape[0] for frame in resized)
padded: list[np.ndarray] = []
for frame in resized:
if frame.shape[0] == max_height:
padded.append(frame)
continue
canvas = np.zeros((max_height, frame.shape[1], 3), dtype=np.uint8)
canvas[: frame.shape[0], :, :] = frame
padded.append(canvas)
columns = max(1, math.ceil(math.sqrt(len(padded))))
rows = math.ceil(len(padded) / columns)
blank = np.zeros_like(padded[0])
row_images: list[np.ndarray] = []
for row_index in range(rows):
row_frames = padded[row_index * columns : (row_index + 1) * columns]
while len(row_frames) < columns:
row_frames.append(blank)
row_images.append(np.concatenate(row_frames, axis=1))
sheet = np.concatenate(row_images, axis=0)
image_path.parent.mkdir(parents=True, exist_ok=True)
if not cv2.imwrite(str(image_path), sheet):
raise RuntimeError(f"failed to write preview image {image_path}")
print(f"Preview contact sheet: {image_path}")
return len(frames)
def collect_svo_inputs(input_path: Path) -> list[Path]:
if input_path.is_file():
if input_path.suffix.lower() in {".svo", ".svo2"}:
return [input_path]
if input_path.suffix.lower() == ".mcap":
return []
raise ValueError(f"unsupported input file: {input_path}")
if input_path.is_dir():
return sorted(
path for path in input_path.rglob("*") if path.suffix.lower() in {".svo", ".svo2"}
)
raise FileNotFoundError(f"input not found: {input_path}")
def default_output_dir(input_path: Path) -> Path:
if input_path.is_dir():
return input_path / "mcap_preview"
return input_path.parent / "mcap_preview"
def convert_svo(
zed_bin: Path,
svo_path: Path,
mcap_path: Path,
args: argparse.Namespace,
) -> None:
env = os.environ.copy()
if args.cuda_visible_devices:
env["CUDA_VISIBLE_DEVICES"] = args.cuda_visible_devices
command = [
str(zed_bin),
"--input",
str(svo_path),
"--output",
str(mcap_path),
"--codec",
args.codec,
"--encoder-device",
args.encoder_device,
"--mcap-compression",
args.mcap_compression,
"--depth-mode",
args.depth_mode,
"--start-frame",
str(args.start_frame),
]
if args.end_frame is not None:
command.extend(["--end-frame", str(args.end_frame)])
mcap_path.parent.mkdir(parents=True, exist_ok=True)
run(command, env=env)
def preview_mcap(reader_bin: Path, mcap_path: Path, args: argparse.Namespace) -> None:
summarize_mcap(mcap_path)
video_format = infer_video_format(reader_bin, mcap_path, args.format)
print(f"Detected video format: {video_format}")
stream_extension = ".h265" if video_format == "h265" else ".h264"
with tempfile.TemporaryDirectory(prefix="zed_mcap_preview_") as temp_dir:
temp_root = Path(temp_dir)
stream_path = temp_root / f"preview{stream_extension}"
dump_annexb(reader_bin, mcap_path, video_format, stream_path)
preview_path = mcap_path.with_suffix(".preview.png")
decoded = make_contact_sheet(
stream_path,
preview_path,
sample_count=args.sample_count,
frame_step=args.frame_step,
tile_width=args.contact_sheet_width,
)
print(f"Decoded {decoded} preview frame(s)")
def main() -> int:
args = parse_args()
input_path = Path(args.input).expanduser().resolve()
output_dir = Path(args.output_dir).expanduser().resolve() if args.output_dir else default_output_dir(input_path)
output_dir.mkdir(parents=True, exist_ok=True)
reader_bin = locate_binary("mcap_reader_tester", args.reader_bin)
zed_bin = locate_binary("zed_svo_to_mcap", args.zed_bin) if input_path.suffix.lower() != ".mcap" or input_path.is_dir() else None
if input_path.is_file() and input_path.suffix.lower() == ".mcap":
if not args.no_preview:
preview_mcap(reader_bin, input_path, args)
return 0
svo_inputs = collect_svo_inputs(input_path)
if not svo_inputs:
raise RuntimeError(f"no .svo/.svo2 files found under {input_path}")
converted_paths: list[Path] = []
for svo_path in svo_inputs:
output_name = f"{svo_path.stem}.mcap"
mcap_path = output_dir / output_name
convert_svo(zed_bin, svo_path, mcap_path, args)
converted_paths.append(mcap_path)
if args.no_preview:
return 0
preview_targets = converted_paths if args.preview_all else converted_paths[:1]
for mcap_path in preview_targets:
preview_mcap(reader_bin, mcap_path, args)
print("Generated MCAP files:")
for mcap_path in converted_paths:
print(f" {mcap_path}")
return 0
if __name__ == "__main__":
try:
raise SystemExit(main())
except KeyboardInterrupt:
raise SystemExit(130)
+5 -1
View File
@@ -155,7 +155,7 @@ std::expected<EncoderDeviceType, std::string> parse_encoder_device(std::string_v
return std::unexpected("invalid encoder device: '" + std::string(raw) + "' (expected: auto|nvidia|software)"); return std::unexpected("invalid encoder device: '" + std::string(raw) + "' (expected: auto|nvidia|software)");
} }
std::expected<McapCompression, std::string> parse_mcap_compression(std::string_view raw) { std::expected<McapCompression, std::string> parse_mcap_compression_impl(std::string_view raw) {
if (raw == "none") { if (raw == "none") {
return McapCompression::None; return McapCompression::None;
} }
@@ -481,6 +481,10 @@ void finalize_rtp_endpoint(RuntimeConfig &config) {
} }
std::expected<McapCompression, std::string> parse_mcap_compression(std::string_view raw) {
return parse_mcap_compression_impl(raw);
}
RuntimeConfig RuntimeConfig::defaults() { RuntimeConfig RuntimeConfig::defaults() {
return RuntimeConfig{}; return RuntimeConfig{};
} }
+294 -44
View File
@@ -9,6 +9,7 @@
#include <cvmmap/client.hpp> #include <cvmmap/client.hpp>
#include <cvmmap/nats_client.hpp> #include <cvmmap/nats_client.hpp>
#include <cvmmap/nats_service.hpp>
#include <cvmmap/parser.hpp> #include <cvmmap/parser.hpp>
#include <chrono> #include <chrono>
@@ -250,6 +251,219 @@ std::uint64_t body_tracking_timestamp_ns(const cvmmap::body_tracking_frame_t &fr
return frame.header.sdk_timestamp_ns; return frame.header.sdk_timestamp_ns;
} }
struct McapRecorderState {
mutable std::mutex mutex{};
RuntimeConfig base_config{};
std::optional<RuntimeConfig> active_record_config{};
std::optional<encode::EncodedStreamInfo> current_stream_info{};
std::optional<record::McapRecordSink> sink{};
cvmmap::RecordingStatus status{
.format = cvmmap::RecordingFormat::Mcap,
.can_record = true,
};
};
[[nodiscard]]
cvmmap::ControlError make_recording_control_error(
const int32_t code,
std::string message) {
return cvmmap::ControlError{
.code = code,
.message = std::move(message),
};
}
[[nodiscard]]
RuntimeConfig make_mcap_record_config(
const RuntimeConfig &base_config,
const cvmmap::RecordingRequest &request) {
auto record_config = base_config;
record_config.record.mcap.enabled = true;
record_config.record.mcap.path = request.output_path;
if (request.mcap_options) {
if (request.mcap_options->topic) {
record_config.record.mcap.topic = *request.mcap_options->topic;
}
if (request.mcap_options->depth_topic) {
record_config.record.mcap.depth_topic = *request.mcap_options->depth_topic;
}
if (request.mcap_options->body_topic) {
record_config.record.mcap.body_topic = *request.mcap_options->body_topic;
}
if (request.mcap_options->frame_id) {
record_config.record.mcap.frame_id = *request.mcap_options->frame_id;
}
}
return record_config;
}
void reset_mcap_status_after_stop(cvmmap::RecordingStatus &status) {
status.is_recording = false;
status.is_paused = false;
status.active_path.clear();
}
[[nodiscard]]
std::expected<cvmmap::RecordingStatus, cvmmap::ControlError> start_mcap_recording(
McapRecorderState &recorder_state,
const cvmmap::RecordingRequest &request) {
std::lock_guard lock(recorder_state.mutex);
if (request.format != cvmmap::RecordingFormat::Mcap) {
return std::unexpected(make_recording_control_error(
cvmmap::CONTROL_RESPONSE_UNSUPPORTED,
"recording format is not supported by the streamer"));
}
if (!recorder_state.current_stream_info) {
return std::unexpected(make_recording_control_error(
cvmmap::CONTROL_RESPONSE_ERROR,
"MCAP recorder is not ready; stream info unavailable"));
}
if (recorder_state.sink) {
return std::unexpected(make_recording_control_error(
cvmmap::CONTROL_RESPONSE_ERROR,
"MCAP recording is already active"));
}
auto record_config = make_mcap_record_config(recorder_state.base_config, request);
if (request.mcap_options && request.mcap_options->compression) {
auto parsed = parse_mcap_compression(*request.mcap_options->compression);
if (!parsed) {
return std::unexpected(make_recording_control_error(
cvmmap::CONTROL_RESPONSE_INVALID_PAYLOAD,
parsed.error()));
}
record_config.record.mcap.compression = *parsed;
}
auto created = record::McapRecordSink::create(record_config, *recorder_state.current_stream_info);
if (!created) {
return std::unexpected(make_recording_control_error(
cvmmap::CONTROL_RESPONSE_ERROR,
"pipeline MCAP sink init failed: " + created.error()));
}
recorder_state.active_record_config = record_config;
recorder_state.sink.emplace(std::move(*created));
recorder_state.status.can_record = true;
recorder_state.status.is_recording = true;
recorder_state.status.is_paused = false;
recorder_state.status.last_frame_ok = true;
recorder_state.status.frames_ingested = 0;
recorder_state.status.frames_encoded = 0;
recorder_state.status.active_path = request.output_path;
return recorder_state.status;
}
[[nodiscard]]
std::expected<cvmmap::RecordingStatus, cvmmap::ControlError> stop_mcap_recording(
McapRecorderState &recorder_state) {
std::lock_guard lock(recorder_state.mutex);
if (recorder_state.sink) {
recorder_state.sink->close();
recorder_state.sink.reset();
}
recorder_state.active_record_config.reset();
reset_mcap_status_after_stop(recorder_state.status);
return recorder_state.status;
}
[[nodiscard]]
std::expected<cvmmap::RecordingStatus, cvmmap::ControlError> get_mcap_recording_status(
McapRecorderState &recorder_state) {
std::lock_guard lock(recorder_state.mutex);
recorder_state.status.can_record = true;
return recorder_state.status;
}
void update_mcap_stream_info(
McapRecorderState &recorder_state,
const encode::EncodedStreamInfo &stream_info) {
std::lock_guard lock(recorder_state.mutex);
recorder_state.current_stream_info = stream_info;
if (recorder_state.sink) {
auto updated = recorder_state.sink->update_stream_info(stream_info);
if (!updated) {
recorder_state.status.last_frame_ok = false;
recorder_state.status.is_recording = false;
recorder_state.status.active_path.clear();
recorder_state.sink->close();
recorder_state.sink.reset();
recorder_state.active_record_config.reset();
spdlog::error("pipeline MCAP stream update failed: {}", updated.error());
}
}
}
record::McapRecordSink *lock_mcap_sink(
McapRecorderState *recorder_state,
std::unique_lock<std::mutex> &lock) {
if (recorder_state == nullptr) {
return nullptr;
}
lock = std::unique_lock<std::mutex>(recorder_state->mutex);
if (!recorder_state->sink) {
return nullptr;
}
return &*recorder_state->sink;
}
Status write_mcap_access_unit(
McapRecorderState *recorder_state,
const encode::EncodedAccessUnit &access_unit) {
std::unique_lock<std::mutex> lock{};
auto *sink = lock_mcap_sink(recorder_state, lock);
if (sink == nullptr) {
return {};
}
auto write = sink->write_access_unit(access_unit);
if (!write) {
recorder_state->status.last_frame_ok = false;
return unexpected_error(ERR_SERIALIZATION, write.error());
}
recorder_state->status.last_frame_ok = true;
recorder_state->status.is_recording = true;
recorder_state->status.frames_ingested += 1;
recorder_state->status.frames_encoded += 1;
return {};
}
Status write_mcap_body_message(
McapRecorderState *recorder_state,
const record::RawBodyTrackingMessageView &body_message) {
std::unique_lock<std::mutex> lock{};
auto *sink = lock_mcap_sink(recorder_state, lock);
if (sink == nullptr) {
return {};
}
auto write = sink->write_body_tracking_message(body_message);
if (!write) {
recorder_state->status.last_frame_ok = false;
return unexpected_error(ERR_SERIALIZATION, write.error());
}
recorder_state->status.last_frame_ok = true;
return {};
}
Status write_mcap_depth_map(
McapRecorderState *recorder_state,
const record::RawDepthMapView &depth_map) {
std::unique_lock<std::mutex> lock{};
auto *sink = lock_mcap_sink(recorder_state, lock);
if (sink == nullptr) {
return {};
}
auto write = sink->write_depth_map(depth_map);
if (!write) {
recorder_state->status.last_frame_ok = false;
return unexpected_error(ERR_SERIALIZATION, write.error());
}
recorder_state->status.last_frame_ok = true;
return {};
}
[[nodiscard]] [[nodiscard]]
Status publish_access_units( Status publish_access_units(
const RuntimeConfig &config, const RuntimeConfig &config,
@@ -257,7 +471,7 @@ Status publish_access_units(
PipelineStats &stats, PipelineStats &stats,
protocol::UdpRtpPublisher *rtp_publisher, protocol::UdpRtpPublisher *rtp_publisher,
protocol::RtmpOutput *rtmp_output, protocol::RtmpOutput *rtmp_output,
record::McapRecordSink *mcap_sink, McapRecorderState *mcap_recorder,
metrics::IngestEmitLatencyTracker &latency_tracker) { metrics::IngestEmitLatencyTracker &latency_tracker) {
for (auto &access_unit : access_units) { for (auto &access_unit : access_units) {
if (access_unit.annexb_bytes.empty()) { if (access_unit.annexb_bytes.empty()) {
@@ -278,10 +492,10 @@ Status publish_access_units(
return std::unexpected(publish.error()); return std::unexpected(publish.error());
} }
} }
if (mcap_sink != nullptr) { if (mcap_recorder != nullptr) {
auto write = mcap_sink->write_access_unit(access_unit); auto write = write_mcap_access_unit(mcap_recorder, access_unit);
if (!write) { if (!write) {
return unexpected_error(ERR_SERIALIZATION, write.error()); return std::unexpected(write.error());
} }
} }
@@ -308,7 +522,7 @@ Status drain_encoder(
PipelineStats &stats, PipelineStats &stats,
protocol::UdpRtpPublisher *rtp_publisher, protocol::UdpRtpPublisher *rtp_publisher,
protocol::RtmpOutput *rtmp_output, protocol::RtmpOutput *rtmp_output,
record::McapRecordSink *mcap_sink, McapRecorderState *mcap_recorder,
metrics::IngestEmitLatencyTracker &latency_tracker) { metrics::IngestEmitLatencyTracker &latency_tracker) {
auto drained = flushing ? backend->flush() : backend->drain(); auto drained = flushing ? backend->flush() : backend->drain();
if (!drained) { if (!drained) {
@@ -320,7 +534,7 @@ Status drain_encoder(
stats, stats,
rtp_publisher, rtp_publisher,
rtmp_output, rtmp_output,
mcap_sink, mcap_recorder,
latency_tracker); latency_tracker);
} }
@@ -377,10 +591,16 @@ int run_pipeline(const RuntimeConfig &config) {
std::optional<protocol::UdpRtpPublisher> rtp_publisher{}; std::optional<protocol::UdpRtpPublisher> rtp_publisher{};
std::optional<protocol::RtmpOutput> rtmp_output{}; std::optional<protocol::RtmpOutput> rtmp_output{};
std::optional<record::McapRecordSink> mcap_sink{}; McapRecorderState mcap_recorder{};
mcap_recorder.base_config = config;
mcap_recorder.status.can_record = true;
cvmmap::NatsControlClient nats_client( cvmmap::NatsControlClient nats_client(
input_endpoints->nats_target_key, input_endpoints->nats_target_key,
config.input.nats_url); config.input.nats_url);
cvmmap::NatsControlService recorder_service(
config.input.uri,
input_endpoints->nats_target_key,
config.input.nats_url);
std::mutex nats_event_mutex{}; std::mutex nats_event_mutex{};
std::deque<std::vector<std::uint8_t>> pending_body_packets{}; std::deque<std::vector<std::uint8_t>> pending_body_packets{};
std::deque<int32_t> pending_status_codes{}; std::deque<int32_t> pending_status_codes{};
@@ -389,18 +609,52 @@ int run_pipeline(const RuntimeConfig &config) {
std::lock_guard lock(nats_event_mutex); std::lock_guard lock(nats_event_mutex);
pending_status_codes.push_back(status_code); pending_status_codes.push_back(status_code);
}); });
if (config.record.mcap.enabled) { nats_client.SetBodyTrackingRawCallback(
nats_client.SetBodyTrackingRawCallback( [&nats_event_mutex, &pending_body_packets](std::span<const std::uint8_t> bytes) {
[&nats_event_mutex, &pending_body_packets](std::span<const std::uint8_t> bytes) { std::lock_guard lock(nats_event_mutex);
std::lock_guard lock(nats_event_mutex); pending_body_packets.emplace_back(bytes.begin(), bytes.end());
pending_body_packets.emplace_back(bytes.begin(), bytes.end()); });
});
}
if (!nats_client.Start()) { if (!nats_client.Start()) {
spdlog::error("pipeline NATS subscribe failed on '{}'", config.input.nats_url); spdlog::error("pipeline NATS subscribe failed on '{}'", config.input.nats_url);
return exit_code(PipelineExitCode::SubscriberError); return exit_code(PipelineExitCode::SubscriberError);
} }
cvmmap::NatsControlHandlers recorder_handlers{};
recorder_handlers.on_recording_available =
[](const cvmmap::RecordingFormat format) {
return format == cvmmap::RecordingFormat::Mcap;
};
recorder_handlers.on_start_recording =
[&mcap_recorder](const cvmmap::RecordingRequest &request) {
return start_mcap_recording(mcap_recorder, request);
};
recorder_handlers.on_stop_recording =
[&mcap_recorder](const cvmmap::RecordingFormat format)
-> std::expected<cvmmap::RecordingStatus, cvmmap::ControlError> {
if (format != cvmmap::RecordingFormat::Mcap) {
return std::unexpected(make_recording_control_error(
cvmmap::CONTROL_RESPONSE_UNSUPPORTED,
"recording format is not supported by the streamer"));
}
return stop_mcap_recording(mcap_recorder);
};
recorder_handlers.on_get_recording_status =
[&mcap_recorder](const cvmmap::RecordingFormat format)
-> std::expected<cvmmap::RecordingStatus, cvmmap::ControlError> {
if (format != cvmmap::RecordingFormat::Mcap) {
return std::unexpected(make_recording_control_error(
cvmmap::CONTROL_RESPONSE_UNSUPPORTED,
"recording format is not supported by the streamer"));
}
return get_mcap_recording_status(mcap_recorder);
};
recorder_service.SetHandlers(std::move(recorder_handlers));
if (!recorder_service.Start()) {
spdlog::error("pipeline recorder control service failed on '{}'", config.input.nats_url);
nats_client.Stop();
return exit_code(PipelineExitCode::SubscriberError);
}
if (config.outputs.rtp.enabled) { if (config.outputs.rtp.enabled) {
auto created = protocol::UdpRtpPublisher::create(config); auto created = protocol::UdpRtpPublisher::create(config);
if (!created) { if (!created) {
@@ -458,28 +712,29 @@ int run_pipeline(const RuntimeConfig &config) {
} }
rtmp_output.emplace(std::move(*created)); rtmp_output.emplace(std::move(*created));
} }
auto stream_info = (*backend)->stream_info();
if (!stream_info) {
return unexpected_error(
stream_info.error().code,
"pipeline encoder stream info unavailable: " + format_error(stream_info.error()));
}
update_mcap_stream_info(mcap_recorder, *stream_info);
if (config.record.mcap.enabled) { if (config.record.mcap.enabled) {
auto stream_info = (*backend)->stream_info(); std::lock_guard lock(mcap_recorder.mutex);
if (!stream_info) { if (!mcap_recorder.sink) {
return unexpected_error(
stream_info.error().code,
"pipeline MCAP stream info unavailable: " + format_error(stream_info.error()));
}
if (!mcap_sink) {
auto created = record::McapRecordSink::create(config, *stream_info); auto created = record::McapRecordSink::create(config, *stream_info);
if (!created) { if (!created) {
return unexpected_error( return unexpected_error(
ERR_INTERNAL, ERR_INTERNAL,
"pipeline MCAP sink init failed: " + created.error()); "pipeline MCAP sink init failed: " + created.error());
} }
mcap_sink.emplace(std::move(*created)); mcap_recorder.active_record_config = config;
} else { mcap_recorder.sink.emplace(std::move(*created));
auto updated = mcap_sink->update_stream_info(*stream_info); mcap_recorder.status.format = cvmmap::RecordingFormat::Mcap;
if (!updated) { mcap_recorder.status.can_record = true;
return unexpected_error( mcap_recorder.status.is_recording = true;
ERR_INTERNAL, mcap_recorder.status.last_frame_ok = true;
"pipeline MCAP stream update failed: " + updated.error()); mcap_recorder.status.active_path = config.record.mcap.path;
}
} }
} }
started = true; started = true;
@@ -529,10 +784,6 @@ int run_pipeline(const RuntimeConfig &config) {
} }
} }
for (const auto &body_bytes_vec : body_packets) { for (const auto &body_bytes_vec : body_packets) {
if (!mcap_sink) {
continue;
}
const auto body_bytes = std::span<const std::uint8_t>( const auto body_bytes = std::span<const std::uint8_t>(
body_bytes_vec.data(), body_bytes_vec.data(),
body_bytes_vec.size()); body_bytes_vec.size());
@@ -546,12 +797,12 @@ int run_pipeline(const RuntimeConfig &config) {
continue; continue;
} }
auto write_body = mcap_sink->write_body_tracking_message(record::RawBodyTrackingMessageView{ auto write_body = write_mcap_body_message(&mcap_recorder, record::RawBodyTrackingMessageView{
.timestamp_ns = body_tracking_timestamp_ns(*parsed_body), .timestamp_ns = body_tracking_timestamp_ns(*parsed_body),
.bytes = body_bytes, .bytes = body_bytes,
}); });
if (!write_body) { if (!write_body) {
const auto reason = "pipeline body MCAP write failed: " + write_body.error(); const auto reason = "pipeline body MCAP write failed: " + format_error(write_body.error());
restart_backend(reason, active_info); restart_backend(reason, active_info);
break; break;
} }
@@ -595,7 +846,7 @@ int run_pipeline(const RuntimeConfig &config) {
stats, stats,
rtp_publisher ? &*rtp_publisher : nullptr, rtp_publisher ? &*rtp_publisher : nullptr,
rtmp_output ? &*rtmp_output : nullptr, rtmp_output ? &*rtmp_output : nullptr,
mcap_sink ? &*mcap_sink : nullptr, &mcap_recorder,
latency_tracker); latency_tracker);
if (!drain) { if (!drain) {
const auto reason = format_error(drain.error()); const auto reason = format_error(drain.error());
@@ -670,7 +921,7 @@ int run_pipeline(const RuntimeConfig &config) {
continue; continue;
} }
if (mcap_sink.has_value() && !snapshot->depth.empty()) { if (!snapshot->depth.empty()) {
if (snapshot->depth_unit == ipc::DepthUnit::Unknown) { if (snapshot->depth_unit == ipc::DepthUnit::Unknown) {
if (!warned_unknown_depth_unit) { if (!warned_unknown_depth_unit) {
spdlog::warn( spdlog::warn(
@@ -686,9 +937,9 @@ int run_pipeline(const RuntimeConfig &config) {
continue; continue;
} }
auto write_depth = mcap_sink->write_depth_map(*depth_map); auto write_depth = write_mcap_depth_map(&mcap_recorder, *depth_map);
if (!write_depth) { if (!write_depth) {
const auto reason = "pipeline depth MCAP write failed: " + write_depth.error(); const auto reason = "pipeline depth MCAP write failed: " + format_error(write_depth.error());
restart_backend(reason, active_info); restart_backend(reason, active_info);
continue; continue;
} }
@@ -702,7 +953,7 @@ int run_pipeline(const RuntimeConfig &config) {
stats, stats,
rtp_publisher ? &*rtp_publisher : nullptr, rtp_publisher ? &*rtp_publisher : nullptr,
rtmp_output ? &*rtmp_output : nullptr, rtmp_output ? &*rtmp_output : nullptr,
mcap_sink ? &*mcap_sink : nullptr, &mcap_recorder,
latency_tracker); latency_tracker);
if (!drain) { if (!drain) {
const auto reason = format_error(drain.error()); const auto reason = format_error(drain.error());
@@ -730,7 +981,7 @@ int run_pipeline(const RuntimeConfig &config) {
stats, stats,
rtp_publisher ? &*rtp_publisher : nullptr, rtp_publisher ? &*rtp_publisher : nullptr,
rtmp_output ? &*rtmp_output : nullptr, rtmp_output ? &*rtmp_output : nullptr,
mcap_sink ? &*mcap_sink : nullptr, &mcap_recorder,
latency_tracker); latency_tracker);
if (!drain) { if (!drain) {
spdlog::error("pipeline publish failed during flush: {}", format_error(drain.error())); spdlog::error("pipeline publish failed during flush: {}", format_error(drain.error()));
@@ -739,9 +990,8 @@ int run_pipeline(const RuntimeConfig &config) {
} }
(*backend)->shutdown(); (*backend)->shutdown();
if (mcap_sink) { std::ignore = stop_mcap_recording(mcap_recorder);
mcap_sink->close(); recorder_service.Stop();
}
spdlog::info( spdlog::info(
"PIPELINE_METRICS codec={} backend={} sync_messages={} status_messages={} torn_frames={} pushed_frames={} encoded_access_units={} resets={} format_rebuilds={} supervised_restarts={}", "PIPELINE_METRICS codec={} backend={} sync_messages={} status_messages={} torn_frames={} pushed_frames={} encoded_access_units={} resets={} format_rebuilds={} supervised_restarts={}",