From 172df3022590471b7ea5f824982b09c659a7c923 Mon Sep 17 00:00:00 2001 From: crosstyan Date: Fri, 13 Mar 2026 17:30:57 +0800 Subject: [PATCH] feat(record): add raw ZED body MCAP capture --- CMakeLists.txt | 2 + docs/caveats.md | 4 + docs/compat_matrix.md | 2 + docs/mcap_body_tracking.md | 44 ++++ .../cvmmap_streamer/config/runtime_config.hpp | 1 + .../record/mcap_record_sink.hpp | 8 + src/config/runtime_config.cpp | 14 ++ src/ipc/help.cpp | 5 +- src/pipeline/pipeline_runtime.cpp | 108 +++++++- src/record/mcap_record_sink.cpp | 47 ++++ src/testers/mcap_body_inspector.cpp | 213 ++++++++++++++++ src/testers/mcap_body_record_tester.cpp | 238 ++++++++++++++++++ src/testers/mcap_reader_tester.cpp | 7 +- src/testers/mcap_replay_tester.cpp | 7 +- 14 files changed, 681 insertions(+), 19 deletions(-) create mode 100644 docs/mcap_body_tracking.md create mode 100644 src/testers/mcap_body_inspector.cpp create mode 100644 src/testers/mcap_body_record_tester.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt index dae3f84..2104afe 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -206,6 +206,8 @@ add_cvmmap_binary(rtmp_stub_tester src/testers/rtmp_stub_tester.cpp) add_cvmmap_binary(rtmp_output_tester src/testers/rtmp_output_tester.cpp) add_cvmmap_binary(ipc_snapshot_tester src/testers/ipc_snapshot_tester.cpp) add_cvmmap_binary(mcap_depth_record_tester src/testers/mcap_depth_record_tester.cpp) +add_cvmmap_binary(mcap_body_record_tester src/testers/mcap_body_record_tester.cpp) +add_cvmmap_binary(mcap_body_inspector src/testers/mcap_body_inspector.cpp) add_executable(mcap_reader_tester src/testers/mcap_reader_tester.cpp) target_include_directories(mcap_reader_tester diff --git a/docs/caveats.md b/docs/caveats.md index d5f9a97..c94edcd 100644 --- a/docs/caveats.md +++ b/docs/caveats.md @@ -86,6 +86,10 @@ If you need audio transport, muxing, or A/V sync, that is outside the current re MCAP recording stores one `foxglove.CompressedVideo` message per encoded access unit. Replay depends on encoded keyframes carrying decoder configuration; this is handled by the current writer on keyframes. +### MCAP Body Recording Is Raw Packet Capture + +When the producer exposes ZED body tracking, MCAP recording stores one raw `cvmmap.body_tracking.v1` message per body PUB packet on `/camera/body` by default. These payload bytes are not normalized by `cvmmap-streamer`; downstream consumers should parse them with the cv-mmap body-tracking v1 contract. + ## External Server Caveats ### Local SRS Defaults Can Hit `ulimit` diff --git a/docs/compat_matrix.md b/docs/compat_matrix.md index 5089164..2b611ca 100644 --- a/docs/compat_matrix.md +++ b/docs/compat_matrix.md @@ -45,6 +45,7 @@ MCAP support is validated separately with: - `./build/mcap_reader_tester` - `./build/mcap_replay_tester` +- `./build/mcap_body_record_tester` - `./scripts/replay_mcap.sh` Current recording scope: @@ -53,6 +54,7 @@ Current recording scope: |--------|-------|-------| | MCAP `foxglove.CompressedVideo` | H.264 | Stored as Annex B access units | | MCAP `foxglove.CompressedVideo` | H.265 | Stored as Annex B access units | +| MCAP raw body packets | `cvmmap.body_tracking.v1` | Payload bytes are identical to the cv-mmap ZED body PUB packet | ## Current Defaults diff --git a/docs/mcap_body_tracking.md b/docs/mcap_body_tracking.md new file mode 100644 index 0000000..fe72306 --- /dev/null +++ b/docs/mcap_body_tracking.md @@ -0,0 +1,44 @@ +# MCAP ZED Body Tracking Contract + +`cvmmap-streamer` records native cv-mmap ZED body-tracking packets into MCAP as a raw binary channel. + +## Channel + +- default topic: `/camera/body` +- `messageEncoding`: `cvmmap.body_tracking.v1` +- schema: none (`schemaId = 0`) +- channel metadata: + - `packet_layout = cvmmap_body_tracking_v1` + - `payload_format = raw` + - `source_transport = cv-mmap-body-pub` + +The channel is created lazily on the first valid body packet, so non-body inputs do not produce an empty `/camera/body` channel. + +## Payload + +Each MCAP message payload is byte-for-byte identical to the cv-mmap ZED body PUB packet: + +- 64-byte `body_tracking_message_header_t` +- followed by `body_count` packed `body_tracking_body_t` records + +The intended parser contract is the existing cv-mmap body-tracking v1 layout: + +- `cvmmap::parse_body_tracking_message(...)` +- `cv-mmap/docs/cvmmap_body_tracking_v1.ksy` + +This recording path does not remap joints, filter fields, or convert the packet into protobuf/JSON. + +## Timing And Alignment + +- `logTime` and `publishTime` use `header.timestamp_ns` +- if `header.timestamp_ns == 0`, they fall back to `header.sdk_timestamp_ns` +- cross-topic alignment should use body `frame_count` and timestamps, not MCAP file order + +## Configuration + +MCAP body capture follows the normal MCAP recording switch: + +- enable MCAP with `--mcap` or `record.mcap.enabled = true` +- override the body topic with `--mcap-body-topic` or `record.mcap.body_topic` + +There is no separate body-enable flag. If the input never emits body packets, no body channel is written. diff --git a/include/cvmmap_streamer/config/runtime_config.hpp b/include/cvmmap_streamer/config/runtime_config.hpp index 6004206..9bcd94a 100644 --- a/include/cvmmap_streamer/config/runtime_config.hpp +++ b/include/cvmmap_streamer/config/runtime_config.hpp @@ -84,6 +84,7 @@ struct McapRecordConfig { std::string path{"capture.mcap"}; std::string topic{"/camera/video"}; std::string depth_topic{"/camera/depth"}; + std::string body_topic{"/camera/body"}; std::string frame_id{"camera"}; McapCompression compression{McapCompression::Zstd}; }; diff --git a/include/cvmmap_streamer/record/mcap_record_sink.hpp b/include/cvmmap_streamer/record/mcap_record_sink.hpp index 9fd29e8..e89e582 100644 --- a/include/cvmmap_streamer/record/mcap_record_sink.hpp +++ b/include/cvmmap_streamer/record/mcap_record_sink.hpp @@ -26,6 +26,11 @@ struct RawDepthMapView { std::span pixels{}; }; +struct RawBodyTrackingMessageView { + std::uint64_t timestamp_ns{0}; + std::span bytes{}; +}; + class McapRecordSink { public: McapRecordSink() = default; @@ -51,6 +56,9 @@ public: [[nodiscard]] std::expected write_depth_map(const RawDepthMapView &depth_map); + [[nodiscard]] + std::expected write_body_tracking_message(const RawBodyTrackingMessageView &body_message); + [[nodiscard]] bool is_open() const; diff --git a/src/config/runtime_config.cpp b/src/config/runtime_config.cpp index 98e9c6e..64f327a 100644 --- a/src/config/runtime_config.cpp +++ b/src/config/runtime_config.cpp @@ -362,6 +362,10 @@ std::expected apply_toml_file(RuntimeConfig &config, const st config.record.mcap.enabled = true; config.record.mcap.depth_topic = *value; } + if (auto value = toml_value(table, "record.mcap.body_topic")) { + config.record.mcap.enabled = true; + config.record.mcap.body_topic = *value; + } if (auto value = toml_value(table, "record.mcap.frame_id")) { config.record.mcap.enabled = true; config.record.mcap.frame_id = *value; @@ -560,6 +564,7 @@ std::expected parse_runtime_config(int argc, char ** std::string mcap_path_raw{}; std::string mcap_topic_raw{}; std::string mcap_depth_topic_raw{}; + std::string mcap_body_topic_raw{}; std::string mcap_frame_id_raw{}; std::string mcap_compression_raw{}; std::string queue_size_raw{}; @@ -600,6 +605,7 @@ std::expected parse_runtime_config(int argc, char ** app.add_option("--mcap-path", mcap_path_raw); app.add_option("--mcap-topic", mcap_topic_raw); app.add_option("--mcap-depth-topic", mcap_depth_topic_raw); + app.add_option("--mcap-body-topic", mcap_body_topic_raw); app.add_option("--mcap-frame-id", mcap_frame_id_raw); app.add_option("--mcap-compression", mcap_compression_raw); app.add_option("--queue-size", queue_size_raw); @@ -712,6 +718,10 @@ std::expected parse_runtime_config(int argc, char ** config.record.mcap.enabled = true; config.record.mcap.depth_topic = mcap_depth_topic_raw; } + if (!mcap_body_topic_raw.empty()) { + config.record.mcap.enabled = true; + config.record.mcap.body_topic = mcap_body_topic_raw; + } if (!mcap_frame_id_raw.empty()) { config.record.mcap.enabled = true; config.record.mcap.frame_id = mcap_frame_id_raw; @@ -846,6 +856,9 @@ std::expected validate_runtime_config(const RuntimeConfig &co if (config.record.mcap.depth_topic.empty()) { return std::unexpected("invalid MCAP config: depth_topic must not be empty"); } + if (config.record.mcap.body_topic.empty()) { + return std::unexpected("invalid MCAP config: body_topic must not be empty"); + } if (config.record.mcap.frame_id.empty()) { return std::unexpected("invalid MCAP config: frame_id must not be empty"); } @@ -885,6 +898,7 @@ std::string summarize_runtime_config(const RuntimeConfig &config) { ss << ", mcap.path=" << config.record.mcap.path; ss << ", mcap.topic=" << config.record.mcap.topic; ss << ", mcap.depth_topic=" << config.record.mcap.depth_topic; + ss << ", mcap.body_topic=" << config.record.mcap.body_topic; ss << ", mcap.frame_id=" << config.record.mcap.frame_id; ss << ", mcap.compression=" << to_string(config.record.mcap.compression); ss << ", latency.queue_size=" << config.latency.queue_size; diff --git a/src/ipc/help.cpp b/src/ipc/help.cpp index 9f5eece..bcd9753 100644 --- a/src/ipc/help.cpp +++ b/src/ipc/help.cpp @@ -9,7 +9,7 @@ namespace cvmmap_streamer { namespace { -constexpr std::array kHelpLines{ +constexpr std::array kHelpLines{ "Usage:", " --help, -h\tshow this message", "", @@ -31,10 +31,11 @@ constexpr std::array kHelpLines{ " --rtmp-url \tadd RTMP destination (repeatable)", " --rtmp-transport \tlibavformat|ffmpeg_process", " --rtmp-ffmpeg \tffmpeg binary for ffmpeg_process transport", - " --mcap\t\tenable MCAP recording", + " --mcap\t\tenable MCAP recording", " --mcap-path \tMCAP output file", " --mcap-topic \tMCAP topic name", " --mcap-depth-topic \tMCAP depth topic name", + " --mcap-body-topic \tMCAP body topic name", " --mcap-frame-id \tFoxglove CompressedVideo frame_id", " --mcap-compression \tnone|lz4|zstd", "", diff --git a/src/pipeline/pipeline_runtime.cpp b/src/pipeline/pipeline_runtime.cpp index 46469e4..abd6d9d 100644 --- a/src/pipeline/pipeline_runtime.cpp +++ b/src/pipeline/pipeline_runtime.cpp @@ -8,6 +8,7 @@ #include "cvmmap_streamer/record/mcap_record_sink.hpp" #include +#include #include #include @@ -59,21 +60,23 @@ constexpr int exit_code(PipelineExitCode code) { struct ResolvedInputEndpoints { std::string shm_name; std::string zmq_endpoint; + std::string body_zmq_endpoint; }; [[nodiscard]] std::expected resolve_input_endpoints(const RuntimeConfig &config) { try { auto target = cvmmap::resolve_cvmmap_target_or_throw(config.input.uri); - spdlog::info( - "pipeline input resolved: uri='{}' shm_name='{}' zmq_endpoint='{}'", - config.input.uri, - target.shm_name, - target.zmq_addr); - return ResolvedInputEndpoints{ - .shm_name = target.shm_name, - .zmq_endpoint = target.zmq_addr, - }; + spdlog::info( + "pipeline input resolved: uri='{}' shm_name='{}' zmq_endpoint='{}'", + config.input.uri, + target.shm_name, + target.zmq_addr); + return ResolvedInputEndpoints{ + .shm_name = target.shm_name, + .zmq_endpoint = target.zmq_addr, + .body_zmq_endpoint = target.zmq_body_addr, + }; } catch (const std::exception &e) { return std::unexpected(std::string("invalid cvmmap input URI: ") + e.what()); } @@ -235,6 +238,14 @@ std::expected make_depth_map_view(const ip }; } +[[nodiscard]] +std::uint64_t body_tracking_timestamp_ns(const cvmmap::body_tracking_frame_t &frame) { + if (frame.header.timestamp_ns != 0) { + return frame.header.timestamp_ns; + } + return frame.header.sdk_timestamp_ns; +} + [[nodiscard]] Status publish_access_units( const RuntimeConfig &config, @@ -354,13 +365,24 @@ int run_pipeline(const RuntimeConfig &config) { zmq::socket_t subscriber(zmq_ctx, zmq::socket_type::sub); try { subscriber.set(zmq::sockopt::subscribe, ""); - subscriber.set(zmq::sockopt::rcvtimeo, 20); subscriber.connect(input_endpoints->zmq_endpoint); } catch (const zmq::error_t &e) { spdlog::error("pipeline subscribe failed on '{}': {}", input_endpoints->zmq_endpoint, e.what()); return exit_code(PipelineExitCode::SubscriberError); } + std::optional body_subscriber{}; + if (config.record.mcap.enabled) { + try { + body_subscriber.emplace(zmq_ctx, zmq::socket_type::sub); + body_subscriber->set(zmq::sockopt::subscribe, ""); + body_subscriber->connect(input_endpoints->body_zmq_endpoint); + } catch (const zmq::error_t &e) { + spdlog::error("pipeline body subscribe failed on '{}': {}", input_endpoints->body_zmq_endpoint, e.what()); + return exit_code(PipelineExitCode::SubscriberError); + } + } + std::optional rtp_publisher{}; std::optional rtmp_output{}; std::optional mcap_sink{}; @@ -464,9 +486,22 @@ int run_pipeline(const RuntimeConfig &config) { restart_backend(reason, active_info); } - zmq::message_t message; - const auto recv_result = subscriber.recv(message, zmq::recv_flags::none); - if (!recv_result) { + std::array poll_items{{ + {subscriber.handle(), 0, ZMQ_POLLIN, 0}, + {body_subscriber ? body_subscriber->handle() : nullptr, 0, ZMQ_POLLIN, 0}, + }}; + try { + zmq::poll(poll_items, std::chrono::milliseconds{20}); + } catch (const zmq::error_t &e) { + spdlog::error("pipeline poll failed: {}", e.what()); + return exit_code(PipelineExitCode::SubscriberError); + } + + const bool frame_socket_ready = (poll_items[0].revents & ZMQ_POLLIN) != 0; + const bool body_socket_ready = + body_subscriber.has_value() && + (poll_items[1].revents & ZMQ_POLLIN) != 0; + if (!frame_socket_ready && !body_socket_ready) { const auto now = std::chrono::steady_clock::now(); if (restart_pending && restart_target_info) { auto start_result = attempt_backend_start(*restart_target_info); @@ -497,6 +532,53 @@ int run_pipeline(const RuntimeConfig &config) { continue; } + if (body_socket_ready && body_subscriber) { + while (true) { + zmq::message_t body_message; + const auto recv_body = body_subscriber->recv(body_message, zmq::recv_flags::dontwait); + if (!recv_body) { + break; + } + + last_event = std::chrono::steady_clock::now(); + auto body_bytes = std::span( + static_cast(body_message.data()), + body_message.size()); + if (body_bytes.empty()) { + continue; + } + if (!mcap_sink) { + continue; + } + + auto parsed_body = cvmmap::parse_body_tracking_message(body_bytes); + if (!parsed_body) { + spdlog::warn("pipeline body packet parse error: {}", parsed_body.error()); + continue; + } + + auto write_body = mcap_sink->write_body_tracking_message(record::RawBodyTrackingMessageView{ + .timestamp_ns = body_tracking_timestamp_ns(*parsed_body), + .bytes = body_bytes, + }); + if (!write_body) { + const auto reason = "pipeline body MCAP write failed: " + write_body.error(); + restart_backend(reason, active_info); + break; + } + } + } + + if (!frame_socket_ready) { + continue; + } + + zmq::message_t message; + const auto recv_result = subscriber.recv(message, zmq::recv_flags::dontwait); + if (!recv_result) { + continue; + } + last_event = std::chrono::steady_clock::now(); auto bytes = std::span( static_cast(message.data()), diff --git a/src/record/mcap_record_sink.cpp b/src/record/mcap_record_sink.cpp index 2779ee4..0dfea5f 100644 --- a/src/record/mcap_record_sink.cpp +++ b/src/record/mcap_record_sink.cpp @@ -27,6 +27,7 @@ namespace { constexpr float kRvlDepthQuantization = 200.0f; constexpr float kMinDepthMaxMeters = 20.0f; +constexpr std::string_view kBodyTrackingMessageEncoding = "cvmmap.body_tracking.v1"; [[nodiscard]] std::string codec_format(CodecType codec) { @@ -60,6 +61,15 @@ mcap::Compression to_mcap_compression(McapCompression compression) { } } +[[nodiscard]] +mcap::KeyValueMap body_channel_metadata() { + return mcap::KeyValueMap{ + {"packet_layout", "cvmmap_body_tracking_v1"}, + {"payload_format", "raw"}, + {"source_transport", "cv-mmap-body-pub"}, + }; +} + [[nodiscard]] google::protobuf::Timestamp to_proto_timestamp(std::uint64_t timestamp_ns) { google::protobuf::Timestamp timestamp{}; @@ -317,10 +327,13 @@ struct McapRecordSink::State { mcap::McapWriter writer{}; std::string path{}; std::string frame_id{}; + std::string body_topic{}; mcap::ChannelId video_channel_id{0}; mcap::ChannelId depth_channel_id{0}; + mcap::ChannelId body_channel_id{0}; std::uint32_t video_sequence{0}; std::uint32_t depth_sequence{0}; + std::uint32_t body_sequence{0}; CodecType codec{CodecType::H264}; std::vector keyframe_preamble{}; }; @@ -350,6 +363,7 @@ std::expected McapRecordSink::create( auto state = std::make_unique(); state->path = config.record.mcap.path; state->frame_id = config.record.mcap.frame_id; + state->body_topic = config.record.mcap.body_topic; mcap::McapWriterOptions options(""); options.compression = to_mcap_compression(config.record.mcap.compression); @@ -488,6 +502,39 @@ std::expected McapRecordSink::write_depth_map(const RawDepthM return {}; } +std::expected McapRecordSink::write_body_tracking_message(const RawBodyTrackingMessageView &body_message) { + if (state_ == nullptr) { + return std::unexpected("MCAP sink is not open"); + } + if (body_message.bytes.empty()) { + return std::unexpected("body tracking payload is empty"); + } + + if (state_->body_channel_id == 0) { + mcap::Channel body_channel( + state_->body_topic, + kBodyTrackingMessageEncoding, + 0, + body_channel_metadata()); + state_->writer.addChannel(body_channel); + state_->body_channel_id = body_channel.id; + } + + mcap::Message record{}; + record.channelId = state_->body_channel_id; + record.sequence = state_->body_sequence++; + record.logTime = body_message.timestamp_ns; + record.publishTime = body_message.timestamp_ns; + record.data = reinterpret_cast(body_message.bytes.data()); + record.dataSize = body_message.bytes.size(); + + const auto write_status = state_->writer.write(record); + if (!write_status.ok()) { + return std::unexpected("failed to write MCAP body message: " + write_status.message); + } + return {}; +} + bool McapRecordSink::is_open() const { return state_ != nullptr; } diff --git a/src/testers/mcap_body_inspector.cpp b/src/testers/mcap_body_inspector.cpp new file mode 100644 index 0000000..2a64e6f --- /dev/null +++ b/src/testers/mcap_body_inspector.cpp @@ -0,0 +1,213 @@ +#include + +#include + +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace { + +enum class InspectorExitCode : int { + Success = 0, + OpenError = 2, + SchemaError = 3, + ParseError = 4, + Empty = 5, +}; + +[[nodiscard]] +constexpr int exit_code(InspectorExitCode code) { + return static_cast(code); +} + +struct Config { + std::string input_path{}; + std::string topic{"/camera/body"}; + std::uint32_t sample_limit{3}; +}; + +struct Sample { + std::uint32_t frame_count{0}; + std::uint64_t timestamp_ns{0}; + std::uint16_t body_count{0}; +}; + +struct Summary { + std::uint64_t message_count{0}; + std::uint64_t parsed_count{0}; + std::uint64_t total_bodies{0}; + std::uint64_t nonzero_messages{0}; + std::uint64_t parse_errors{0}; + std::uint64_t min_timestamp_ns{std::numeric_limits::max()}; + std::uint64_t max_timestamp_ns{0}; + std::uint32_t min_frame_count{std::numeric_limits::max()}; + std::uint32_t max_frame_count{0}; + std::optional message_encoding{}; + std::optional schema_name{}; + std::map body_count_histogram{}; + std::vector samples{}; +}; + +[[nodiscard]] +std::expected parse_args(int argc, char **argv) { + Config config{}; + CLI::App app{"mcap_body_inspector - summarize raw cvmmap body packets stored in MCAP"}; + app.add_option("input", config.input_path, "Input MCAP path")->required(); + app.add_option("--topic", config.topic, "Body topic to inspect"); + app.add_option("--sample-limit", config.sample_limit, "Number of non-zero body packets to sample") + ->check(CLI::NonNegativeNumber); + + try { + app.parse(argc, argv); + } catch (const CLI::ParseError &e) { + return std::unexpected(app.exit(e)); + } + return config; +} + +[[nodiscard]] +std::string_view schema_name_or_none(const std::optional &schema_name) { + return schema_name ? std::string_view(*schema_name) : std::string_view("none"); +} + +std::expected inspect_file(const Config &config) { + mcap::McapReader reader{}; + const auto open_status = reader.open(config.input_path); + if (!open_status.ok()) { + spdlog::error("failed to open MCAP file '{}': {}", config.input_path, open_status.message); + return std::unexpected(InspectorExitCode::OpenError); + } + + Summary summary{}; + auto messages = reader.readMessages(); + for (auto it = messages.begin(); it != messages.end(); ++it) { + if (it->channel == nullptr) { + spdlog::error("MCAP body message missing channel metadata"); + reader.close(); + return std::unexpected(InspectorExitCode::SchemaError); + } + if (it->channel->topic != config.topic) { + continue; + } + + summary.message_count += 1; + if (!summary.message_encoding) { + summary.message_encoding = it->channel->messageEncoding; + } + if (!summary.schema_name) { + summary.schema_name = it->schema ? std::optional(it->schema->name) : std::nullopt; + } + + auto bytes = std::span( + reinterpret_cast(it->message.data), + it->message.dataSize); + auto parsed = cvmmap::parse_body_tracking_message(bytes); + if (!parsed) { + summary.parse_errors += 1; + spdlog::warn("body packet parse error at logTime={}: {}", it->message.logTime, parsed.error()); + continue; + } + + summary.parsed_count += 1; + summary.total_bodies += parsed->bodies.size(); + const auto body_count = parsed->header.body_count; + summary.body_count_histogram[body_count] += 1; + if (body_count > 0) { + summary.nonzero_messages += 1; + if (summary.samples.size() < config.sample_limit) { + summary.samples.push_back(Sample{ + .frame_count = parsed->header.frame_count, + .timestamp_ns = parsed->header.timestamp_ns, + .body_count = body_count, + }); + } + } + + summary.min_timestamp_ns = std::min(summary.min_timestamp_ns, parsed->header.timestamp_ns); + summary.max_timestamp_ns = std::max(summary.max_timestamp_ns, parsed->header.timestamp_ns); + summary.min_frame_count = std::min(summary.min_frame_count, parsed->header.frame_count); + summary.max_frame_count = std::max(summary.max_frame_count, parsed->header.frame_count); + } + + reader.close(); + if (summary.message_count == 0) { + spdlog::error("no body messages found on topic '{}'", config.topic); + return std::unexpected(InspectorExitCode::Empty); + } + return summary; +} + +void print_summary(const Config &config, const Summary &summary) { + spdlog::info("body topic: {}", config.topic); + spdlog::info( + "messages={} parsed={} parse_errors={} encoding={} schema={}", + summary.message_count, + summary.parsed_count, + summary.parse_errors, + summary.message_encoding.value_or(""), + schema_name_or_none(summary.schema_name)); + spdlog::info( + "nonzero_messages={} total_bodies={} avg_bodies_per_message={:.3f}", + summary.nonzero_messages, + summary.total_bodies, + summary.parsed_count > 0 + ? static_cast(summary.total_bodies) / static_cast(summary.parsed_count) + : 0.0); + if (summary.parsed_count > 0) { + spdlog::info( + "frame_count_range=[{}, {}] timestamp_range_ns=[{}, {}]", + summary.min_frame_count, + summary.max_frame_count, + summary.min_timestamp_ns, + summary.max_timestamp_ns); + } + + std::string histogram{}; + for (const auto &[body_count, message_count] : summary.body_count_histogram) { + if (!histogram.empty()) { + histogram += ", "; + } + histogram += std::to_string(body_count); + histogram += "=>"; + histogram += std::to_string(message_count); + } + spdlog::info("body_count_histogram: {}", histogram.empty() ? "" : histogram); + + for (std::size_t index = 0; index < summary.samples.size(); ++index) { + const auto &sample = summary.samples[index]; + spdlog::info( + "sample[{}]: frame_count={} timestamp_ns={} body_count={}", + index, + sample.frame_count, + sample.timestamp_ns, + sample.body_count); + } +} + +} + +int main(int argc, char **argv) { + auto config = parse_args(argc, argv); + if (!config) { + return config.error(); + } + + auto summary = inspect_file(*config); + if (!summary) { + return exit_code(summary.error()); + } + + print_summary(*config, *summary); + return exit_code(InspectorExitCode::Success); +} diff --git a/src/testers/mcap_body_record_tester.cpp b/src/testers/mcap_body_record_tester.cpp new file mode 100644 index 0000000..16f363c --- /dev/null +++ b/src/testers/mcap_body_record_tester.cpp @@ -0,0 +1,238 @@ +#include + +#include +#include + +#include "cvmmap_streamer/common.h" +#include "cvmmap_streamer/record/mcap_record_sink.hpp" +#include "foxglove/CompressedVideo.pb.h" + +#include + +#include +#include +#include +#include +#include + +namespace { + +constexpr std::string_view kBodyTopic = "/camera/body"; +constexpr std::string_view kBodyEncoding = "cvmmap.body_tracking.v1"; + +enum class TesterExitCode : int { + Success = 0, + CreateError = 2, + WriteError = 3, + OpenError = 4, + VerificationError = 5, +}; + +[[nodiscard]] +constexpr int exit_code(TesterExitCode code) { + return static_cast(code); +} + +[[nodiscard]] +std::vector serialize_body_tracking_frame(const cvmmap::body_tracking_frame_t &frame) { + auto header = frame.header; + header._magic = cvmmap::BODY_TRACKING_MAGIC; + header.versions_major = cvmmap::VERSION_MAJOR; + header.versions_minor = cvmmap::VERSION_MINOR; + header.body_count = static_cast(frame.bodies.size()); + header.body_record_size = sizeof(cvmmap::body_tracking_body_t); + header.payload_size_bytes = static_cast( + frame.bodies.size() * sizeof(cvmmap::body_tracking_body_t)); + + std::vector bytes( + sizeof(cvmmap::body_tracking_message_header_t) + header.payload_size_bytes, + static_cast(0)); + std::memcpy(bytes.data(), &header, sizeof(header)); + if (!frame.bodies.empty()) { + std::memcpy( + bytes.data() + sizeof(header), + frame.bodies.data(), + header.payload_size_bytes); + } + return bytes; +} + +} + +int main(int argc, char **argv) { + if (cvmmap_streamer::has_help_flag(argc, argv)) { + cvmmap_streamer::print_help("mcap_body_record_tester"); + return exit_code(TesterExitCode::Success); + } + + const std::filesystem::path output_path = + argc > 1 + ? std::filesystem::path(argv[1]) + : std::filesystem::temp_directory_path() / "cvmmap_streamer_body_record_test.mcap"; + if (output_path.has_parent_path()) { + std::filesystem::create_directories(output_path.parent_path()); + } + + cvmmap_streamer::RuntimeConfig config = cvmmap_streamer::RuntimeConfig::defaults(); + config.record.mcap.enabled = true; + config.record.mcap.path = output_path.string(); + config.record.mcap.topic = "/camera/video"; + config.record.mcap.depth_topic = "/camera/depth"; + config.record.mcap.body_topic = std::string(kBodyTopic); + config.record.mcap.frame_id = "camera"; + config.record.mcap.compression = cvmmap_streamer::McapCompression::None; + + cvmmap_streamer::encode::EncodedStreamInfo stream_info{}; + stream_info.codec = cvmmap_streamer::CodecType::H264; + + auto sink = cvmmap_streamer::record::McapRecordSink::create(config, stream_info); + if (!sink) { + spdlog::error("failed to create MCAP sink: {}", sink.error()); + return exit_code(TesterExitCode::CreateError); + } + + cvmmap_streamer::encode::EncodedAccessUnit access_unit{}; + access_unit.codec = cvmmap_streamer::CodecType::H264; + access_unit.source_timestamp_ns = 10; + access_unit.stream_pts_ns = 10; + access_unit.keyframe = false; + access_unit.annexb_bytes = {0x00, 0x00, 0x00, 0x01, 0x09, 0x10}; + if (auto write = sink->write_access_unit(access_unit); !write) { + spdlog::error("failed to write video access unit: {}", write.error()); + return exit_code(TesterExitCode::WriteError); + } + + cvmmap::body_tracking_frame_t body_frame{}; + body_frame.header.frame_count = 17; + body_frame.header.timestamp_ns = 123456789ull; + body_frame.header.sdk_timestamp_ns = 123456700ull; + body_frame.header.body_format = cvmmap::BodyFormat::Body38; + body_frame.header.body_selection = cvmmap::BodyKeypointSelection::Full; + body_frame.header.detection_model = cvmmap::BodyTrackingModel::HumanBodyAccurate; + body_frame.header.inference_precision = cvmmap::InferencePrecision::FP32; + body_frame.header.set_coordinate_system(cvmmap::BodyCoordinateSystem::RightHandedYUp); + body_frame.header.set_reference_frame(cvmmap::BodyReferenceFrame::World); + body_frame.header.set_floor_as_origin(true); + + cvmmap::body_tracking_body_t body{}; + body.id = 42; + body.confidence = 88.0f; + body.position = {1.0f, 2.0f, 3.0f}; + body.keypoint_count = cvmmap::BODY_KEYPOINT_CAPACITY; + body.keypoint_3d[0] = {0.1f, 0.2f, 0.3f}; + body.keypoint_confidence[0] = 95.0f; + body_frame.bodies.push_back(body); + + const auto body_bytes = serialize_body_tracking_frame(body_frame); + if (auto write = sink->write_body_tracking_message(cvmmap_streamer::record::RawBodyTrackingMessageView{ + .timestamp_ns = body_frame.header.timestamp_ns, + .bytes = body_bytes, + }); !write) { + spdlog::error("failed to write body tracking packet: {}", write.error()); + return exit_code(TesterExitCode::WriteError); + } + + sink->close(); + + mcap::McapReader reader{}; + const auto open_status = reader.open(output_path.string()); + if (!open_status.ok()) { + spdlog::error("failed to open MCAP file '{}': {}", output_path.string(), open_status.message); + return exit_code(TesterExitCode::OpenError); + } + + std::uint64_t video_messages{0}; + std::uint64_t body_messages{0}; + + auto messages = reader.readMessages(); + for (auto it = messages.begin(); it != messages.end(); ++it) { + if (it->channel == nullptr) { + spdlog::error("MCAP message missing channel metadata"); + reader.close(); + return exit_code(TesterExitCode::VerificationError); + } + + if (it->schema != nullptr && it->schema->name == "foxglove.CompressedVideo") { + foxglove::CompressedVideo video{}; + if (!video.ParseFromArray(it->message.data, static_cast(it->message.dataSize))) { + spdlog::error("failed to parse foxglove.CompressedVideo payload"); + reader.close(); + return exit_code(TesterExitCode::VerificationError); + } + if (it->channel->topic != "/camera/video" || video.frame_id() != "camera" || video.data().empty()) { + spdlog::error("video MCAP payload verification failed"); + reader.close(); + return exit_code(TesterExitCode::VerificationError); + } + video_messages += 1; + continue; + } + + if (it->channel->topic != kBodyTopic) { + continue; + } + if (it->schema != nullptr) { + spdlog::error("body MCAP channel unexpectedly has a schema"); + reader.close(); + return exit_code(TesterExitCode::VerificationError); + } + if (it->channel->messageEncoding != kBodyEncoding) { + spdlog::error("unexpected body MCAP message encoding: {}", it->channel->messageEncoding); + reader.close(); + return exit_code(TesterExitCode::VerificationError); + } + if (it->channel->metadata.at("packet_layout") != "cvmmap_body_tracking_v1" || + it->channel->metadata.at("payload_format") != "raw") { + spdlog::error("unexpected body MCAP channel metadata"); + reader.close(); + return exit_code(TesterExitCode::VerificationError); + } + + auto recorded_body = std::span( + reinterpret_cast(it->message.data), + it->message.dataSize); + if (!std::equal(recorded_body.begin(), recorded_body.end(), body_bytes.begin(), body_bytes.end())) { + spdlog::error("body MCAP payload bytes differ from the original packet"); + reader.close(); + return exit_code(TesterExitCode::VerificationError); + } + if (it->message.logTime != body_frame.header.timestamp_ns || + it->message.publishTime != body_frame.header.timestamp_ns) { + spdlog::error("body MCAP timestamp verification failed"); + reader.close(); + return exit_code(TesterExitCode::VerificationError); + } + + auto parsed_body = cvmmap::parse_body_tracking_message(recorded_body); + if (!parsed_body) { + spdlog::error("recorded body MCAP payload could not be parsed: {}", parsed_body.error()); + reader.close(); + return exit_code(TesterExitCode::VerificationError); + } + if (parsed_body->header.frame_count != body_frame.header.frame_count || + parsed_body->header.timestamp_ns != body_frame.header.timestamp_ns || + !parsed_body->header.floor_as_origin() || + parsed_body->bodies.size() != 1 || + parsed_body->bodies.front().id != body.id) { + spdlog::error("parsed body MCAP payload verification failed"); + reader.close(); + return exit_code(TesterExitCode::VerificationError); + } + body_messages += 1; + } + + reader.close(); + + if (video_messages != 1 || body_messages != 1) { + spdlog::error( + "unexpected message counts: video={} body={}", + video_messages, + body_messages); + return exit_code(TesterExitCode::VerificationError); + } + + spdlog::info( + "validated same-file MCAP video+body recording at '{}'", + output_path.string()); + return exit_code(TesterExitCode::Success); +} diff --git a/src/testers/mcap_reader_tester.cpp b/src/testers/mcap_reader_tester.cpp index 6af95f6..3b477ad 100644 --- a/src/testers/mcap_reader_tester.cpp +++ b/src/testers/mcap_reader_tester.cpp @@ -89,11 +89,14 @@ int main(int argc, char **argv) { auto message_view = reader.readMessages(); for (auto it = message_view.begin(); it != message_view.end(); ++it) { - if (it->schema == nullptr || it->channel == nullptr) { - spdlog::error("MCAP message missing schema or channel metadata"); + if (it->channel == nullptr) { + spdlog::error("MCAP message missing channel metadata"); reader.close(); return exit_code(TesterExitCode::SchemaError); } + if (it->schema == nullptr) { + continue; + } if (it->schema->encoding != "protobuf" || it->schema->name != "foxglove.CompressedVideo") { continue; } diff --git a/src/testers/mcap_replay_tester.cpp b/src/testers/mcap_replay_tester.cpp index 005deaa..dc5dc15 100644 --- a/src/testers/mcap_replay_tester.cpp +++ b/src/testers/mcap_replay_tester.cpp @@ -97,11 +97,14 @@ std::expected, TesterExitCode> load_messages(const Co std::vector messages{}; auto view = reader.readMessages(); for (auto it = view.begin(); it != view.end(); ++it) { - if (it->schema == nullptr || it->channel == nullptr) { - spdlog::error("MCAP message missing schema or channel metadata"); + if (it->channel == nullptr) { + spdlog::error("MCAP message missing channel metadata"); reader.close(); return std::unexpected(TesterExitCode::SchemaError); } + if (it->schema == nullptr) { + continue; + } if (it->schema->encoding != "protobuf" || it->schema->name != "foxglove.CompressedVideo") { continue; }