feat(mcap): add paced replay tooling

This commit is contained in:
2026-03-11 15:51:38 +08:00
parent bc1b619dee
commit ed3f32ff6e
7 changed files with 605 additions and 13 deletions
+21
View File
@@ -7,6 +7,7 @@
#include <cstdint>
#include <expected>
#include <fstream>
#include <iostream>
#include <optional>
#include <string>
@@ -25,6 +26,7 @@ enum class TesterExitCode : int {
FormatMismatch = 7,
EmptyPayload = 8,
ThresholdError = 9,
DumpError = 10,
};
[[nodiscard]]
@@ -36,6 +38,7 @@ struct Config {
std::string input_path{};
std::optional<std::string> expected_topic{};
std::optional<std::string> expected_format{};
std::optional<std::string> dump_annexb_output{};
std::uint32_t min_messages{1};
};
@@ -46,6 +49,7 @@ std::expected<Config, int> parse_args(int argc, char **argv) {
app.add_option("input", config.input_path, "Input MCAP path")->required();
app.add_option("--expect-topic", config.expected_topic, "Expected MCAP topic");
app.add_option("--expect-format", config.expected_format, "Expected CompressedVideo format");
app.add_option("--dump-annexb-output", config.dump_annexb_output, "Write concatenated CompressedVideo.data payloads to a file");
app.add_option("--min-messages", config.min_messages, "Minimum expected message count")->check(CLI::PositiveNumber);
try {
@@ -74,6 +78,15 @@ int main(int argc, char **argv) {
std::uint64_t message_count{0};
std::uint64_t previous_log_time{0};
bool saw_log_time{false};
std::optional<std::ofstream> dump_stream{};
if (config->dump_annexb_output) {
dump_stream.emplace(*config->dump_annexb_output, std::ios::binary | std::ios::trunc);
if (!dump_stream->is_open()) {
spdlog::error("failed to open dump output '{}'", *config->dump_annexb_output);
reader.close();
return exit_code(TesterExitCode::DumpError);
}
}
auto message_view = reader.readMessages();
for (auto it = message_view.begin(); it != message_view.end(); ++it) {
@@ -117,6 +130,14 @@ int main(int argc, char **argv) {
reader.close();
return exit_code(TesterExitCode::EmptyPayload);
}
if (dump_stream) {
dump_stream->write(message.data().data(), static_cast<std::streamsize>(message.data().size()));
if (!dump_stream->good()) {
spdlog::error("failed to write Annex B dump to '{}'", *config->dump_annexb_output);
reader.close();
return exit_code(TesterExitCode::DumpError);
}
}
previous_log_time = it->message.logTime;
saw_log_time = true;
+298
View File
@@ -0,0 +1,298 @@
#define MCAP_IMPLEMENTATION
#include <mcap/reader.hpp>
#include <foxglove/CompressedVideo.pb.h>
#include <CLI/CLI.hpp>
#include <chrono>
#include <cstdint>
#include <expected>
#include <optional>
#include <span>
#include <string>
#include <string_view>
#include <system_error>
#include <thread>
#include <vector>
#include <cerrno>
#include <cstring>
#include <fcntl.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <unistd.h>
#include <spdlog/spdlog.h>
namespace {
enum class TesterExitCode : int {
Success = 0,
OpenError = 2,
SchemaError = 3,
TopicMismatch = 4,
ParseError = 5,
FormatMismatch = 6,
EmptyPayload = 7,
ProcessError = 8,
WriteError = 9,
WaitError = 10,
};
[[nodiscard]]
constexpr int exit_code(TesterExitCode code) {
return static_cast<int>(code);
}
struct Config {
std::string input_path{};
std::optional<std::string> expected_topic{};
std::string expected_format{"h264"};
std::string ffplay_path{"ffplay"};
std::vector<std::string> ffplay_args{};
double speed{1.0};
bool no_pace{false};
};
struct ReplayMessage {
std::uint64_t timestamp_ns{0};
std::vector<std::uint8_t> payload{};
};
[[nodiscard]]
std::expected<Config, int> parse_args(int argc, char **argv) {
Config config{};
CLI::App app{"mcap_replay_tester - replay foxglove.CompressedVideo MCAP with recorded pacing"};
app.add_option("input", config.input_path, "Input MCAP path")->required();
app.add_option("--expect-topic", config.expected_topic, "Expected MCAP topic");
app.add_option("--expect-format", config.expected_format, "Expected CompressedVideo format")
->check(CLI::IsMember({"h264", "h265"}));
app.add_option("--ffplay-path", config.ffplay_path, "ffplay binary path");
app.add_option("--ffplay-arg", config.ffplay_args, "Extra ffplay argument (repeatable)");
app.add_option("--speed", config.speed, "Playback speed multiplier")->check(CLI::PositiveNumber);
app.add_flag("--no-pace", config.no_pace, "Write frames as fast as possible instead of using recorded timestamps");
try {
app.parse(argc, argv);
} catch (const CLI::ParseError &e) {
return std::unexpected(app.exit(e));
}
return config;
}
[[nodiscard]]
std::uint64_t proto_timestamp_ns(const google::protobuf::Timestamp &timestamp) {
return static_cast<std::uint64_t>(timestamp.seconds()) * 1000000000ull + static_cast<std::uint64_t>(timestamp.nanos());
}
[[nodiscard]]
std::expected<std::vector<ReplayMessage>, TesterExitCode> load_messages(const Config &config) {
mcap::McapReader reader{};
const auto open_status = reader.open(config.input_path);
if (!open_status.ok()) {
spdlog::error("failed to open MCAP file '{}': {}", config.input_path, open_status.message);
return std::unexpected(TesterExitCode::OpenError);
}
std::vector<ReplayMessage> messages{};
auto view = reader.readMessages();
for (auto it = view.begin(); it != view.end(); ++it) {
if (it->schema == nullptr || it->channel == nullptr) {
spdlog::error("MCAP message missing schema or channel metadata");
reader.close();
return std::unexpected(TesterExitCode::SchemaError);
}
if (it->schema->encoding != "protobuf" || it->schema->name != "foxglove.CompressedVideo") {
continue;
}
if (it->channel->messageEncoding != "protobuf") {
spdlog::error("unexpected MCAP message encoding: {}", it->channel->messageEncoding);
reader.close();
return std::unexpected(TesterExitCode::SchemaError);
}
if (config.expected_topic && it->channel->topic != *config.expected_topic) {
spdlog::error("unexpected topic: expected '{}' got '{}'", *config.expected_topic, it->channel->topic);
reader.close();
return std::unexpected(TesterExitCode::TopicMismatch);
}
foxglove::CompressedVideo message{};
if (!message.ParseFromArray(it->message.data, static_cast<int>(it->message.dataSize))) {
spdlog::error("failed to parse foxglove.CompressedVideo payload");
reader.close();
return std::unexpected(TesterExitCode::ParseError);
}
if (message.format() != config.expected_format) {
spdlog::error("unexpected format: expected '{}' got '{}'", config.expected_format, message.format());
reader.close();
return std::unexpected(TesterExitCode::FormatMismatch);
}
if (message.data().empty()) {
spdlog::error("compressed video payload is empty");
reader.close();
return std::unexpected(TesterExitCode::EmptyPayload);
}
auto timestamp_ns = proto_timestamp_ns(message.timestamp());
if (timestamp_ns == 0) {
timestamp_ns = it->message.logTime;
}
ReplayMessage replay{};
replay.timestamp_ns = timestamp_ns;
replay.payload.assign(
reinterpret_cast<const std::uint8_t *>(message.data().data()),
reinterpret_cast<const std::uint8_t *>(message.data().data()) + message.data().size());
messages.push_back(std::move(replay));
}
reader.close();
if (messages.empty()) {
spdlog::error("no foxglove.CompressedVideo messages found in '{}'", config.input_path);
return std::unexpected(TesterExitCode::EmptyPayload);
}
return messages;
}
[[nodiscard]]
const char *ffplay_input_format(std::string_view format) {
return format == "h265" ? "hevc" : "h264";
}
[[nodiscard]]
std::expected<pid_t, TesterExitCode> spawn_ffplay(
const Config &config,
int &stdin_write_fd) {
int pipe_fds[2]{-1, -1};
if (::pipe(pipe_fds) != 0) {
spdlog::error("failed to create ffplay stdin pipe: {}", std::strerror(errno));
return std::unexpected(TesterExitCode::ProcessError);
}
const auto pid = ::fork();
if (pid < 0) {
spdlog::error("failed to fork ffplay: {}", std::strerror(errno));
::close(pipe_fds[0]);
::close(pipe_fds[1]);
return std::unexpected(TesterExitCode::ProcessError);
}
if (pid == 0) {
::dup2(pipe_fds[0], STDIN_FILENO);
::close(pipe_fds[0]);
::close(pipe_fds[1]);
std::vector<std::string> owned_args{};
owned_args.push_back(config.ffplay_path);
owned_args.push_back("-hide_banner");
owned_args.push_back("-loglevel");
owned_args.push_back("warning");
owned_args.push_back("-fflags");
owned_args.push_back("nobuffer");
owned_args.push_back("-flags");
owned_args.push_back("low_delay");
owned_args.push_back("-f");
owned_args.push_back(ffplay_input_format(config.expected_format));
owned_args.insert(owned_args.end(), config.ffplay_args.begin(), config.ffplay_args.end());
owned_args.push_back("pipe:0");
std::vector<char *> argv{};
argv.reserve(owned_args.size() + 1);
for (auto &arg : owned_args) {
argv.push_back(arg.data());
}
argv.push_back(nullptr);
::execvp(config.ffplay_path.c_str(), argv.data());
::perror("execvp ffplay");
::_exit(127);
}
::close(pipe_fds[0]);
stdin_write_fd = pipe_fds[1];
return pid;
}
[[nodiscard]]
std::expected<void, TesterExitCode> write_all(int fd, std::span<const std::uint8_t> bytes) {
std::size_t written{0};
while (written < bytes.size()) {
const auto result = ::write(fd, bytes.data() + written, bytes.size() - written);
if (result < 0) {
if (errno == EINTR) {
continue;
}
spdlog::error("failed to write replay data to ffplay: {}", std::strerror(errno));
return std::unexpected(TesterExitCode::WriteError);
}
if (result == 0) {
spdlog::error("short write to ffplay stdin");
return std::unexpected(TesterExitCode::WriteError);
}
written += static_cast<std::size_t>(result);
}
return {};
}
}
int main(int argc, char **argv) {
auto config = parse_args(argc, argv);
if (!config) {
return config.error();
}
auto messages = load_messages(*config);
if (!messages) {
return exit_code(messages.error());
}
int stdin_write_fd{-1};
auto child_pid = spawn_ffplay(*config, stdin_write_fd);
if (!child_pid) {
return exit_code(child_pid.error());
}
const auto start_wall = std::chrono::steady_clock::now();
const auto first_timestamp = messages->front().timestamp_ns;
for (const auto &message : *messages) {
if (!config->no_pace) {
const auto delta_ns = message.timestamp_ns > first_timestamp ? message.timestamp_ns - first_timestamp : 0ull;
const auto scaled_ns = static_cast<std::uint64_t>(static_cast<long double>(delta_ns) / config->speed);
std::this_thread::sleep_until(start_wall + std::chrono::nanoseconds(scaled_ns));
}
auto write = write_all(stdin_write_fd, std::span<const std::uint8_t>(message.payload.data(), message.payload.size()));
if (!write) {
::close(stdin_write_fd);
int status = 0;
::waitpid(*child_pid, &status, 0);
return exit_code(write.error());
}
}
::close(stdin_write_fd);
int status{0};
if (::waitpid(*child_pid, &status, 0) < 0) {
spdlog::error("failed to wait for ffplay: {}", std::strerror(errno));
return exit_code(TesterExitCode::WaitError);
}
if (WIFEXITED(status) && WEXITSTATUS(status) == 0) {
spdlog::info("replayed {} MCAP video messages", messages->size());
return exit_code(TesterExitCode::Success);
}
if (WIFSIGNALED(status)) {
spdlog::error("ffplay exited on signal {}", WTERMSIG(status));
return exit_code(TesterExitCode::WaitError);
}
if (WIFEXITED(status)) {
spdlog::error("ffplay exited with status {}", WEXITSTATUS(status));
return WEXITSTATUS(status);
}
spdlog::error("ffplay exited unexpectedly");
return exit_code(TesterExitCode::WaitError);
}