diff --git a/CMakeLists.txt b/CMakeLists.txt index 16d6fe9..aca0004 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -28,6 +28,14 @@ if (NOT TARGET OpenSSL::Crypto AND DEFINED OPENSSL_CRYPTO_LIBRARY) INTERFACE_INCLUDE_DIRECTORIES "${OPENSSL_INCLUDE_DIR}") endif() find_package(cppzmq QUIET) +set(CPPZMQ_LOCAL_ROOT "${CMAKE_CURRENT_LIST_DIR}/../cppzmq" CACHE PATH "Path to a local cppzmq checkout") +if (NOT TARGET cppzmq::cppzmq AND NOT TARGET cppzmq) + if (EXISTS "${CPPZMQ_LOCAL_ROOT}/zmq.hpp") + add_library(cppzmq::cppzmq INTERFACE IMPORTED GLOBAL) + set_target_properties(cppzmq::cppzmq PROPERTIES + INTERFACE_INCLUDE_DIRECTORIES "${CPPZMQ_LOCAL_ROOT}") + endif() +endif() if (DEFINED CVMMAP_STREAMER_USE_SYSTEM_CNATS) message(FATAL_ERROR "CVMMAP_STREAMER_USE_SYSTEM_CNATS was removed; use CVMMAP_CNATS_PROVIDER=system") @@ -161,8 +169,10 @@ protobuf_generate( TARGET cvmmap_streamer_depth_proto LANGUAGE cpp PROTOC_OUT_DIR "${CMAKE_CURRENT_BINARY_DIR}" - PROTOS "${CMAKE_CURRENT_LIST_DIR}/proto/cvmmap_streamer/DepthMap.proto" - IMPORT_DIRS "${CMAKE_CURRENT_LIST_DIR}") + PROTOS + "${CMAKE_CURRENT_LIST_DIR}/proto/cvmmap_streamer/DepthMap.proto" + "${CMAKE_CURRENT_LIST_DIR}/proto/cvmmap_streamer/BundleManifest.proto" + IMPORT_DIRS "${CMAKE_CURRENT_LIST_DIR}/proto") add_library(cvmmap_streamer_protobuf INTERFACE) target_include_directories(cvmmap_streamer_foxglove_proto PUBLIC diff --git a/README.md b/README.md index b836c4c..49f0cd1 100644 --- a/README.md +++ b/README.md @@ -265,9 +265,11 @@ uv run python scripts/zed_batch_svo_to_mcap.py \ The batch MCAP wrapper writes `/.mcap` by default, skips existing outputs unless told otherwise, and returns a nonzero exit code if any segment fails. The repo includes a minimal pose config at `config/zed_pose_config.toml` so MCAP conversion does not depend on a separate `cv-mmap` checkout. -In bundled multi-camera mode, `--start-frame` and `--end-frame` mean the first and last emitted synced frame-group indices from the common start timestamp, inclusive. +In bundled multi-camera mode, `--start-frame` and `--end-frame` mean the first and last emitted bundle indices from the common start timestamp, inclusive. When stderr is attached to a TTY, `zed_batch_svo_to_mcap.py` uses a `progress-table` view by default; otherwise it emits line-oriented start/completion/failure logs plus periodic heartbeat summaries. Use `--progress-ui table` or `--progress-ui text` to override the automatic mode selection. +Bundled MCAP export now defaults to `--bundle-policy nearest`. That mode emits one `/bundle` manifest message per bundle timestamp on the common timeline and keeps the original per-camera timestamps on `/zedN/video`, `/zedN/depth`, and optional `/zedN/pose`. Consumers that care about grouping should follow `/bundle` instead of inferring bundle membership from identical message timestamps. Use `--bundle-policy strict` when you want the older thresholded sync behavior, and `--sync-tolerance-ms` only applies in that strict mode. + ### Why Mixed Hardware/Software Mode Exists Bundled MCAP export opens one video encoder per camera stream. A four-camera segment therefore consumes four H.264/H.265 encoder sessions at once. diff --git a/include/cvmmap_streamer/record/mcap_record_sink.hpp b/include/cvmmap_streamer/record/mcap_record_sink.hpp index cdcd31c..f3afd59 100644 --- a/include/cvmmap_streamer/record/mcap_record_sink.hpp +++ b/include/cvmmap_streamer/record/mcap_record_sink.hpp @@ -8,6 +8,7 @@ #include #include #include +#include #include #include #include @@ -19,6 +20,16 @@ enum class DepthEncoding { RvlF32, }; +enum class BundlePolicy { + Nearest, + Strict, +}; + +enum class BundleMemberStatus { + Present, + CorruptedGap, +}; + struct RawDepthMapView { std::uint64_t timestamp_ns{0}; std::uint32_t width{0}; @@ -57,6 +68,21 @@ struct RawBodyTrackingMessageView { std::span bytes{}; }; +struct RawBundleMemberView { + std::string_view camera_label{}; + BundleMemberStatus status{BundleMemberStatus::Present}; + std::optional timestamp_ns{}; + std::int64_t delta_ns{0}; + std::uint32_t corrupted_frames_skipped{0}; +}; + +struct RawBundleManifestView { + std::uint64_t timestamp_ns{0}; + std::uint64_t bundle_index{0}; + BundlePolicy policy{BundlePolicy::Nearest}; + std::span members{}; +}; + struct McapRecordStreamConfig { std::string topic{"/camera/video"}; std::string depth_topic{"/camera/depth"}; @@ -137,7 +163,8 @@ public: [[nodiscard]] static std::expected create( std::string path, - McapCompression compression); + McapCompression compression, + std::string bundle_topic = "/bundle"); [[nodiscard]] std::expected add_stream( @@ -179,6 +206,10 @@ public: StreamId stream_id, const RawPoseView &pose); + [[nodiscard]] + std::expected write_bundle_manifest( + const RawBundleManifestView &bundle); + [[nodiscard]] std::expected write_body_tracking_message( StreamId stream_id, diff --git a/proto/cvmmap_streamer/BundleManifest.proto b/proto/cvmmap_streamer/BundleManifest.proto new file mode 100644 index 0000000..fa316e3 --- /dev/null +++ b/proto/cvmmap_streamer/BundleManifest.proto @@ -0,0 +1,32 @@ +syntax = "proto3"; + +package cvmmap_streamer; + +import "google/protobuf/timestamp.proto"; + +message BundleManifest { + enum BundlePolicy { + BUNDLE_POLICY_UNKNOWN = 0; + BUNDLE_POLICY_NEAREST = 1; + BUNDLE_POLICY_STRICT = 2; + } + + enum BundleMemberStatus { + BUNDLE_MEMBER_STATUS_UNKNOWN = 0; + BUNDLE_MEMBER_STATUS_PRESENT = 1; + BUNDLE_MEMBER_STATUS_CORRUPTED_GAP = 2; + } + + message BundleMember { + string camera_label = 1; + google.protobuf.Timestamp timestamp = 2; + sint64 delta_ns = 3; + BundleMemberStatus status = 4; + uint32 corrupted_frames_skipped = 5; + } + + google.protobuf.Timestamp timestamp = 1; + uint64 bundle_index = 2; + BundlePolicy policy = 3; + repeated BundleMember members = 4; +} diff --git a/scripts/zed_batch_svo_to_mcap.py b/scripts/zed_batch_svo_to_mcap.py index 6d798a6..26121e9 100644 --- a/scripts/zed_batch_svo_to_mcap.py +++ b/scripts/zed_batch_svo_to_mcap.py @@ -10,6 +10,7 @@ import re import subprocess import sys import time +from collections import Counter from dataclasses import dataclass from pathlib import Path @@ -34,6 +35,8 @@ class BatchConfig: mcap_compression: str depth_mode: str depth_size: str + bundle_policy: str + bundle_topic: str | None with_pose: bool pose_config: Path | None world_frame_id: str | None @@ -102,6 +105,7 @@ class ActiveJobState: _MCAP_READER_MODULE = None +_BUNDLE_MANIFEST_CLASS_CACHE: dict[bytes, tuple[object, int | None]] = {} TABLE_REFRESH_SECONDS = 1.0 TEXT_HEARTBEAT_SECONDS = 30.0 @@ -500,7 +504,11 @@ def command_for_job(job: ConversionJob, config: BatchConfig, encoder_device: str config.depth_mode, "--depth-size", config.depth_size, + "--bundle-policy", + config.bundle_policy, ] + if config.bundle_topic: + command.extend(["--bundle-topic", config.bundle_topic]) if config.with_pose: command.append("--with-pose") if config.pose_config is not None: @@ -569,22 +577,131 @@ def required_topics_for(camera_labels: tuple[str, ...]) -> set[str]: return topics -def probe_output(output_path: Path, camera_labels: tuple[str, ...]) -> OutputProbeResult: +def load_bundle_manifest_type(schema_data: bytes) -> tuple[object, int | None]: + cached = _BUNDLE_MANIFEST_CLASS_CACHE.get(schema_data) + if cached is not None: + return cached + + from google.protobuf import descriptor_pb2, descriptor_pool, message_factory, timestamp_pb2 + + descriptor_set = descriptor_pb2.FileDescriptorSet() + descriptor_set.ParseFromString(schema_data) + pool = descriptor_pool.DescriptorPool() + has_embedded_timestamp = any( + file_descriptor.name == "google/protobuf/timestamp.proto" + for file_descriptor in descriptor_set.file + ) + if has_embedded_timestamp: + for file_descriptor in descriptor_set.file: + if file_descriptor.name == "google/protobuf/timestamp.proto": + pool.Add(file_descriptor) + break + else: + pool.AddSerializedFile(timestamp_pb2.DESCRIPTOR.serialized_pb) + for file_descriptor in descriptor_set.file: + if file_descriptor.name == "google/protobuf/timestamp.proto": + continue + pool.Add(file_descriptor) + message_descriptor = pool.FindMessageTypeByName("cvmmap_streamer.BundleManifest") + message_class = message_factory.GetMessageClass(message_descriptor) + present_value = None + if "BundleMemberStatus" in message_descriptor.enum_types_by_name: + status_enum = message_descriptor.enum_types_by_name["BundleMemberStatus"] + present_value = status_enum.values_by_name["BUNDLE_MEMBER_STATUS_PRESENT"].number + _BUNDLE_MANIFEST_CLASS_CACHE[schema_data] = (message_class, present_value) + return message_class, present_value + + +def probe_output( + output_path: Path, + camera_labels: tuple[str, ...], + *, + bundle_topic: str | None, +) -> OutputProbeResult: if not output_path.is_file(): return OutputProbeResult(output_path=output_path, status="missing") reader_module = load_mcap_reader() expected_topics = required_topics_for(camera_labels) + require_bundle = len(camera_labels) > 1 and bool(bundle_topic) + if require_bundle: + expected_topics.add(bundle_topic or "/bundle") found_topics: set[str] = set() + video_counts: Counter[str] = Counter() + depth_counts: Counter[str] = Counter() + bundle_present_counts: Counter[str] = Counter() + expected_camera_labels = set(camera_labels) try: with output_path.open("rb") as stream: reader = reader_module.make_reader(stream) - for _schema, channel, _message in reader.iter_messages(): + for schema, channel, message in reader.iter_messages(): if channel.topic in expected_topics: found_topics.add(channel.topic) - if found_topics == expected_topics: - return OutputProbeResult(output_path=output_path, status="valid") + if channel.topic.endswith("/video"): + video_counts[channel.topic.removeprefix("/").removesuffix("/video")] += 1 + continue + if channel.topic.endswith("/depth"): + depth_counts[channel.topic.removeprefix("/").removesuffix("/depth")] += 1 + continue + if require_bundle and channel.topic == bundle_topic: + if schema is None or schema.name != "cvmmap_streamer.BundleManifest": + return OutputProbeResult( + output_path=output_path, + status="invalid", + reason=f"bundle topic '{bundle_topic}' is missing the BundleManifest schema", + ) + try: + bundle_class, present_value = load_bundle_manifest_type(schema.data) + bundle = bundle_class() + bundle.ParseFromString(message.data) + except Exception as error: # noqa: BLE001 + return OutputProbeResult( + output_path=output_path, + status="invalid", + reason=f"failed to parse bundle manifest: {error}", + ) + + bundle_labels: set[str] = set() + for member in bundle.members: + label = str(member.camera_label) + if label not in expected_camera_labels: + return OutputProbeResult( + output_path=output_path, + status="invalid", + reason=f"bundle manifest referenced unknown camera label '{label}'", + ) + if label in bundle_labels: + return OutputProbeResult( + output_path=output_path, + status="invalid", + reason=f"bundle manifest duplicated camera label '{label}'", + ) + bundle_labels.add(label) + is_present = member.HasField("timestamp") + if present_value is not None: + is_present = member.status == present_value + if is_present and not member.HasField("timestamp"): + return OutputProbeResult( + output_path=output_path, + status="invalid", + reason=f"bundle member '{label}' is present but missing a timestamp", + ) + if is_present: + bundle_present_counts[label] += 1 + if bundle_labels != expected_camera_labels: + missing_labels = sorted(expected_camera_labels - bundle_labels) + extra_labels = sorted(bundle_labels - expected_camera_labels) + details = [] + if missing_labels: + details.append("missing=" + ",".join(missing_labels)) + if extra_labels: + details.append("extra=" + ",".join(extra_labels)) + return OutputProbeResult( + output_path=output_path, + status="invalid", + reason="bundle manifest camera coverage mismatch: " + " ".join(details), + ) except Exception as error: # noqa: BLE001 return OutputProbeResult(output_path=output_path, status="invalid", reason=str(error)) @@ -595,6 +712,27 @@ def probe_output(output_path: Path, camera_labels: tuple[str, ...]) -> OutputPro status="invalid", reason="missing expected topics: " + ", ".join(missing_topics), ) + if require_bundle: + for label in camera_labels: + present_count = bundle_present_counts[label] + if video_counts[label] != present_count: + return OutputProbeResult( + output_path=output_path, + status="invalid", + reason=( + f"video count mismatch for {label}: " + f"bundle_present={present_count} video_messages={video_counts[label]}" + ), + ) + if depth_counts[label] != present_count: + return OutputProbeResult( + output_path=output_path, + status="invalid", + reason=( + f"depth count mismatch for {label}: " + f"bundle_present={present_count} depth_messages={depth_counts[label]}" + ), + ) return OutputProbeResult(output_path=output_path, status="valid") @@ -635,8 +773,13 @@ def split_lines_for_excerpt(text: str, max_lines: int = 8) -> list[str]: lines = [line.rstrip() for line in text.splitlines() if line.strip()] if len(lines) <= max_lines: return lines - excerpt = lines[:max_lines] - excerpt.append(f"... ({len(lines) - max_lines} more lines)") + head_count = max(1, max_lines // 2) + tail_count = max_lines - head_count + excerpt = lines[:head_count] + omitted = len(lines) - head_count - tail_count + if omitted > 0: + excerpt.append(f"... ({omitted} omitted line(s))") + excerpt.extend(lines[-tail_count:]) return excerpt @@ -1011,6 +1154,19 @@ def build_worker_slots( default="optimal", show_default=True, ) +@click.option( + "--bundle-policy", + type=click.Choice(("nearest", "strict")), + default="nearest", + show_default=True, + help="Bundling policy for multi-camera MCAP export.", +) +@click.option( + "--bundle-topic", + default="/bundle", + show_default=True, + help="Topic used for bundled multi-camera manifest messages.", +) @click.option("--with-pose", is_flag=True, help="Enable per-camera positional tracking export when available.") @click.option( "--pose-config", @@ -1026,19 +1182,19 @@ def build_worker_slots( "--start-frame", type=click.IntRange(min=0), default=None, - help="First synced frame group to export (inclusive) in bundled multi-camera mode.", + help="First bundle index to export (inclusive) in bundled multi-camera mode.", ) @click.option( "--end-frame", type=click.IntRange(min=0), default=None, - help="Last synced frame group to export (inclusive) in bundled multi-camera mode.", + help="Last bundle index to export (inclusive) in bundled multi-camera mode.", ) @click.option( "--sync-tolerance-ms", type=click.FloatRange(min=0.0, min_open=True), default=None, - help="Override the maximum timestamp delta used for bundled multi-camera sync.", + help="Override the maximum timestamp delta used by strict bundled sync.", ) @click.option( "--progress-ui", @@ -1070,6 +1226,8 @@ def main( mcap_compression: str, depth_mode: str, depth_size: str, + bundle_policy: str, + bundle_topic: str, with_pose: bool, pose_config: Path | None, world_frame_id: str | None, @@ -1102,6 +1260,8 @@ def main( mcap_compression=mcap_compression, depth_mode=depth_mode, depth_size=depth_size, + bundle_policy=bundle_policy, + bundle_topic=bundle_topic, with_pose=with_pose, pose_config=pose_config.expanduser().resolve() if pose_config is not None else None, world_frame_id=world_frame_id, @@ -1156,7 +1316,7 @@ def main( continue if report_existing: - probe_result = probe_output(output_path, job.camera_labels) + probe_result = probe_output(output_path, job.camera_labels, bundle_topic=config.bundle_topic) if probe_result.status == "valid": valid_existing.append(probe_result) elif probe_result.status == "invalid": @@ -1171,7 +1331,7 @@ def main( continue if config.probe_existing: - probe_result = probe_output(output_path, job.camera_labels) + probe_result = probe_output(output_path, job.camera_labels, bundle_topic=config.bundle_topic) if probe_result.status == "valid": valid_existing.append(probe_result) skipped_results.append( diff --git a/src/record/mcap_record_sink.cpp b/src/record/mcap_record_sink.cpp index f5dc5d9..445fc87 100644 --- a/src/record/mcap_record_sink.cpp +++ b/src/record/mcap_record_sink.cpp @@ -3,6 +3,7 @@ #include "cvmmap_streamer/record/mcap_record_sink.hpp" #include "protobuf_descriptor.hpp" +#include "proto/cvmmap_streamer/BundleManifest.pb.h" #include "proto/cvmmap_streamer/DepthMap.pb.h" #include "proto/foxglove/CameraCalibration.pb.h" #include "proto/foxglove/CompressedVideo.pb.h" @@ -117,6 +118,28 @@ cvmmap_streamer::DepthMap::Encoding to_proto_depth_encoding(DepthEncoding encodi } } +[[nodiscard]] +cvmmap_streamer::BundleManifest::BundlePolicy to_proto_bundle_policy(BundlePolicy policy) { + switch (policy) { + case BundlePolicy::Strict: + return cvmmap_streamer::BundleManifest::BUNDLE_POLICY_STRICT; + case BundlePolicy::Nearest: + default: + return cvmmap_streamer::BundleManifest::BUNDLE_POLICY_NEAREST; + } +} + +[[nodiscard]] +cvmmap_streamer::BundleManifest::BundleMemberStatus to_proto_bundle_member_status(BundleMemberStatus status) { + switch (status) { + case BundleMemberStatus::CorruptedGap: + return cvmmap_streamer::BundleManifest::BUNDLE_MEMBER_STATUS_CORRUPTED_GAP; + case BundleMemberStatus::Present: + default: + return cvmmap_streamer::BundleManifest::BUNDLE_MEMBER_STATUS_PRESENT; + } +} + void append_start_code(std::vector &output) { output.push_back(0x00); output.push_back(0x00); @@ -464,6 +487,60 @@ std::expected write_calibration_message( return {}; } +[[nodiscard]] +std::expected write_bundle_manifest_message( + mcap::McapWriter &writer, + mcap::ChannelId channel_id, + std::uint32_t &sequence, + const RawBundleManifestView &bundle) { + if (channel_id == 0) { + return std::unexpected("bundle topic is disabled"); + } + if (bundle.members.empty()) { + return std::unexpected("bundle manifest must contain at least one member"); + } + + cvmmap_streamer::BundleManifest message{}; + *message.mutable_timestamp() = to_proto_timestamp(bundle.timestamp_ns); + message.set_bundle_index(bundle.bundle_index); + message.set_policy(to_proto_bundle_policy(bundle.policy)); + for (const auto &member_view : bundle.members) { + if (member_view.camera_label.empty()) { + return std::unexpected("bundle member camera label is empty"); + } + if (member_view.status == BundleMemberStatus::Present && !member_view.timestamp_ns.has_value()) { + return std::unexpected("present bundle member is missing a timestamp"); + } + auto *member = message.add_members(); + member->set_camera_label(std::string(member_view.camera_label)); + member->set_status(to_proto_bundle_member_status(member_view.status)); + if (member_view.timestamp_ns.has_value()) { + *member->mutable_timestamp() = to_proto_timestamp(*member_view.timestamp_ns); + } + member->set_delta_ns(member_view.delta_ns); + member->set_corrupted_frames_skipped(member_view.corrupted_frames_skipped); + } + + std::string serialized{}; + if (!message.SerializeToString(&serialized)) { + return std::unexpected("failed to serialize cvmmap_streamer.BundleManifest"); + } + + mcap::Message record{}; + record.channelId = channel_id; + record.sequence = sequence++; + record.logTime = bundle.timestamp_ns; + record.publishTime = bundle.timestamp_ns; + record.data = reinterpret_cast(serialized.data()); + record.dataSize = serialized.size(); + + const auto write_status = writer.write(record); + if (!write_status.ok()) { + return std::unexpected("failed to write MCAP bundle manifest: " + write_status.message); + } + return {}; +} + } struct McapRecordSink::State { @@ -835,8 +912,12 @@ struct MultiMcapRecordSink::State { std::string path{}; mcap::SchemaId video_schema_id{0}; mcap::SchemaId depth_schema_id{0}; + mcap::SchemaId bundle_schema_id{0}; mcap::SchemaId calibration_schema_id{0}; mcap::SchemaId pose_schema_id{0}; + std::string bundle_topic{}; + mcap::ChannelId bundle_channel_id{0}; + std::uint32_t bundle_sequence{0}; std::vector streams{}; }; @@ -959,10 +1040,12 @@ MultiMcapRecordSink &MultiMcapRecordSink::operator=(MultiMcapRecordSink &&other) std::expected MultiMcapRecordSink::create( std::string path, - McapCompression compression) { + McapCompression compression, + std::string bundle_topic) { MultiMcapRecordSink sink{}; auto state = std::make_unique(); state->path = std::move(path); + state->bundle_topic = std::move(bundle_topic); mcap::McapWriterOptions options(""); options.compression = to_mcap_compression(compression); @@ -989,6 +1072,21 @@ std::expected MultiMcapRecordSink::create( state->writer.addSchema(depth_schema); state->depth_schema_id = depth_schema.id; + if (!state->bundle_topic.empty()) { + const auto bundle_descriptor_set = build_file_descriptor_set(cvmmap_streamer::BundleManifest::descriptor()); + std::string bundle_schema_bytes{}; + if (!bundle_descriptor_set.SerializeToString(&bundle_schema_bytes)) { + return std::unexpected("failed to serialize cvmmap_streamer.BundleManifest descriptor set"); + } + mcap::Schema bundle_schema("cvmmap_streamer.BundleManifest", "protobuf", bundle_schema_bytes); + state->writer.addSchema(bundle_schema); + state->bundle_schema_id = bundle_schema.id; + + mcap::Channel bundle_channel(state->bundle_topic, "protobuf", state->bundle_schema_id); + state->writer.addChannel(bundle_channel); + state->bundle_channel_id = bundle_channel.id; + } + const auto calibration_descriptor_set = build_file_descriptor_set(foxglove::CameraCalibration::descriptor()); std::string calibration_schema_bytes{}; if (!calibration_descriptor_set.SerializeToString(&calibration_schema_bytes)) { @@ -1237,6 +1335,18 @@ std::expected MultiMcapRecordSink::write_pose( return {}; } +std::expected MultiMcapRecordSink::write_bundle_manifest( + const RawBundleManifestView &bundle) { + if (state_ == nullptr) { + return std::unexpected("MCAP sink is not open"); + } + return write_bundle_manifest_message( + state_->writer, + state_->bundle_channel_id, + state_->bundle_sequence, + bundle); +} + std::expected MultiMcapRecordSink::write_body_tracking_message( const StreamId stream_id, const RawBodyTrackingMessageView &body_message) { diff --git a/src/testers/mcap_multi_record_tester.cpp b/src/testers/mcap_multi_record_tester.cpp index b7566ab..4768a0a 100644 --- a/src/testers/mcap_multi_record_tester.cpp +++ b/src/testers/mcap_multi_record_tester.cpp @@ -1,5 +1,6 @@ #include +#include "proto/cvmmap_streamer/BundleManifest.pb.h" #include "proto/cvmmap_streamer/DepthMap.pb.h" #include "cvmmap_streamer/common.h" #include "cvmmap_streamer/record/mcap_record_sink.hpp" @@ -73,7 +74,8 @@ int main(int argc, char **argv) { auto sink = cvmmap_streamer::record::MultiMcapRecordSink::create( output_path.string(), - compression); + compression, + "/bundle"); if (!sink) { spdlog::error("failed to create MCAP sink: {}", sink.error()); return exit_code(TesterExitCode::CreateError); @@ -125,6 +127,31 @@ int main(int argc, char **argv) { 0.0, 500.0, 240.0, 0.0, 0.0, 0.0, 1.0, 0.0, }; + const std::vector bundle_members{ + { + .camera_label = "zed1", + .status = cvmmap_streamer::record::BundleMemberStatus::Present, + .timestamp_ns = 100, + .delta_ns = -5, + }, + { + .camera_label = "zed2", + .status = cvmmap_streamer::record::BundleMemberStatus::CorruptedGap, + .timestamp_ns = std::nullopt, + .delta_ns = 0, + .corrupted_frames_skipped = 3, + }, + }; + + if (auto write = sink->write_bundle_manifest(cvmmap_streamer::record::RawBundleManifestView{ + .timestamp_ns = 105, + .bundle_index = 7, + .policy = cvmmap_streamer::record::BundlePolicy::Nearest, + .members = bundle_members, + }); !write) { + spdlog::error("failed to write bundle manifest: {}", write.error()); + return exit_code(TesterExitCode::WriteError); + } for (const auto [stream_id, label, pose_x] : { std::tuple{*zed1, std::string("zed1"), 1.0}, @@ -242,6 +269,30 @@ int main(int argc, char **argv) { continue; } + if (it->schema->name == "cvmmap_streamer.BundleManifest") { + cvmmap_streamer::BundleManifest bundle{}; + if (!bundle.ParseFromArray(it->message.data, static_cast(it->message.dataSize))) { + spdlog::error("failed to parse cvmmap_streamer.BundleManifest"); + reader.close(); + return exit_code(TesterExitCode::VerificationError); + } + if (bundle.bundle_index() != 7 || bundle.members_size() != 2) { + spdlog::error("bundle manifest contents mismatch"); + reader.close(); + return exit_code(TesterExitCode::VerificationError); + } + if (bundle.members(0).status() != cvmmap_streamer::BundleManifest::BUNDLE_MEMBER_STATUS_PRESENT || + !bundle.members(0).has_timestamp() || + bundle.members(1).status() != cvmmap_streamer::BundleManifest::BUNDLE_MEMBER_STATUS_CORRUPTED_GAP || + bundle.members(1).has_timestamp() || + bundle.members(1).corrupted_frames_skipped() != 3) { + spdlog::error("bundle manifest member status mismatch"); + reader.close(); + return exit_code(TesterExitCode::VerificationError); + } + continue; + } + if (it->schema->name == "foxglove.CameraCalibration") { foxglove::CameraCalibration calibration{}; if (!calibration.ParseFromArray(it->message.data, static_cast(it->message.dataSize))) { @@ -266,6 +317,7 @@ int main(int argc, char **argv) { reader.close(); for (const auto &topic : { + "/bundle", "/zed1/video", "/zed1/depth", "/zed1/calibration", diff --git a/src/tools/zed_svo_to_mcap.cpp b/src/tools/zed_svo_to_mcap.cpp index b952400..302cdc5 100644 --- a/src/tools/zed_svo_to_mcap.cpp +++ b/src/tools/zed_svo_to_mcap.cpp @@ -55,6 +55,8 @@ struct CliOptions { std::string mcap_compression{"zstd"}; std::string depth_mode{"neural_plus"}; std::string depth_size{"optimal"}; + std::string bundle_policy{"nearest"}; + std::string bundle_topic{"/bundle"}; bool with_pose{false}; std::uint32_t start_frame{0}; std::uint32_t end_frame{0}; @@ -126,6 +128,7 @@ struct CameraStream { TrackingSample next_tracking{}; std::uint64_t current_timestamp_ns{0}; std::uint64_t next_timestamp_ns{0}; + std::uint32_t next_corrupted_frames_skipped{0}; std::uint64_t first_timestamp_ns{0}; std::uint64_t last_timestamp_ns{0}; std::uint64_t total_frames{0}; @@ -152,6 +155,24 @@ struct CameraStream { CalibrationData depth_calibration{}; }; +struct BundledFrameSelection { + std::size_t stream_index{0}; + cvmmap_streamer::record::BundleMemberStatus status{ + cvmmap_streamer::record::BundleMemberStatus::Present, + }; + bool use_next{false}; + std::optional timestamp_ns{}; + std::int64_t delta_ns{0}; + std::uint32_t corrupted_frames_skipped{0}; +}; + +struct GrabResult { + int svo_position{-1}; + std::uint32_t corrupted_frames_skipped{0}; + int first_corrupted_position{-1}; + int last_corrupted_position{-1}; +}; + [[nodiscard]] constexpr int exit_code(const ToolExitCode code) { return static_cast(code); @@ -302,6 +323,18 @@ std::expected parse_depth_size(const std::string_vi return sl::Resolution(std::stoi(match[1].str()), std::stoi(match[2].str())); } +[[nodiscard]] +std::expected parse_bundle_policy(const std::string_view raw) { + const auto normalized = lowercase(std::string(raw)); + if (normalized == "nearest") { + return cvmmap_streamer::record::BundlePolicy::Nearest; + } + if (normalized == "strict") { + return cvmmap_streamer::record::BundlePolicy::Strict; + } + return std::unexpected("invalid bundle policy: '" + std::string(raw) + "' (expected: nearest|strict)"); +} + [[nodiscard]] std::uint64_t frame_period_ns(const float fps) { if (!(fps > 0.0f)) { @@ -659,36 +692,118 @@ void maybe_log_tracking_state(CameraStream &stream, const sl::POSITIONAL_TRACKIN } [[nodiscard]] -std::expected read_frame_data( +std::expected grab_next_readable_frame( + sl::Camera &camera, + const sl::RuntimeParameters &runtime, + const std::string_view source_label, + const std::string_view source_path, + const std::uint64_t total_frames, + const std::optional current_timestamp_ns, + const std::optional last_timestamp_ns) { + GrabResult result{}; + while (true) { + const auto grab_status = camera.grab(runtime); + const auto svo_position = camera.getSVOPosition(); + if (grab_status == sl::ERROR_CODE::END_OF_SVOFILE_REACHED) { + return std::unexpected("end-of-svo"); + } + if (grab_status == sl::ERROR_CODE::CORRUPTED_FRAME) { + if (result.first_corrupted_position < 0) { + result.first_corrupted_position = svo_position; + } + result.last_corrupted_position = svo_position; + result.corrupted_frames_skipped += 1; + + const auto last_frame_position = total_frames > 0 + ? static_cast(total_frames - 1) + : 0; + const bool at_or_past_tail = svo_position >= last_frame_position; + const bool beyond_last_readable = + last_timestamp_ns.has_value() && + current_timestamp_ns.has_value() && + *current_timestamp_ns >= *last_timestamp_ns; + if (at_or_past_tail || beyond_last_readable) { + spdlog::warn( + "treating {} unreadable tail frame(s) as end-of-svo for {} last_corrupted_position={} current_timestamp_ns={} last_timestamp_ns={}", + result.corrupted_frames_skipped, + source_path, + svo_position, + current_timestamp_ns.value_or(0), + last_timestamp_ns.value_or(0)); + return std::unexpected("end-of-svo"); + } + + if (svo_position < 0) { + return std::unexpected( + "failed to advance past corrupted frame for " + std::string(source_label) + + ": invalid svo_position=" + std::to_string(svo_position)); + } + + const auto next_position = svo_position + 1; + if (next_position >= static_cast(total_frames)) { + spdlog::warn( + "treating {} unreadable tail frame(s) as end-of-svo for {} last_corrupted_position={} current_timestamp_ns={} last_timestamp_ns={}", + result.corrupted_frames_skipped, + source_path, + svo_position, + current_timestamp_ns.value_or(0), + last_timestamp_ns.value_or(0)); + return std::unexpected("end-of-svo"); + } + camera.setSVOPosition(next_position); + continue; + } + if (grab_status != sl::ERROR_CODE::SUCCESS) { + return std::unexpected( + "failed to grab frame for " + std::string(source_label) + + ": " + zed_status_string(grab_status) + + " svo_position=" + std::to_string(svo_position) + + " total_frames=" + std::to_string(total_frames) + + " current_timestamp_ns=" + std::to_string(current_timestamp_ns.value_or(0)) + + " last_timestamp_ns=" + std::to_string(last_timestamp_ns.value_or(0)) + + " corrupted_frames_skipped=" + std::to_string(result.corrupted_frames_skipped)); + } + + result.svo_position = svo_position; + return result; + } +} + +void maybe_log_recovered_corruption_gap( + const std::string_view source_label, + const GrabResult &grab_result, + const std::uint64_t recovered_timestamp_ns) { + if (grab_result.corrupted_frames_skipped == 0) { + return; + } + spdlog::warn( + "recovered {} after skipping {} corrupted frame(s) positions={}..{} recovered_position={} recovered_timestamp_ns={}", + source_label, + grab_result.corrupted_frames_skipped, + grab_result.first_corrupted_position, + grab_result.last_corrupted_position, + grab_result.svo_position, + recovered_timestamp_ns); +} + +[[nodiscard]] +std::expected read_frame_data( CameraStream &stream, sl::Mat &left_frame, sl::Mat &depth_frame, TrackingSample &tracking_sample, std::optional fallback_timestamp_ns, std::uint64_t ×tamp_ns_out) { - const auto grab_status = stream.camera->grab(stream.runtime); - if (grab_status == sl::ERROR_CODE::END_OF_SVOFILE_REACHED) { - return std::unexpected("end-of-svo"); - } - if (grab_status == sl::ERROR_CODE::CORRUPTED_FRAME && - stream.last_timestamp_ns != 0 && - stream.current_timestamp_ns >= stream.last_timestamp_ns) { - spdlog::warn( - "treating unreadable tail frame as end-of-svo for {} current_timestamp_ns={} last_timestamp_ns={}", - stream.source.path.string(), - stream.current_timestamp_ns, - stream.last_timestamp_ns); - return std::unexpected("end-of-svo"); - } - if (grab_status != sl::ERROR_CODE::SUCCESS) { - const auto svo_position = stream.camera->getSVOPosition(); - return std::unexpected( - "failed to grab frame for " + stream.source.label + - ": " + zed_status_string(grab_status) + - " svo_position=" + std::to_string(svo_position) + - " total_frames=" + std::to_string(stream.total_frames) + - " current_timestamp_ns=" + std::to_string(stream.current_timestamp_ns) + - " last_timestamp_ns=" + std::to_string(stream.last_timestamp_ns)); + auto grab = grab_next_readable_frame( + *stream.camera, + stream.runtime, + stream.source.label, + stream.source.path.string(), + stream.total_frames, + stream.current_timestamp_ns == 0 ? std::nullopt : std::optional{stream.current_timestamp_ns}, + stream.last_timestamp_ns == 0 ? std::nullopt : std::optional{stream.last_timestamp_ns}); + if (!grab) { + return std::unexpected(grab.error()); } const auto image_status = stream.camera->retrieveImage(left_frame, sl::VIEW::LEFT_BGR, sl::MEM::CPU); @@ -718,6 +833,7 @@ std::expected read_frame_data( return std::unexpected(timestamp_ns.error()); } timestamp_ns_out = *timestamp_ns; + maybe_log_recovered_corruption_gap(stream.source.label, *grab, timestamp_ns_out); tracking_sample = {}; if (stream.tracking_enabled) { @@ -741,7 +857,7 @@ std::expected read_frame_data( }; } } - return {}; + return *grab; } [[nodiscard]] @@ -756,11 +872,13 @@ std::expected fill_next_frame(CameraStream &stream) { if (!next) { if (next.error() == "end-of-svo") { stream.has_next = false; + stream.next_corrupted_frames_skipped = 0; return {}; } return std::unexpected(next.error()); } stream.has_next = true; + stream.next_corrupted_frames_skipped = next->corrupted_frames_skipped; return {}; } @@ -775,6 +893,7 @@ std::expected promote_next_frame(CameraStream &stream) { std::swap(stream.current_tracking, stream.next_tracking); std::swap(stream.current_timestamp_ns, stream.next_timestamp_ns); stream.has_next = false; + stream.next_corrupted_frames_skipped = 0; return fill_next_frame(stream); } @@ -1049,30 +1168,51 @@ std::expected sync_streams_to_timestamp( if (log_shutdown_request(shutdown_logged, "multi-camera sync")) { return std::unexpected("interrupted"); } - stream.sync_position = stream.camera->getSVOPositionAtTimestamp(sl::Timestamp{effective_start_ts}); - if (stream.sync_position < 0) { + const auto sdk_position = stream.camera->getSVOPositionAtTimestamp(sl::Timestamp{effective_start_ts}); + if (sdk_position < 0) { return std::unexpected( "failed to compute synced start frame for " + stream.source.path.string() + " at timestamp " + std::to_string(effective_start_ts)); } + stream.sync_position = std::clamp( + sdk_position, + 0, + static_cast(stream.total_frames > 0 ? stream.total_frames - 1 : 0)); - stream.camera->setSVOPosition(stream.sync_position); - auto current = read_frame_data( - stream, - stream.current_left_frame, - stream.current_depth_frame, - stream.current_tracking, - std::nullopt, - stream.current_timestamp_ns); - if (!current) { - return std::unexpected(current.error()); - } - auto next = fill_next_frame(stream); - if (!next) { - return std::unexpected(next.error()); + const auto read_at_position = [&](const int position) -> std::expected { + stream.camera->setSVOPosition(position); + auto current = read_frame_data( + stream, + stream.current_left_frame, + stream.current_depth_frame, + stream.current_tracking, + std::nullopt, + stream.current_timestamp_ns); + if (!current) { + return std::unexpected(current.error()); + } + auto next = fill_next_frame(stream); + if (!next) { + return std::unexpected(next.error()); + } + return {}; + }; + + if (auto loaded = read_at_position(stream.sync_position); !loaded) { + return std::unexpected(loaded.error()); } - while (stream.current_timestamp_ns < effective_start_ts && stream.has_next) { + while (stream.sync_position > 0 && stream.current_timestamp_ns > effective_start_ts) { + if (log_shutdown_request(shutdown_logged, "multi-camera sync")) { + return std::unexpected("interrupted"); + } + stream.sync_position -= 1; + if (auto loaded = read_at_position(stream.sync_position); !loaded) { + return std::unexpected(loaded.error()); + } + } + + while (stream.has_next && stream.next_timestamp_ns <= effective_start_ts) { if (log_shutdown_request(shutdown_logged, "multi-camera sync")) { return std::unexpected("interrupted"); } @@ -1107,6 +1247,119 @@ bool have_sync_window(const std::vector &streams, const std::uint6 return true; } +[[nodiscard]] +std::expected advance_streams_to_timestamp( + std::vector &streams, + const std::uint64_t target_timestamp_ns) { + for (auto &stream : streams) { + while (stream.has_next && stream.next_timestamp_ns <= target_timestamp_ns) { + stream.dropped_frames += 1; + auto promote = promote_next_frame(stream); + if (!promote) { + return std::unexpected(promote.error()); + } + } + } + return {}; +} + +[[nodiscard]] +std::uint64_t timestamp_delta_abs(const std::uint64_t left, const std::uint64_t right) { + return left >= right ? left - right : right - left; +} + +[[nodiscard]] +std::expected, std::string> select_nearest_bundle( + const std::vector &streams, + const std::uint64_t bundle_timestamp_ns, + const std::uint64_t common_end_ts) { + std::vector selections{}; + selections.reserve(streams.size()); + + for (std::size_t stream_index = 0; stream_index < streams.size(); ++stream_index) { + const auto &stream = streams[stream_index]; + if (stream.current_timestamp_ns > common_end_ts) { + return std::unexpected("no bundle frames remain within the common overlap window"); + } + + const bool blocked_by_corruption_gap = + stream.has_next && + stream.next_corrupted_frames_skipped > 0 && + bundle_timestamp_ns > stream.current_timestamp_ns && + bundle_timestamp_ns < stream.next_timestamp_ns; + if (blocked_by_corruption_gap) { + selections.push_back(BundledFrameSelection{ + .stream_index = stream_index, + .status = cvmmap_streamer::record::BundleMemberStatus::CorruptedGap, + .use_next = false, + .timestamp_ns = std::nullopt, + .delta_ns = 0, + .corrupted_frames_skipped = stream.next_corrupted_frames_skipped, + }); + continue; + } + + bool use_next = false; + std::uint64_t selected_timestamp_ns = stream.current_timestamp_ns; + if (stream.has_next && stream.next_timestamp_ns <= common_end_ts) { + const auto current_delta = timestamp_delta_abs(stream.current_timestamp_ns, bundle_timestamp_ns); + const auto next_delta = timestamp_delta_abs(stream.next_timestamp_ns, bundle_timestamp_ns); + if (next_delta <= current_delta) { + use_next = true; + selected_timestamp_ns = stream.next_timestamp_ns; + } + } + + selections.push_back(BundledFrameSelection{ + .stream_index = stream_index, + .status = cvmmap_streamer::record::BundleMemberStatus::Present, + .use_next = use_next, + .timestamp_ns = selected_timestamp_ns, + .delta_ns = static_cast(selected_timestamp_ns) - + static_cast(bundle_timestamp_ns), + .corrupted_frames_skipped = 0, + }); + } + + return selections; +} + +[[nodiscard]] +std::vector make_strict_bundle( + const std::vector &streams, + const std::uint64_t bundle_timestamp_ns) { + std::vector selections{}; + selections.reserve(streams.size()); + for (std::size_t stream_index = 0; stream_index < streams.size(); ++stream_index) { + const auto &stream = streams[stream_index]; + selections.push_back(BundledFrameSelection{ + .stream_index = stream_index, + .status = cvmmap_streamer::record::BundleMemberStatus::Present, + .use_next = false, + .timestamp_ns = stream.current_timestamp_ns, + .delta_ns = static_cast(stream.current_timestamp_ns) - + static_cast(bundle_timestamp_ns), + .corrupted_frames_skipped = 0, + }); + } + return selections; +} + +[[nodiscard]] +const sl::Mat &selected_left_frame(const CameraStream &stream, const BundledFrameSelection &selection) { + return selection.use_next ? stream.next_left_frame : stream.current_left_frame; +} + +[[nodiscard]] +const sl::Mat &selected_depth_frame(const CameraStream &stream, const BundledFrameSelection &selection) { + return selection.use_next ? stream.next_depth_frame : stream.current_depth_frame; +} + +[[nodiscard]] +const TrackingSample &selected_tracking(const CameraStream &stream, const BundledFrameSelection &selection) { + return selection.use_next ? stream.next_tracking : stream.current_tracking; +} + [[nodiscard]] std::expected, std::string> next_synced_group_timestamp( std::vector &streams, @@ -1167,15 +1420,66 @@ std::expected encode_and_write_group( cvmmap_streamer::record::MultiMcapRecordSink &sink, std::vector &streams, const CliOptions &options, - const std::uint64_t group_timestamp_ns) { - for (auto &stream : streams) { - const auto video_step_bytes = stream.current_left_frame.getStepBytes(sl::MEM::CPU); + const cvmmap_streamer::record::BundlePolicy bundle_policy, + const std::uint64_t bundle_index, + const std::uint64_t bundle_timestamp_ns, + const std::span selections) { + if (selections.size() != streams.size()) { + return std::unexpected("bundle selection size does not match stream count"); + } + + std::vector bundle_members{}; + bundle_members.reserve(selections.size()); + for (const auto &selection : selections) { + const auto &stream = streams[selection.stream_index]; + bundle_members.push_back(cvmmap_streamer::record::RawBundleMemberView{ + .camera_label = stream.source.label, + .status = selection.status, + .timestamp_ns = selection.timestamp_ns, + .delta_ns = selection.delta_ns, + .corrupted_frames_skipped = selection.corrupted_frames_skipped, + }); + } + if (auto write = sink.write_bundle_manifest(cvmmap_streamer::record::RawBundleManifestView{ + .timestamp_ns = bundle_timestamp_ns, + .bundle_index = bundle_index, + .policy = bundle_policy, + .members = bundle_members, + }); !write) { + return std::unexpected("failed to write bundle manifest: " + write.error()); + } + + std::vector ordered_selections{}; + ordered_selections.reserve(selections.size()); + for (const auto &selection : selections) { + if (selection.status == cvmmap_streamer::record::BundleMemberStatus::Present && + selection.timestamp_ns.has_value()) { + ordered_selections.push_back(selection); + } + } + std::sort( + ordered_selections.begin(), + ordered_selections.end(), + [](const auto &left, const auto &right) { + if (left.timestamp_ns != right.timestamp_ns) { + return left.timestamp_ns < right.timestamp_ns; + } + return left.stream_index < right.stream_index; + }); + + for (const auto &selection : ordered_selections) { + auto &stream = streams[selection.stream_index]; + const auto &left_frame = selected_left_frame(stream, selection); + const auto &depth_frame = selected_depth_frame(stream, selection); + const auto &tracking = selected_tracking(stream, selection); + + const auto video_step_bytes = left_frame.getStepBytes(sl::MEM::CPU); const auto video_bytes = std::span( - stream.current_left_frame.getPtr(sl::MEM::CPU), - video_step_bytes * stream.current_left_frame.getHeight()); + left_frame.getPtr(sl::MEM::CPU), + video_step_bytes * left_frame.getHeight()); cvmmap_streamer::encode::RawVideoFrame raw_video{ .info = stream.frame_info, - .source_timestamp_ns = group_timestamp_ns, + .source_timestamp_ns = *selection.timestamp_ns, .row_stride_bytes = video_step_bytes, .bytes = video_bytes, }; @@ -1196,7 +1500,7 @@ std::expected encode_and_write_group( if (!stream.calibration_written) { cvmmap_streamer::record::RawCameraCalibrationView calibration{ - .timestamp_ns = group_timestamp_ns, + .timestamp_ns = *selection.timestamp_ns, .width = stream.video_calibration.width, .height = stream.video_calibration.height, .distortion_model = "plumb_bob", @@ -1214,7 +1518,7 @@ std::expected encode_and_write_group( if (!stream.depth_calibration_written && (stream.depth_width != stream.width || stream.depth_height != stream.height)) { cvmmap_streamer::record::RawCameraCalibrationView depth_calibration{ - .timestamp_ns = group_timestamp_ns, + .timestamp_ns = *selection.timestamp_ns, .width = stream.depth_calibration.width, .height = stream.depth_calibration.height, .distortion_model = "plumb_bob", @@ -1229,15 +1533,15 @@ std::expected encode_and_write_group( stream.depth_calibration_written = true; } - const auto depth_width = static_cast(stream.current_depth_frame.getWidth()); - const auto depth_height = static_cast(stream.current_depth_frame.getHeight()); + const auto depth_width = static_cast(depth_frame.getWidth()); + const auto depth_height = static_cast(depth_frame.getHeight()); if (depth_width != stream.depth_width || depth_height != stream.depth_height) { return std::unexpected( "depth resolution changed unexpectedly for " + stream.source.label + ": " + std::to_string(depth_width) + "x" + std::to_string(depth_height) + " vs " + std::to_string(stream.depth_width) + "x" + std::to_string(stream.depth_height)); } - const auto depth_step_bytes = stream.current_depth_frame.getStepBytes(sl::MEM::CPU); + const auto depth_step_bytes = depth_frame.getStepBytes(sl::MEM::CPU); const auto packed_depth_bytes = static_cast(depth_width) * sizeof(std::uint16_t); if (depth_step_bytes < packed_depth_bytes) { return std::unexpected( @@ -1249,15 +1553,15 @@ std::expected encode_and_write_group( std::span depth_pixels{}; if (depth_step_bytes == packed_depth_bytes) { depth_pixels = std::span( - stream.current_depth_frame.getPtr(sl::MEM::CPU), + depth_frame.getPtr(sl::MEM::CPU), static_cast(depth_width) * static_cast(depth_height)); } else { - compact_depth = copy_compact_u16_plane(stream.current_depth_frame); + compact_depth = copy_compact_u16_plane(depth_frame); depth_pixels = *compact_depth; } cvmmap_streamer::record::RawDepthMapU16View depth_map{ - .timestamp_ns = group_timestamp_ns, + .timestamp_ns = *selection.timestamp_ns, .width = depth_width, .height = depth_height, .pixels = depth_pixels, @@ -1266,12 +1570,12 @@ std::expected encode_and_write_group( return std::unexpected("failed to write depth map for " + stream.source.label + ": " + write.error()); } - if (options.with_pose && stream.current_tracking.has_pose) { + if (options.with_pose && tracking.has_pose) { cvmmap_streamer::record::RawPoseView pose_view{ - .timestamp_ns = group_timestamp_ns, + .timestamp_ns = *selection.timestamp_ns, .reference_frame_id = pose_reference_frame_id(stream.pose_tracking, stream.source.label), - .position = stream.current_tracking.position, - .orientation = stream.current_tracking.orientation, + .position = tracking.position, + .orientation = tracking.orientation, }; if (auto write = sink.write_pose(stream.mcap_stream_id, pose_view); !write) { return std::unexpected("failed to write pose for " + stream.source.label + ": " + write.error()); @@ -1281,6 +1585,23 @@ std::expected encode_and_write_group( return {}; } +[[nodiscard]] +std::expected advance_after_nearest_emit( + std::vector &streams, + const std::span selections) { + for (const auto &selection : selections) { + if (selection.status != cvmmap_streamer::record::BundleMemberStatus::Present || !selection.use_next) { + continue; + } + auto &stream = streams[selection.stream_index]; + auto promote = promote_next_frame(stream); + if (!promote) { + return std::unexpected(promote.error()); + } + } + return {}; +} + [[nodiscard]] std::expected advance_after_emit(std::vector &streams) { for (auto &stream : streams) { @@ -1552,31 +1873,33 @@ int run_single_source( const auto total_frames_to_emit = static_cast(last_frame - options.start_frame + 1); ProgressBar progress{total_frames_to_emit}; - while (options.start_frame + emitted_frames <= last_frame) { + while (true) { if (log_shutdown_request(shutdown_logged, "single-camera export")) { interrupted = true; break; } - const auto grab_status = camera.grab(runtime_parameters); - if (grab_status == sl::ERROR_CODE::END_OF_SVOFILE_REACHED) { + auto grab = grab_next_readable_frame( + camera, + runtime_parameters, + input_path.filename().string(), + input_path.string(), + static_cast(total_frames), + last_timestamp_ns, + std::nullopt); + if (!grab && grab.error() == "end-of-svo") { break; } - if (grab_status == sl::ERROR_CODE::CORRUPTED_FRAME && - camera.getSVOPosition() >= static_cast(last_frame)) { - spdlog::warn( - "treating unreadable tail frame as end-of-svo for {} at frame {}", - input_path.string(), - camera.getSVOPosition()); - break; - } - if (grab_status != sl::ERROR_CODE::SUCCESS) { + if (!grab) { progress.finish(emitted_frames, false); sink->close(); backend->shutdown(); close_camera(); - spdlog::error("failed to grab SVO frame: {}", zed_status_string(grab_status)); + spdlog::error("{}", grab.error()); return exit_code(ToolExitCode::RuntimeError); } + if (grab->svo_position > static_cast(last_frame)) { + break; + } const auto image_status = camera.retrieveImage(left_frame, sl::VIEW::LEFT_BGR, sl::MEM::CPU); if (image_status != sl::ERROR_CODE::SUCCESS) { @@ -1621,6 +1944,7 @@ int run_single_source( if (last_timestamp_ns && timestamp_ns <= *last_timestamp_ns) { timestamp_ns = *last_timestamp_ns + 1; } + maybe_log_recovered_corruption_gap(input_path.filename().string(), *grab, timestamp_ns); last_timestamp_ns = timestamp_ns; const auto video_step_bytes = left_frame.getStepBytes(sl::MEM::CPU); @@ -1783,7 +2107,7 @@ int run_single_source( spdlog::warn( "pose tracking state changed to {} at frame {}", zed_tracking_state_string(tracking_state), - options.start_frame + emitted_frames); + grab->svo_position); } } if (tracking_state == sl::POSITIONAL_TRACKING_STATE::OK) { @@ -1861,10 +2185,11 @@ int run_multi_source( const cvmmap_streamer::McapCompression compression, const sl::DEPTH_MODE depth_mode, const sl::Resolution depth_size, - const PoseTrackingOptions &pose_tracking) { + const PoseTrackingOptions &pose_tracking, + const cvmmap_streamer::record::BundlePolicy bundle_policy) { if (options.has_end_frame && options.end_frame < options.start_frame) { spdlog::error( - "invalid bundled group range: start-frame={} end-frame={}", + "invalid bundled range: start-frame={} end-frame={}", options.start_frame, options.end_frame); return exit_code(ToolExitCode::UsageError); @@ -1915,21 +2240,68 @@ int run_multi_source( [](const auto &left, const auto &right) { return left.nominal_frame_period_ns < right.nominal_frame_period_ns; })->nominal_frame_period_ns; + const auto bundle_policy_name = bundle_policy == cvmmap_streamer::record::BundlePolicy::Nearest + ? "nearest" + : "strict"; const auto tolerance_ns = options.has_sync_tolerance ? static_cast(std::llround(options.sync_tolerance_ms * 1'000'000.0)) - : std::max(1, slowest_period_ns / 4); + : std::max(1, slowest_period_ns); spdlog::info( - "multi-camera sync window start_ts={} end_ts={} tolerance_ns={}", + "multi-camera bundle window start_ts={} end_ts={} policy={} bundle_period_ns={} tolerance_ns={}", common_start_ts, common_end_ts, + bundle_policy_name, + slowest_period_ns, tolerance_ns); const auto render_progress = stderr_supports_progress_bar(); - const std::uint64_t selected_total_groups = options.has_end_frame - ? static_cast(options.end_frame - options.start_frame) + 1 + const auto total_timeline_bundles = common_end_ts >= common_start_ts + ? ((common_end_ts - common_start_ts) / slowest_period_ns) + 1 : 0; - const bool exact_group_progress = render_progress && options.has_end_frame; - const bool approximate_time_progress = render_progress && !options.has_end_frame; - if (auto synced = sync_streams_to_timestamp(streams, common_start_ts); !synced) { + if (bundle_policy == cvmmap_streamer::record::BundlePolicy::Nearest && + options.start_frame >= total_timeline_bundles) { + close_camera_streams(streams); + spdlog::error( + "start-frame {} is out of range for bundled multi-camera mode (available bundles: {})", + options.start_frame, + total_timeline_bundles); + return exit_code(ToolExitCode::UsageError); + } + if (bundle_policy == cvmmap_streamer::record::BundlePolicy::Nearest && + options.has_end_frame && + options.end_frame >= total_timeline_bundles) { + close_camera_streams(streams); + spdlog::error( + "end-frame {} is out of range for bundled multi-camera mode (available bundles: {})", + options.end_frame, + total_timeline_bundles); + return exit_code(ToolExitCode::UsageError); + } + + const auto selected_end_bundle = options.has_end_frame + ? static_cast(options.end_frame) + : (bundle_policy == cvmmap_streamer::record::BundlePolicy::Nearest + ? total_timeline_bundles - 1 + : 0); + const auto selected_total_groups = options.has_end_frame + ? static_cast(options.end_frame - options.start_frame) + 1 + : (bundle_policy == cvmmap_streamer::record::BundlePolicy::Nearest + ? total_timeline_bundles - static_cast(options.start_frame) + : 0); + const bool exact_group_progress = + render_progress && + (bundle_policy == cvmmap_streamer::record::BundlePolicy::Nearest || options.has_end_frame); + const bool approximate_time_progress = + render_progress && + bundle_policy == cvmmap_streamer::record::BundlePolicy::Strict && + !options.has_end_frame; + ProgressBar progress{exact_group_progress ? selected_total_groups : 0}; + double last_progress_fraction = 0.0; + std::string last_progress_detail{}; + + const auto initial_target_ts = bundle_policy == cvmmap_streamer::record::BundlePolicy::Nearest + ? common_start_ts + static_cast(options.start_frame) * slowest_period_ns + : common_start_ts; + if (auto synced = sync_streams_to_timestamp(streams, initial_target_ts); !synced) { close_camera_streams(streams); if (synced.error() == "interrupted") { return interrupted_exit_code(); @@ -1937,8 +2309,8 @@ int run_multi_source( spdlog::error("{}", synced.error()); return exit_code(ToolExitCode::RuntimeError); } - auto effective_progress_start_ts = common_start_ts; - if (options.start_frame > 0) { + auto effective_progress_start_ts = initial_target_ts; + if (bundle_policy == cvmmap_streamer::record::BundlePolicy::Strict && options.start_frame > 0) { if (auto skipped_to = skip_bundled_start_groups(streams, options.start_frame, tolerance_ns, common_end_ts); !skipped_to) { close_camera_streams(streams); if (skipped_to.error() == "interrupted") { @@ -1950,11 +2322,11 @@ int run_multi_source( effective_progress_start_ts = *skipped_to; } } - ProgressBar progress{exact_group_progress ? selected_total_groups : 0}; - double last_progress_fraction = 0.0; - std::string last_progress_detail{}; - auto sink = cvmmap_streamer::record::MultiMcapRecordSink::create(output_path.string(), compression); + auto sink = cvmmap_streamer::record::MultiMcapRecordSink::create( + output_path.string(), + compression, + options.bundle_topic); if (!sink) { if (approximate_time_progress) { progress.finish_fraction(last_progress_fraction, false, last_progress_detail); @@ -1978,67 +2350,121 @@ int run_multi_source( } std::uint64_t emitted_groups{0}; - while (true) { - if (log_shutdown_request(shutdown_logged, "multi-camera export")) { - interrupted = true; - break; - } - auto group_timestamp = next_synced_group_timestamp(streams, tolerance_ns, common_end_ts); - if (!group_timestamp) { - if (approximate_time_progress) { - progress.finish_fraction(last_progress_fraction, false, last_progress_detail); - } else { - progress.finish(emitted_groups, false); - } - sink->close(); - close_camera_streams(streams); - if (group_timestamp.error() == "interrupted") { - return interrupted_exit_code(); - } - spdlog::error("sync failed: {}", group_timestamp.error()); - return exit_code(ToolExitCode::RuntimeError); - } - if (!*group_timestamp) { - break; - } - - if (auto write = encode_and_write_group(*sink, streams, options, **group_timestamp); !write) { - if (approximate_time_progress) { - progress.finish_fraction(last_progress_fraction, false, last_progress_detail); - } else { - progress.finish(emitted_groups, false); - } - sink->close(); - close_camera_streams(streams); - spdlog::error("{}", write.error()); - return exit_code(ToolExitCode::RuntimeError); - } - emitted_groups += 1; - if (approximate_time_progress) { - last_progress_fraction = bundled_progress_fraction(effective_progress_start_ts, common_end_ts, **group_timestamp); - last_progress_detail = bundled_progress_detail(effective_progress_start_ts, common_end_ts, **group_timestamp); - progress.update_fraction(last_progress_fraction, last_progress_detail); - } else { - progress.update(emitted_groups); - } - if (options.has_end_frame && emitted_groups >= selected_total_groups) { - break; - } - - auto advance = advance_after_emit(streams); - if (!advance) { - if (advance.error() == "end-of-svo") { + if (bundle_policy == cvmmap_streamer::record::BundlePolicy::Nearest) { + for (std::uint64_t bundle_index = options.start_frame; bundle_index <= selected_end_bundle; ++bundle_index) { + if (log_shutdown_request(shutdown_logged, "multi-camera export")) { + interrupted = true; break; } - if (approximate_time_progress) { - progress.finish_fraction(last_progress_fraction, false, last_progress_detail); - } else { + const auto bundle_timestamp_ns = common_start_ts + bundle_index * slowest_period_ns; + if (auto advanced = advance_streams_to_timestamp(streams, bundle_timestamp_ns); !advanced) { progress.finish(emitted_groups, false); + sink->close(); + close_camera_streams(streams); + spdlog::error("{}", advanced.error()); + return exit_code(ToolExitCode::RuntimeError); + } + auto selections = select_nearest_bundle(streams, bundle_timestamp_ns, common_end_ts); + if (!selections) { + break; + } + + if (auto write = encode_and_write_group( + *sink, + streams, + options, + bundle_policy, + bundle_index, + bundle_timestamp_ns, + *selections); !write) { + progress.finish(emitted_groups, false); + sink->close(); + close_camera_streams(streams); + spdlog::error("{}", write.error()); + return exit_code(ToolExitCode::RuntimeError); + } + if (auto advance = advance_after_nearest_emit(streams, *selections); !advance) { + progress.finish(emitted_groups, false); + sink->close(); + close_camera_streams(streams); + spdlog::error("{}", advance.error()); + return exit_code(ToolExitCode::RuntimeError); + } + emitted_groups += 1; + progress.update(emitted_groups); + } + } else { + while (true) { + if (log_shutdown_request(shutdown_logged, "multi-camera export")) { + interrupted = true; + break; + } + auto group_timestamp = next_synced_group_timestamp(streams, tolerance_ns, common_end_ts); + if (!group_timestamp) { + if (approximate_time_progress) { + progress.finish_fraction(last_progress_fraction, false, last_progress_detail); + } else { + progress.finish(emitted_groups, false); + } + sink->close(); + close_camera_streams(streams); + if (group_timestamp.error() == "interrupted") { + return interrupted_exit_code(); + } + spdlog::error("sync failed: {}", group_timestamp.error()); + return exit_code(ToolExitCode::RuntimeError); + } + if (!*group_timestamp) { + break; + } + + const auto bundle_index = static_cast(options.start_frame) + emitted_groups; + const auto selections = make_strict_bundle(streams, **group_timestamp); + if (auto write = encode_and_write_group( + *sink, + streams, + options, + bundle_policy, + bundle_index, + **group_timestamp, + selections); !write) { + if (approximate_time_progress) { + progress.finish_fraction(last_progress_fraction, false, last_progress_detail); + } else { + progress.finish(emitted_groups, false); + } + sink->close(); + close_camera_streams(streams); + spdlog::error("{}", write.error()); + return exit_code(ToolExitCode::RuntimeError); + } + emitted_groups += 1; + if (approximate_time_progress) { + last_progress_fraction = bundled_progress_fraction(effective_progress_start_ts, common_end_ts, **group_timestamp); + last_progress_detail = bundled_progress_detail(effective_progress_start_ts, common_end_ts, **group_timestamp); + progress.update_fraction(last_progress_fraction, last_progress_detail); + } else { + progress.update(emitted_groups); + } + if (options.has_end_frame && emitted_groups >= selected_total_groups) { + break; + } + + auto advance = advance_after_emit(streams); + if (!advance) { + if (advance.error() == "end-of-svo") { + break; + } + if (approximate_time_progress) { + progress.finish_fraction(last_progress_fraction, false, last_progress_detail); + } else { + progress.finish(emitted_groups, false); + } + sink->close(); + close_camera_streams(streams); + spdlog::error("{}", advance.error()); + return exit_code(ToolExitCode::RuntimeError); } - sink->close(); - close_camera_streams(streams); - spdlog::error("{}", advance.error()); - return exit_code(ToolExitCode::RuntimeError); } } @@ -2083,15 +2509,16 @@ int run_multi_source( remove_error.message()); } spdlog::error( - "no synced frame groups were found across {} camera(s) for '{}'", + "no bundled frame groups were found across {} camera(s) for '{}' using policy={}", sources.size(), - output_path.string()); + output_path.string(), + bundle_policy_name); return exit_code(ToolExitCode::RuntimeError); } if (interrupted) { spdlog::warn( - "gracefully stopped after writing {} synced frame group(s) across {} camera(s) to '{}'", + "gracefully stopped after writing {} bundled frame group(s) across {} camera(s) to '{}'", emitted_groups, sources.size(), output_path.string()); @@ -2099,10 +2526,11 @@ int run_multi_source( } spdlog::info( - "wrote {} synced frame group(s) across {} camera(s) to '{}'", + "wrote {} bundled frame group(s) across {} camera(s) to '{}' using policy={}", emitted_groups, sources.size(), - output_path.string()); + output_path.string(), + bundle_policy_name); return exit_code(ToolExitCode::Success); } @@ -2127,20 +2555,23 @@ int main(int argc, char **argv) { ->check(CLI::IsMember({"neural_light", "neural", "neural_plus"})); app.add_option("--depth-size", options.depth_size, "Depth output size (optimal|native|x)") ->default_val("optimal"); + app.add_option("--bundle-policy", options.bundle_policy, "Bundling policy for multi-camera mode (nearest|strict)") + ->check(CLI::IsMember({"nearest", "strict"})); app.add_option( "--start-frame", options.start_frame, - "First frame/group to export (inclusive): raw SVO frame in single-camera mode, synced group index in bundled multi-camera mode") + "First frame/group to export (inclusive): raw SVO frame in single-camera mode, bundle index in multi-camera mode") ->check(CLI::NonNegativeNumber); auto *end_frame_option = app.add_option( "--end-frame", options.end_frame, - "Last frame/group to export (inclusive): raw SVO frame in single-camera mode, synced group index in bundled multi-camera mode") + "Last frame/group to export (inclusive): raw SVO frame in single-camera mode, bundle index in multi-camera mode") ->check(CLI::NonNegativeNumber); app.add_option("--frame-id", options.frame_id, "Frame id for image and depth topics"); app.add_option("--video-topic", options.video_topic, "MCAP topic for foxglove.CompressedVideo"); app.add_option("--depth-topic", options.depth_topic, "MCAP topic for cvmmap_streamer.DepthMap"); app.add_option("--calibration-topic", options.calibration_topic, "MCAP topic for foxglove.CameraCalibration"); + app.add_option("--bundle-topic", options.bundle_topic, "MCAP topic for bundled multi-camera manifests"); app.add_option( "--depth-calibration-topic", options.depth_calibration_topic, @@ -2155,7 +2586,7 @@ int main(int argc, char **argv) { auto *sync_tolerance_option = app.add_option( "--sync-tolerance-ms", options.sync_tolerance_ms, - "Maximum allowed timestamp delta between cameras in multi-camera mode"); + "Maximum allowed timestamp delta between cameras in strict multi-camera mode"); sync_tolerance_option->check(CLI::PositiveNumber); try { @@ -2191,6 +2622,11 @@ int main(int argc, char **argv) { spdlog::error("{}", depth_size.error()); return exit_code(ToolExitCode::UsageError); } + auto bundle_policy = parse_bundle_policy(options.bundle_policy); + if (!bundle_policy) { + spdlog::error("{}", bundle_policy.error()); + return exit_code(ToolExitCode::UsageError); + } auto pose_tracking = load_pose_tracking_options(options); if (!pose_tracking) { spdlog::error("{}", pose_tracking.error()); @@ -2226,5 +2662,6 @@ int main(int argc, char **argv) { *compression, *depth_mode, *depth_size, - *pose_tracking); + *pose_tracking, + *bundle_policy); }