From 8d9bd1b815897ebc858ad09dbc47b99ee09bbac6 Mon Sep 17 00:00:00 2001 From: crosstyan Date: Fri, 20 Mar 2026 08:09:12 +0000 Subject: [PATCH] feat: bundle synced multi-camera ZED SVO streams into MCAP Extend zed_svo_to_mcap to export multiple SVO inputs or a kindergarten segment directory into a single MCAP with per-camera namespaced topics. Add a MultiMcapRecordSink so one writer can manage independent video, depth, calibration, pose, and body channels for multiple cameras while reusing the existing protobuf schemas and encoded access-unit flow. Implement strict timestamp synchronization for bundled exports by computing the common time window, seeking each SVO to the shared start, dropping unsynced frames, and only emitting groups that fall within the configured tolerance. Load per-camera positional tracking settings from a cv-mmap style TOML pose config, preserve single-camera behavior, and add graceful SIGINT or SIGTERM shutdown so interrupted exports flush encoders and finalize readable MCAP files. Add mcap_multi_record_tester coverage and switch zed_svo_to_mcap defaults to H.265 plus QUALITY depth mode. --- CMakeLists.txt | 1 + .../record/mcap_record_sink.hpp | 80 + src/record/mcap_record_sink.cpp | 480 ++++++ src/testers/mcap_multi_record_tester.cpp | 247 ++++ src/tools/zed_svo_to_mcap.cpp | 1299 ++++++++++++++++- 5 files changed, 2043 insertions(+), 64 deletions(-) create mode 100644 src/testers/mcap_multi_record_tester.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt index 4171dab..7d69823 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -287,6 +287,7 @@ add_cvmmap_binary(mcap_depth_record_tester src/testers/mcap_depth_record_tester. add_cvmmap_binary(mcap_body_record_tester src/testers/mcap_body_record_tester.cpp) add_cvmmap_binary(mcap_body_inspector src/testers/mcap_body_inspector.cpp) add_cvmmap_binary(mcap_pose_record_tester src/testers/mcap_pose_record_tester.cpp) +add_cvmmap_binary(mcap_multi_record_tester src/testers/mcap_multi_record_tester.cpp) add_executable(mcap_reader_tester src/testers/mcap_reader_tester.cpp) target_include_directories(mcap_reader_tester diff --git a/include/cvmmap_streamer/record/mcap_record_sink.hpp b/include/cvmmap_streamer/record/mcap_record_sink.hpp index 8ce2b4d..a8720f9 100644 --- a/include/cvmmap_streamer/record/mcap_record_sink.hpp +++ b/include/cvmmap_streamer/record/mcap_record_sink.hpp @@ -57,6 +57,15 @@ struct RawBodyTrackingMessageView { std::span bytes{}; }; +struct McapRecordStreamConfig { + std::string topic{"/camera/video"}; + std::string depth_topic{"/camera/depth"}; + std::string calibration_topic{"/camera/calibration"}; + std::string pose_topic{"/camera/pose"}; + std::string body_topic{"/camera/body"}; + std::string frame_id{"camera"}; +}; + class McapRecordSink { public: McapRecordSink() = default; @@ -107,4 +116,75 @@ private: State *state_{nullptr}; }; +class MultiMcapRecordSink { +public: + using StreamId = std::size_t; + struct State; + + MultiMcapRecordSink() = default; + ~MultiMcapRecordSink(); + + MultiMcapRecordSink(const MultiMcapRecordSink &) = delete; + MultiMcapRecordSink &operator=(const MultiMcapRecordSink &) = delete; + + MultiMcapRecordSink(MultiMcapRecordSink &&other) noexcept; + MultiMcapRecordSink &operator=(MultiMcapRecordSink &&other) noexcept; + + [[nodiscard]] + static std::expected create( + std::string path, + McapCompression compression); + + [[nodiscard]] + std::expected add_stream( + const McapRecordStreamConfig &config, + const encode::EncodedStreamInfo &stream_info); + + [[nodiscard]] + std::expected update_stream_info( + StreamId stream_id, + const encode::EncodedStreamInfo &stream_info); + + [[nodiscard]] + std::expected write_access_unit( + StreamId stream_id, + const encode::EncodedAccessUnit &access_unit); + + [[nodiscard]] + std::expected write_depth_map( + StreamId stream_id, + const RawDepthMapView &depth_map); + + [[nodiscard]] + std::expected write_depth_map_u16( + StreamId stream_id, + const RawDepthMapU16View &depth_map); + + [[nodiscard]] + std::expected write_camera_calibration( + StreamId stream_id, + const RawCameraCalibrationView &calibration); + + [[nodiscard]] + std::expected write_pose( + StreamId stream_id, + const RawPoseView &pose); + + [[nodiscard]] + std::expected write_body_tracking_message( + StreamId stream_id, + const RawBodyTrackingMessageView &body_message); + + [[nodiscard]] + bool is_open() const; + + [[nodiscard]] + std::string_view path() const; + + void close(); + +private: + State *state_{nullptr}; +}; + } diff --git a/src/record/mcap_record_sink.cpp b/src/record/mcap_record_sink.cpp index c53241b..de1ba07 100644 --- a/src/record/mcap_record_sink.cpp +++ b/src/record/mcap_record_sink.cpp @@ -772,4 +772,484 @@ void McapRecordSink::close() { state_ = nullptr; } +struct MultiMcapRecordSink::State { + struct StreamState { + McapRecordStreamConfig config{}; + mcap::ChannelId video_channel_id{0}; + mcap::ChannelId depth_channel_id{0}; + mcap::ChannelId calibration_channel_id{0}; + mcap::ChannelId pose_channel_id{0}; + mcap::ChannelId body_channel_id{0}; + std::uint32_t video_sequence{0}; + std::uint32_t depth_sequence{0}; + std::uint32_t calibration_sequence{0}; + std::uint32_t pose_sequence{0}; + std::uint32_t body_sequence{0}; + CodecType codec{CodecType::H264}; + std::vector keyframe_preamble{}; + }; + + mcap::McapWriter writer{}; + std::string path{}; + mcap::SchemaId video_schema_id{0}; + mcap::SchemaId depth_schema_id{0}; + mcap::SchemaId calibration_schema_id{0}; + mcap::SchemaId pose_schema_id{0}; + std::vector streams{}; +}; + +namespace { + +[[nodiscard]] +MultiMcapRecordSink::State::StreamState *stream_state( + MultiMcapRecordSink::State *state, + const MultiMcapRecordSink::StreamId stream_id, + std::string &error) { + if (state == nullptr) { + error = "MCAP sink is not open"; + return nullptr; + } + if (stream_id >= state->streams.size()) { + error = "invalid MCAP stream id"; + return nullptr; + } + return &state->streams[stream_id]; +} + +[[nodiscard]] +std::expected update_stream_info_impl( + MultiMcapRecordSink::State::StreamState &stream, + const encode::EncodedStreamInfo &stream_info) { + auto decoder_config_annexb = decoder_config_to_annexb(stream_info.codec, stream_info.decoder_config); + if (!decoder_config_annexb) { + return std::unexpected("failed to prepare MCAP keyframe decoder config: " + decoder_config_annexb.error()); + } + stream.codec = stream_info.codec; + stream.keyframe_preamble = std::move(*decoder_config_annexb); + return {}; +} + +[[nodiscard]] +std::expected validate_new_stream_config( + const MultiMcapRecordSink::State &state, + const McapRecordStreamConfig &config) { + if (config.topic.empty()) { + return std::unexpected("video topic is empty"); + } + if (config.depth_topic.empty()) { + return std::unexpected("depth topic is empty"); + } + if (config.frame_id.empty()) { + return std::unexpected("frame_id is empty"); + } + + const auto topic_in_use = [&state](const std::string &topic) { + if (topic.empty()) { + return false; + } + for (const auto &stream : state.streams) { + if (stream.config.topic == topic || + stream.config.depth_topic == topic || + stream.config.calibration_topic == topic || + stream.config.pose_topic == topic || + stream.config.body_topic == topic) { + return true; + } + } + return false; + }; + + for (const auto *topic : { + &config.topic, + &config.depth_topic, + &config.calibration_topic, + &config.pose_topic, + &config.body_topic, + }) { + if (topic_in_use(*topic)) { + return std::unexpected("duplicate MCAP topic: " + *topic); + } + } + return {}; +} + +} // namespace + +MultiMcapRecordSink::~MultiMcapRecordSink() { + close(); +} + +MultiMcapRecordSink::MultiMcapRecordSink(MultiMcapRecordSink &&other) noexcept + : state_(other.state_) { + other.state_ = nullptr; +} + +MultiMcapRecordSink &MultiMcapRecordSink::operator=(MultiMcapRecordSink &&other) noexcept { + if (this != &other) { + close(); + state_ = other.state_; + other.state_ = nullptr; + } + return *this; +} + +std::expected MultiMcapRecordSink::create( + std::string path, + McapCompression compression) { + MultiMcapRecordSink sink{}; + auto state = std::make_unique(); + state->path = std::move(path); + + mcap::McapWriterOptions options(""); + options.compression = to_mcap_compression(compression); + const auto open_status = state->writer.open(state->path, options); + if (!open_status.ok()) { + return std::unexpected("failed to open MCAP writer at '" + state->path + "': " + open_status.message); + } + + const auto video_descriptor_set = build_file_descriptor_set(foxglove::CompressedVideo::descriptor()); + std::string video_schema_bytes{}; + if (!video_descriptor_set.SerializeToString(&video_schema_bytes)) { + return std::unexpected("failed to serialize foxglove.CompressedVideo descriptor set"); + } + mcap::Schema video_schema("foxglove.CompressedVideo", "protobuf", video_schema_bytes); + state->writer.addSchema(video_schema); + state->video_schema_id = video_schema.id; + + const auto depth_descriptor_set = build_file_descriptor_set(cvmmap_streamer::DepthMap::descriptor()); + std::string depth_schema_bytes{}; + if (!depth_descriptor_set.SerializeToString(&depth_schema_bytes)) { + return std::unexpected("failed to serialize cvmmap_streamer.DepthMap descriptor set"); + } + mcap::Schema depth_schema("cvmmap_streamer.DepthMap", "protobuf", depth_schema_bytes); + state->writer.addSchema(depth_schema); + state->depth_schema_id = depth_schema.id; + + const auto calibration_descriptor_set = build_file_descriptor_set(foxglove::CameraCalibration::descriptor()); + std::string calibration_schema_bytes{}; + if (!calibration_descriptor_set.SerializeToString(&calibration_schema_bytes)) { + return std::unexpected("failed to serialize foxglove.CameraCalibration descriptor set"); + } + mcap::Schema calibration_schema("foxglove.CameraCalibration", "protobuf", calibration_schema_bytes); + state->writer.addSchema(calibration_schema); + state->calibration_schema_id = calibration_schema.id; + + const auto pose_descriptor_set = build_file_descriptor_set(foxglove::PoseInFrame::descriptor()); + std::string pose_schema_bytes{}; + if (!pose_descriptor_set.SerializeToString(&pose_schema_bytes)) { + return std::unexpected("failed to serialize foxglove.PoseInFrame descriptor set"); + } + mcap::Schema pose_schema("foxglove.PoseInFrame", "protobuf", pose_schema_bytes); + state->writer.addSchema(pose_schema); + state->pose_schema_id = pose_schema.id; + + sink.state_ = state.release(); + return sink; +} + +std::expected MultiMcapRecordSink::add_stream( + const McapRecordStreamConfig &config, + const encode::EncodedStreamInfo &stream_info) { + if (state_ == nullptr) { + return std::unexpected("MCAP sink is not open"); + } + if (auto valid = validate_new_stream_config(*state_, config); !valid) { + return std::unexpected(valid.error()); + } + + State::StreamState stream{}; + stream.config = config; + + mcap::Channel video_channel(config.topic, "protobuf", state_->video_schema_id); + state_->writer.addChannel(video_channel); + stream.video_channel_id = video_channel.id; + + mcap::Channel depth_channel(config.depth_topic, "protobuf", state_->depth_schema_id); + state_->writer.addChannel(depth_channel); + stream.depth_channel_id = depth_channel.id; + + if (auto updated = update_stream_info_impl(stream, stream_info); !updated) { + return std::unexpected(updated.error()); + } + + state_->streams.push_back(std::move(stream)); + return state_->streams.size() - 1; +} + +std::expected MultiMcapRecordSink::update_stream_info( + const StreamId stream_id, + const encode::EncodedStreamInfo &stream_info) { + std::string error{}; + auto *stream = stream_state(state_, stream_id, error); + if (stream == nullptr) { + return std::unexpected(error); + } + return update_stream_info_impl(*stream, stream_info); +} + +std::expected MultiMcapRecordSink::write_access_unit( + const StreamId stream_id, + const encode::EncodedAccessUnit &access_unit) { + std::string error{}; + auto *stream = stream_state(state_, stream_id, error); + if (stream == nullptr) { + return std::unexpected(error); + } + + foxglove::CompressedVideo message{}; + *message.mutable_timestamp() = to_proto_timestamp(access_unit.source_timestamp_ns); + message.set_frame_id(stream->config.frame_id); + message.set_format(codec_format(access_unit.codec)); + std::vector payload{}; + if (access_unit.keyframe && !stream->keyframe_preamble.empty()) { + payload.reserve(stream->keyframe_preamble.size() + access_unit.annexb_bytes.size()); + payload.insert(payload.end(), stream->keyframe_preamble.begin(), stream->keyframe_preamble.end()); + } + payload.insert(payload.end(), access_unit.annexb_bytes.begin(), access_unit.annexb_bytes.end()); + message.set_data( + reinterpret_cast(payload.data()), + static_cast(payload.size())); + + std::string serialized{}; + if (!message.SerializeToString(&serialized)) { + return std::unexpected("failed to serialize foxglove.CompressedVideo"); + } + + mcap::Message record{}; + record.channelId = stream->video_channel_id; + record.sequence = stream->video_sequence++; + record.logTime = access_unit.source_timestamp_ns; + record.publishTime = access_unit.source_timestamp_ns; + record.data = reinterpret_cast(serialized.data()); + record.dataSize = serialized.size(); + + const auto write_status = state_->writer.write(record); + if (!write_status.ok()) { + return std::unexpected("failed to write MCAP message: " + write_status.message); + } + return {}; +} + +std::expected MultiMcapRecordSink::write_depth_map( + const StreamId stream_id, + const RawDepthMapView &depth_map) { + std::string error{}; + auto *stream = stream_state(state_, stream_id, error); + if (stream == nullptr) { + return std::unexpected(error); + } + + const auto source_unit = normalize_depth_source_unit(depth_map.source_unit); + auto encoded = encode_depth_payload(depth_map); + if (!encoded) { + return std::unexpected(encoded.error()); + } + + return write_depth_message( + state_->writer, + depth_map.timestamp_ns, + stream->config.frame_id, + depth_map.width, + depth_map.height, + source_unit, + stream->depth_channel_id, + stream->depth_sequence, + *encoded); +} + +std::expected MultiMcapRecordSink::write_depth_map_u16( + const StreamId stream_id, + const RawDepthMapU16View &depth_map) { + std::string error{}; + auto *stream = stream_state(state_, stream_id, error); + if (stream == nullptr) { + return std::unexpected(error); + } + + auto encoded = encode_depth_payload(depth_map); + if (!encoded) { + return std::unexpected(encoded.error()); + } + + return write_depth_message( + state_->writer, + depth_map.timestamp_ns, + stream->config.frame_id, + depth_map.width, + depth_map.height, + ipc::DepthUnit::Millimeter, + stream->depth_channel_id, + stream->depth_sequence, + *encoded); +} + +std::expected MultiMcapRecordSink::write_camera_calibration( + const StreamId stream_id, + const RawCameraCalibrationView &calibration) { + std::string error{}; + auto *stream = stream_state(state_, stream_id, error); + if (stream == nullptr) { + return std::unexpected(error); + } + if (stream->config.calibration_topic.empty()) { + return std::unexpected("calibration topic is empty"); + } + + if (stream->calibration_channel_id == 0) { + mcap::Channel calibration_channel(stream->config.calibration_topic, "protobuf", state_->calibration_schema_id); + state_->writer.addChannel(calibration_channel); + stream->calibration_channel_id = calibration_channel.id; + } + + foxglove::CameraCalibration message{}; + *message.mutable_timestamp() = to_proto_timestamp(calibration.timestamp_ns); + message.set_frame_id(stream->config.frame_id); + message.set_width(calibration.width); + message.set_height(calibration.height); + message.set_distortion_model(std::string(calibration.distortion_model)); + for (const double value : calibration.distortion) { + message.add_d(value); + } + if (auto append = append_repeated_double(message.mutable_k(), calibration.intrinsic_matrix, 9, "K"); !append) { + return append; + } + if (auto append = append_repeated_double(message.mutable_r(), calibration.rectification_matrix, 9, "R"); !append) { + return append; + } + if (auto append = append_repeated_double(message.mutable_p(), calibration.projection_matrix, 12, "P"); !append) { + return append; + } + + std::string serialized{}; + if (!message.SerializeToString(&serialized)) { + return std::unexpected("failed to serialize foxglove.CameraCalibration"); + } + + mcap::Message record{}; + record.channelId = stream->calibration_channel_id; + record.sequence = stream->calibration_sequence++; + record.logTime = calibration.timestamp_ns; + record.publishTime = calibration.timestamp_ns; + record.data = reinterpret_cast(serialized.data()); + record.dataSize = serialized.size(); + + const auto write_status = state_->writer.write(record); + if (!write_status.ok()) { + return std::unexpected("failed to write MCAP calibration message: " + write_status.message); + } + return {}; +} + +std::expected MultiMcapRecordSink::write_pose( + const StreamId stream_id, + const RawPoseView &pose) { + std::string error{}; + auto *stream = stream_state(state_, stream_id, error); + if (stream == nullptr) { + return std::unexpected(error); + } + if (stream->config.pose_topic.empty()) { + return std::unexpected("pose topic is empty"); + } + + if (stream->pose_channel_id == 0) { + mcap::Channel pose_channel(stream->config.pose_topic, "protobuf", state_->pose_schema_id); + state_->writer.addChannel(pose_channel); + stream->pose_channel_id = pose_channel.id; + } + + foxglove::PoseInFrame message{}; + *message.mutable_timestamp() = to_proto_timestamp(pose.timestamp_ns); + message.set_frame_id(std::string(pose.reference_frame_id)); + auto *pose_message = message.mutable_pose(); + auto *position = pose_message->mutable_position(); + position->set_x(pose.position[0]); + position->set_y(pose.position[1]); + position->set_z(pose.position[2]); + auto *orientation = pose_message->mutable_orientation(); + orientation->set_x(pose.orientation[0]); + orientation->set_y(pose.orientation[1]); + orientation->set_z(pose.orientation[2]); + orientation->set_w(pose.orientation[3]); + + std::string serialized{}; + if (!message.SerializeToString(&serialized)) { + return std::unexpected("failed to serialize foxglove.PoseInFrame"); + } + + mcap::Message record{}; + record.channelId = stream->pose_channel_id; + record.sequence = stream->pose_sequence++; + record.logTime = pose.timestamp_ns; + record.publishTime = pose.timestamp_ns; + record.data = reinterpret_cast(serialized.data()); + record.dataSize = serialized.size(); + + const auto write_status = state_->writer.write(record); + if (!write_status.ok()) { + return std::unexpected("failed to write MCAP pose message: " + write_status.message); + } + return {}; +} + +std::expected MultiMcapRecordSink::write_body_tracking_message( + const StreamId stream_id, + const RawBodyTrackingMessageView &body_message) { + std::string error{}; + auto *stream = stream_state(state_, stream_id, error); + if (stream == nullptr) { + return std::unexpected(error); + } + if (body_message.bytes.empty()) { + return std::unexpected("body tracking payload is empty"); + } + + if (stream->config.body_topic.empty()) { + return std::unexpected("body topic is empty"); + } + if (stream->body_channel_id == 0) { + mcap::Channel body_channel( + stream->config.body_topic, + kBodyTrackingMessageEncoding, + 0, + body_channel_metadata()); + state_->writer.addChannel(body_channel); + stream->body_channel_id = body_channel.id; + } + + mcap::Message record{}; + record.channelId = stream->body_channel_id; + record.sequence = stream->body_sequence++; + record.logTime = body_message.timestamp_ns; + record.publishTime = body_message.timestamp_ns; + record.data = reinterpret_cast(body_message.bytes.data()); + record.dataSize = body_message.bytes.size(); + + const auto write_status = state_->writer.write(record); + if (!write_status.ok()) { + return std::unexpected("failed to write MCAP body message: " + write_status.message); + } + return {}; +} + +bool MultiMcapRecordSink::is_open() const { + return state_ != nullptr; +} + +std::string_view MultiMcapRecordSink::path() const { + if (state_ == nullptr) { + return {}; + } + return state_->path; +} + +void MultiMcapRecordSink::close() { + if (state_ == nullptr) { + return; + } + state_->writer.close(); + delete state_; + state_ = nullptr; +} + } diff --git a/src/testers/mcap_multi_record_tester.cpp b/src/testers/mcap_multi_record_tester.cpp new file mode 100644 index 0000000..2a0a175 --- /dev/null +++ b/src/testers/mcap_multi_record_tester.cpp @@ -0,0 +1,247 @@ +#include + +#include "proto/cvmmap_streamer/DepthMap.pb.h" +#include "cvmmap_streamer/common.h" +#include "cvmmap_streamer/record/mcap_record_sink.hpp" +#include "proto/foxglove/CameraCalibration.pb.h" +#include "proto/foxglove/CompressedVideo.pb.h" +#include "proto/foxglove/PoseInFrame.pb.h" + +#include +#include + +#include +#include +#include +#include +#include + +namespace { + +enum class TesterExitCode : int { + Success = 0, + CreateError = 2, + WriteError = 3, + OpenError = 4, + VerificationError = 5, +}; + +[[nodiscard]] +constexpr int exit_code(const TesterExitCode code) { + return static_cast(code); +} + +} + +int main(int argc, char **argv) { + if (cvmmap_streamer::has_help_flag(argc, argv)) { + cvmmap_streamer::print_help("mcap_multi_record_tester"); + return exit_code(TesterExitCode::Success); + } + + const std::filesystem::path output_path = + argc > 1 + ? std::filesystem::path(argv[1]) + : std::filesystem::temp_directory_path() / "cvmmap_streamer_multi_record_test.mcap"; + if (output_path.has_parent_path()) { + std::filesystem::create_directories(output_path.parent_path()); + } + + auto sink = cvmmap_streamer::record::MultiMcapRecordSink::create( + output_path.string(), + cvmmap_streamer::McapCompression::None); + if (!sink) { + spdlog::error("failed to create MCAP sink: {}", sink.error()); + return exit_code(TesterExitCode::CreateError); + } + + cvmmap_streamer::encode::EncodedStreamInfo stream_info{}; + stream_info.codec = cvmmap_streamer::CodecType::H264; + + auto zed1 = sink->add_stream(cvmmap_streamer::record::McapRecordStreamConfig{ + .topic = "/zed1/video", + .depth_topic = "/zed1/depth", + .calibration_topic = "/zed1/calibration", + .pose_topic = "/zed1/pose", + .body_topic = "/zed1/body", + .frame_id = "zed1", + }, stream_info); + auto zed2 = sink->add_stream(cvmmap_streamer::record::McapRecordStreamConfig{ + .topic = "/zed2/video", + .depth_topic = "/zed2/depth", + .calibration_topic = "/zed2/calibration", + .pose_topic = "/zed2/pose", + .body_topic = "/zed2/body", + .frame_id = "zed2", + }, stream_info); + if (!zed1 || !zed2) { + spdlog::error("failed to add streams: {} {}", zed1 ? "" : zed1.error(), zed2 ? "" : zed2.error()); + return exit_code(TesterExitCode::CreateError); + } + + const std::vector depth_pixels{ + 1000, 2000, + 1500, 2500, + }; + const std::vector distortion{0.0, 0.0, 0.0, 0.0, 0.0}; + const std::vector intrinsic_matrix{ + 500.0, 0.0, 320.0, + 0.0, 500.0, 240.0, + 0.0, 0.0, 1.0, + }; + const std::vector rectification_matrix{ + 1.0, 0.0, 0.0, + 0.0, 1.0, 0.0, + 0.0, 0.0, 1.0, + }; + const std::vector projection_matrix{ + 500.0, 0.0, 320.0, 0.0, + 0.0, 500.0, 240.0, 0.0, + 0.0, 0.0, 1.0, 0.0, + }; + + for (const auto [stream_id, label, pose_x] : { + std::tuple{*zed1, std::string("zed1"), 1.0}, + std::tuple{*zed2, std::string("zed2"), 2.0}, + }) { + cvmmap_streamer::encode::EncodedAccessUnit access_unit{}; + access_unit.codec = cvmmap_streamer::CodecType::H264; + access_unit.source_timestamp_ns = 100; + access_unit.stream_pts_ns = 100; + access_unit.keyframe = false; + access_unit.annexb_bytes = {0x00, 0x00, 0x00, 0x01, 0x09, 0x10}; + if (auto write = sink->write_access_unit(stream_id, access_unit); !write) { + spdlog::error("failed to write video access unit: {}", write.error()); + return exit_code(TesterExitCode::WriteError); + } + + if (auto write = sink->write_depth_map_u16(stream_id, cvmmap_streamer::record::RawDepthMapU16View{ + .timestamp_ns = 100, + .width = 2, + .height = 2, + .pixels = depth_pixels, + }); !write) { + spdlog::error("failed to write depth map: {}", write.error()); + return exit_code(TesterExitCode::WriteError); + } + + if (auto write = sink->write_camera_calibration(stream_id, cvmmap_streamer::record::RawCameraCalibrationView{ + .timestamp_ns = 100, + .width = 640, + .height = 480, + .distortion_model = "plumb_bob", + .distortion = distortion, + .intrinsic_matrix = intrinsic_matrix, + .rectification_matrix = rectification_matrix, + .projection_matrix = projection_matrix, + }); !write) { + spdlog::error("failed to write calibration: {}", write.error()); + return exit_code(TesterExitCode::WriteError); + } + + if (auto write = sink->write_pose(stream_id, cvmmap_streamer::record::RawPoseView{ + .timestamp_ns = 100, + .reference_frame_id = label + "/world", + .position = {pose_x, 2.0, 3.0}, + .orientation = {0.1, 0.2, 0.3, 0.9}, + }); !write) { + spdlog::error("failed to write pose: {}", write.error()); + return exit_code(TesterExitCode::WriteError); + } + } + + sink->close(); + + mcap::McapReader reader{}; + const auto open_status = reader.open(output_path.string()); + if (!open_status.ok()) { + spdlog::error("failed to open MCAP file '{}': {}", output_path.string(), open_status.message); + return exit_code(TesterExitCode::OpenError); + } + + std::map topic_counts{}; + + auto messages = reader.readMessages(); + for (auto it = messages.begin(); it != messages.end(); ++it) { + if (it->schema == nullptr || it->channel == nullptr) { + spdlog::error("MCAP message missing schema or channel"); + reader.close(); + return exit_code(TesterExitCode::VerificationError); + } + topic_counts[it->channel->topic] += 1; + + if (it->schema->name == "foxglove.CompressedVideo") { + foxglove::CompressedVideo video{}; + if (!video.ParseFromArray(it->message.data, static_cast(it->message.dataSize))) { + spdlog::error("failed to parse foxglove.CompressedVideo"); + reader.close(); + return exit_code(TesterExitCode::VerificationError); + } + if (video.data().empty()) { + spdlog::error("video payload is empty"); + reader.close(); + return exit_code(TesterExitCode::VerificationError); + } + continue; + } + + if (it->schema->name == "cvmmap_streamer.DepthMap") { + cvmmap_streamer::DepthMap depth{}; + if (!depth.ParseFromArray(it->message.data, static_cast(it->message.dataSize))) { + spdlog::error("failed to parse cvmmap_streamer.DepthMap"); + reader.close(); + return exit_code(TesterExitCode::VerificationError); + } + const auto decoded = rvl::decompress_image(std::span( + reinterpret_cast(depth.data().data()), + depth.data().size())); + if (decoded.pixels.size() != depth_pixels.size()) { + spdlog::error("decoded depth pixel count mismatch"); + reader.close(); + return exit_code(TesterExitCode::VerificationError); + } + continue; + } + + if (it->schema->name == "foxglove.CameraCalibration") { + foxglove::CameraCalibration calibration{}; + if (!calibration.ParseFromArray(it->message.data, static_cast(it->message.dataSize))) { + spdlog::error("failed to parse foxglove.CameraCalibration"); + reader.close(); + return exit_code(TesterExitCode::VerificationError); + } + continue; + } + + if (it->schema->name == "foxglove.PoseInFrame") { + foxglove::PoseInFrame pose{}; + if (!pose.ParseFromArray(it->message.data, static_cast(it->message.dataSize))) { + spdlog::error("failed to parse foxglove.PoseInFrame"); + reader.close(); + return exit_code(TesterExitCode::VerificationError); + } + continue; + } + } + + reader.close(); + + for (const auto &topic : { + "/zed1/video", + "/zed1/depth", + "/zed1/calibration", + "/zed1/pose", + "/zed2/video", + "/zed2/depth", + "/zed2/calibration", + "/zed2/pose", + }) { + if (topic_counts[topic] != 1) { + spdlog::error("expected exactly one message on topic '{}', got {}", topic, topic_counts[topic]); + return exit_code(TesterExitCode::VerificationError); + } + } + + spdlog::info("validated multi-stream MCAP recording at '{}'", output_path.string()); + return exit_code(TesterExitCode::Success); +} diff --git a/src/tools/zed_svo_to_mcap.cpp b/src/tools/zed_svo_to_mcap.cpp index 3ea2d4f..f59fe8d 100644 --- a/src/tools/zed_svo_to_mcap.cpp +++ b/src/tools/zed_svo_to_mcap.cpp @@ -1,5 +1,6 @@ #include #include +#include #include @@ -9,19 +10,31 @@ #include "cvmmap_streamer/ipc/contracts.hpp" #include "cvmmap_streamer/record/mcap_record_sink.hpp" +#include #include +#include +#include #include #include #include +#include +#include #include +#include +#include #include +#include #include #include #include +#include #include namespace { +volatile std::sig_atomic_t g_signal_count = 0; +volatile std::sig_atomic_t g_last_signal = 0; + enum class ToolExitCode : int { Success = 0, UsageError = 2, @@ -29,12 +42,13 @@ enum class ToolExitCode : int { }; struct CliOptions { - std::string input_path{}; + std::vector input_paths{}; + std::string segment_dir{}; std::string output_path{}; - std::string codec{"h264"}; + std::string codec{"h265"}; std::string encoder_device{"auto"}; std::string mcap_compression{"zstd"}; - std::string depth_mode{"neural"}; + std::string depth_mode{"quality"}; bool with_pose{false}; std::uint32_t start_frame{0}; std::uint32_t end_frame{0}; @@ -44,6 +58,80 @@ struct CliOptions { std::string depth_topic{"/camera/depth"}; std::string calibration_topic{"/camera/calibration"}; std::string pose_topic{"/camera/pose"}; + std::string world_frame_id{"world"}; + std::string pose_config_path{}; + double sync_tolerance_ms{0.0}; + bool has_sync_tolerance{false}; +}; + +struct PoseTrackingOptions { + sl::COORDINATE_SYSTEM coordinate_system{sl::COORDINATE_SYSTEM::IMAGE}; + sl::REFERENCE_FRAME reference_frame{sl::REFERENCE_FRAME::WORLD}; + bool set_floor_as_origin{false}; + std::string reference_frame_name{"world"}; +}; + +struct SourceSpec { + std::filesystem::path path{}; + std::string label{}; +}; + +struct TrackingSample { + sl::POSITIONAL_TRACKING_STATE state{sl::POSITIONAL_TRACKING_STATE::OFF}; + bool has_pose{false}; + std::array position{}; + std::array orientation{}; +}; + +struct CameraStream { + SourceSpec source{}; + std::unique_ptr camera{}; + sl::RuntimeParameters runtime{}; + sl::Mat current_left_frame{}; + sl::Mat current_depth_frame{}; + sl::Mat next_left_frame{}; + sl::Mat next_depth_frame{}; + TrackingSample current_tracking{}; + TrackingSample next_tracking{}; + std::uint64_t current_timestamp_ns{0}; + std::uint64_t next_timestamp_ns{0}; + std::uint64_t first_timestamp_ns{0}; + std::uint64_t last_timestamp_ns{0}; + std::uint64_t total_frames{0}; + std::uint64_t nominal_frame_period_ns{0}; + std::uint64_t dropped_frames{0}; + float fps{0.0f}; + std::uint32_t width{0}; + std::uint32_t height{0}; + int sync_position{-1}; + bool has_next{false}; + bool tracking_enabled{false}; + bool calibration_written{false}; + std::optional last_tracking_state{}; + int serial_number{0}; + PoseTrackingOptions pose_tracking{}; + cvmmap_streamer::ipc::FrameInfo frame_info{}; + std::optional backend{}; + cvmmap_streamer::encode::EncodedStreamInfo stream_info{}; + cvmmap_streamer::record::MultiMcapRecordSink::StreamId mcap_stream_id{0}; + std::array distortion{ + 0.0, 0.0, 0.0, 0.0, 0.0, + }; + std::array intrinsic_matrix{ + 0.0, 0.0, 0.0, + 0.0, 0.0, 0.0, + 0.0, 0.0, 1.0, + }; + std::array rectification_matrix{ + 1.0, 0.0, 0.0, + 0.0, 1.0, 0.0, + 0.0, 0.0, 1.0, + }; + std::array projection_matrix{ + 0.0, 0.0, 0.0, 0.0, + 0.0, 0.0, 0.0, 0.0, + 0.0, 0.0, 1.0, 0.0, + }; }; [[nodiscard]] @@ -51,6 +139,55 @@ constexpr int exit_code(const ToolExitCode code) { return static_cast(code); } +void termination_signal_handler(const int signal_number) { + g_last_signal = signal_number; + if (g_signal_count > 0) { + std::_Exit(128 + signal_number); + } + g_signal_count = 1; +} + +void install_signal_handlers() { + std::signal(SIGINT, termination_signal_handler); + std::signal(SIGTERM, termination_signal_handler); +} + +[[nodiscard]] +bool shutdown_requested() { + return g_signal_count != 0; +} + +[[nodiscard]] +int interrupted_exit_code() { + if (g_last_signal == SIGTERM) { + return 128 + SIGTERM; + } + return 128 + SIGINT; +} + +[[nodiscard]] +std::string interrupted_signal_name() { + if (g_last_signal == SIGTERM) { + return "SIGTERM"; + } + return "SIGINT"; +} + +[[nodiscard]] +bool log_shutdown_request(bool &logged, const std::string_view context) { + if (!shutdown_requested()) { + return false; + } + if (!logged) { + spdlog::warn( + "{}: received {}; attempting graceful shutdown. send the signal again to force exit", + context, + interrupted_signal_name()); + logged = true; + } + return true; +} + [[nodiscard]] std::string zed_string(const sl::String &value) { return std::string(value.c_str() == nullptr ? "" : value.c_str()); @@ -177,6 +314,541 @@ std::vector copy_compact_u16_plane(const sl::Mat &mat) { return compact; } +[[nodiscard]] +std::string lowercase(std::string value) { + std::transform( + value.begin(), + value.end(), + value.begin(), + [](unsigned char ch) { + return static_cast(std::tolower(ch)); + }); + return value; +} + +[[nodiscard]] +std::string normalize_config_token(std::string value) { + value = lowercase(std::move(value)); + std::replace(value.begin(), value.end(), '-', '_'); + std::replace(value.begin(), value.end(), ' ', '_'); + return value; +} + +[[nodiscard]] +std::expected parse_coordinate_system(const std::string_view raw) { + const auto normalized = normalize_config_token(std::string(raw)); + if (normalized == "image") { + return sl::COORDINATE_SYSTEM::IMAGE; + } + if (normalized == "right_handed_y_up" || normalized == "y_up") { + return sl::COORDINATE_SYSTEM::RIGHT_HANDED_Y_UP; + } + return std::unexpected( + "invalid zed.coordinate_system: '" + std::string(raw) + "' (allowed: IMAGE, RIGHT_HANDED_Y_UP)"); +} + +[[nodiscard]] +std::expected load_pose_tracking_options(const CliOptions &options) { + PoseTrackingOptions config{}; + config.reference_frame_name = options.world_frame_id; + if (options.pose_config_path.empty()) { + return config; + } + + toml::table table{}; + try { + table = toml::parse_file(options.pose_config_path); + } catch (const toml::parse_error &error) { + return std::unexpected("failed to parse pose config '" + options.pose_config_path + "': " + std::string(error.description())); + } + + if (const auto value = table["zed"]["coordinate_system"].value(); value) { + auto parsed = parse_coordinate_system(*value); + if (!parsed) { + return std::unexpected(parsed.error()); + } + config.coordinate_system = *parsed; + } + + if (const auto value = table["zed"]["body_tracking"]["reference_frame"].value(); value) { + const auto normalized = normalize_config_token(*value); + if (normalized == "camera") { + config.reference_frame = sl::REFERENCE_FRAME::CAMERA; + config.reference_frame_name = "camera"; + } else if (normalized == "world") { + config.reference_frame = sl::REFERENCE_FRAME::WORLD; + config.reference_frame_name = "world"; + } else { + return std::unexpected( + "invalid zed.body_tracking.reference_frame: '" + *value + "' (allowed: CAMERA, WORLD)"); + } + } + + if (const auto value = table["zed"]["body_tracking"]["set_floor_as_origin"].value(); value) { + config.set_floor_as_origin = *value; + } + + return config; +} + +[[nodiscard]] +std::string extract_camera_label(const std::filesystem::path &path, const std::size_t fallback_index = 0) { + static const std::regex pattern{R"(.*_(zed[0-9]+)\.svo2?$)", std::regex::icase}; + std::smatch match{}; + const auto filename = path.filename().string(); + if (std::regex_match(filename, match, pattern)) { + return lowercase(match[1].str()); + } + if (fallback_index > 0) { + return "cam" + std::to_string(fallback_index); + } + return lowercase(path.stem().string()); +} + +[[nodiscard]] +std::string namespace_topic(const std::string_view label, const std::string_view topic) { + if (topic.empty()) { + return {}; + } + std::string suffix{topic}; + while (!suffix.empty() && suffix.front() == '/') { + suffix.erase(suffix.begin()); + } + if (suffix.rfind("camera/", 0) == 0) { + suffix.erase(0, std::string_view{"camera/"}.size()); + } + return "/" + std::string(label) + "/" + suffix; +} + +[[nodiscard]] +std::string multi_frame_id(const CliOptions &options, const std::string_view label) { + if (options.frame_id.empty() || options.frame_id == "camera") { + return std::string(label); + } + return options.frame_id + "/" + std::string(label); +} + +[[nodiscard]] +std::string pose_reference_frame_id(const PoseTrackingOptions &options, const std::string_view label) { + return std::string(label) + "/" + options.reference_frame_name; +} + +[[nodiscard]] +std::expected, std::string> discover_segment_inputs(const std::filesystem::path &segment_dir) { + if (!std::filesystem::is_directory(segment_dir)) { + return std::unexpected("segment directory does not exist: " + segment_dir.string()); + } + + static const std::regex pattern{R"(.*_zed([0-9]+)\.svo2?$)", std::regex::icase}; + std::vector> ordered_paths{}; + for (const auto &entry : std::filesystem::directory_iterator{segment_dir}) { + if (!entry.is_regular_file()) { + continue; + } + std::smatch match{}; + const auto filename = entry.path().filename().string(); + if (!std::regex_match(filename, match, pattern)) { + continue; + } + ordered_paths.emplace_back(std::stoi(match[1].str()), entry.path()); + } + + std::sort( + ordered_paths.begin(), + ordered_paths.end(), + [](const auto &left, const auto &right) { + return left.first < right.first; + }); + + if (ordered_paths.size() < 2) { + return std::unexpected( + "expected at least 2 SVO inputs under '" + segment_dir.string() + "', found " + std::to_string(ordered_paths.size())); + } + + std::vector sources{}; + sources.reserve(ordered_paths.size()); + for (const auto &[camera_index, path] : ordered_paths) { + sources.push_back(SourceSpec{ + .path = path, + .label = "zed" + std::to_string(camera_index), + }); + } + return sources; +} + +[[nodiscard]] +std::expected, std::string> resolve_sources(const CliOptions &options) { + if (!options.segment_dir.empty()) { + if (!options.input_paths.empty()) { + return std::unexpected("provide either --segment-dir or --input, not both"); + } + return discover_segment_inputs(std::filesystem::path{options.segment_dir}); + } + + if (options.input_paths.empty()) { + return std::unexpected("provide at least one --input or use --segment-dir"); + } + + std::vector sources{}; + sources.reserve(options.input_paths.size()); + for (std::size_t index = 0; index < options.input_paths.size(); ++index) { + const auto path = std::filesystem::path{options.input_paths[index]}; + if (!std::filesystem::is_regular_file(path)) { + return std::unexpected("input file does not exist: " + path.string()); + } + sources.push_back(SourceSpec{ + .path = path, + .label = extract_camera_label(path, index + 1), + }); + } + + std::sort( + sources.begin(), + sources.end(), + [](const auto &left, const auto &right) { + return left.label < right.label; + }); + for (std::size_t index = 1; index < sources.size(); ++index) { + if (sources[index - 1].label == sources[index].label) { + return std::unexpected("duplicate camera label resolved from inputs: " + sources[index].label); + } + } + return sources; +} + +[[nodiscard]] +std::filesystem::path derive_output_path(const CliOptions &options, const std::vector &sources) { + if (!options.output_path.empty()) { + return std::filesystem::path{options.output_path}; + } + if (!options.segment_dir.empty()) { + const auto segment_dir = std::filesystem::path{options.segment_dir}; + return segment_dir / (segment_dir.filename().string() + ".mcap"); + } + auto output_path = sources.front().path; + output_path.replace_extension(".mcap"); + return output_path; +} + +[[nodiscard]] +std::expected read_image_timestamp_ns( + sl::Camera &camera, + const std::optional fallback_timestamp_ns, + const std::uint64_t nominal_frame_period_ns) { + auto timestamp_ns = camera.getTimestamp(sl::TIME_REFERENCE::IMAGE).getNanoseconds(); + if (timestamp_ns == 0) { + if (!fallback_timestamp_ns) { + return std::unexpected("ZED SDK returned a zero image timestamp for the first frame"); + } + timestamp_ns = *fallback_timestamp_ns + nominal_frame_period_ns; + } + return timestamp_ns; +} + +void maybe_log_tracking_state(CameraStream &stream, const sl::POSITIONAL_TRACKING_STATE tracking_state) { + if (!stream.tracking_enabled) { + return; + } + if (stream.last_tracking_state && *stream.last_tracking_state == tracking_state) { + return; + } + stream.last_tracking_state = tracking_state; + if (tracking_state != sl::POSITIONAL_TRACKING_STATE::OK) { + spdlog::warn( + "pose tracking state for {} changed to {}", + stream.source.label, + zed_tracking_state_string(tracking_state)); + } +} + +[[nodiscard]] +std::expected read_frame_data( + CameraStream &stream, + sl::Mat &left_frame, + sl::Mat &depth_frame, + TrackingSample &tracking_sample, + std::optional fallback_timestamp_ns, + std::uint64_t ×tamp_ns_out) { + const auto grab_status = stream.camera->grab(stream.runtime); + if (grab_status == sl::ERROR_CODE::END_OF_SVOFILE_REACHED) { + return std::unexpected("end-of-svo"); + } + if (grab_status != sl::ERROR_CODE::SUCCESS) { + return std::unexpected( + "failed to grab frame for " + stream.source.label + ": " + zed_status_string(grab_status)); + } + + const auto image_status = stream.camera->retrieveImage(left_frame, sl::VIEW::LEFT_BGR, sl::MEM::CPU); + if (image_status != sl::ERROR_CODE::SUCCESS) { + return std::unexpected( + "failed to retrieve left image for " + stream.source.label + ": " + zed_status_string(image_status)); + } + if (auto valid = validate_u8c3_mat(left_frame, stream.source.label + " left image"); !valid) { + return std::unexpected(valid.error()); + } + + const auto depth_status = stream.camera->retrieveMeasure(depth_frame, sl::MEASURE::DEPTH_U16_MM, sl::MEM::CPU); + if (depth_status != sl::ERROR_CODE::SUCCESS) { + return std::unexpected( + "failed to retrieve depth map for " + stream.source.label + ": " + zed_status_string(depth_status)); + } + if (auto valid = validate_u16c1_mat(depth_frame, stream.source.label + " depth map"); !valid) { + return std::unexpected(valid.error()); + } + + auto timestamp_ns = read_image_timestamp_ns(*stream.camera, fallback_timestamp_ns, stream.nominal_frame_period_ns); + if (!timestamp_ns) { + return std::unexpected(timestamp_ns.error()); + } + timestamp_ns_out = *timestamp_ns; + + tracking_sample = {}; + if (stream.tracking_enabled) { + sl::Pose pose{}; + tracking_sample.state = stream.camera->getPosition(pose, stream.pose_tracking.reference_frame); + maybe_log_tracking_state(stream, tracking_sample.state); + if (tracking_sample.state == sl::POSITIONAL_TRACKING_STATE::OK) { + tracking_sample.has_pose = true; + const auto translation = pose.getTranslation(); + const auto orientation = pose.getOrientation(); + tracking_sample.position = { + static_cast(translation.x), + static_cast(translation.y), + static_cast(translation.z), + }; + tracking_sample.orientation = { + static_cast(orientation.x), + static_cast(orientation.y), + static_cast(orientation.z), + static_cast(orientation.w), + }; + } + } + return {}; +} + +[[nodiscard]] +std::expected fill_next_frame(CameraStream &stream) { + auto next = read_frame_data( + stream, + stream.next_left_frame, + stream.next_depth_frame, + stream.next_tracking, + stream.current_timestamp_ns, + stream.next_timestamp_ns); + if (!next) { + if (next.error() == "end-of-svo") { + stream.has_next = false; + return {}; + } + return std::unexpected(next.error()); + } + stream.has_next = true; + return {}; +} + +[[nodiscard]] +std::expected promote_next_frame(CameraStream &stream) { + if (!stream.has_next) { + return std::unexpected("no buffered next frame is available for " + stream.source.label); + } + + std::swap(stream.current_left_frame, stream.next_left_frame); + std::swap(stream.current_depth_frame, stream.next_depth_frame); + std::swap(stream.current_tracking, stream.next_tracking); + std::swap(stream.current_timestamp_ns, stream.next_timestamp_ns); + stream.has_next = false; + return fill_next_frame(stream); +} + +[[nodiscard]] +std::expected read_last_readable_timestamp(CameraStream &stream) { + const auto last_candidate = static_cast(stream.total_frames - 1); + std::string last_error{}; + + for (int position = last_candidate; position >= 0; --position) { + stream.camera->setSVOPosition(position); + std::uint64_t timestamp_ns = 0; + TrackingSample ignored_tracking{}; + auto frame = read_frame_data( + stream, + stream.current_left_frame, + stream.current_depth_frame, + ignored_tracking, + std::nullopt, + timestamp_ns); + if (frame) { + const auto skipped_tail_frames = static_cast(last_candidate - position); + if (skipped_tail_frames > 0) { + spdlog::warn( + "skipping {} unreadable tail frame(s) for {} last_error={}", + skipped_tail_frames, + stream.source.path.string(), + last_error); + } + return timestamp_ns; + } + last_error = frame.error(); + } + + return std::unexpected( + "failed to read any trailing frame for " + stream.source.path.string() + ": " + last_error); +} + +[[nodiscard]] +std::expected make_encoder_backend( + const CliOptions &options, + const cvmmap_streamer::CodecType codec, + const cvmmap_streamer::EncoderDeviceType encoder_device, + const cvmmap_streamer::ipc::FrameInfo &frame_info) { + cvmmap_streamer::RuntimeConfig config = cvmmap_streamer::RuntimeConfig::defaults(); + config.encoder.backend = cvmmap_streamer::EncoderBackendType::FFmpeg; + config.encoder.device = encoder_device; + config.encoder.codec = codec; + config.encoder.gop = 30; + config.encoder.b_frames = 0; + + auto backend_result = cvmmap_streamer::encode::make_encoder_backend(config); + if (!backend_result) { + return std::unexpected( + "failed to create encoder backend: " + cvmmap_streamer::format_error(backend_result.error())); + } + auto backend = std::move(*backend_result); + if (auto init_status = backend->init(config, frame_info); !init_status) { + return std::unexpected( + "failed to initialize encoder backend: " + cvmmap_streamer::format_error(init_status.error())); + } + return std::move(backend); +} + +[[nodiscard]] +std::expected open_camera_stream( + const SourceSpec &source, + const CliOptions &options, + const cvmmap_streamer::CodecType codec, + const cvmmap_streamer::EncoderDeviceType encoder_device, + const sl::DEPTH_MODE depth_mode, + const PoseTrackingOptions &pose_tracking) { + CameraStream stream{}; + stream.source = source; + stream.camera = std::make_unique(); + stream.pose_tracking = pose_tracking; + + sl::InitParameters init{}; + init.input.setFromSVOFile(source.path.c_str()); + init.svo_real_time_mode = false; + init.coordinate_system = pose_tracking.coordinate_system; + init.coordinate_units = sl::UNIT::METER; + init.depth_mode = depth_mode; + init.sdk_verbose = false; + + const auto open_status = stream.camera->open(init); + if (open_status != sl::ERROR_CODE::SUCCESS) { + return std::unexpected("failed to open SVO '" + source.path.string() + "': " + zed_status_string(open_status)); + } + + if (options.with_pose) { + sl::PositionalTrackingParameters tracking_parameters{}; + tracking_parameters.set_floor_as_origin = pose_tracking.set_floor_as_origin; + const auto tracking_status = stream.camera->enablePositionalTracking(tracking_parameters); + if (tracking_status == sl::ERROR_CODE::SUCCESS) { + stream.tracking_enabled = true; + } else { + spdlog::warn( + "positional tracking unavailable for '{}': {}. continuing without pose output", + source.path.string(), + zed_status_string(tracking_status)); + } + } + + const auto total_frames = stream.camera->getSVONumberOfFrames(); + if (total_frames <= 0) { + return std::unexpected("input SVO has no frames: " + source.path.string()); + } + stream.total_frames = static_cast(total_frames); + + const auto camera_info = stream.camera->getCameraInformation(); + const auto &camera_config = camera_info.camera_configuration; + const auto &left_camera = camera_config.calibration_parameters.left_cam; + stream.serial_number = camera_info.serial_number; + stream.width = static_cast(camera_config.resolution.width); + stream.height = static_cast(camera_config.resolution.height); + stream.fps = camera_config.fps; + stream.nominal_frame_period_ns = frame_period_ns(camera_config.fps); + if (stream.width == 0 || stream.height == 0) { + return std::unexpected("camera resolution reported by the ZED SDK is invalid for " + source.path.string()); + } + + stream.intrinsic_matrix = { + static_cast(left_camera.fx), 0.0, static_cast(left_camera.cx), + 0.0, static_cast(left_camera.fy), static_cast(left_camera.cy), + 0.0, 0.0, 1.0, + }; + stream.projection_matrix = { + static_cast(left_camera.fx), 0.0, static_cast(left_camera.cx), 0.0, + 0.0, static_cast(left_camera.fy), static_cast(left_camera.cy), 0.0, + 0.0, 0.0, 1.0, 0.0, + }; + + stream.frame_info = cvmmap_streamer::ipc::FrameInfo{ + .width = static_cast(stream.width), + .height = static_cast(stream.height), + .channels = 3, + .depth = cvmmap_streamer::ipc::Depth::U8, + .pixel_format = cvmmap_streamer::ipc::PixelFormat::BGR, + .buffer_size = stream.width * stream.height * 3u, + }; + + auto backend = make_encoder_backend(options, codec, encoder_device, stream.frame_info); + if (!backend) { + return std::unexpected(backend.error()); + } + stream.backend.emplace(std::move(*backend)); + + auto stream_info = (*stream.backend)->stream_info(); + if (!stream_info) { + return std::unexpected( + "encoder backend did not provide stream info: " + cvmmap_streamer::format_error(stream_info.error())); + } + stream.stream_info = *stream_info; + + std::uint64_t first_timestamp_ns = 0; + auto first_frame = read_frame_data( + stream, + stream.current_left_frame, + stream.current_depth_frame, + stream.current_tracking, + std::nullopt, + first_timestamp_ns); + if (!first_frame) { + return std::unexpected(first_frame.error()); + } + stream.first_timestamp_ns = first_timestamp_ns; + + auto last_timestamp_ns = read_last_readable_timestamp(stream); + if (!last_timestamp_ns) { + return std::unexpected(last_timestamp_ns.error()); + } + stream.last_timestamp_ns = *last_timestamp_ns; + + return stream; +} + +void close_camera_streams(std::vector &streams) { + for (auto &stream : streams) { + if (stream.backend) { + (*stream.backend)->shutdown(); + } + if (stream.tracking_enabled && stream.camera != nullptr && stream.camera->isOpened()) { + stream.camera->disablePositionalTracking(); + stream.tracking_enabled = false; + } + if (stream.camera != nullptr && stream.camera->isOpened()) { + stream.camera->close(); + } + } +} + [[nodiscard]] std::expected write_access_units( cvmmap_streamer::record::McapRecordSink &sink, @@ -189,6 +861,19 @@ std::expected write_access_units( return {}; } +[[nodiscard]] +std::expected write_access_units( + cvmmap_streamer::record::MultiMcapRecordSink &sink, + const cvmmap_streamer::record::MultiMcapRecordSink::StreamId stream_id, + const std::vector &access_units) { + for (const auto &access_unit : access_units) { + if (auto write = sink.write_access_unit(stream_id, access_unit); !write) { + return std::unexpected(write.error()); + } + } + return {}; +} + [[nodiscard]] std::expected flush_and_write( cvmmap_streamer::record::McapRecordSink &sink, @@ -200,65 +885,282 @@ std::expected flush_and_write( return write_access_units(sink, *flushed); } +[[nodiscard]] +std::expected flush_and_write( + cvmmap_streamer::record::MultiMcapRecordSink &sink, + const cvmmap_streamer::record::MultiMcapRecordSink::StreamId stream_id, + cvmmap_streamer::encode::EncoderBackend &backend) { + auto flushed = backend->flush(); + if (!flushed) { + return std::unexpected(cvmmap_streamer::format_error(flushed.error())); + } + return write_access_units(sink, stream_id, *flushed); } -int main(int argc, char **argv) { - CliOptions options{}; +[[nodiscard]] +std::expected register_mcap_streams( + cvmmap_streamer::record::MultiMcapRecordSink &sink, + std::vector &streams, + const CliOptions &options) { + for (auto &stream : streams) { + cvmmap_streamer::record::McapRecordStreamConfig config{}; + config.topic = namespace_topic(stream.source.label, options.video_topic); + config.depth_topic = namespace_topic(stream.source.label, options.depth_topic); + config.calibration_topic = namespace_topic(stream.source.label, options.calibration_topic); + config.pose_topic = options.with_pose ? namespace_topic(stream.source.label, options.pose_topic) : ""; + config.body_topic = namespace_topic(stream.source.label, "/camera/body"); + config.frame_id = multi_frame_id(options, stream.source.label); + auto stream_id = sink.add_stream(config, stream.stream_info); + if (!stream_id) { + return std::unexpected(stream_id.error()); + } + stream.mcap_stream_id = *stream_id; + } + return {}; +} - CLI::App app{"zed_svo_to_mcap - convert ZED SVO/SVO2 playback to MCAP"}; - app.add_option("--input", options.input_path, "Input SVO/SVO2 file")->required(); - app.add_option("--output", options.output_path, "Output MCAP file")->required(); - app.add_option("--codec", options.codec, "Video codec (h264|h265)") - ->check(CLI::IsMember({"h264", "h265"})); - app.add_option("--encoder-device", options.encoder_device, "Encoder device (auto|nvidia|software)") - ->check(CLI::IsMember({"auto", "nvidia", "software"})); - app.add_flag("--with-pose", options.with_pose, "Emit foxglove.PoseInFrame when tracking is available"); - app.add_option("--mcap-compression", options.mcap_compression, "MCAP chunk compression (none|lz4|zstd)") - ->check(CLI::IsMember({"none", "lz4", "zstd"})); - app.add_option("--depth-mode", options.depth_mode, "ZED depth mode (neural|quality|performance|ultra)") - ->check(CLI::IsMember({"neural", "quality", "performance", "ultra"})); - app.add_option("--start-frame", options.start_frame, "First SVO frame to export (inclusive)") - ->check(CLI::NonNegativeNumber); - auto *end_frame_option = app.add_option("--end-frame", options.end_frame, "Last SVO frame to export (inclusive)") - ->check(CLI::NonNegativeNumber); - app.add_option("--frame-id", options.frame_id, "Frame id for image and depth topics"); - app.add_option("--video-topic", options.video_topic, "MCAP topic for foxglove.CompressedVideo"); - app.add_option("--depth-topic", options.depth_topic, "MCAP topic for cvmmap_streamer.DepthMap"); - app.add_option("--calibration-topic", options.calibration_topic, "MCAP topic for foxglove.CameraCalibration"); - app.add_option("--pose-topic", options.pose_topic, "MCAP topic for foxglove.PoseInFrame"); +[[nodiscard]] +std::expected sync_streams_to_timestamp( + std::vector &streams, + const std::uint64_t effective_start_ts) { + bool shutdown_logged{false}; + for (auto &stream : streams) { + if (log_shutdown_request(shutdown_logged, "multi-camera sync")) { + return std::unexpected("interrupted"); + } + stream.sync_position = stream.camera->getSVOPositionAtTimestamp(sl::Timestamp{effective_start_ts}); + if (stream.sync_position < 0) { + return std::unexpected( + "failed to compute synced start frame for " + stream.source.path.string() + " at timestamp " + + std::to_string(effective_start_ts)); + } - try { - app.parse(argc, argv); - } catch (const CLI::ParseError &error) { - return app.exit(error); - } - options.has_end_frame = end_frame_option->count() > 0; + stream.camera->setSVOPosition(stream.sync_position); + auto current = read_frame_data( + stream, + stream.current_left_frame, + stream.current_depth_frame, + stream.current_tracking, + std::nullopt, + stream.current_timestamp_ns); + if (!current) { + return std::unexpected(current.error()); + } + auto next = fill_next_frame(stream); + if (!next) { + return std::unexpected(next.error()); + } - auto codec = parse_codec(options.codec); - if (!codec) { - spdlog::error("{}", codec.error()); - return exit_code(ToolExitCode::UsageError); + while (stream.current_timestamp_ns < effective_start_ts && stream.has_next) { + if (log_shutdown_request(shutdown_logged, "multi-camera sync")) { + return std::unexpected("interrupted"); + } + stream.dropped_frames += 1; + auto promote = promote_next_frame(stream); + if (!promote) { + return std::unexpected(promote.error()); + } + } + + spdlog::info( + "ZED_SVO_MCAP_SYNC input={} label={} sync_position={} first_timestamp_ns={} current_timestamp_ns={} next_timestamp_ns={}", + stream.source.path.string(), + stream.source.label, + stream.sync_position, + stream.first_timestamp_ns, + stream.current_timestamp_ns, + stream.has_next ? stream.next_timestamp_ns : 0); } - auto encoder_device = parse_encoder_device(options.encoder_device); - if (!encoder_device) { - spdlog::error("{}", encoder_device.error()); - return exit_code(ToolExitCode::UsageError); + return {}; +} + +[[nodiscard]] +bool have_sync_window(const std::vector &streams, const std::uint64_t common_end_ts) { + for (const auto &stream : streams) { + if (stream.current_timestamp_ns > common_end_ts) { + return false; + } } - auto compression = parse_mcap_compression(options.mcap_compression); - if (!compression) { - spdlog::error("{}", compression.error()); - return exit_code(ToolExitCode::UsageError); + return true; +} + +[[nodiscard]] +std::expected, std::string> next_synced_group_timestamp( + std::vector &streams, + const std::uint64_t tolerance_ns, + const std::uint64_t common_end_ts) { + bool shutdown_logged{false}; + while (have_sync_window(streams, common_end_ts)) { + if (log_shutdown_request(shutdown_logged, "multi-camera sync")) { + return std::unexpected("interrupted"); + } + const auto candidate_it = std::max_element( + streams.begin(), + streams.end(), + [](const auto &left, const auto &right) { + return left.current_timestamp_ns < right.current_timestamp_ns; + }); + const auto candidate_ts = candidate_it->current_timestamp_ns; + + bool advanced = false; + for (auto &stream : streams) { + while (stream.current_timestamp_ns + tolerance_ns < candidate_ts) { + if (log_shutdown_request(shutdown_logged, "multi-camera sync")) { + return std::unexpected("interrupted"); + } + if (!stream.has_next) { + return std::optional{}; + } + stream.dropped_frames += 1; + auto promote = promote_next_frame(stream); + if (!promote) { + return std::unexpected(promote.error()); + } + advanced = true; + if (stream.current_timestamp_ns > common_end_ts) { + return std::optional{}; + } + } + } + if (advanced) { + continue; + } + + const auto [min_it, max_it] = std::minmax_element( + streams.begin(), + streams.end(), + [](const auto &left, const auto &right) { + return left.current_timestamp_ns < right.current_timestamp_ns; + }); + if (max_it->current_timestamp_ns - min_it->current_timestamp_ns <= tolerance_ns) { + return max_it->current_timestamp_ns; + } } - auto depth_mode = parse_depth_mode(options.depth_mode); - if (!depth_mode) { - spdlog::error("{}", depth_mode.error()); + return std::optional{}; +} + +[[nodiscard]] +std::expected encode_and_write_group( + cvmmap_streamer::record::MultiMcapRecordSink &sink, + std::vector &streams, + const CliOptions &options, + const std::uint64_t group_timestamp_ns) { + for (auto &stream : streams) { + const auto video_step_bytes = stream.current_left_frame.getStepBytes(sl::MEM::CPU); + const auto video_bytes = std::span( + stream.current_left_frame.getPtr(sl::MEM::CPU), + video_step_bytes * stream.current_left_frame.getHeight()); + cvmmap_streamer::encode::RawVideoFrame raw_video{ + .info = stream.frame_info, + .source_timestamp_ns = group_timestamp_ns, + .row_stride_bytes = video_step_bytes, + .bytes = video_bytes, + }; + if (auto push = (*stream.backend)->push_frame(raw_video); !push) { + return std::unexpected( + "failed to encode frame for " + stream.source.label + ": " + cvmmap_streamer::format_error(push.error())); + } + + auto drained = (*stream.backend)->drain(); + if (!drained) { + return std::unexpected( + "failed to drain encoded access units for " + stream.source.label + ": " + + cvmmap_streamer::format_error(drained.error())); + } + if (auto write = write_access_units(sink, stream.mcap_stream_id, *drained); !write) { + return std::unexpected("failed to write video access unit for " + stream.source.label + ": " + write.error()); + } + + if (!stream.calibration_written) { + cvmmap_streamer::record::RawCameraCalibrationView calibration{ + .timestamp_ns = group_timestamp_ns, + .width = stream.width, + .height = stream.height, + .distortion_model = "plumb_bob", + .distortion = stream.distortion, + .intrinsic_matrix = stream.intrinsic_matrix, + .rectification_matrix = stream.rectification_matrix, + .projection_matrix = stream.projection_matrix, + }; + if (auto write = sink.write_camera_calibration(stream.mcap_stream_id, calibration); !write) { + return std::unexpected("failed to write calibration for " + stream.source.label + ": " + write.error()); + } + stream.calibration_written = true; + } + + const auto depth_step_bytes = stream.current_depth_frame.getStepBytes(sl::MEM::CPU); + const auto packed_depth_bytes = static_cast(stream.width) * sizeof(std::uint16_t); + if (depth_step_bytes < packed_depth_bytes) { + return std::unexpected( + "depth stride " + std::to_string(depth_step_bytes) + " is smaller than packed row size " + + std::to_string(packed_depth_bytes) + " for " + stream.source.label); + } + + std::optional> compact_depth{}; + std::span depth_pixels{}; + if (depth_step_bytes == packed_depth_bytes) { + depth_pixels = std::span( + stream.current_depth_frame.getPtr(sl::MEM::CPU), + static_cast(stream.width) * static_cast(stream.height)); + } else { + compact_depth = copy_compact_u16_plane(stream.current_depth_frame); + depth_pixels = *compact_depth; + } + + cvmmap_streamer::record::RawDepthMapU16View depth_map{ + .timestamp_ns = group_timestamp_ns, + .width = stream.width, + .height = stream.height, + .pixels = depth_pixels, + }; + if (auto write = sink.write_depth_map_u16(stream.mcap_stream_id, depth_map); !write) { + return std::unexpected("failed to write depth map for " + stream.source.label + ": " + write.error()); + } + + if (options.with_pose && stream.current_tracking.has_pose) { + cvmmap_streamer::record::RawPoseView pose_view{ + .timestamp_ns = group_timestamp_ns, + .reference_frame_id = pose_reference_frame_id(stream.pose_tracking, stream.source.label), + .position = stream.current_tracking.position, + .orientation = stream.current_tracking.orientation, + }; + if (auto write = sink.write_pose(stream.mcap_stream_id, pose_view); !write) { + return std::unexpected("failed to write pose for " + stream.source.label + ": " + write.error()); + } + } + } + return {}; +} + +[[nodiscard]] +std::expected advance_after_emit(std::vector &streams) { + for (auto &stream : streams) { + if (!stream.has_next) { + return std::unexpected("end-of-svo"); + } + auto promote = promote_next_frame(stream); + if (!promote) { + return std::unexpected(promote.error()); + } + } + return {}; +} + +[[nodiscard]] +int run_single_source( + const CliOptions &options, + const std::filesystem::path &output_path, + const cvmmap_streamer::CodecType codec, + const cvmmap_streamer::EncoderDeviceType encoder_device, + const cvmmap_streamer::McapCompression compression, + const sl::DEPTH_MODE depth_mode, + const PoseTrackingOptions &pose_tracking) { + const auto input_path = std::filesystem::path{options.input_paths.front()}; + if (!std::filesystem::is_regular_file(input_path)) { + spdlog::error("input file does not exist: {}", input_path.string()); return exit_code(ToolExitCode::UsageError); } - if (options.output_path.empty()) { - spdlog::error("output path must not be empty"); - return exit_code(ToolExitCode::UsageError); - } if (options.has_end_frame && options.end_frame < options.start_frame) { spdlog::error( "invalid frame range: start-frame={} end-frame={}", @@ -266,8 +1168,6 @@ int main(int argc, char **argv) { options.end_frame); return exit_code(ToolExitCode::UsageError); } - - const std::filesystem::path output_path{options.output_path}; if (output_path.has_parent_path()) { std::filesystem::create_directories(output_path.parent_path()); } @@ -286,18 +1186,18 @@ int main(int argc, char **argv) { }; sl::InitParameters init{}; - init.input.setFromSVOFile(options.input_path.c_str()); + init.input.setFromSVOFile(input_path.c_str()); init.svo_real_time_mode = false; - init.coordinate_system = sl::COORDINATE_SYSTEM::IMAGE; + init.coordinate_system = pose_tracking.coordinate_system; init.coordinate_units = sl::UNIT::METER; - init.depth_mode = *depth_mode; + init.depth_mode = depth_mode; init.sdk_verbose = false; const auto open_status = camera.open(init); if (open_status != sl::ERROR_CODE::SUCCESS) { spdlog::error( "failed to open SVO '{}': {}", - options.input_path, + input_path.string(), zed_status_string(open_status)); return exit_code(ToolExitCode::RuntimeError); } @@ -329,13 +1229,14 @@ int main(int argc, char **argv) { if (options.with_pose) { sl::PositionalTrackingParameters tracking_parameters{}; + tracking_parameters.set_floor_as_origin = pose_tracking.set_floor_as_origin; const auto tracking_status = camera.enablePositionalTracking(tracking_parameters); if (tracking_status == sl::ERROR_CODE::SUCCESS) { tracking_enabled = true; } else { spdlog::warn( "positional tracking unavailable for '{}': {}. continuing without pose output", - options.input_path, + input_path.string(), zed_status_string(tracking_status)); } } @@ -353,8 +1254,8 @@ int main(int argc, char **argv) { cvmmap_streamer::RuntimeConfig config = cvmmap_streamer::RuntimeConfig::defaults(); config.encoder.backend = cvmmap_streamer::EncoderBackendType::FFmpeg; - config.encoder.device = *encoder_device; - config.encoder.codec = *codec; + config.encoder.device = encoder_device; + config.encoder.codec = codec; config.encoder.gop = 30; config.encoder.b_frames = 0; config.record.mcap.enabled = true; @@ -364,7 +1265,7 @@ int main(int argc, char **argv) { config.record.mcap.calibration_topic = options.calibration_topic; config.record.mcap.pose_topic = options.pose_topic; config.record.mcap.frame_id = options.frame_id; - config.record.mcap.compression = *compression; + config.record.mcap.compression = compression; cvmmap_streamer::ipc::FrameInfo frame_info{ .width = static_cast(width), @@ -432,12 +1333,18 @@ int main(int argc, char **argv) { std::uint64_t emitted_frames{0}; std::optional last_timestamp_ns{}; std::optional last_tracking_state{}; + bool interrupted{false}; + bool shutdown_logged{false}; const auto last_frame = options.has_end_frame ? options.end_frame : static_cast(total_frames - 1); const auto nominal_frame_period_ns = frame_period_ns(camera_config.fps); while (options.start_frame + emitted_frames <= last_frame) { + if (log_shutdown_request(shutdown_logged, "single-camera export")) { + interrupted = true; + break; + } const auto grab_status = camera.grab(runtime_parameters); if (grab_status == sl::ERROR_CODE::END_OF_SVOFILE_REACHED) { break; @@ -583,7 +1490,7 @@ int main(int argc, char **argv) { } if (tracking_enabled) { - const auto tracking_state = camera.getPosition(pose, sl::REFERENCE_FRAME::WORLD); + const auto tracking_state = camera.getPosition(pose, pose_tracking.reference_frame); if (!last_tracking_state || *last_tracking_state != tracking_state) { last_tracking_state = tracking_state; if (tracking_state != sl::POSITIONAL_TRACKING_STATE::OK) { @@ -598,7 +1505,7 @@ int main(int argc, char **argv) { const auto orientation = pose.getOrientation(); cvmmap_streamer::record::RawPoseView pose_view{ .timestamp_ns = timestamp_ns, - .reference_frame_id = "world", + .reference_frame_id = pose_tracking.reference_frame_name, .position = { static_cast(translation.x), static_cast(translation.y), @@ -636,10 +1543,274 @@ int main(int argc, char **argv) { backend->shutdown(); close_camera(); + if (interrupted) { + spdlog::warn( + "gracefully stopped after writing {} frame(s) from '{}' to '{}'", + emitted_frames, + input_path.string(), + output_path.string()); + return interrupted_exit_code(); + } + spdlog::info( "wrote {} frame(s) from '{}' to '{}'", emitted_frames, - options.input_path, + input_path.string(), output_path.string()); return exit_code(ToolExitCode::Success); } + +[[nodiscard]] +int run_multi_source( + const CliOptions &options, + const std::vector &sources, + const std::filesystem::path &output_path, + const cvmmap_streamer::CodecType codec, + const cvmmap_streamer::EncoderDeviceType encoder_device, + const cvmmap_streamer::McapCompression compression, + const sl::DEPTH_MODE depth_mode, + const PoseTrackingOptions &pose_tracking) { + if (options.start_frame != 0 || options.has_end_frame) { + spdlog::error("multi-camera mode does not support --start-frame or --end-frame"); + return exit_code(ToolExitCode::UsageError); + } + bool interrupted{false}; + bool shutdown_logged{false}; + if (output_path.has_parent_path()) { + std::filesystem::create_directories(output_path.parent_path()); + } + + std::vector streams{}; + streams.reserve(sources.size()); + for (const auto &source : sources) { + if (log_shutdown_request(shutdown_logged, "multi-camera export")) { + close_camera_streams(streams); + return interrupted_exit_code(); + } + auto stream = open_camera_stream(source, options, codec, encoder_device, depth_mode, pose_tracking); + if (!stream) { + close_camera_streams(streams); + spdlog::error("{}", stream.error()); + return exit_code(ToolExitCode::RuntimeError); + } + streams.push_back(std::move(*stream)); + } + + const auto common_start_ts = std::max_element( + streams.begin(), + streams.end(), + [](const auto &left, const auto &right) { + return left.first_timestamp_ns < right.first_timestamp_ns; + })->first_timestamp_ns; + const auto common_end_ts = std::min_element( + streams.begin(), + streams.end(), + [](const auto &left, const auto &right) { + return left.last_timestamp_ns < right.last_timestamp_ns; + })->last_timestamp_ns; + if (common_start_ts > common_end_ts) { + close_camera_streams(streams); + spdlog::error("synced time window is empty: start_ts={} end_ts={}", common_start_ts, common_end_ts); + return exit_code(ToolExitCode::UsageError); + } + + const auto slowest_period_ns = std::max_element( + streams.begin(), + streams.end(), + [](const auto &left, const auto &right) { + return left.nominal_frame_period_ns < right.nominal_frame_period_ns; + })->nominal_frame_period_ns; + const auto tolerance_ns = options.has_sync_tolerance + ? static_cast(std::llround(options.sync_tolerance_ms * 1'000'000.0)) + : std::max(1, slowest_period_ns / 4); + spdlog::info( + "multi-camera sync window start_ts={} end_ts={} tolerance_ns={}", + common_start_ts, + common_end_ts, + tolerance_ns); + + auto sink = cvmmap_streamer::record::MultiMcapRecordSink::create(output_path.string(), compression); + if (!sink) { + close_camera_streams(streams); + spdlog::error("failed to create MCAP sink: {}", sink.error()); + return exit_code(ToolExitCode::RuntimeError); + } + if (auto registered = register_mcap_streams(*sink, streams, options); !registered) { + sink->close(); + close_camera_streams(streams); + spdlog::error("failed to register MCAP streams: {}", registered.error()); + return exit_code(ToolExitCode::RuntimeError); + } + if (auto synced = sync_streams_to_timestamp(streams, common_start_ts); !synced) { + sink->close(); + close_camera_streams(streams); + if (synced.error() == "interrupted") { + return interrupted_exit_code(); + } + spdlog::error("{}", synced.error()); + return exit_code(ToolExitCode::RuntimeError); + } + + std::uint64_t emitted_groups{0}; + while (true) { + if (log_shutdown_request(shutdown_logged, "multi-camera export")) { + interrupted = true; + break; + } + auto group_timestamp = next_synced_group_timestamp(streams, tolerance_ns, common_end_ts); + if (!group_timestamp) { + sink->close(); + close_camera_streams(streams); + if (group_timestamp.error() == "interrupted") { + return interrupted_exit_code(); + } + spdlog::error("sync failed: {}", group_timestamp.error()); + return exit_code(ToolExitCode::RuntimeError); + } + if (!*group_timestamp) { + break; + } + + if (auto write = encode_and_write_group(*sink, streams, options, **group_timestamp); !write) { + sink->close(); + close_camera_streams(streams); + spdlog::error("{}", write.error()); + return exit_code(ToolExitCode::RuntimeError); + } + emitted_groups += 1; + + auto advance = advance_after_emit(streams); + if (!advance) { + if (advance.error() == "end-of-svo") { + break; + } + sink->close(); + close_camera_streams(streams); + spdlog::error("{}", advance.error()); + return exit_code(ToolExitCode::RuntimeError); + } + } + + for (auto &stream : streams) { + if (auto flushed = flush_and_write(*sink, stream.mcap_stream_id, *stream.backend); !flushed) { + sink->close(); + close_camera_streams(streams); + spdlog::error("failed to finalize encoded video for {}: {}", stream.source.label, flushed.error()); + return exit_code(ToolExitCode::RuntimeError); + } + } + + sink->close(); + for (const auto &stream : streams) { + spdlog::info( + "bundled {} dropped_frame(s) for {}", + stream.dropped_frames, + stream.source.label); + } + close_camera_streams(streams); + + if (interrupted) { + spdlog::warn( + "gracefully stopped after writing {} synced frame group(s) across {} camera(s) to '{}'", + emitted_groups, + sources.size(), + output_path.string()); + return interrupted_exit_code(); + } + + spdlog::info( + "wrote {} synced frame group(s) across {} camera(s) to '{}'", + emitted_groups, + sources.size(), + output_path.string()); + return exit_code(ToolExitCode::Success); +} + +} // namespace + +int main(int argc, char **argv) { + CliOptions options{}; + install_signal_handlers(); + + CLI::App app{"zed_svo_to_mcap - convert ZED SVO/SVO2 playback to MCAP"}; + app.add_option("--input", options.input_paths, "Input SVO/SVO2 file (repeat for multi-camera mode)"); + app.add_option("--segment-dir", options.segment_dir, "Segment directory containing *_zed*.svo or *_zed*.svo2 files"); + app.add_option("--output", options.output_path, "Output MCAP file"); + app.add_option("--codec", options.codec, "Video codec (h264|h265)") + ->check(CLI::IsMember({"h264", "h265"})); + app.add_option("--encoder-device", options.encoder_device, "Encoder device (auto|nvidia|software)") + ->check(CLI::IsMember({"auto", "nvidia", "software"})); + app.add_flag("--with-pose", options.with_pose, "Emit foxglove.PoseInFrame when tracking is available"); + app.add_option("--mcap-compression", options.mcap_compression, "MCAP chunk compression (none|lz4|zstd)") + ->check(CLI::IsMember({"none", "lz4", "zstd"})); + app.add_option("--depth-mode", options.depth_mode, "ZED depth mode (neural|quality|performance|ultra)") + ->check(CLI::IsMember({"neural", "quality", "performance", "ultra"})); + app.add_option("--start-frame", options.start_frame, "First SVO frame to export (inclusive)") + ->check(CLI::NonNegativeNumber); + auto *end_frame_option = app.add_option("--end-frame", options.end_frame, "Last SVO frame to export (inclusive)") + ->check(CLI::NonNegativeNumber); + app.add_option("--frame-id", options.frame_id, "Frame id for image and depth topics"); + app.add_option("--video-topic", options.video_topic, "MCAP topic for foxglove.CompressedVideo"); + app.add_option("--depth-topic", options.depth_topic, "MCAP topic for cvmmap_streamer.DepthMap"); + app.add_option("--calibration-topic", options.calibration_topic, "MCAP topic for foxglove.CameraCalibration"); + app.add_option("--pose-topic", options.pose_topic, "MCAP topic for foxglove.PoseInFrame"); + app.add_option( + "--pose-config", + options.pose_config_path, + "TOML config file; reads zed.coordinate_system and zed.body_tracking.{reference_frame,set_floor_as_origin}"); + app.add_option("--world-frame-id", options.world_frame_id, "Reference frame id for pose output") + ->default_val("world"); + auto *sync_tolerance_option = app.add_option( + "--sync-tolerance-ms", + options.sync_tolerance_ms, + "Maximum allowed timestamp delta between cameras in multi-camera mode"); + sync_tolerance_option->check(CLI::PositiveNumber); + + try { + app.parse(argc, argv); + } catch (const CLI::ParseError &error) { + return app.exit(error); + } + options.has_end_frame = end_frame_option->count() > 0; + options.has_sync_tolerance = sync_tolerance_option->count() > 0; + + auto codec = parse_codec(options.codec); + if (!codec) { + spdlog::error("{}", codec.error()); + return exit_code(ToolExitCode::UsageError); + } + auto encoder_device = parse_encoder_device(options.encoder_device); + if (!encoder_device) { + spdlog::error("{}", encoder_device.error()); + return exit_code(ToolExitCode::UsageError); + } + auto compression = parse_mcap_compression(options.mcap_compression); + if (!compression) { + spdlog::error("{}", compression.error()); + return exit_code(ToolExitCode::UsageError); + } + auto depth_mode = parse_depth_mode(options.depth_mode); + if (!depth_mode) { + spdlog::error("{}", depth_mode.error()); + return exit_code(ToolExitCode::UsageError); + } + auto pose_tracking = load_pose_tracking_options(options); + if (!pose_tracking) { + spdlog::error("{}", pose_tracking.error()); + return exit_code(ToolExitCode::UsageError); + } + + auto sources = resolve_sources(options); + if (!sources) { + spdlog::error("{}", sources.error()); + return exit_code(ToolExitCode::UsageError); + } + + const auto output_path = derive_output_path(options, *sources); + if (sources->size() == 1) { + options.input_paths = {sources->front().path.string()}; + return run_single_source(options, output_path, *codec, *encoder_device, *compression, *depth_mode, *pose_tracking); + } + + return run_multi_source(options, *sources, output_path, *codec, *encoder_device, *compression, *depth_mode, *pose_tracking); +}