diff --git a/CMakeLists.txt b/CMakeLists.txt index 57838db..17b968f 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -210,3 +210,30 @@ target_link_libraries(mcap_reader_tester PRIVATE cvmmap_streamer_protobuf) if (TARGET PkgConfig::PROTOBUF_PKG) target_link_libraries(mcap_reader_tester PRIVATE PkgConfig::PROTOBUF_PKG) endif() + +add_executable(mcap_replay_tester src/testers/mcap_replay_tester.cpp) +target_include_directories(mcap_replay_tester + PRIVATE + "${CMAKE_CURRENT_LIST_DIR}/include" + "${CMAKE_CURRENT_LIST_DIR}/lib/CLI11/include" + "${CMAKE_CURRENT_LIST_DIR}/lib/mcap/include" + "${CVMMAP_PROXY_INCLUDE_DIR}" + "${CMAKE_CURRENT_BINARY_DIR}") +target_link_libraries(mcap_replay_tester + PRIVATE + Threads::Threads + cvmmap_streamer_foxglove_proto + PkgConfig::ZSTD + PkgConfig::LZ4) +if (TARGET spdlog::spdlog) + target_link_libraries(mcap_replay_tester PRIVATE spdlog::spdlog) +elseif (TARGET spdlog) + target_link_libraries(mcap_replay_tester PRIVATE spdlog) +endif() +if (TARGET CLI11::CLI11) + target_link_libraries(mcap_replay_tester PRIVATE CLI11::CLI11) +endif() +target_link_libraries(mcap_replay_tester PRIVATE cvmmap_streamer_protobuf) +if (TARGET PkgConfig::PROTOBUF_PKG) + target_link_libraries(mcap_replay_tester PRIVATE PkgConfig::PROTOBUF_PKG) +endif() diff --git a/include/cvmmap_streamer/record/mcap_record_sink.hpp b/include/cvmmap_streamer/record/mcap_record_sink.hpp index c52fdf4..f5356a5 100644 --- a/include/cvmmap_streamer/record/mcap_record_sink.hpp +++ b/include/cvmmap_streamer/record/mcap_record_sink.hpp @@ -21,7 +21,12 @@ public: McapRecordSink &operator=(McapRecordSink &&other) noexcept; [[nodiscard]] - static std::expected create(const RuntimeConfig &config); + static std::expected create( + const RuntimeConfig &config, + const encode::EncodedStreamInfo &stream_info); + + [[nodiscard]] + std::expected update_stream_info(const encode::EncodedStreamInfo &stream_info); [[nodiscard]] std::expected write_access_unit(const encode::EncodedAccessUnit &access_unit); diff --git a/scripts/replay_mcap.sh b/scripts/replay_mcap.sh new file mode 100755 index 0000000..660ce84 --- /dev/null +++ b/scripts/replay_mcap.sh @@ -0,0 +1,68 @@ +#!/usr/bin/env bash +set -euo pipefail + +ROOT_DIR="$(cd -- "$(dirname -- "${BASH_SOURCE[0]}")/.." && pwd)" +BUILD_DIR="${BUILD_DIR:-$ROOT_DIR/build}" +REPLAY_BIN="${REPLAY_BIN:-$BUILD_DIR/mcap_replay_tester}" +FFPLAY_BIN="${FFPLAY_BIN:-ffplay}" +FFPLAY_ARGS="${FFPLAY_ARGS:-}" + +usage() { + cat <<'EOF' +Usage: + replay_mcap.sh [h264|h265] + +This replays MCAP video using the recorded timestamps and pipes frames to ffplay. +The optional codec argument defaults to h264. Extra ffplay flags may be supplied +via FFPLAY_ARGS, for example: FFPLAY_ARGS='-window_title replay'. +EOF +} + +if [[ "${1:-}" == "--help" || "${1:-}" == "-h" ]]; then + usage + exit 0 +fi + +if [[ $# -lt 1 || $# -gt 2 ]]; then + usage >&2 + exit 1 +fi + +INPUT_PATH="$1" +FORMAT="${2:-h264}" +if [[ "$FORMAT" != "h264" && "$FORMAT" != "h265" ]]; then + echo "unsupported format: $FORMAT (expected h264 or h265)" >&2 + exit 1 +fi + +if [[ ! -f "$INPUT_PATH" ]]; then + echo "input MCAP not found: $INPUT_PATH" >&2 + exit 1 +fi +if [[ ! -x "$REPLAY_BIN" ]]; then + echo "replay binary not found: $REPLAY_BIN" >&2 + exit 1 +fi +if ! command -v "$FFPLAY_BIN" >/dev/null 2>&1; then + echo "ffplay not found: $FFPLAY_BIN" >&2 + exit 1 +fi + +extra_args=() +if [[ -n "$FFPLAY_ARGS" ]]; then + # shellcheck disable=SC2206 + extra_args=($FFPLAY_ARGS) +fi + +cmd=( + "$REPLAY_BIN" + "$INPUT_PATH" + --expect-format "$FORMAT" + --ffplay-path "$FFPLAY_BIN" +) + +for arg in "${extra_args[@]}"; do + cmd+=(--ffplay-arg "$arg") +done + +"${cmd[@]}" diff --git a/src/pipeline/pipeline_runtime.cpp b/src/pipeline/pipeline_runtime.cpp index 44bc6ae..a3a76b5 100644 --- a/src/pipeline/pipeline_runtime.cpp +++ b/src/pipeline/pipeline_runtime.cpp @@ -331,15 +331,6 @@ int run_pipeline(const RuntimeConfig &config) { rtp_publisher.emplace(std::move(*created)); } - if (config.record.mcap.enabled) { - auto created = record::McapRecordSink::create(config); - if (!created) { - spdlog::error("pipeline MCAP sink init failed: {}", created.error()); - return exit_code(PipelineExitCode::InitializationError); - } - mcap_sink.emplace(std::move(*created)); - } - PipelineStats stats{}; metrics::IngestEmitLatencyTracker latency_tracker{}; bool producer_offline{false}; @@ -386,6 +377,30 @@ int run_pipeline(const RuntimeConfig &config) { } rtmp_output.emplace(std::move(*created)); } + if (config.record.mcap.enabled) { + auto stream_info = (*backend)->stream_info(); + if (!stream_info) { + return unexpected_error( + stream_info.error().code, + "pipeline MCAP stream info unavailable: " + format_error(stream_info.error())); + } + if (!mcap_sink) { + auto created = record::McapRecordSink::create(config, *stream_info); + if (!created) { + return unexpected_error( + ERR_INTERNAL, + "pipeline MCAP sink init failed: " + created.error()); + } + mcap_sink.emplace(std::move(*created)); + } else { + auto updated = mcap_sink->update_stream_info(*stream_info); + if (!updated) { + return unexpected_error( + ERR_INTERNAL, + "pipeline MCAP stream update failed: " + updated.error()); + } + } + } started = true; restart_pending = false; restart_target_info.reset(); diff --git a/src/record/mcap_record_sink.cpp b/src/record/mcap_record_sink.cpp index 09e6c2f..0791e5e 100644 --- a/src/record/mcap_record_sink.cpp +++ b/src/record/mcap_record_sink.cpp @@ -12,9 +12,11 @@ #include #include #include +#include #include #include #include +#include namespace cvmmap_streamer::record { @@ -46,6 +48,134 @@ google::protobuf::Timestamp to_proto_timestamp(std::uint64_t timestamp_ns) { return timestamp; } +void append_start_code(std::vector &output) { + output.push_back(0x00); + output.push_back(0x00); + output.push_back(0x00); + output.push_back(0x01); +} + +[[nodiscard]] +std::expected read_be16(std::span bytes, std::size_t offset) { + if (offset + 2 > bytes.size()) { + return std::unexpected("decoder config truncated"); + } + return static_cast((static_cast(bytes[offset]) << 8) | bytes[offset + 1]); +} + +[[nodiscard]] +bool looks_like_annexb(std::span bytes) { + if (bytes.size() >= 4 && + bytes[0] == 0x00 && + bytes[1] == 0x00 && + bytes[2] == 0x00 && + bytes[3] == 0x01) { + return true; + } + return bytes.size() >= 3 && + bytes[0] == 0x00 && + bytes[1] == 0x00 && + bytes[2] == 0x01; +} + +[[nodiscard]] +std::expected, std::string> avcc_to_annexb(std::span decoder_config) { + if (decoder_config.size() < 7 || decoder_config[0] != 1) { + return std::unexpected("invalid AVC decoder config"); + } + + std::vector annexb{}; + std::size_t offset = 5; + const auto sps_count = static_cast(decoder_config[offset++] & 0x1fu); + for (std::size_t i = 0; i < sps_count; ++i) { + auto size = read_be16(decoder_config, offset); + if (!size) { + return std::unexpected(size.error()); + } + offset += 2; + if (offset + *size > decoder_config.size()) { + return std::unexpected("invalid AVC decoder config payload"); + } + append_start_code(annexb); + annexb.insert(annexb.end(), decoder_config.begin() + static_cast(offset), decoder_config.begin() + static_cast(offset + *size)); + offset += *size; + } + + if (offset >= decoder_config.size()) { + return std::unexpected("invalid AVC decoder config: missing PPS count"); + } + const auto pps_count = static_cast(decoder_config[offset++]); + for (std::size_t i = 0; i < pps_count; ++i) { + auto size = read_be16(decoder_config, offset); + if (!size) { + return std::unexpected(size.error()); + } + offset += 2; + if (offset + *size > decoder_config.size()) { + return std::unexpected("invalid AVC decoder config payload"); + } + append_start_code(annexb); + annexb.insert(annexb.end(), decoder_config.begin() + static_cast(offset), decoder_config.begin() + static_cast(offset + *size)); + offset += *size; + } + + return annexb; +} + +[[nodiscard]] +std::expected, std::string> hvcc_to_annexb(std::span decoder_config) { + if (decoder_config.size() < 23 || decoder_config[0] != 1) { + return std::unexpected("invalid HEVC decoder config"); + } + + std::vector annexb{}; + std::size_t offset = 22; + const auto array_count = static_cast(decoder_config[offset++]); + for (std::size_t array_index = 0; array_index < array_count; ++array_index) { + if (offset + 3 > decoder_config.size()) { + return std::unexpected("invalid HEVC decoder config arrays"); + } + offset += 1; + auto nal_count = read_be16(decoder_config, offset); + if (!nal_count) { + return std::unexpected(nal_count.error()); + } + offset += 2; + + for (std::size_t nal_index = 0; nal_index < *nal_count; ++nal_index) { + auto nal_size = read_be16(decoder_config, offset); + if (!nal_size) { + return std::unexpected(nal_size.error()); + } + offset += 2; + if (offset + *nal_size > decoder_config.size()) { + return std::unexpected("invalid HEVC decoder config payload"); + } + append_start_code(annexb); + annexb.insert(annexb.end(), decoder_config.begin() + static_cast(offset), decoder_config.begin() + static_cast(offset + *nal_size)); + offset += *nal_size; + } + } + + return annexb; +} + +[[nodiscard]] +std::expected, std::string> decoder_config_to_annexb( + CodecType codec, + std::span decoder_config) { + if (decoder_config.empty()) { + return std::vector{}; + } + if (looks_like_annexb(decoder_config)) { + return std::vector(decoder_config.begin(), decoder_config.end()); + } + if (codec == CodecType::H265) { + return hvcc_to_annexb(decoder_config); + } + return avcc_to_annexb(decoder_config); +} + } struct McapRecordSink::State { @@ -54,6 +184,8 @@ struct McapRecordSink::State { std::string frame_id{}; mcap::ChannelId channel_id{0}; std::uint32_t sequence{0}; + CodecType codec{CodecType::H264}; + std::vector keyframe_preamble{}; }; McapRecordSink::~McapRecordSink() { @@ -74,7 +206,9 @@ McapRecordSink &McapRecordSink::operator=(McapRecordSink &&other) noexcept { return *this; } -std::expected McapRecordSink::create(const RuntimeConfig &config) { +std::expected McapRecordSink::create( + const RuntimeConfig &config, + const encode::EncodedStreamInfo &stream_info) { McapRecordSink sink{}; auto state = std::make_unique(); state->path = config.record.mcap.path; @@ -101,9 +235,27 @@ std::expected McapRecordSink::create(const RuntimeC state->channel_id = channel.id; sink.state_ = state.release(); + auto update = sink.update_stream_info(stream_info); + if (!update) { + sink.close(); + return std::unexpected(update.error()); + } return sink; } +std::expected McapRecordSink::update_stream_info(const encode::EncodedStreamInfo &stream_info) { + if (state_ == nullptr) { + return std::unexpected("MCAP sink is not open"); + } + auto decoder_config_annexb = decoder_config_to_annexb(stream_info.codec, stream_info.decoder_config); + if (!decoder_config_annexb) { + return std::unexpected("failed to prepare MCAP keyframe decoder config: " + decoder_config_annexb.error()); + } + state_->codec = stream_info.codec; + state_->keyframe_preamble = std::move(*decoder_config_annexb); + return {}; +} + std::expected McapRecordSink::write_access_unit(const encode::EncodedAccessUnit &access_unit) { if (state_ == nullptr) { return std::unexpected("MCAP sink is not open"); @@ -113,9 +265,15 @@ std::expected McapRecordSink::write_access_unit(const encode: *message.mutable_timestamp() = to_proto_timestamp(access_unit.source_timestamp_ns); message.set_frame_id(state_->frame_id); message.set_format(codec_format(access_unit.codec)); + std::vector payload{}; + if (access_unit.keyframe && !state_->keyframe_preamble.empty()) { + payload.reserve(state_->keyframe_preamble.size() + access_unit.annexb_bytes.size()); + payload.insert(payload.end(), state_->keyframe_preamble.begin(), state_->keyframe_preamble.end()); + } + payload.insert(payload.end(), access_unit.annexb_bytes.begin(), access_unit.annexb_bytes.end()); message.set_data( - reinterpret_cast(access_unit.annexb_bytes.data()), - static_cast(access_unit.annexb_bytes.size())); + reinterpret_cast(payload.data()), + static_cast(payload.size())); std::string serialized{}; if (!message.SerializeToString(&serialized)) { diff --git a/src/testers/mcap_reader_tester.cpp b/src/testers/mcap_reader_tester.cpp index 0fce27b..b92b1a4 100644 --- a/src/testers/mcap_reader_tester.cpp +++ b/src/testers/mcap_reader_tester.cpp @@ -7,6 +7,7 @@ #include #include +#include #include #include #include @@ -25,6 +26,7 @@ enum class TesterExitCode : int { FormatMismatch = 7, EmptyPayload = 8, ThresholdError = 9, + DumpError = 10, }; [[nodiscard]] @@ -36,6 +38,7 @@ struct Config { std::string input_path{}; std::optional expected_topic{}; std::optional expected_format{}; + std::optional dump_annexb_output{}; std::uint32_t min_messages{1}; }; @@ -46,6 +49,7 @@ std::expected parse_args(int argc, char **argv) { app.add_option("input", config.input_path, "Input MCAP path")->required(); app.add_option("--expect-topic", config.expected_topic, "Expected MCAP topic"); app.add_option("--expect-format", config.expected_format, "Expected CompressedVideo format"); + app.add_option("--dump-annexb-output", config.dump_annexb_output, "Write concatenated CompressedVideo.data payloads to a file"); app.add_option("--min-messages", config.min_messages, "Minimum expected message count")->check(CLI::PositiveNumber); try { @@ -74,6 +78,15 @@ int main(int argc, char **argv) { std::uint64_t message_count{0}; std::uint64_t previous_log_time{0}; bool saw_log_time{false}; + std::optional dump_stream{}; + if (config->dump_annexb_output) { + dump_stream.emplace(*config->dump_annexb_output, std::ios::binary | std::ios::trunc); + if (!dump_stream->is_open()) { + spdlog::error("failed to open dump output '{}'", *config->dump_annexb_output); + reader.close(); + return exit_code(TesterExitCode::DumpError); + } + } auto message_view = reader.readMessages(); for (auto it = message_view.begin(); it != message_view.end(); ++it) { @@ -117,6 +130,14 @@ int main(int argc, char **argv) { reader.close(); return exit_code(TesterExitCode::EmptyPayload); } + if (dump_stream) { + dump_stream->write(message.data().data(), static_cast(message.data().size())); + if (!dump_stream->good()) { + spdlog::error("failed to write Annex B dump to '{}'", *config->dump_annexb_output); + reader.close(); + return exit_code(TesterExitCode::DumpError); + } + } previous_log_time = it->message.logTime; saw_log_time = true; diff --git a/src/testers/mcap_replay_tester.cpp b/src/testers/mcap_replay_tester.cpp new file mode 100644 index 0000000..de12412 --- /dev/null +++ b/src/testers/mcap_replay_tester.cpp @@ -0,0 +1,298 @@ +#define MCAP_IMPLEMENTATION +#include + +#include + +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include + +#include + +namespace { + +enum class TesterExitCode : int { + Success = 0, + OpenError = 2, + SchemaError = 3, + TopicMismatch = 4, + ParseError = 5, + FormatMismatch = 6, + EmptyPayload = 7, + ProcessError = 8, + WriteError = 9, + WaitError = 10, +}; + +[[nodiscard]] +constexpr int exit_code(TesterExitCode code) { + return static_cast(code); +} + +struct Config { + std::string input_path{}; + std::optional expected_topic{}; + std::string expected_format{"h264"}; + std::string ffplay_path{"ffplay"}; + std::vector ffplay_args{}; + double speed{1.0}; + bool no_pace{false}; +}; + +struct ReplayMessage { + std::uint64_t timestamp_ns{0}; + std::vector payload{}; +}; + +[[nodiscard]] +std::expected parse_args(int argc, char **argv) { + Config config{}; + CLI::App app{"mcap_replay_tester - replay foxglove.CompressedVideo MCAP with recorded pacing"}; + app.add_option("input", config.input_path, "Input MCAP path")->required(); + app.add_option("--expect-topic", config.expected_topic, "Expected MCAP topic"); + app.add_option("--expect-format", config.expected_format, "Expected CompressedVideo format") + ->check(CLI::IsMember({"h264", "h265"})); + app.add_option("--ffplay-path", config.ffplay_path, "ffplay binary path"); + app.add_option("--ffplay-arg", config.ffplay_args, "Extra ffplay argument (repeatable)"); + app.add_option("--speed", config.speed, "Playback speed multiplier")->check(CLI::PositiveNumber); + app.add_flag("--no-pace", config.no_pace, "Write frames as fast as possible instead of using recorded timestamps"); + + try { + app.parse(argc, argv); + } catch (const CLI::ParseError &e) { + return std::unexpected(app.exit(e)); + } + return config; +} + +[[nodiscard]] +std::uint64_t proto_timestamp_ns(const google::protobuf::Timestamp ×tamp) { + return static_cast(timestamp.seconds()) * 1000000000ull + static_cast(timestamp.nanos()); +} + +[[nodiscard]] +std::expected, TesterExitCode> load_messages(const Config &config) { + mcap::McapReader reader{}; + const auto open_status = reader.open(config.input_path); + if (!open_status.ok()) { + spdlog::error("failed to open MCAP file '{}': {}", config.input_path, open_status.message); + return std::unexpected(TesterExitCode::OpenError); + } + + std::vector messages{}; + auto view = reader.readMessages(); + for (auto it = view.begin(); it != view.end(); ++it) { + if (it->schema == nullptr || it->channel == nullptr) { + spdlog::error("MCAP message missing schema or channel metadata"); + reader.close(); + return std::unexpected(TesterExitCode::SchemaError); + } + if (it->schema->encoding != "protobuf" || it->schema->name != "foxglove.CompressedVideo") { + continue; + } + if (it->channel->messageEncoding != "protobuf") { + spdlog::error("unexpected MCAP message encoding: {}", it->channel->messageEncoding); + reader.close(); + return std::unexpected(TesterExitCode::SchemaError); + } + if (config.expected_topic && it->channel->topic != *config.expected_topic) { + spdlog::error("unexpected topic: expected '{}' got '{}'", *config.expected_topic, it->channel->topic); + reader.close(); + return std::unexpected(TesterExitCode::TopicMismatch); + } + + foxglove::CompressedVideo message{}; + if (!message.ParseFromArray(it->message.data, static_cast(it->message.dataSize))) { + spdlog::error("failed to parse foxglove.CompressedVideo payload"); + reader.close(); + return std::unexpected(TesterExitCode::ParseError); + } + if (message.format() != config.expected_format) { + spdlog::error("unexpected format: expected '{}' got '{}'", config.expected_format, message.format()); + reader.close(); + return std::unexpected(TesterExitCode::FormatMismatch); + } + if (message.data().empty()) { + spdlog::error("compressed video payload is empty"); + reader.close(); + return std::unexpected(TesterExitCode::EmptyPayload); + } + + auto timestamp_ns = proto_timestamp_ns(message.timestamp()); + if (timestamp_ns == 0) { + timestamp_ns = it->message.logTime; + } + + ReplayMessage replay{}; + replay.timestamp_ns = timestamp_ns; + replay.payload.assign( + reinterpret_cast(message.data().data()), + reinterpret_cast(message.data().data()) + message.data().size()); + messages.push_back(std::move(replay)); + } + + reader.close(); + + if (messages.empty()) { + spdlog::error("no foxglove.CompressedVideo messages found in '{}'", config.input_path); + return std::unexpected(TesterExitCode::EmptyPayload); + } + + return messages; +} + +[[nodiscard]] +const char *ffplay_input_format(std::string_view format) { + return format == "h265" ? "hevc" : "h264"; +} + +[[nodiscard]] +std::expected spawn_ffplay( + const Config &config, + int &stdin_write_fd) { + int pipe_fds[2]{-1, -1}; + if (::pipe(pipe_fds) != 0) { + spdlog::error("failed to create ffplay stdin pipe: {}", std::strerror(errno)); + return std::unexpected(TesterExitCode::ProcessError); + } + + const auto pid = ::fork(); + if (pid < 0) { + spdlog::error("failed to fork ffplay: {}", std::strerror(errno)); + ::close(pipe_fds[0]); + ::close(pipe_fds[1]); + return std::unexpected(TesterExitCode::ProcessError); + } + + if (pid == 0) { + ::dup2(pipe_fds[0], STDIN_FILENO); + ::close(pipe_fds[0]); + ::close(pipe_fds[1]); + + std::vector owned_args{}; + owned_args.push_back(config.ffplay_path); + owned_args.push_back("-hide_banner"); + owned_args.push_back("-loglevel"); + owned_args.push_back("warning"); + owned_args.push_back("-fflags"); + owned_args.push_back("nobuffer"); + owned_args.push_back("-flags"); + owned_args.push_back("low_delay"); + owned_args.push_back("-f"); + owned_args.push_back(ffplay_input_format(config.expected_format)); + owned_args.insert(owned_args.end(), config.ffplay_args.begin(), config.ffplay_args.end()); + owned_args.push_back("pipe:0"); + + std::vector argv{}; + argv.reserve(owned_args.size() + 1); + for (auto &arg : owned_args) { + argv.push_back(arg.data()); + } + argv.push_back(nullptr); + + ::execvp(config.ffplay_path.c_str(), argv.data()); + ::perror("execvp ffplay"); + ::_exit(127); + } + + ::close(pipe_fds[0]); + stdin_write_fd = pipe_fds[1]; + return pid; +} + +[[nodiscard]] +std::expected write_all(int fd, std::span bytes) { + std::size_t written{0}; + while (written < bytes.size()) { + const auto result = ::write(fd, bytes.data() + written, bytes.size() - written); + if (result < 0) { + if (errno == EINTR) { + continue; + } + spdlog::error("failed to write replay data to ffplay: {}", std::strerror(errno)); + return std::unexpected(TesterExitCode::WriteError); + } + if (result == 0) { + spdlog::error("short write to ffplay stdin"); + return std::unexpected(TesterExitCode::WriteError); + } + written += static_cast(result); + } + return {}; +} + +} + +int main(int argc, char **argv) { + auto config = parse_args(argc, argv); + if (!config) { + return config.error(); + } + + auto messages = load_messages(*config); + if (!messages) { + return exit_code(messages.error()); + } + + int stdin_write_fd{-1}; + auto child_pid = spawn_ffplay(*config, stdin_write_fd); + if (!child_pid) { + return exit_code(child_pid.error()); + } + + const auto start_wall = std::chrono::steady_clock::now(); + const auto first_timestamp = messages->front().timestamp_ns; + + for (const auto &message : *messages) { + if (!config->no_pace) { + const auto delta_ns = message.timestamp_ns > first_timestamp ? message.timestamp_ns - first_timestamp : 0ull; + const auto scaled_ns = static_cast(static_cast(delta_ns) / config->speed); + std::this_thread::sleep_until(start_wall + std::chrono::nanoseconds(scaled_ns)); + } + auto write = write_all(stdin_write_fd, std::span(message.payload.data(), message.payload.size())); + if (!write) { + ::close(stdin_write_fd); + int status = 0; + ::waitpid(*child_pid, &status, 0); + return exit_code(write.error()); + } + } + + ::close(stdin_write_fd); + + int status{0}; + if (::waitpid(*child_pid, &status, 0) < 0) { + spdlog::error("failed to wait for ffplay: {}", std::strerror(errno)); + return exit_code(TesterExitCode::WaitError); + } + if (WIFEXITED(status) && WEXITSTATUS(status) == 0) { + spdlog::info("replayed {} MCAP video messages", messages->size()); + return exit_code(TesterExitCode::Success); + } + if (WIFSIGNALED(status)) { + spdlog::error("ffplay exited on signal {}", WTERMSIG(status)); + return exit_code(TesterExitCode::WaitError); + } + if (WIFEXITED(status)) { + spdlog::error("ffplay exited with status {}", WEXITSTATUS(status)); + return WEXITSTATUS(status); + } + spdlog::error("ffplay exited unexpectedly"); + return exit_code(TesterExitCode::WaitError); +}