feat: bundle synced multi-camera ZED SVO streams into MCAP
Extend zed_svo_to_mcap to export multiple SVO inputs or a kindergarten segment directory into a single MCAP with per-camera namespaced topics. Add a MultiMcapRecordSink so one writer can manage independent video, depth, calibration, pose, and body channels for multiple cameras while reusing the existing protobuf schemas and encoded access-unit flow. Implement strict timestamp synchronization for bundled exports by computing the common time window, seeking each SVO to the shared start, dropping unsynced frames, and only emitting groups that fall within the configured tolerance. Load per-camera positional tracking settings from a cv-mmap style TOML pose config, preserve single-camera behavior, and add graceful SIGINT or SIGTERM shutdown so interrupted exports flush encoders and finalize readable MCAP files. Add mcap_multi_record_tester coverage and switch zed_svo_to_mcap defaults to H.265 plus QUALITY depth mode.
This commit is contained in:
@@ -287,6 +287,7 @@ add_cvmmap_binary(mcap_depth_record_tester src/testers/mcap_depth_record_tester.
|
|||||||
add_cvmmap_binary(mcap_body_record_tester src/testers/mcap_body_record_tester.cpp)
|
add_cvmmap_binary(mcap_body_record_tester src/testers/mcap_body_record_tester.cpp)
|
||||||
add_cvmmap_binary(mcap_body_inspector src/testers/mcap_body_inspector.cpp)
|
add_cvmmap_binary(mcap_body_inspector src/testers/mcap_body_inspector.cpp)
|
||||||
add_cvmmap_binary(mcap_pose_record_tester src/testers/mcap_pose_record_tester.cpp)
|
add_cvmmap_binary(mcap_pose_record_tester src/testers/mcap_pose_record_tester.cpp)
|
||||||
|
add_cvmmap_binary(mcap_multi_record_tester src/testers/mcap_multi_record_tester.cpp)
|
||||||
|
|
||||||
add_executable(mcap_reader_tester src/testers/mcap_reader_tester.cpp)
|
add_executable(mcap_reader_tester src/testers/mcap_reader_tester.cpp)
|
||||||
target_include_directories(mcap_reader_tester
|
target_include_directories(mcap_reader_tester
|
||||||
|
|||||||
@@ -57,6 +57,15 @@ struct RawBodyTrackingMessageView {
|
|||||||
std::span<const std::uint8_t> bytes{};
|
std::span<const std::uint8_t> bytes{};
|
||||||
};
|
};
|
||||||
|
|
||||||
|
struct McapRecordStreamConfig {
|
||||||
|
std::string topic{"/camera/video"};
|
||||||
|
std::string depth_topic{"/camera/depth"};
|
||||||
|
std::string calibration_topic{"/camera/calibration"};
|
||||||
|
std::string pose_topic{"/camera/pose"};
|
||||||
|
std::string body_topic{"/camera/body"};
|
||||||
|
std::string frame_id{"camera"};
|
||||||
|
};
|
||||||
|
|
||||||
class McapRecordSink {
|
class McapRecordSink {
|
||||||
public:
|
public:
|
||||||
McapRecordSink() = default;
|
McapRecordSink() = default;
|
||||||
@@ -107,4 +116,75 @@ private:
|
|||||||
State *state_{nullptr};
|
State *state_{nullptr};
|
||||||
};
|
};
|
||||||
|
|
||||||
|
class MultiMcapRecordSink {
|
||||||
|
public:
|
||||||
|
using StreamId = std::size_t;
|
||||||
|
struct State;
|
||||||
|
|
||||||
|
MultiMcapRecordSink() = default;
|
||||||
|
~MultiMcapRecordSink();
|
||||||
|
|
||||||
|
MultiMcapRecordSink(const MultiMcapRecordSink &) = delete;
|
||||||
|
MultiMcapRecordSink &operator=(const MultiMcapRecordSink &) = delete;
|
||||||
|
|
||||||
|
MultiMcapRecordSink(MultiMcapRecordSink &&other) noexcept;
|
||||||
|
MultiMcapRecordSink &operator=(MultiMcapRecordSink &&other) noexcept;
|
||||||
|
|
||||||
|
[[nodiscard]]
|
||||||
|
static std::expected<MultiMcapRecordSink, std::string> create(
|
||||||
|
std::string path,
|
||||||
|
McapCompression compression);
|
||||||
|
|
||||||
|
[[nodiscard]]
|
||||||
|
std::expected<StreamId, std::string> add_stream(
|
||||||
|
const McapRecordStreamConfig &config,
|
||||||
|
const encode::EncodedStreamInfo &stream_info);
|
||||||
|
|
||||||
|
[[nodiscard]]
|
||||||
|
std::expected<void, std::string> update_stream_info(
|
||||||
|
StreamId stream_id,
|
||||||
|
const encode::EncodedStreamInfo &stream_info);
|
||||||
|
|
||||||
|
[[nodiscard]]
|
||||||
|
std::expected<void, std::string> write_access_unit(
|
||||||
|
StreamId stream_id,
|
||||||
|
const encode::EncodedAccessUnit &access_unit);
|
||||||
|
|
||||||
|
[[nodiscard]]
|
||||||
|
std::expected<void, std::string> write_depth_map(
|
||||||
|
StreamId stream_id,
|
||||||
|
const RawDepthMapView &depth_map);
|
||||||
|
|
||||||
|
[[nodiscard]]
|
||||||
|
std::expected<void, std::string> write_depth_map_u16(
|
||||||
|
StreamId stream_id,
|
||||||
|
const RawDepthMapU16View &depth_map);
|
||||||
|
|
||||||
|
[[nodiscard]]
|
||||||
|
std::expected<void, std::string> write_camera_calibration(
|
||||||
|
StreamId stream_id,
|
||||||
|
const RawCameraCalibrationView &calibration);
|
||||||
|
|
||||||
|
[[nodiscard]]
|
||||||
|
std::expected<void, std::string> write_pose(
|
||||||
|
StreamId stream_id,
|
||||||
|
const RawPoseView &pose);
|
||||||
|
|
||||||
|
[[nodiscard]]
|
||||||
|
std::expected<void, std::string> write_body_tracking_message(
|
||||||
|
StreamId stream_id,
|
||||||
|
const RawBodyTrackingMessageView &body_message);
|
||||||
|
|
||||||
|
[[nodiscard]]
|
||||||
|
bool is_open() const;
|
||||||
|
|
||||||
|
[[nodiscard]]
|
||||||
|
std::string_view path() const;
|
||||||
|
|
||||||
|
void close();
|
||||||
|
|
||||||
|
private:
|
||||||
|
State *state_{nullptr};
|
||||||
|
};
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -772,4 +772,484 @@ void McapRecordSink::close() {
|
|||||||
state_ = nullptr;
|
state_ = nullptr;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
struct MultiMcapRecordSink::State {
|
||||||
|
struct StreamState {
|
||||||
|
McapRecordStreamConfig config{};
|
||||||
|
mcap::ChannelId video_channel_id{0};
|
||||||
|
mcap::ChannelId depth_channel_id{0};
|
||||||
|
mcap::ChannelId calibration_channel_id{0};
|
||||||
|
mcap::ChannelId pose_channel_id{0};
|
||||||
|
mcap::ChannelId body_channel_id{0};
|
||||||
|
std::uint32_t video_sequence{0};
|
||||||
|
std::uint32_t depth_sequence{0};
|
||||||
|
std::uint32_t calibration_sequence{0};
|
||||||
|
std::uint32_t pose_sequence{0};
|
||||||
|
std::uint32_t body_sequence{0};
|
||||||
|
CodecType codec{CodecType::H264};
|
||||||
|
std::vector<std::uint8_t> keyframe_preamble{};
|
||||||
|
};
|
||||||
|
|
||||||
|
mcap::McapWriter writer{};
|
||||||
|
std::string path{};
|
||||||
|
mcap::SchemaId video_schema_id{0};
|
||||||
|
mcap::SchemaId depth_schema_id{0};
|
||||||
|
mcap::SchemaId calibration_schema_id{0};
|
||||||
|
mcap::SchemaId pose_schema_id{0};
|
||||||
|
std::vector<StreamState> streams{};
|
||||||
|
};
|
||||||
|
|
||||||
|
namespace {
|
||||||
|
|
||||||
|
[[nodiscard]]
|
||||||
|
MultiMcapRecordSink::State::StreamState *stream_state(
|
||||||
|
MultiMcapRecordSink::State *state,
|
||||||
|
const MultiMcapRecordSink::StreamId stream_id,
|
||||||
|
std::string &error) {
|
||||||
|
if (state == nullptr) {
|
||||||
|
error = "MCAP sink is not open";
|
||||||
|
return nullptr;
|
||||||
|
}
|
||||||
|
if (stream_id >= state->streams.size()) {
|
||||||
|
error = "invalid MCAP stream id";
|
||||||
|
return nullptr;
|
||||||
|
}
|
||||||
|
return &state->streams[stream_id];
|
||||||
|
}
|
||||||
|
|
||||||
|
[[nodiscard]]
|
||||||
|
std::expected<void, std::string> update_stream_info_impl(
|
||||||
|
MultiMcapRecordSink::State::StreamState &stream,
|
||||||
|
const encode::EncodedStreamInfo &stream_info) {
|
||||||
|
auto decoder_config_annexb = decoder_config_to_annexb(stream_info.codec, stream_info.decoder_config);
|
||||||
|
if (!decoder_config_annexb) {
|
||||||
|
return std::unexpected("failed to prepare MCAP keyframe decoder config: " + decoder_config_annexb.error());
|
||||||
|
}
|
||||||
|
stream.codec = stream_info.codec;
|
||||||
|
stream.keyframe_preamble = std::move(*decoder_config_annexb);
|
||||||
|
return {};
|
||||||
|
}
|
||||||
|
|
||||||
|
[[nodiscard]]
|
||||||
|
std::expected<void, std::string> validate_new_stream_config(
|
||||||
|
const MultiMcapRecordSink::State &state,
|
||||||
|
const McapRecordStreamConfig &config) {
|
||||||
|
if (config.topic.empty()) {
|
||||||
|
return std::unexpected("video topic is empty");
|
||||||
|
}
|
||||||
|
if (config.depth_topic.empty()) {
|
||||||
|
return std::unexpected("depth topic is empty");
|
||||||
|
}
|
||||||
|
if (config.frame_id.empty()) {
|
||||||
|
return std::unexpected("frame_id is empty");
|
||||||
|
}
|
||||||
|
|
||||||
|
const auto topic_in_use = [&state](const std::string &topic) {
|
||||||
|
if (topic.empty()) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
for (const auto &stream : state.streams) {
|
||||||
|
if (stream.config.topic == topic ||
|
||||||
|
stream.config.depth_topic == topic ||
|
||||||
|
stream.config.calibration_topic == topic ||
|
||||||
|
stream.config.pose_topic == topic ||
|
||||||
|
stream.config.body_topic == topic) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
};
|
||||||
|
|
||||||
|
for (const auto *topic : {
|
||||||
|
&config.topic,
|
||||||
|
&config.depth_topic,
|
||||||
|
&config.calibration_topic,
|
||||||
|
&config.pose_topic,
|
||||||
|
&config.body_topic,
|
||||||
|
}) {
|
||||||
|
if (topic_in_use(*topic)) {
|
||||||
|
return std::unexpected("duplicate MCAP topic: " + *topic);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return {};
|
||||||
|
}
|
||||||
|
|
||||||
|
} // namespace
|
||||||
|
|
||||||
|
MultiMcapRecordSink::~MultiMcapRecordSink() {
|
||||||
|
close();
|
||||||
|
}
|
||||||
|
|
||||||
|
MultiMcapRecordSink::MultiMcapRecordSink(MultiMcapRecordSink &&other) noexcept
|
||||||
|
: state_(other.state_) {
|
||||||
|
other.state_ = nullptr;
|
||||||
|
}
|
||||||
|
|
||||||
|
MultiMcapRecordSink &MultiMcapRecordSink::operator=(MultiMcapRecordSink &&other) noexcept {
|
||||||
|
if (this != &other) {
|
||||||
|
close();
|
||||||
|
state_ = other.state_;
|
||||||
|
other.state_ = nullptr;
|
||||||
|
}
|
||||||
|
return *this;
|
||||||
|
}
|
||||||
|
|
||||||
|
std::expected<MultiMcapRecordSink, std::string> MultiMcapRecordSink::create(
|
||||||
|
std::string path,
|
||||||
|
McapCompression compression) {
|
||||||
|
MultiMcapRecordSink sink{};
|
||||||
|
auto state = std::make_unique<State>();
|
||||||
|
state->path = std::move(path);
|
||||||
|
|
||||||
|
mcap::McapWriterOptions options("");
|
||||||
|
options.compression = to_mcap_compression(compression);
|
||||||
|
const auto open_status = state->writer.open(state->path, options);
|
||||||
|
if (!open_status.ok()) {
|
||||||
|
return std::unexpected("failed to open MCAP writer at '" + state->path + "': " + open_status.message);
|
||||||
|
}
|
||||||
|
|
||||||
|
const auto video_descriptor_set = build_file_descriptor_set(foxglove::CompressedVideo::descriptor());
|
||||||
|
std::string video_schema_bytes{};
|
||||||
|
if (!video_descriptor_set.SerializeToString(&video_schema_bytes)) {
|
||||||
|
return std::unexpected("failed to serialize foxglove.CompressedVideo descriptor set");
|
||||||
|
}
|
||||||
|
mcap::Schema video_schema("foxglove.CompressedVideo", "protobuf", video_schema_bytes);
|
||||||
|
state->writer.addSchema(video_schema);
|
||||||
|
state->video_schema_id = video_schema.id;
|
||||||
|
|
||||||
|
const auto depth_descriptor_set = build_file_descriptor_set(cvmmap_streamer::DepthMap::descriptor());
|
||||||
|
std::string depth_schema_bytes{};
|
||||||
|
if (!depth_descriptor_set.SerializeToString(&depth_schema_bytes)) {
|
||||||
|
return std::unexpected("failed to serialize cvmmap_streamer.DepthMap descriptor set");
|
||||||
|
}
|
||||||
|
mcap::Schema depth_schema("cvmmap_streamer.DepthMap", "protobuf", depth_schema_bytes);
|
||||||
|
state->writer.addSchema(depth_schema);
|
||||||
|
state->depth_schema_id = depth_schema.id;
|
||||||
|
|
||||||
|
const auto calibration_descriptor_set = build_file_descriptor_set(foxglove::CameraCalibration::descriptor());
|
||||||
|
std::string calibration_schema_bytes{};
|
||||||
|
if (!calibration_descriptor_set.SerializeToString(&calibration_schema_bytes)) {
|
||||||
|
return std::unexpected("failed to serialize foxglove.CameraCalibration descriptor set");
|
||||||
|
}
|
||||||
|
mcap::Schema calibration_schema("foxglove.CameraCalibration", "protobuf", calibration_schema_bytes);
|
||||||
|
state->writer.addSchema(calibration_schema);
|
||||||
|
state->calibration_schema_id = calibration_schema.id;
|
||||||
|
|
||||||
|
const auto pose_descriptor_set = build_file_descriptor_set(foxglove::PoseInFrame::descriptor());
|
||||||
|
std::string pose_schema_bytes{};
|
||||||
|
if (!pose_descriptor_set.SerializeToString(&pose_schema_bytes)) {
|
||||||
|
return std::unexpected("failed to serialize foxglove.PoseInFrame descriptor set");
|
||||||
|
}
|
||||||
|
mcap::Schema pose_schema("foxglove.PoseInFrame", "protobuf", pose_schema_bytes);
|
||||||
|
state->writer.addSchema(pose_schema);
|
||||||
|
state->pose_schema_id = pose_schema.id;
|
||||||
|
|
||||||
|
sink.state_ = state.release();
|
||||||
|
return sink;
|
||||||
|
}
|
||||||
|
|
||||||
|
std::expected<MultiMcapRecordSink::StreamId, std::string> MultiMcapRecordSink::add_stream(
|
||||||
|
const McapRecordStreamConfig &config,
|
||||||
|
const encode::EncodedStreamInfo &stream_info) {
|
||||||
|
if (state_ == nullptr) {
|
||||||
|
return std::unexpected("MCAP sink is not open");
|
||||||
|
}
|
||||||
|
if (auto valid = validate_new_stream_config(*state_, config); !valid) {
|
||||||
|
return std::unexpected(valid.error());
|
||||||
|
}
|
||||||
|
|
||||||
|
State::StreamState stream{};
|
||||||
|
stream.config = config;
|
||||||
|
|
||||||
|
mcap::Channel video_channel(config.topic, "protobuf", state_->video_schema_id);
|
||||||
|
state_->writer.addChannel(video_channel);
|
||||||
|
stream.video_channel_id = video_channel.id;
|
||||||
|
|
||||||
|
mcap::Channel depth_channel(config.depth_topic, "protobuf", state_->depth_schema_id);
|
||||||
|
state_->writer.addChannel(depth_channel);
|
||||||
|
stream.depth_channel_id = depth_channel.id;
|
||||||
|
|
||||||
|
if (auto updated = update_stream_info_impl(stream, stream_info); !updated) {
|
||||||
|
return std::unexpected(updated.error());
|
||||||
|
}
|
||||||
|
|
||||||
|
state_->streams.push_back(std::move(stream));
|
||||||
|
return state_->streams.size() - 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
std::expected<void, std::string> MultiMcapRecordSink::update_stream_info(
|
||||||
|
const StreamId stream_id,
|
||||||
|
const encode::EncodedStreamInfo &stream_info) {
|
||||||
|
std::string error{};
|
||||||
|
auto *stream = stream_state(state_, stream_id, error);
|
||||||
|
if (stream == nullptr) {
|
||||||
|
return std::unexpected(error);
|
||||||
|
}
|
||||||
|
return update_stream_info_impl(*stream, stream_info);
|
||||||
|
}
|
||||||
|
|
||||||
|
std::expected<void, std::string> MultiMcapRecordSink::write_access_unit(
|
||||||
|
const StreamId stream_id,
|
||||||
|
const encode::EncodedAccessUnit &access_unit) {
|
||||||
|
std::string error{};
|
||||||
|
auto *stream = stream_state(state_, stream_id, error);
|
||||||
|
if (stream == nullptr) {
|
||||||
|
return std::unexpected(error);
|
||||||
|
}
|
||||||
|
|
||||||
|
foxglove::CompressedVideo message{};
|
||||||
|
*message.mutable_timestamp() = to_proto_timestamp(access_unit.source_timestamp_ns);
|
||||||
|
message.set_frame_id(stream->config.frame_id);
|
||||||
|
message.set_format(codec_format(access_unit.codec));
|
||||||
|
std::vector<std::uint8_t> payload{};
|
||||||
|
if (access_unit.keyframe && !stream->keyframe_preamble.empty()) {
|
||||||
|
payload.reserve(stream->keyframe_preamble.size() + access_unit.annexb_bytes.size());
|
||||||
|
payload.insert(payload.end(), stream->keyframe_preamble.begin(), stream->keyframe_preamble.end());
|
||||||
|
}
|
||||||
|
payload.insert(payload.end(), access_unit.annexb_bytes.begin(), access_unit.annexb_bytes.end());
|
||||||
|
message.set_data(
|
||||||
|
reinterpret_cast<const char *>(payload.data()),
|
||||||
|
static_cast<int>(payload.size()));
|
||||||
|
|
||||||
|
std::string serialized{};
|
||||||
|
if (!message.SerializeToString(&serialized)) {
|
||||||
|
return std::unexpected("failed to serialize foxglove.CompressedVideo");
|
||||||
|
}
|
||||||
|
|
||||||
|
mcap::Message record{};
|
||||||
|
record.channelId = stream->video_channel_id;
|
||||||
|
record.sequence = stream->video_sequence++;
|
||||||
|
record.logTime = access_unit.source_timestamp_ns;
|
||||||
|
record.publishTime = access_unit.source_timestamp_ns;
|
||||||
|
record.data = reinterpret_cast<const std::byte *>(serialized.data());
|
||||||
|
record.dataSize = serialized.size();
|
||||||
|
|
||||||
|
const auto write_status = state_->writer.write(record);
|
||||||
|
if (!write_status.ok()) {
|
||||||
|
return std::unexpected("failed to write MCAP message: " + write_status.message);
|
||||||
|
}
|
||||||
|
return {};
|
||||||
|
}
|
||||||
|
|
||||||
|
std::expected<void, std::string> MultiMcapRecordSink::write_depth_map(
|
||||||
|
const StreamId stream_id,
|
||||||
|
const RawDepthMapView &depth_map) {
|
||||||
|
std::string error{};
|
||||||
|
auto *stream = stream_state(state_, stream_id, error);
|
||||||
|
if (stream == nullptr) {
|
||||||
|
return std::unexpected(error);
|
||||||
|
}
|
||||||
|
|
||||||
|
const auto source_unit = normalize_depth_source_unit(depth_map.source_unit);
|
||||||
|
auto encoded = encode_depth_payload(depth_map);
|
||||||
|
if (!encoded) {
|
||||||
|
return std::unexpected(encoded.error());
|
||||||
|
}
|
||||||
|
|
||||||
|
return write_depth_message(
|
||||||
|
state_->writer,
|
||||||
|
depth_map.timestamp_ns,
|
||||||
|
stream->config.frame_id,
|
||||||
|
depth_map.width,
|
||||||
|
depth_map.height,
|
||||||
|
source_unit,
|
||||||
|
stream->depth_channel_id,
|
||||||
|
stream->depth_sequence,
|
||||||
|
*encoded);
|
||||||
|
}
|
||||||
|
|
||||||
|
std::expected<void, std::string> MultiMcapRecordSink::write_depth_map_u16(
|
||||||
|
const StreamId stream_id,
|
||||||
|
const RawDepthMapU16View &depth_map) {
|
||||||
|
std::string error{};
|
||||||
|
auto *stream = stream_state(state_, stream_id, error);
|
||||||
|
if (stream == nullptr) {
|
||||||
|
return std::unexpected(error);
|
||||||
|
}
|
||||||
|
|
||||||
|
auto encoded = encode_depth_payload(depth_map);
|
||||||
|
if (!encoded) {
|
||||||
|
return std::unexpected(encoded.error());
|
||||||
|
}
|
||||||
|
|
||||||
|
return write_depth_message(
|
||||||
|
state_->writer,
|
||||||
|
depth_map.timestamp_ns,
|
||||||
|
stream->config.frame_id,
|
||||||
|
depth_map.width,
|
||||||
|
depth_map.height,
|
||||||
|
ipc::DepthUnit::Millimeter,
|
||||||
|
stream->depth_channel_id,
|
||||||
|
stream->depth_sequence,
|
||||||
|
*encoded);
|
||||||
|
}
|
||||||
|
|
||||||
|
std::expected<void, std::string> MultiMcapRecordSink::write_camera_calibration(
|
||||||
|
const StreamId stream_id,
|
||||||
|
const RawCameraCalibrationView &calibration) {
|
||||||
|
std::string error{};
|
||||||
|
auto *stream = stream_state(state_, stream_id, error);
|
||||||
|
if (stream == nullptr) {
|
||||||
|
return std::unexpected(error);
|
||||||
|
}
|
||||||
|
if (stream->config.calibration_topic.empty()) {
|
||||||
|
return std::unexpected("calibration topic is empty");
|
||||||
|
}
|
||||||
|
|
||||||
|
if (stream->calibration_channel_id == 0) {
|
||||||
|
mcap::Channel calibration_channel(stream->config.calibration_topic, "protobuf", state_->calibration_schema_id);
|
||||||
|
state_->writer.addChannel(calibration_channel);
|
||||||
|
stream->calibration_channel_id = calibration_channel.id;
|
||||||
|
}
|
||||||
|
|
||||||
|
foxglove::CameraCalibration message{};
|
||||||
|
*message.mutable_timestamp() = to_proto_timestamp(calibration.timestamp_ns);
|
||||||
|
message.set_frame_id(stream->config.frame_id);
|
||||||
|
message.set_width(calibration.width);
|
||||||
|
message.set_height(calibration.height);
|
||||||
|
message.set_distortion_model(std::string(calibration.distortion_model));
|
||||||
|
for (const double value : calibration.distortion) {
|
||||||
|
message.add_d(value);
|
||||||
|
}
|
||||||
|
if (auto append = append_repeated_double(message.mutable_k(), calibration.intrinsic_matrix, 9, "K"); !append) {
|
||||||
|
return append;
|
||||||
|
}
|
||||||
|
if (auto append = append_repeated_double(message.mutable_r(), calibration.rectification_matrix, 9, "R"); !append) {
|
||||||
|
return append;
|
||||||
|
}
|
||||||
|
if (auto append = append_repeated_double(message.mutable_p(), calibration.projection_matrix, 12, "P"); !append) {
|
||||||
|
return append;
|
||||||
|
}
|
||||||
|
|
||||||
|
std::string serialized{};
|
||||||
|
if (!message.SerializeToString(&serialized)) {
|
||||||
|
return std::unexpected("failed to serialize foxglove.CameraCalibration");
|
||||||
|
}
|
||||||
|
|
||||||
|
mcap::Message record{};
|
||||||
|
record.channelId = stream->calibration_channel_id;
|
||||||
|
record.sequence = stream->calibration_sequence++;
|
||||||
|
record.logTime = calibration.timestamp_ns;
|
||||||
|
record.publishTime = calibration.timestamp_ns;
|
||||||
|
record.data = reinterpret_cast<const std::byte *>(serialized.data());
|
||||||
|
record.dataSize = serialized.size();
|
||||||
|
|
||||||
|
const auto write_status = state_->writer.write(record);
|
||||||
|
if (!write_status.ok()) {
|
||||||
|
return std::unexpected("failed to write MCAP calibration message: " + write_status.message);
|
||||||
|
}
|
||||||
|
return {};
|
||||||
|
}
|
||||||
|
|
||||||
|
std::expected<void, std::string> MultiMcapRecordSink::write_pose(
|
||||||
|
const StreamId stream_id,
|
||||||
|
const RawPoseView &pose) {
|
||||||
|
std::string error{};
|
||||||
|
auto *stream = stream_state(state_, stream_id, error);
|
||||||
|
if (stream == nullptr) {
|
||||||
|
return std::unexpected(error);
|
||||||
|
}
|
||||||
|
if (stream->config.pose_topic.empty()) {
|
||||||
|
return std::unexpected("pose topic is empty");
|
||||||
|
}
|
||||||
|
|
||||||
|
if (stream->pose_channel_id == 0) {
|
||||||
|
mcap::Channel pose_channel(stream->config.pose_topic, "protobuf", state_->pose_schema_id);
|
||||||
|
state_->writer.addChannel(pose_channel);
|
||||||
|
stream->pose_channel_id = pose_channel.id;
|
||||||
|
}
|
||||||
|
|
||||||
|
foxglove::PoseInFrame message{};
|
||||||
|
*message.mutable_timestamp() = to_proto_timestamp(pose.timestamp_ns);
|
||||||
|
message.set_frame_id(std::string(pose.reference_frame_id));
|
||||||
|
auto *pose_message = message.mutable_pose();
|
||||||
|
auto *position = pose_message->mutable_position();
|
||||||
|
position->set_x(pose.position[0]);
|
||||||
|
position->set_y(pose.position[1]);
|
||||||
|
position->set_z(pose.position[2]);
|
||||||
|
auto *orientation = pose_message->mutable_orientation();
|
||||||
|
orientation->set_x(pose.orientation[0]);
|
||||||
|
orientation->set_y(pose.orientation[1]);
|
||||||
|
orientation->set_z(pose.orientation[2]);
|
||||||
|
orientation->set_w(pose.orientation[3]);
|
||||||
|
|
||||||
|
std::string serialized{};
|
||||||
|
if (!message.SerializeToString(&serialized)) {
|
||||||
|
return std::unexpected("failed to serialize foxglove.PoseInFrame");
|
||||||
|
}
|
||||||
|
|
||||||
|
mcap::Message record{};
|
||||||
|
record.channelId = stream->pose_channel_id;
|
||||||
|
record.sequence = stream->pose_sequence++;
|
||||||
|
record.logTime = pose.timestamp_ns;
|
||||||
|
record.publishTime = pose.timestamp_ns;
|
||||||
|
record.data = reinterpret_cast<const std::byte *>(serialized.data());
|
||||||
|
record.dataSize = serialized.size();
|
||||||
|
|
||||||
|
const auto write_status = state_->writer.write(record);
|
||||||
|
if (!write_status.ok()) {
|
||||||
|
return std::unexpected("failed to write MCAP pose message: " + write_status.message);
|
||||||
|
}
|
||||||
|
return {};
|
||||||
|
}
|
||||||
|
|
||||||
|
std::expected<void, std::string> MultiMcapRecordSink::write_body_tracking_message(
|
||||||
|
const StreamId stream_id,
|
||||||
|
const RawBodyTrackingMessageView &body_message) {
|
||||||
|
std::string error{};
|
||||||
|
auto *stream = stream_state(state_, stream_id, error);
|
||||||
|
if (stream == nullptr) {
|
||||||
|
return std::unexpected(error);
|
||||||
|
}
|
||||||
|
if (body_message.bytes.empty()) {
|
||||||
|
return std::unexpected("body tracking payload is empty");
|
||||||
|
}
|
||||||
|
|
||||||
|
if (stream->config.body_topic.empty()) {
|
||||||
|
return std::unexpected("body topic is empty");
|
||||||
|
}
|
||||||
|
if (stream->body_channel_id == 0) {
|
||||||
|
mcap::Channel body_channel(
|
||||||
|
stream->config.body_topic,
|
||||||
|
kBodyTrackingMessageEncoding,
|
||||||
|
0,
|
||||||
|
body_channel_metadata());
|
||||||
|
state_->writer.addChannel(body_channel);
|
||||||
|
stream->body_channel_id = body_channel.id;
|
||||||
|
}
|
||||||
|
|
||||||
|
mcap::Message record{};
|
||||||
|
record.channelId = stream->body_channel_id;
|
||||||
|
record.sequence = stream->body_sequence++;
|
||||||
|
record.logTime = body_message.timestamp_ns;
|
||||||
|
record.publishTime = body_message.timestamp_ns;
|
||||||
|
record.data = reinterpret_cast<const std::byte *>(body_message.bytes.data());
|
||||||
|
record.dataSize = body_message.bytes.size();
|
||||||
|
|
||||||
|
const auto write_status = state_->writer.write(record);
|
||||||
|
if (!write_status.ok()) {
|
||||||
|
return std::unexpected("failed to write MCAP body message: " + write_status.message);
|
||||||
|
}
|
||||||
|
return {};
|
||||||
|
}
|
||||||
|
|
||||||
|
bool MultiMcapRecordSink::is_open() const {
|
||||||
|
return state_ != nullptr;
|
||||||
|
}
|
||||||
|
|
||||||
|
std::string_view MultiMcapRecordSink::path() const {
|
||||||
|
if (state_ == nullptr) {
|
||||||
|
return {};
|
||||||
|
}
|
||||||
|
return state_->path;
|
||||||
|
}
|
||||||
|
|
||||||
|
void MultiMcapRecordSink::close() {
|
||||||
|
if (state_ == nullptr) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
state_->writer.close();
|
||||||
|
delete state_;
|
||||||
|
state_ = nullptr;
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -0,0 +1,247 @@
|
|||||||
|
#include <mcap/reader.hpp>
|
||||||
|
|
||||||
|
#include "proto/cvmmap_streamer/DepthMap.pb.h"
|
||||||
|
#include "cvmmap_streamer/common.h"
|
||||||
|
#include "cvmmap_streamer/record/mcap_record_sink.hpp"
|
||||||
|
#include "proto/foxglove/CameraCalibration.pb.h"
|
||||||
|
#include "proto/foxglove/CompressedVideo.pb.h"
|
||||||
|
#include "proto/foxglove/PoseInFrame.pb.h"
|
||||||
|
|
||||||
|
#include <rvl/rvl.hpp>
|
||||||
|
#include <spdlog/spdlog.h>
|
||||||
|
|
||||||
|
#include <cstdint>
|
||||||
|
#include <filesystem>
|
||||||
|
#include <map>
|
||||||
|
#include <string>
|
||||||
|
#include <vector>
|
||||||
|
|
||||||
|
namespace {
|
||||||
|
|
||||||
|
enum class TesterExitCode : int {
|
||||||
|
Success = 0,
|
||||||
|
CreateError = 2,
|
||||||
|
WriteError = 3,
|
||||||
|
OpenError = 4,
|
||||||
|
VerificationError = 5,
|
||||||
|
};
|
||||||
|
|
||||||
|
[[nodiscard]]
|
||||||
|
constexpr int exit_code(const TesterExitCode code) {
|
||||||
|
return static_cast<int>(code);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
int main(int argc, char **argv) {
|
||||||
|
if (cvmmap_streamer::has_help_flag(argc, argv)) {
|
||||||
|
cvmmap_streamer::print_help("mcap_multi_record_tester");
|
||||||
|
return exit_code(TesterExitCode::Success);
|
||||||
|
}
|
||||||
|
|
||||||
|
const std::filesystem::path output_path =
|
||||||
|
argc > 1
|
||||||
|
? std::filesystem::path(argv[1])
|
||||||
|
: std::filesystem::temp_directory_path() / "cvmmap_streamer_multi_record_test.mcap";
|
||||||
|
if (output_path.has_parent_path()) {
|
||||||
|
std::filesystem::create_directories(output_path.parent_path());
|
||||||
|
}
|
||||||
|
|
||||||
|
auto sink = cvmmap_streamer::record::MultiMcapRecordSink::create(
|
||||||
|
output_path.string(),
|
||||||
|
cvmmap_streamer::McapCompression::None);
|
||||||
|
if (!sink) {
|
||||||
|
spdlog::error("failed to create MCAP sink: {}", sink.error());
|
||||||
|
return exit_code(TesterExitCode::CreateError);
|
||||||
|
}
|
||||||
|
|
||||||
|
cvmmap_streamer::encode::EncodedStreamInfo stream_info{};
|
||||||
|
stream_info.codec = cvmmap_streamer::CodecType::H264;
|
||||||
|
|
||||||
|
auto zed1 = sink->add_stream(cvmmap_streamer::record::McapRecordStreamConfig{
|
||||||
|
.topic = "/zed1/video",
|
||||||
|
.depth_topic = "/zed1/depth",
|
||||||
|
.calibration_topic = "/zed1/calibration",
|
||||||
|
.pose_topic = "/zed1/pose",
|
||||||
|
.body_topic = "/zed1/body",
|
||||||
|
.frame_id = "zed1",
|
||||||
|
}, stream_info);
|
||||||
|
auto zed2 = sink->add_stream(cvmmap_streamer::record::McapRecordStreamConfig{
|
||||||
|
.topic = "/zed2/video",
|
||||||
|
.depth_topic = "/zed2/depth",
|
||||||
|
.calibration_topic = "/zed2/calibration",
|
||||||
|
.pose_topic = "/zed2/pose",
|
||||||
|
.body_topic = "/zed2/body",
|
||||||
|
.frame_id = "zed2",
|
||||||
|
}, stream_info);
|
||||||
|
if (!zed1 || !zed2) {
|
||||||
|
spdlog::error("failed to add streams: {} {}", zed1 ? "" : zed1.error(), zed2 ? "" : zed2.error());
|
||||||
|
return exit_code(TesterExitCode::CreateError);
|
||||||
|
}
|
||||||
|
|
||||||
|
const std::vector<std::uint16_t> depth_pixels{
|
||||||
|
1000, 2000,
|
||||||
|
1500, 2500,
|
||||||
|
};
|
||||||
|
const std::vector<double> distortion{0.0, 0.0, 0.0, 0.0, 0.0};
|
||||||
|
const std::vector<double> intrinsic_matrix{
|
||||||
|
500.0, 0.0, 320.0,
|
||||||
|
0.0, 500.0, 240.0,
|
||||||
|
0.0, 0.0, 1.0,
|
||||||
|
};
|
||||||
|
const std::vector<double> rectification_matrix{
|
||||||
|
1.0, 0.0, 0.0,
|
||||||
|
0.0, 1.0, 0.0,
|
||||||
|
0.0, 0.0, 1.0,
|
||||||
|
};
|
||||||
|
const std::vector<double> projection_matrix{
|
||||||
|
500.0, 0.0, 320.0, 0.0,
|
||||||
|
0.0, 500.0, 240.0, 0.0,
|
||||||
|
0.0, 0.0, 1.0, 0.0,
|
||||||
|
};
|
||||||
|
|
||||||
|
for (const auto [stream_id, label, pose_x] : {
|
||||||
|
std::tuple{*zed1, std::string("zed1"), 1.0},
|
||||||
|
std::tuple{*zed2, std::string("zed2"), 2.0},
|
||||||
|
}) {
|
||||||
|
cvmmap_streamer::encode::EncodedAccessUnit access_unit{};
|
||||||
|
access_unit.codec = cvmmap_streamer::CodecType::H264;
|
||||||
|
access_unit.source_timestamp_ns = 100;
|
||||||
|
access_unit.stream_pts_ns = 100;
|
||||||
|
access_unit.keyframe = false;
|
||||||
|
access_unit.annexb_bytes = {0x00, 0x00, 0x00, 0x01, 0x09, 0x10};
|
||||||
|
if (auto write = sink->write_access_unit(stream_id, access_unit); !write) {
|
||||||
|
spdlog::error("failed to write video access unit: {}", write.error());
|
||||||
|
return exit_code(TesterExitCode::WriteError);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (auto write = sink->write_depth_map_u16(stream_id, cvmmap_streamer::record::RawDepthMapU16View{
|
||||||
|
.timestamp_ns = 100,
|
||||||
|
.width = 2,
|
||||||
|
.height = 2,
|
||||||
|
.pixels = depth_pixels,
|
||||||
|
}); !write) {
|
||||||
|
spdlog::error("failed to write depth map: {}", write.error());
|
||||||
|
return exit_code(TesterExitCode::WriteError);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (auto write = sink->write_camera_calibration(stream_id, cvmmap_streamer::record::RawCameraCalibrationView{
|
||||||
|
.timestamp_ns = 100,
|
||||||
|
.width = 640,
|
||||||
|
.height = 480,
|
||||||
|
.distortion_model = "plumb_bob",
|
||||||
|
.distortion = distortion,
|
||||||
|
.intrinsic_matrix = intrinsic_matrix,
|
||||||
|
.rectification_matrix = rectification_matrix,
|
||||||
|
.projection_matrix = projection_matrix,
|
||||||
|
}); !write) {
|
||||||
|
spdlog::error("failed to write calibration: {}", write.error());
|
||||||
|
return exit_code(TesterExitCode::WriteError);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (auto write = sink->write_pose(stream_id, cvmmap_streamer::record::RawPoseView{
|
||||||
|
.timestamp_ns = 100,
|
||||||
|
.reference_frame_id = label + "/world",
|
||||||
|
.position = {pose_x, 2.0, 3.0},
|
||||||
|
.orientation = {0.1, 0.2, 0.3, 0.9},
|
||||||
|
}); !write) {
|
||||||
|
spdlog::error("failed to write pose: {}", write.error());
|
||||||
|
return exit_code(TesterExitCode::WriteError);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
sink->close();
|
||||||
|
|
||||||
|
mcap::McapReader reader{};
|
||||||
|
const auto open_status = reader.open(output_path.string());
|
||||||
|
if (!open_status.ok()) {
|
||||||
|
spdlog::error("failed to open MCAP file '{}': {}", output_path.string(), open_status.message);
|
||||||
|
return exit_code(TesterExitCode::OpenError);
|
||||||
|
}
|
||||||
|
|
||||||
|
std::map<std::string, std::uint64_t> topic_counts{};
|
||||||
|
|
||||||
|
auto messages = reader.readMessages();
|
||||||
|
for (auto it = messages.begin(); it != messages.end(); ++it) {
|
||||||
|
if (it->schema == nullptr || it->channel == nullptr) {
|
||||||
|
spdlog::error("MCAP message missing schema or channel");
|
||||||
|
reader.close();
|
||||||
|
return exit_code(TesterExitCode::VerificationError);
|
||||||
|
}
|
||||||
|
topic_counts[it->channel->topic] += 1;
|
||||||
|
|
||||||
|
if (it->schema->name == "foxglove.CompressedVideo") {
|
||||||
|
foxglove::CompressedVideo video{};
|
||||||
|
if (!video.ParseFromArray(it->message.data, static_cast<int>(it->message.dataSize))) {
|
||||||
|
spdlog::error("failed to parse foxglove.CompressedVideo");
|
||||||
|
reader.close();
|
||||||
|
return exit_code(TesterExitCode::VerificationError);
|
||||||
|
}
|
||||||
|
if (video.data().empty()) {
|
||||||
|
spdlog::error("video payload is empty");
|
||||||
|
reader.close();
|
||||||
|
return exit_code(TesterExitCode::VerificationError);
|
||||||
|
}
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (it->schema->name == "cvmmap_streamer.DepthMap") {
|
||||||
|
cvmmap_streamer::DepthMap depth{};
|
||||||
|
if (!depth.ParseFromArray(it->message.data, static_cast<int>(it->message.dataSize))) {
|
||||||
|
spdlog::error("failed to parse cvmmap_streamer.DepthMap");
|
||||||
|
reader.close();
|
||||||
|
return exit_code(TesterExitCode::VerificationError);
|
||||||
|
}
|
||||||
|
const auto decoded = rvl::decompress_image(std::span<const std::uint8_t>(
|
||||||
|
reinterpret_cast<const std::uint8_t *>(depth.data().data()),
|
||||||
|
depth.data().size()));
|
||||||
|
if (decoded.pixels.size() != depth_pixels.size()) {
|
||||||
|
spdlog::error("decoded depth pixel count mismatch");
|
||||||
|
reader.close();
|
||||||
|
return exit_code(TesterExitCode::VerificationError);
|
||||||
|
}
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (it->schema->name == "foxglove.CameraCalibration") {
|
||||||
|
foxglove::CameraCalibration calibration{};
|
||||||
|
if (!calibration.ParseFromArray(it->message.data, static_cast<int>(it->message.dataSize))) {
|
||||||
|
spdlog::error("failed to parse foxglove.CameraCalibration");
|
||||||
|
reader.close();
|
||||||
|
return exit_code(TesterExitCode::VerificationError);
|
||||||
|
}
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (it->schema->name == "foxglove.PoseInFrame") {
|
||||||
|
foxglove::PoseInFrame pose{};
|
||||||
|
if (!pose.ParseFromArray(it->message.data, static_cast<int>(it->message.dataSize))) {
|
||||||
|
spdlog::error("failed to parse foxglove.PoseInFrame");
|
||||||
|
reader.close();
|
||||||
|
return exit_code(TesterExitCode::VerificationError);
|
||||||
|
}
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
reader.close();
|
||||||
|
|
||||||
|
for (const auto &topic : {
|
||||||
|
"/zed1/video",
|
||||||
|
"/zed1/depth",
|
||||||
|
"/zed1/calibration",
|
||||||
|
"/zed1/pose",
|
||||||
|
"/zed2/video",
|
||||||
|
"/zed2/depth",
|
||||||
|
"/zed2/calibration",
|
||||||
|
"/zed2/pose",
|
||||||
|
}) {
|
||||||
|
if (topic_counts[topic] != 1) {
|
||||||
|
spdlog::error("expected exactly one message on topic '{}', got {}", topic, topic_counts[topic]);
|
||||||
|
return exit_code(TesterExitCode::VerificationError);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
spdlog::info("validated multi-stream MCAP recording at '{}'", output_path.string());
|
||||||
|
return exit_code(TesterExitCode::Success);
|
||||||
|
}
|
||||||
+1235
-64
File diff suppressed because it is too large
Load Diff
Reference in New Issue
Block a user