feat(record): add depth RVL recording to MCAP

This commit is contained in:
2026-03-11 21:15:25 +08:00
parent 782af9481c
commit 59ff8b79d9
15 changed files with 826 additions and 35 deletions
+14
View File
@@ -358,6 +358,10 @@ std::expected<void, std::string> apply_toml_file(RuntimeConfig &config, const st
config.record.mcap.enabled = true;
config.record.mcap.topic = *value;
}
if (auto value = toml_value<std::string>(table, "record.mcap.depth_topic")) {
config.record.mcap.enabled = true;
config.record.mcap.depth_topic = *value;
}
if (auto value = toml_value<std::string>(table, "record.mcap.frame_id")) {
config.record.mcap.enabled = true;
config.record.mcap.frame_id = *value;
@@ -555,6 +559,7 @@ std::expected<RuntimeConfig, std::string> parse_runtime_config(int argc, char **
std::string rtp_sdp_raw{};
std::string mcap_path_raw{};
std::string mcap_topic_raw{};
std::string mcap_depth_topic_raw{};
std::string mcap_frame_id_raw{};
std::string mcap_compression_raw{};
std::string queue_size_raw{};
@@ -594,6 +599,7 @@ std::expected<RuntimeConfig, std::string> parse_runtime_config(int argc, char **
app.add_flag("--mcap", mcap_enabled);
app.add_option("--mcap-path", mcap_path_raw);
app.add_option("--mcap-topic", mcap_topic_raw);
app.add_option("--mcap-depth-topic", mcap_depth_topic_raw);
app.add_option("--mcap-frame-id", mcap_frame_id_raw);
app.add_option("--mcap-compression", mcap_compression_raw);
app.add_option("--queue-size", queue_size_raw);
@@ -702,6 +708,10 @@ std::expected<RuntimeConfig, std::string> parse_runtime_config(int argc, char **
config.record.mcap.enabled = true;
config.record.mcap.topic = mcap_topic_raw;
}
if (!mcap_depth_topic_raw.empty()) {
config.record.mcap.enabled = true;
config.record.mcap.depth_topic = mcap_depth_topic_raw;
}
if (!mcap_frame_id_raw.empty()) {
config.record.mcap.enabled = true;
config.record.mcap.frame_id = mcap_frame_id_raw;
@@ -833,6 +843,9 @@ std::expected<void, std::string> validate_runtime_config(const RuntimeConfig &co
if (config.record.mcap.topic.empty()) {
return std::unexpected("invalid MCAP config: topic must not be empty");
}
if (config.record.mcap.depth_topic.empty()) {
return std::unexpected("invalid MCAP config: depth_topic must not be empty");
}
if (config.record.mcap.frame_id.empty()) {
return std::unexpected("invalid MCAP config: frame_id must not be empty");
}
@@ -871,6 +884,7 @@ std::string summarize_runtime_config(const RuntimeConfig &config) {
ss << ", mcap.enabled=" << (config.record.mcap.enabled ? "true" : "false");
ss << ", mcap.path=" << config.record.mcap.path;
ss << ", mcap.topic=" << config.record.mcap.topic;
ss << ", mcap.depth_topic=" << config.record.mcap.depth_topic;
ss << ", mcap.frame_id=" << config.record.mcap.frame_id;
ss << ", mcap.compression=" << to_string(config.record.mcap.compression);
ss << ", latency.queue_size=" << config.latency.queue_size;
+92 -1
View File
@@ -97,6 +97,19 @@ namespace {
return static_cast<PixelFormat>(pixel_format_raw);
}
[[nodiscard]]
DepthUnit from_core_depth_unit(const cvmmap::DepthUnit unit) {
switch (unit) {
case cvmmap::DepthUnit::Millimeter:
return DepthUnit::Millimeter;
case cvmmap::DepthUnit::Meter:
return DepthUnit::Meter;
case cvmmap::DepthUnit::Unknown:
default:
return DepthUnit::Unknown;
}
}
[[nodiscard]]
std::expected<ModuleStatus, ParseError> validate_module_status(std::int32_t status_raw) {
switch (status_raw) {
@@ -119,6 +132,9 @@ namespace {
if (error.contains("version")) {
return ParseError::UnsupportedVersion;
}
if (error.contains("depth_unit")) {
return ParseError::InvalidDepthUnit;
}
if (error.contains("depth")) {
return ParseError::InvalidDepth;
}
@@ -145,6 +161,58 @@ namespace {
return converted;
}
[[nodiscard]]
FrameInfo from_core_frame_info(const cvmmap::frame_info_t &info) {
FrameInfo converted{};
converted.width = info.width;
converted.height = info.height;
converted.channels = info.channels;
converted.depth = static_cast<Depth>(info.depth);
converted.pixel_format = static_cast<PixelFormat>(info.pixel_format);
converted.buffer_size = info.buffer_size;
return converted;
}
[[nodiscard]]
std::size_t span_offset(
const std::span<const std::uint8_t> outer,
const std::span<const std::uint8_t> inner) {
if (inner.empty()) {
return 0;
}
return static_cast<std::size_t>(inner.data() - outer.data());
}
[[nodiscard]]
std::size_t payload_bytes_for(
const std::span<const std::uint8_t> payload_region,
const cvmmap::parsed_frame_metadata_t &metadata) {
std::size_t payload_bytes = metadata.left_plane.size();
const auto consider = [&](std::span<const std::uint8_t> plane) {
if (plane.empty()) {
return;
}
payload_bytes = std::max(
payload_bytes,
span_offset(payload_region, plane) + plane.size());
};
consider(metadata.depth_plane);
consider(metadata.confidence_plane);
return payload_bytes;
}
[[nodiscard]]
std::span<const std::uint8_t> translate_span(
const std::span<const std::uint8_t> source_payload,
const std::span<const std::uint8_t> destination_payload,
const std::span<const std::uint8_t> source_plane) {
if (source_plane.empty()) {
return {};
}
const auto offset = span_offset(source_payload, source_plane);
return destination_payload.subspan(offset, source_plane.size());
}
}
std::string_view to_string(ParseError error) {
@@ -159,6 +227,8 @@ std::string_view to_string(ParseError error) {
return "unsupported version";
case ParseError::InvalidDepth:
return "invalid depth";
case ParseError::InvalidDepthUnit:
return "invalid depth unit";
case ParseError::InvalidPixelFormat:
return "invalid pixel format";
case ParseError::InvalidModuleStatus:
@@ -344,9 +414,22 @@ std::expected<ValidatedShmView, ParseError> validate_shm_region(std::span<const
return std::unexpected(map_core_parser_error(metadata_result.error()));
}
const auto payload_region = shm_region.subspan(kShmPayloadOffset);
const auto payload_bytes = payload_bytes_for(payload_region, *metadata_result);
return ValidatedShmView{
.metadata = from_core_metadata(metadata_result->normalized_metadata),
.payload = metadata_result->left_plane};
.depth_unit = from_core_depth_unit(metadata_result->depth_unit),
.payload = payload_region.first(payload_bytes),
.left = metadata_result->left_plane,
.depth_info = metadata_result->depth_info
? std::optional<FrameInfo>(from_core_frame_info(*metadata_result->depth_info))
: std::nullopt,
.depth = metadata_result->depth_plane,
.confidence_info = metadata_result->confidence_info
? std::optional<FrameInfo>(from_core_frame_info(*metadata_result->confidence_info))
: std::nullopt,
.confidence = metadata_result->confidence_plane};
}
std::expected<CoherentSnapshot, SnapshotError> read_coherent_snapshot(
@@ -378,8 +461,16 @@ std::expected<CoherentSnapshot, SnapshotError> read_coherent_snapshot(
return std::unexpected(SnapshotError::TornRead);
}
const auto copied_payload = std::span<const std::uint8_t>(destination.data(), first->payload.size());
return CoherentSnapshot{
.metadata = first->metadata,
.depth_unit = first->depth_unit,
.left = translate_span(first->payload, copied_payload, first->left),
.depth_info = first->depth_info,
.depth = translate_span(first->payload, copied_payload, first->depth),
.confidence_info = first->confidence_info,
.confidence = translate_span(first->payload, copied_payload, first->confidence),
.bytes_copied = first->payload.size()};
}
+1
View File
@@ -34,6 +34,7 @@ constexpr std::array<std::string_view, 32> kHelpLines{
" --mcap\t\tenable MCAP recording",
" --mcap-path <path>\tMCAP output file",
" --mcap-topic <topic>\tMCAP topic name",
" --mcap-depth-topic <topic>\tMCAP depth topic name",
" --mcap-frame-id <id>\tFoxglove CompressedVideo frame_id",
" --mcap-compression <mode>\tnone|lz4|zstd",
"",
+64
View File
@@ -192,6 +192,44 @@ bool frame_info_equal(const ipc::FrameInfo &lhs, const ipc::FrameInfo &rhs) {
lhs.buffer_size == rhs.buffer_size;
}
[[nodiscard]]
std::expected<record::RawDepthMapView, std::string> make_depth_map_view(const ipc::CoherentSnapshot &snapshot) {
if (!snapshot.depth_info) {
return std::unexpected("depth plane metadata is missing");
}
if (snapshot.depth.empty()) {
return std::unexpected("depth plane bytes are missing");
}
if (snapshot.depth_unit == ipc::DepthUnit::Unknown) {
return std::unexpected("depth plane unit is unknown");
}
const auto &depth_info = *snapshot.depth_info;
if (depth_info.depth != ipc::Depth::F32 || depth_info.pixel_format != ipc::PixelFormat::GRAY) {
return std::unexpected("depth plane must be GRAY/F32");
}
const auto pixel_count = static_cast<std::size_t>(depth_info.width) * static_cast<std::size_t>(depth_info.height);
const auto expected_bytes = pixel_count * sizeof(float);
if (snapshot.depth.size() != expected_bytes) {
return std::unexpected(
"depth plane byte size does not match width*height*sizeof(float)");
}
if ((reinterpret_cast<std::uintptr_t>(snapshot.depth.data()) % alignof(float)) != 0) {
return std::unexpected("depth plane is not aligned for float access");
}
return record::RawDepthMapView{
.timestamp_ns = snapshot.metadata.timestamp_ns,
.width = depth_info.width,
.height = depth_info.height,
.source_unit = snapshot.depth_unit,
.pixels = std::span<const float>(
reinterpret_cast<const float *>(snapshot.depth.data()),
pixel_count),
};
}
[[nodiscard]]
Status publish_access_units(
const RuntimeConfig &config,
@@ -338,6 +376,7 @@ int run_pipeline(const RuntimeConfig &config) {
std::optional<ipc::FrameInfo> active_info{};
std::optional<ipc::FrameInfo> restart_target_info{};
bool restart_pending{false};
bool warned_unknown_depth_unit{false};
const auto restart_backend = [&](std::string_view reason, std::optional<ipc::FrameInfo> target_info) {
if (started) {
@@ -352,6 +391,7 @@ int run_pipeline(const RuntimeConfig &config) {
started = false;
restart_pending = true;
restart_target_info = target_info;
warned_unknown_depth_unit = false;
rtmp_output.reset();
};
@@ -404,6 +444,7 @@ int run_pipeline(const RuntimeConfig &config) {
started = true;
restart_pending = false;
restart_target_info.reset();
warned_unknown_depth_unit = false;
active_info = target_info;
return {};
};
@@ -510,6 +551,29 @@ int run_pipeline(const RuntimeConfig &config) {
continue;
}
if (mcap_sink.has_value() && !snapshot->depth.empty()) {
if (snapshot->depth_unit == ipc::DepthUnit::Unknown) {
if (!warned_unknown_depth_unit) {
spdlog::warn("pipeline depth plane present but depth_unit is unknown; skipping depth MCAP recording");
warned_unknown_depth_unit = true;
}
} else {
auto depth_map = make_depth_map_view(*snapshot);
if (!depth_map) {
const auto reason = "pipeline depth snapshot invalid: " + depth_map.error();
restart_backend(reason, active_info);
continue;
}
auto write_depth = mcap_sink->write_depth_map(*depth_map);
if (!write_depth) {
const auto reason = "pipeline depth MCAP write failed: " + write_depth.error();
restart_backend(reason, active_info);
continue;
}
}
}
stats.pushed_frames += 1;
auto drain = drain_encoder(
config,
+192 -6
View File
@@ -1,16 +1,19 @@
#define MCAP_IMPLEMENTATION
#include <mcap/writer.hpp>
#include "cvmmap_streamer/record/mcap_record_sink.hpp"
#include "protobuf_descriptor.hpp"
#include "cvmmap_streamer/DepthMap.pb.h"
#include "foxglove/CompressedVideo.pb.h"
#include <rvl/rvl.hpp>
#include <google/protobuf/timestamp.pb.h>
#include <cmath>
#include <cstddef>
#include <cstdint>
#include <expected>
#include <limits>
#include <memory>
#include <span>
#include <string>
@@ -22,11 +25,20 @@ namespace cvmmap_streamer::record {
namespace {
constexpr float kRvlDepthQuantization = 200.0f;
constexpr float kMinDepthMaxMeters = 20.0f;
[[nodiscard]]
std::string codec_format(CodecType codec) {
return codec == CodecType::H265 ? "h265" : "h264";
}
struct EncodedDepthPayload {
DepthEncoding encoding{DepthEncoding::RvlF32};
ipc::DepthUnit storage_unit{ipc::DepthUnit::Unknown};
std::vector<std::uint8_t> bytes{};
};
[[nodiscard]]
mcap::Compression to_mcap_compression(McapCompression compression) {
switch (compression) {
@@ -48,6 +60,43 @@ google::protobuf::Timestamp to_proto_timestamp(std::uint64_t timestamp_ns) {
return timestamp;
}
[[nodiscard]]
cvmmap_streamer::DepthMap::DepthUnit to_proto_depth_unit(ipc::DepthUnit unit) {
switch (unit) {
case ipc::DepthUnit::Millimeter:
return cvmmap_streamer::DepthMap::DEPTH_UNIT_MILLIMETER;
case ipc::DepthUnit::Meter:
return cvmmap_streamer::DepthMap::DEPTH_UNIT_METER;
case ipc::DepthUnit::Unknown:
default:
return cvmmap_streamer::DepthMap::DEPTH_UNIT_UNKNOWN;
}
}
[[nodiscard]]
cvmmap_streamer::DepthMap::StorageUnit to_proto_storage_unit(ipc::DepthUnit unit) {
switch (unit) {
case ipc::DepthUnit::Millimeter:
return cvmmap_streamer::DepthMap::STORAGE_UNIT_MILLIMETER;
case ipc::DepthUnit::Meter:
return cvmmap_streamer::DepthMap::STORAGE_UNIT_METER;
case ipc::DepthUnit::Unknown:
default:
return cvmmap_streamer::DepthMap::STORAGE_UNIT_UNKNOWN;
}
}
[[nodiscard]]
cvmmap_streamer::DepthMap::Encoding to_proto_depth_encoding(DepthEncoding encoding) {
switch (encoding) {
case DepthEncoding::RvlU16Lossless:
return cvmmap_streamer::DepthMap::RVL_U16_LOSSLESS;
case DepthEncoding::RvlF32:
default:
return cvmmap_streamer::DepthMap::RVL_F32;
}
}
void append_start_code(std::vector<std::uint8_t> &output) {
output.push_back(0x00);
output.push_back(0x00);
@@ -176,14 +225,96 @@ std::expected<std::vector<std::uint8_t>, std::string> decoder_config_to_annexb(
return avcc_to_annexb(decoder_config);
}
[[nodiscard]]
bool can_encode_lossless_u16_mm(const RawDepthMapView &depth_map) {
if (depth_map.source_unit != ipc::DepthUnit::Millimeter) {
return false;
}
for (const float sample : depth_map.pixels) {
if (!std::isfinite(sample) || sample <= 0.0f) {
continue;
}
if (sample > static_cast<float>(std::numeric_limits<std::uint16_t>::max())) {
return false;
}
if (std::fabs(sample - std::round(sample)) > 1e-3f) {
return false;
}
}
return true;
}
[[nodiscard]]
std::expected<EncodedDepthPayload, std::string> encode_depth_payload(const RawDepthMapView &depth_map) {
const auto pixel_count = static_cast<std::size_t>(depth_map.width) * static_cast<std::size_t>(depth_map.height);
if (depth_map.width == 0 || depth_map.height == 0) {
return std::unexpected("depth map dimensions must be non-zero");
}
if (pixel_count != depth_map.pixels.size()) {
return std::unexpected("depth map dimensions do not match the pixel buffer");
}
if (depth_map.source_unit == ipc::DepthUnit::Unknown) {
return std::unexpected("depth source unit is unknown");
}
try {
if (can_encode_lossless_u16_mm(depth_map)) {
std::vector<std::uint16_t> pixels(pixel_count, 0);
for (std::size_t index = 0; index < pixel_count; ++index) {
const float sample = depth_map.pixels[index];
if (!std::isfinite(sample) || sample <= 0.0f) {
continue;
}
pixels[index] = static_cast<std::uint16_t>(std::lrint(sample));
}
return EncodedDepthPayload{
.encoding = DepthEncoding::RvlU16Lossless,
.storage_unit = ipc::DepthUnit::Millimeter,
.bytes = rvl::compress_image(pixels, depth_map.height, depth_map.width),
};
}
std::vector<float> depth_m(pixel_count, std::numeric_limits<float>::quiet_NaN());
float finite_max_m = 0.0f;
for (std::size_t index = 0; index < pixel_count; ++index) {
float sample = depth_map.pixels[index];
if (depth_map.source_unit == ipc::DepthUnit::Millimeter && std::isfinite(sample)) {
sample *= 0.001f;
}
if (!std::isfinite(sample) || sample <= 0.0f) {
continue;
}
depth_m[index] = sample;
finite_max_m = std::max(finite_max_m, sample);
}
const auto parameters = rvl::make_quantization_parameters(
std::max(finite_max_m, kMinDepthMaxMeters),
kRvlDepthQuantization);
return EncodedDepthPayload{
.encoding = DepthEncoding::RvlF32,
.storage_unit = ipc::DepthUnit::Meter,
.bytes = rvl::compress_float_image(depth_m, depth_map.height, depth_map.width, parameters),
};
} catch (const std::exception &error) {
return std::unexpected(std::string("failed to RVL-encode depth map: ") + error.what());
}
}
}
struct McapRecordSink::State {
mcap::McapWriter writer{};
std::string path{};
std::string frame_id{};
mcap::ChannelId channel_id{0};
std::uint32_t sequence{0};
mcap::ChannelId video_channel_id{0};
mcap::ChannelId depth_channel_id{0};
std::uint32_t video_sequence{0};
std::uint32_t depth_sequence{0};
CodecType codec{CodecType::H264};
std::vector<std::uint8_t> keyframe_preamble{};
};
@@ -232,7 +363,20 @@ std::expected<McapRecordSink, std::string> McapRecordSink::create(
mcap::Channel channel(config.record.mcap.topic, "protobuf", schema.id);
state->writer.addChannel(channel);
state->channel_id = channel.id;
state->video_channel_id = channel.id;
const auto depth_descriptor_set = build_file_descriptor_set(cvmmap_streamer::DepthMap::descriptor());
std::string depth_schema_bytes{};
if (!depth_descriptor_set.SerializeToString(&depth_schema_bytes)) {
return std::unexpected("failed to serialize cvmmap_streamer.DepthMap descriptor set");
}
mcap::Schema depth_schema("cvmmap_streamer.DepthMap", "protobuf", depth_schema_bytes);
state->writer.addSchema(depth_schema);
mcap::Channel depth_channel(config.record.mcap.depth_topic, "protobuf", depth_schema.id);
state->writer.addChannel(depth_channel);
state->depth_channel_id = depth_channel.id;
sink.state_ = state.release();
auto update = sink.update_stream_info(stream_info);
@@ -281,8 +425,8 @@ std::expected<void, std::string> McapRecordSink::write_access_unit(const encode:
}
mcap::Message record{};
record.channelId = state_->channel_id;
record.sequence = state_->sequence++;
record.channelId = state_->video_channel_id;
record.sequence = state_->video_sequence++;
record.logTime = access_unit.source_timestamp_ns;
record.publishTime = access_unit.source_timestamp_ns;
record.data = reinterpret_cast<const std::byte *>(serialized.data());
@@ -295,6 +439,48 @@ std::expected<void, std::string> McapRecordSink::write_access_unit(const encode:
return {};
}
std::expected<void, std::string> McapRecordSink::write_depth_map(const RawDepthMapView &depth_map) {
if (state_ == nullptr) {
return std::unexpected("MCAP sink is not open");
}
auto encoded = encode_depth_payload(depth_map);
if (!encoded) {
return std::unexpected(encoded.error());
}
cvmmap_streamer::DepthMap message{};
*message.mutable_timestamp() = to_proto_timestamp(depth_map.timestamp_ns);
message.set_frame_id(state_->frame_id);
message.set_width(depth_map.width);
message.set_height(depth_map.height);
message.set_source_unit(to_proto_depth_unit(depth_map.source_unit));
message.set_storage_unit(to_proto_storage_unit(encoded->storage_unit));
message.set_encoding(to_proto_depth_encoding(encoded->encoding));
message.set_data(
reinterpret_cast<const char *>(encoded->bytes.data()),
static_cast<int>(encoded->bytes.size()));
std::string serialized{};
if (!message.SerializeToString(&serialized)) {
return std::unexpected("failed to serialize cvmmap_streamer.DepthMap");
}
mcap::Message record{};
record.channelId = state_->depth_channel_id;
record.sequence = state_->depth_sequence++;
record.logTime = depth_map.timestamp_ns;
record.publishTime = depth_map.timestamp_ns;
record.data = reinterpret_cast<const std::byte *>(serialized.data());
record.dataSize = serialized.size();
const auto write_status = state_->writer.write(record);
if (!write_status.ok()) {
return std::unexpected("failed to write MCAP depth message: " + write_status.message);
}
return {};
}
bool McapRecordSink::is_open() const {
return state_ != nullptr;
}
+3
View File
@@ -0,0 +1,3 @@
#define MCAP_IMPLEMENTATION
#include <mcap/reader.hpp>
#include <mcap/writer.hpp>
+90 -20
View File
@@ -2,6 +2,7 @@
#include <array>
#include <cstddef>
#include <cstdint>
#include <cstring>
#include <span>
#include <spdlog/spdlog.h>
@@ -29,12 +30,19 @@ constexpr std::size_t kVersionMajorOffset = 8;
constexpr std::size_t kVersionMinorOffset = 9;
constexpr std::size_t kFrameCountOffset = 12;
constexpr std::size_t kTimestampOffset = 16;
constexpr std::size_t kInfoWidthOffset = 24;
constexpr std::size_t kInfoHeightOffset = 26;
constexpr std::size_t kInfoChannelsOffset = 28;
constexpr std::size_t kInfoDepthOffset = 29;
constexpr std::size_t kInfoPixelFormatOffset = 30;
constexpr std::size_t kInfoBufferSizeOffset = 32;
constexpr std::size_t kPublishSeqOffset = 24;
constexpr std::size_t kPlaneCountOffset = 32;
constexpr std::size_t kPlaneMaskOffset = 33;
constexpr std::size_t kDescriptorOffsetField = 34;
constexpr std::size_t kDescriptorSizeField = 36;
constexpr std::size_t kDescriptorCapField = 38;
constexpr std::size_t kPayloadSizeOffset = 40;
constexpr std::size_t kDepthUnitOffset = 44;
constexpr std::size_t kDescriptorsOffset = 64;
constexpr std::size_t kDescriptorSizeBytes = 24;
constexpr std::size_t kLeftSizeBytes = 4;
constexpr std::size_t kDepthSizeBytes = 16;
constexpr std::size_t kPayloadSizeBytes = kLeftSizeBytes + kDepthSizeBytes;
void write_u16_le(std::span<std::uint8_t> buffer, std::size_t offset, std::uint16_t value) {
buffer[offset] = static_cast<std::uint8_t>(value & 0xffu);
@@ -54,25 +62,71 @@ void write_u64_le(std::span<std::uint8_t> buffer, std::size_t offset, std::uint6
}
}
void write_descriptor(
std::span<std::uint8_t> buffer,
std::size_t descriptor_index,
std::uint8_t plane_type,
std::uint8_t pixel_format,
std::uint8_t depth,
std::uint32_t width,
std::uint32_t height,
std::uint32_t stride_bytes,
std::uint32_t offset_bytes,
std::uint32_t size_bytes) {
const auto base = kDescriptorsOffset + descriptor_index * kDescriptorSizeBytes;
buffer[base + 0] = plane_type;
buffer[base + 1] = pixel_format;
buffer[base + 2] = depth;
write_u32_le(buffer, base + 4, width);
write_u32_le(buffer, base + 8, height);
write_u32_le(buffer, base + 12, stride_bytes);
write_u32_le(buffer, base + 16, offset_bytes);
write_u32_le(buffer, base + 20, size_bytes);
}
void write_metadata(
std::span<std::uint8_t> buffer,
std::uint32_t frame_count,
std::uint64_t timestamp_ns,
std::uint32_t payload_size) {
std::uint64_t timestamp_ns) {
std::copy(
cvmmap_streamer::ipc::kFrameMetadataMagic.begin(),
cvmmap_streamer::ipc::kFrameMetadataMagic.end(),
buffer.begin() + kMagicOffset);
buffer[kVersionMajorOffset] = cvmmap_streamer::ipc::kVersionMajor;
buffer[kVersionMajorOffset] = 2;
buffer[kVersionMinorOffset] = cvmmap_streamer::ipc::kVersionMinor;
write_u32_le(buffer, kFrameCountOffset, frame_count);
write_u64_le(buffer, kTimestampOffset, timestamp_ns);
write_u16_le(buffer, kInfoWidthOffset, 2);
write_u16_le(buffer, kInfoHeightOffset, 2);
buffer[kInfoChannelsOffset] = 1;
buffer[kInfoDepthOffset] = static_cast<std::uint8_t>(cvmmap_streamer::ipc::Depth::U8);
buffer[kInfoPixelFormatOffset] = static_cast<std::uint8_t>(cvmmap_streamer::ipc::PixelFormat::GRAY);
write_u32_le(buffer, kInfoBufferSizeOffset, payload_size);
write_u64_le(buffer, kPublishSeqOffset, frame_count);
buffer[kPlaneCountOffset] = 2;
buffer[kPlaneMaskOffset] = 0x03;
write_u16_le(buffer, kDescriptorOffsetField, static_cast<std::uint16_t>(kDescriptorsOffset));
write_u16_le(buffer, kDescriptorSizeField, static_cast<std::uint16_t>(kDescriptorSizeBytes));
write_u16_le(buffer, kDescriptorCapField, 4);
write_u32_le(buffer, kPayloadSizeOffset, static_cast<std::uint32_t>(kPayloadSizeBytes));
buffer[kDepthUnitOffset] = static_cast<std::uint8_t>(cvmmap_streamer::ipc::DepthUnit::Millimeter);
write_descriptor(
buffer,
0,
0,
static_cast<std::uint8_t>(cvmmap_streamer::ipc::PixelFormat::GRAY),
static_cast<std::uint8_t>(cvmmap_streamer::ipc::Depth::U8),
2,
2,
2,
0,
static_cast<std::uint32_t>(kLeftSizeBytes));
write_descriptor(
buffer,
1,
1,
static_cast<std::uint8_t>(cvmmap_streamer::ipc::PixelFormat::GRAY),
static_cast<std::uint8_t>(cvmmap_streamer::ipc::Depth::F32),
2,
2,
8,
static_cast<std::uint32_t>(kLeftSizeBytes),
static_cast<std::uint32_t>(kDepthSizeBytes));
}
}
@@ -83,22 +137,38 @@ int main(int argc, char **argv) {
return exit_code(TesterExitCode::Success);
}
std::array<std::uint8_t, cvmmap_streamer::ipc::kShmPayloadOffset + 32> shm{};
std::array<std::uint8_t, cvmmap_streamer::ipc::kShmPayloadOffset + kPayloadSizeBytes> shm{};
auto shm_view = std::span<std::uint8_t>(shm);
write_metadata(shm_view, 7, 2222, 32);
for (std::size_t i = 0; i < 32; ++i) {
write_metadata(shm_view, 7, 2222);
for (std::size_t i = 0; i < kLeftSizeBytes; ++i) {
shm[cvmmap_streamer::ipc::kShmPayloadOffset + i] = static_cast<std::uint8_t>(i + 1);
}
const float depth_samples[4] = {1000.0f, 2000.0f, 0.0f, -1.0f};
std::memcpy(
shm.data() + cvmmap_streamer::ipc::kShmPayloadOffset + kLeftSizeBytes,
depth_samples,
sizeof(depth_samples));
std::array<std::uint8_t, 32> destination{};
std::array<std::uint8_t, kPayloadSizeBytes> destination{};
auto valid = cvmmap_streamer::ipc::read_coherent_snapshot(shm_view, destination);
if (!valid) {
spdlog::error("coherent snapshot should succeed: {}", cvmmap_streamer::ipc::to_string(valid.error()));
return exit_code(TesterExitCode::ReadError);
}
if (valid->bytes_copied != 32 || valid->metadata.frame_count != 7 || valid->metadata.timestamp_ns != 2222) {
float copied_depth_first = 0.0f;
if (valid->depth.size() >= sizeof(float)) {
std::memcpy(&copied_depth_first, valid->depth.data(), sizeof(float));
}
if (valid->bytes_copied != kPayloadSizeBytes ||
valid->metadata.frame_count != 7 ||
valid->metadata.timestamp_ns != 2222 ||
valid->depth_unit != cvmmap_streamer::ipc::DepthUnit::Millimeter ||
valid->left.size() != kLeftSizeBytes ||
valid->depth.size() != kDepthSizeBytes ||
!valid->depth_info.has_value() ||
copied_depth_first != 1000.0f) {
spdlog::error("valid snapshot verification failed");
return exit_code(TesterExitCode::VerificationError);
}
+238
View File
@@ -0,0 +1,238 @@
#include <mcap/reader.hpp>
#include "cvmmap_streamer/DepthMap.pb.h"
#include "cvmmap_streamer/common.h"
#include "cvmmap_streamer/record/mcap_record_sink.hpp"
#include "foxglove/CompressedVideo.pb.h"
#include <rvl/rvl.hpp>
#include <spdlog/spdlog.h>
#include <cmath>
#include <cstdint>
#include <filesystem>
#include <limits>
#include <optional>
#include <string>
#include <vector>
namespace {
enum class TesterExitCode : int {
Success = 0,
CreateError = 2,
WriteError = 3,
OpenError = 4,
VerificationError = 5,
};
[[nodiscard]]
constexpr int exit_code(TesterExitCode code) {
return static_cast<int>(code);
}
[[nodiscard]]
bool approx_equal(float lhs, float rhs, float tolerance) {
return std::fabs(lhs - rhs) <= tolerance;
}
}
int main(int argc, char **argv) {
if (cvmmap_streamer::has_help_flag(argc, argv)) {
cvmmap_streamer::print_help("mcap_depth_record_tester");
return exit_code(TesterExitCode::Success);
}
const std::filesystem::path output_path =
argc > 1
? std::filesystem::path(argv[1])
: std::filesystem::temp_directory_path() / "cvmmap_streamer_depth_record_test.mcap";
if (output_path.has_parent_path()) {
std::filesystem::create_directories(output_path.parent_path());
}
cvmmap_streamer::RuntimeConfig config = cvmmap_streamer::RuntimeConfig::defaults();
config.record.mcap.enabled = true;
config.record.mcap.path = output_path.string();
config.record.mcap.topic = "/camera/video";
config.record.mcap.depth_topic = "/camera/depth";
config.record.mcap.frame_id = "camera";
config.record.mcap.compression = cvmmap_streamer::McapCompression::None;
cvmmap_streamer::encode::EncodedStreamInfo stream_info{};
stream_info.codec = cvmmap_streamer::CodecType::H264;
auto sink = cvmmap_streamer::record::McapRecordSink::create(config, stream_info);
if (!sink) {
spdlog::error("failed to create MCAP sink: {}", sink.error());
return exit_code(TesterExitCode::CreateError);
}
cvmmap_streamer::encode::EncodedAccessUnit access_unit{};
access_unit.codec = cvmmap_streamer::CodecType::H264;
access_unit.source_timestamp_ns = 10;
access_unit.stream_pts_ns = 10;
access_unit.keyframe = false;
access_unit.annexb_bytes = {0x00, 0x00, 0x00, 0x01, 0x09, 0x10};
if (auto write = sink->write_access_unit(access_unit); !write) {
spdlog::error("failed to write video access unit: {}", write.error());
return exit_code(TesterExitCode::WriteError);
}
const float depth_mm_pixels[4] = {
1000.0f,
2000.0f,
std::numeric_limits<float>::quiet_NaN(),
0.0f,
};
cvmmap_streamer::record::RawDepthMapView depth_mm{};
depth_mm.timestamp_ns = 11;
depth_mm.width = 2;
depth_mm.height = 2;
depth_mm.source_unit = cvmmap_streamer::ipc::DepthUnit::Millimeter;
depth_mm.pixels = depth_mm_pixels;
if (auto write = sink->write_depth_map(depth_mm); !write) {
spdlog::error("failed to write millimeter depth map: {}", write.error());
return exit_code(TesterExitCode::WriteError);
}
const float depth_m_pixels[4] = {
1.0f,
2.0f,
std::numeric_limits<float>::quiet_NaN(),
0.0f,
};
cvmmap_streamer::record::RawDepthMapView depth_m{};
depth_m.timestamp_ns = 12;
depth_m.width = 2;
depth_m.height = 2;
depth_m.source_unit = cvmmap_streamer::ipc::DepthUnit::Meter;
depth_m.pixels = depth_m_pixels;
if (auto write = sink->write_depth_map(depth_m); !write) {
spdlog::error("failed to write meter depth map: {}", write.error());
return exit_code(TesterExitCode::WriteError);
}
sink->close();
mcap::McapReader reader{};
const auto open_status = reader.open(output_path.string());
if (!open_status.ok()) {
spdlog::error("failed to open MCAP file '{}': {}", output_path.string(), open_status.message);
return exit_code(TesterExitCode::OpenError);
}
std::uint64_t video_messages{0};
std::vector<cvmmap_streamer::DepthMap> depth_messages{};
auto messages = reader.readMessages();
for (auto it = messages.begin(); it != messages.end(); ++it) {
if (it->schema == nullptr || it->channel == nullptr) {
spdlog::error("MCAP message missing schema or channel");
reader.close();
return exit_code(TesterExitCode::VerificationError);
}
if (it->schema->encoding != "protobuf" || it->channel->messageEncoding != "protobuf") {
spdlog::error("unexpected schema encoding in MCAP file");
reader.close();
return exit_code(TesterExitCode::VerificationError);
}
if (it->schema->name == "foxglove.CompressedVideo") {
foxglove::CompressedVideo video{};
if (!video.ParseFromArray(it->message.data, static_cast<int>(it->message.dataSize))) {
spdlog::error("failed to parse foxglove.CompressedVideo payload");
reader.close();
return exit_code(TesterExitCode::VerificationError);
}
if (it->channel->topic != "/camera/video" || video.frame_id() != "camera" || video.data().empty()) {
spdlog::error("video MCAP payload verification failed");
reader.close();
return exit_code(TesterExitCode::VerificationError);
}
video_messages += 1;
continue;
}
if (it->schema->name == "cvmmap_streamer.DepthMap") {
cvmmap_streamer::DepthMap depth{};
if (!depth.ParseFromArray(it->message.data, static_cast<int>(it->message.dataSize))) {
spdlog::error("failed to parse cvmmap_streamer.DepthMap payload");
reader.close();
return exit_code(TesterExitCode::VerificationError);
}
if (it->channel->topic != "/camera/depth" || depth.frame_id() != "camera") {
spdlog::error("depth MCAP payload verification failed");
reader.close();
return exit_code(TesterExitCode::VerificationError);
}
depth_messages.push_back(std::move(depth));
}
}
reader.close();
if (video_messages != 1 || depth_messages.size() != 2) {
spdlog::error(
"unexpected message counts: video={} depth={}",
video_messages,
depth_messages.size());
return exit_code(TesterExitCode::VerificationError);
}
const auto &mm_message = depth_messages[0];
if (mm_message.source_unit() != cvmmap_streamer::DepthMap::DEPTH_UNIT_MILLIMETER ||
mm_message.storage_unit() != cvmmap_streamer::DepthMap::STORAGE_UNIT_MILLIMETER ||
mm_message.encoding() != cvmmap_streamer::DepthMap::RVL_U16_LOSSLESS) {
spdlog::error("millimeter depth metadata verification failed");
return exit_code(TesterExitCode::VerificationError);
}
const auto mm_info = rvl::inspect_image(std::span<const std::uint8_t>(
reinterpret_cast<const std::uint8_t *>(mm_message.data().data()),
mm_message.data().size()));
const auto mm_decoded = rvl::decompress_image(std::span<const std::uint8_t>(
reinterpret_cast<const std::uint8_t *>(mm_message.data().data()),
mm_message.data().size()));
if (mm_info.format != rvl::ImageFormat::UInt16Lossless ||
mm_info.rows != 2 ||
mm_info.cols != 2 ||
mm_decoded.pixels.size() != 4 ||
mm_decoded.pixels[0] != 1000 ||
mm_decoded.pixels[1] != 2000 ||
mm_decoded.pixels[2] != 0 ||
mm_decoded.pixels[3] != 0) {
spdlog::error("millimeter RVL round-trip verification failed");
return exit_code(TesterExitCode::VerificationError);
}
const auto &m_message = depth_messages[1];
if (m_message.source_unit() != cvmmap_streamer::DepthMap::DEPTH_UNIT_METER ||
m_message.storage_unit() != cvmmap_streamer::DepthMap::STORAGE_UNIT_METER ||
m_message.encoding() != cvmmap_streamer::DepthMap::RVL_F32) {
spdlog::error("meter depth metadata verification failed");
return exit_code(TesterExitCode::VerificationError);
}
const auto m_info = rvl::inspect_image(std::span<const std::uint8_t>(
reinterpret_cast<const std::uint8_t *>(m_message.data().data()),
m_message.data().size()));
const auto m_decoded = rvl::decompress_float_image(std::span<const std::uint8_t>(
reinterpret_cast<const std::uint8_t *>(m_message.data().data()),
m_message.data().size()));
if (m_info.format != rvl::ImageFormat::Float32InverseDepth ||
m_info.rows != 2 ||
m_info.cols != 2 ||
m_decoded.pixels.size() != 4 ||
!approx_equal(m_decoded.pixels[0], 1.0f, 0.02f) ||
!approx_equal(m_decoded.pixels[1], 2.0f, 0.02f) ||
!std::isnan(m_decoded.pixels[2]) ||
!std::isnan(m_decoded.pixels[3])) {
spdlog::error("meter RVL round-trip verification failed");
return exit_code(TesterExitCode::VerificationError);
}
spdlog::info(
"validated same-file MCAP video+depth recording at '{}'",
output_path.string());
return exit_code(TesterExitCode::Success);
}
-1
View File
@@ -1,4 +1,3 @@
#define MCAP_IMPLEMENTATION
#include <mcap/reader.hpp>
#include <foxglove/CompressedVideo.pb.h>
-1
View File
@@ -1,4 +1,3 @@
#define MCAP_IMPLEMENTATION
#include <mcap/reader.hpp>
#include <foxglove/CompressedVideo.pb.h>