diff --git a/CMakeLists.txt b/CMakeLists.txt index 765d436..082a931 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -6,31 +6,86 @@ set(CMAKE_CXX_STANDARD 23) set(CMAKE_CXX_STANDARD_REQUIRED ON) set(CMAKE_EXPORT_COMPILE_COMMANDS ON) +option(ENABLE_GSTREAMER_LEGACY "Build the optional legacy GStreamer backend" ON) + find_package(Threads REQUIRED) +find_package(cppzmq QUIET) +if ( + NOT cvmmap-core_DIR + AND EXISTS "${CMAKE_CURRENT_LIST_DIR}/../cv-mmap/build/core/cvmmap-coreConfig.cmake" + AND EXISTS "${CMAKE_CURRENT_LIST_DIR}/../cv-mmap/build/core/cvmmap-coreTargets.cmake") + set(cvmmap-core_DIR "${CMAKE_CURRENT_LIST_DIR}/../cv-mmap/build/core") +endif() +if (cvmmap-core_DIR) + find_package(cvmmap-core CONFIG QUIET) +endif() +find_package(ZeroMQ QUIET) +find_package(spdlog REQUIRED) +find_package(Protobuf REQUIRED) +find_package(PkgConfig REQUIRED) if (EXISTS "${CMAKE_CURRENT_LIST_DIR}/lib/CLI11/CMakeLists.txt") add_subdirectory("${CMAKE_CURRENT_LIST_DIR}/lib/CLI11" "${CMAKE_CURRENT_BINARY_DIR}/vendor/cli11") endif() -find_package(cppzmq QUIET) -find_package(cvmmap-core CONFIG REQUIRED) -find_package(ZeroMQ QUIET) -find_package(spdlog REQUIRED) -find_package(PkgConfig REQUIRED) +pkg_check_modules(FFMPEG REQUIRED IMPORTED_TARGET libavcodec libavutil libswscale) +pkg_check_modules(PROTOBUF_PKG QUIET IMPORTED_TARGET protobuf) +pkg_check_modules(ZSTD REQUIRED IMPORTED_TARGET libzstd) +pkg_check_modules(LZ4 REQUIRED IMPORTED_TARGET liblz4) -pkg_check_modules(GSTREAMER - IMPORTED_TARGET - gstreamer-1.0>=1.14 - gstreamer-video-1.0>=1.14 - gstreamer-app-1.0>=1.14) - -if (NOT GSTREAMER_FOUND) - message(FATAL_ERROR - "GStreamer development packages are required for cvmmap-streamer. " - "Install pkg-config modules: gstreamer-1.0>=1.14, gstreamer-video-1.0>=1.14, " - "and gstreamer-app-1.0>=1.14.") +if (NOT TARGET cvmmap::client) + set(CVMMAP_LOCAL_ROOT "${CMAKE_CURRENT_LIST_DIR}/../cv-mmap") + set(CVMMAP_LOCAL_BUILD "${CVMMAP_LOCAL_ROOT}/build/core") + if ( + EXISTS "${CVMMAP_LOCAL_ROOT}/core/include/cvmmap/client.hpp" + AND EXISTS "${CVMMAP_LOCAL_BUILD}/libcvmmap_client.a" + AND EXISTS "${CVMMAP_LOCAL_BUILD}/libcvmmap_ipc.a" + AND EXISTS "${CVMMAP_LOCAL_BUILD}/libcvmmap_target.a") + add_library(cvmmap::client INTERFACE IMPORTED) + set_target_properties(cvmmap::client PROPERTIES + INTERFACE_INCLUDE_DIRECTORIES "${CVMMAP_LOCAL_ROOT}/core/include" + INTERFACE_LINK_LIBRARIES "${CVMMAP_LOCAL_BUILD}/libcvmmap_client.a;${CVMMAP_LOCAL_BUILD}/libcvmmap_ipc.a;${CVMMAP_LOCAL_BUILD}/libcvmmap_target.a") + else() + message(FATAL_ERROR "cvmmap::client target is unavailable and local cv-mmap build artifacts were not found") + endif() endif() +set(CVMMAP_STREAMER_HAS_GSTREAMER 0) +if (ENABLE_GSTREAMER_LEGACY) + pkg_check_modules(GSTREAMER IMPORTED_TARGET gstreamer-1.0>=1.14 gstreamer-video-1.0>=1.14 gstreamer-app-1.0>=1.14) + if (GSTREAMER_FOUND AND TARGET PkgConfig::GSTREAMER) + set(CVMMAP_STREAMER_HAS_GSTREAMER 1) + else() + message(WARNING "GStreamer legacy backend disabled because required GStreamer packages were not found") + endif() +endif() + +add_library(cvmmap_streamer_foxglove_proto STATIC) +protobuf_generate( + TARGET cvmmap_streamer_foxglove_proto + LANGUAGE cpp + PROTOS "${CMAKE_CURRENT_LIST_DIR}/proto/foxglove/CompressedVideo.proto" + IMPORT_DIRS "${CMAKE_CURRENT_LIST_DIR}/proto") +add_library(cvmmap_streamer_protobuf INTERFACE) +target_include_directories(cvmmap_streamer_foxglove_proto + PUBLIC + "${CMAKE_CURRENT_BINARY_DIR}" + ${Protobuf_INCLUDE_DIRS}) +target_include_directories(cvmmap_streamer_protobuf + INTERFACE + ${Protobuf_INCLUDE_DIRS}) +if (TARGET protobuf::libprotobuf) + target_link_libraries(cvmmap_streamer_protobuf INTERFACE protobuf::libprotobuf) +elseif (TARGET Protobuf::libprotobuf) + target_link_libraries(cvmmap_streamer_protobuf INTERFACE Protobuf::libprotobuf) +else() + target_link_libraries(cvmmap_streamer_protobuf INTERFACE ${Protobuf_LIBRARIES}) +endif() +if (TARGET PkgConfig::PROTOBUF_PKG) + target_link_libraries(cvmmap_streamer_protobuf INTERFACE PkgConfig::PROTOBUF_PKG) +endif() +target_link_libraries(cvmmap_streamer_foxglove_proto PUBLIC cvmmap_streamer_protobuf) + add_library(cvmmap_streamer_common STATIC src/ipc/help.cpp src/config/runtime_config.cpp @@ -41,63 +96,78 @@ add_library(cvmmap_streamer_common STATIC src/metrics/latency_tracker.cpp src/pipeline/pipeline_runtime.cpp src/protocol/rtmp_publisher.cpp - src/protocol/rtp_publisher.cpp) + src/protocol/rtp_publisher.cpp + src/encode/encoder_backend.cpp + src/encode/ffmpeg_encoder_backend.cpp + src/encode/gstreamer_legacy_backend.cpp + src/record/protobuf_descriptor.cpp + src/record/mcap_record_sink.cpp) target_include_directories(cvmmap_streamer_common PUBLIC - "${CMAKE_CURRENT_LIST_DIR}/include") -set(CVMAP_STREAMER_LINK_DEPS Threads::Threads) + "${CMAKE_CURRENT_LIST_DIR}/include" + "${CMAKE_CURRENT_LIST_DIR}/lib/CLI11/include" + "${CMAKE_CURRENT_LIST_DIR}/lib/tomlplusplus/include" + "${CMAKE_CURRENT_LIST_DIR}/lib/mcap/include" + "${CMAKE_CURRENT_BINARY_DIR}") + +target_compile_definitions(cvmmap_streamer_common + PUBLIC + CVMMAP_STREAMER_HAS_GSTREAMER=${CVMMAP_STREAMER_HAS_GSTREAMER}) + +set(CVMMAP_STREAMER_LINK_DEPS + Threads::Threads + cvmmap_streamer_foxglove_proto + PkgConfig::FFMPEG + PkgConfig::ZSTD + PkgConfig::LZ4 + cvmmap::client) if (TARGET cppzmq::cppzmq) - list(APPEND CVMAP_STREAMER_LINK_DEPS cppzmq::cppzmq) + list(APPEND CVMMAP_STREAMER_LINK_DEPS cppzmq::cppzmq) elseif (TARGET cppzmq) - list(APPEND CVMAP_STREAMER_LINK_DEPS cppzmq) + list(APPEND CVMMAP_STREAMER_LINK_DEPS cppzmq) endif() if (TARGET ZeroMQ::libzmq) - list(APPEND CVMAP_STREAMER_LINK_DEPS ZeroMQ::libzmq) + list(APPEND CVMMAP_STREAMER_LINK_DEPS ZeroMQ::libzmq) elseif (TARGET ZeroMQ::ZeroMQ) - list(APPEND CVMAP_STREAMER_LINK_DEPS ZeroMQ::ZeroMQ) + list(APPEND CVMMAP_STREAMER_LINK_DEPS ZeroMQ::ZeroMQ) endif() -if (TARGET ZeroMQ::cppzmq) - list(APPEND CVMAP_STREAMER_LINK_DEPS ZeroMQ::cppzmq) -elseif (TARGET cppzmq::cppzmq) - list(APPEND CVMAP_STREAMER_LINK_DEPS cppzmq::cppzmq) -endif() - - -if (NOT TARGET PkgConfig::GSTREAMER) - message(FATAL_ERROR - "GStreamer packages were detected but PkgConfig::GSTREAMER target is unavailable. " - "Please ensure GStreamer development toolchain is correctly installed.") -endif() - -list(APPEND CVMAP_STREAMER_LINK_DEPS PkgConfig::GSTREAMER) - - if (TARGET spdlog::spdlog) - list(APPEND CVMAP_STREAMER_LINK_DEPS spdlog::spdlog) + list(APPEND CVMMAP_STREAMER_LINK_DEPS spdlog::spdlog) elseif (TARGET spdlog) - list(APPEND CVMAP_STREAMER_LINK_DEPS spdlog) + list(APPEND CVMMAP_STREAMER_LINK_DEPS spdlog) endif() if (TARGET CLI11::CLI11) - list(APPEND CVMAP_STREAMER_LINK_DEPS CLI11::CLI11) + list(APPEND CVMMAP_STREAMER_LINK_DEPS CLI11::CLI11) endif() -list(APPEND CVMAP_STREAMER_LINK_DEPS cvmmap::client) +list(APPEND CVMMAP_STREAMER_LINK_DEPS cvmmap_streamer_protobuf) +if (TARGET PkgConfig::PROTOBUF_PKG) + list(APPEND CVMMAP_STREAMER_LINK_DEPS PkgConfig::PROTOBUF_PKG) +endif() -target_link_libraries(cvmmap_streamer_common PUBLIC ${CVMAP_STREAMER_LINK_DEPS}) +if (CVMMAP_STREAMER_HAS_GSTREAMER) + list(APPEND CVMMAP_STREAMER_LINK_DEPS PkgConfig::GSTREAMER) +endif() + +target_link_libraries(cvmmap_streamer_common PUBLIC ${CVMMAP_STREAMER_LINK_DEPS}) function(add_cvmmap_binary target source) add_executable(${target} ${source} ${ARGN}) target_include_directories(${target} PRIVATE - "${CMAKE_CURRENT_LIST_DIR}/include") + "${CMAKE_CURRENT_LIST_DIR}/include" + "${CMAKE_CURRENT_LIST_DIR}/lib/CLI11/include" + "${CMAKE_CURRENT_LIST_DIR}/lib/tomlplusplus/include" + "${CMAKE_CURRENT_LIST_DIR}/lib/mcap/include" + "${CMAKE_CURRENT_BINARY_DIR}") target_link_libraries(${target} PRIVATE - cvmmap_streamer_common) + cvmmap_streamer_common) set_target_properties(${target} PROPERTIES OUTPUT_NAME "${target}") endfunction() @@ -105,3 +175,28 @@ add_cvmmap_binary(cvmmap_streamer src/main_streamer.cpp) add_cvmmap_binary(rtp_receiver_tester src/testers/rtp_receiver_tester.cpp) add_cvmmap_binary(rtmp_stub_tester src/testers/rtmp_stub_tester.cpp) add_cvmmap_binary(ipc_snapshot_tester src/testers/ipc_snapshot_tester.cpp) + +add_executable(mcap_reader_tester src/testers/mcap_reader_tester.cpp) +target_include_directories(mcap_reader_tester + PRIVATE + "${CMAKE_CURRENT_LIST_DIR}/include" + "${CMAKE_CURRENT_LIST_DIR}/lib/CLI11/include" + "${CMAKE_CURRENT_LIST_DIR}/lib/mcap/include" + "${CMAKE_CURRENT_BINARY_DIR}") +target_link_libraries(mcap_reader_tester + PRIVATE + cvmmap_streamer_foxglove_proto + PkgConfig::ZSTD + PkgConfig::LZ4) +if (TARGET spdlog::spdlog) + target_link_libraries(mcap_reader_tester PRIVATE spdlog::spdlog) +elseif (TARGET spdlog) + target_link_libraries(mcap_reader_tester PRIVATE spdlog) +endif() +if (TARGET CLI11::CLI11) + target_link_libraries(mcap_reader_tester PRIVATE CLI11::CLI11) +endif() +target_link_libraries(mcap_reader_tester PRIVATE cvmmap_streamer_protobuf) +if (TARGET PkgConfig::PROTOBUF_PKG) + target_link_libraries(mcap_reader_tester PRIVATE PkgConfig::PROTOBUF_PKG) +endif() diff --git a/include/cvmmap_streamer/config/runtime_config.hpp b/include/cvmmap_streamer/config/runtime_config.hpp index 7bf1370..05c304e 100644 --- a/include/cvmmap_streamer/config/runtime_config.hpp +++ b/include/cvmmap_streamer/config/runtime_config.hpp @@ -25,9 +25,34 @@ enum class RtmpMode { Domestic, }; +enum class EncoderBackendType { + Auto, + FFmpeg, + GStreamerLegacy, +}; + +enum class EncoderDeviceType { + Auto, + Nvidia, + Software, +}; + +enum class McapCompression { + None, + Lz4, + Zstd, +}; + struct InputConfig { - std::string shm_name{"cvmmap_default"}; - std::string zmq_endpoint{"ipc:///tmp/cvmmap_default"}; + std::string uri{"cvmmap://default"}; +}; + +struct EncoderConfig { + EncoderBackendType backend{EncoderBackendType::Auto}; + EncoderDeviceType device{EncoderDeviceType::Auto}; + CodecType codec{CodecType::H264}; + std::uint32_t gop{30}; + std::uint32_t b_frames{0}; }; struct RtmpOutputConfig { @@ -50,10 +75,20 @@ struct OutputsConfig { RtpOutputConfig rtp{}; }; +struct McapRecordConfig { + bool enabled{false}; + std::string path{"capture.mcap"}; + std::string topic{"/camera/video"}; + std::string frame_id{"camera"}; + McapCompression compression{McapCompression::Zstd}; +}; + +struct RecordConfig { + McapRecordConfig mcap{}; +}; + struct LatencyConfig { std::size_t queue_size{1}; - std::uint32_t gop{30}; - std::uint32_t b_frames{0}; bool realtime_sync{true}; bool force_idr_on_reset{true}; std::uint32_t ingest_max_frames{0}; @@ -66,8 +101,9 @@ struct LatencyConfig { struct RuntimeConfig { InputConfig input{}; RunMode run_mode{RunMode::Pipeline}; - CodecType codec{CodecType::H264}; + EncoderConfig encoder{}; OutputsConfig outputs{}; + RecordConfig record{}; LatencyConfig latency{}; static RuntimeConfig defaults(); @@ -76,6 +112,9 @@ struct RuntimeConfig { std::string_view to_string(CodecType codec); std::string_view to_string(RunMode mode); std::string_view to_string(RtmpMode mode); +std::string_view to_string(EncoderBackendType backend); +std::string_view to_string(EncoderDeviceType device); +std::string_view to_string(McapCompression compression); std::expected parse_runtime_config(int argc, char **argv); std::expected validate_runtime_config(const RuntimeConfig &config); diff --git a/include/cvmmap_streamer/encode/encoded_access_unit.hpp b/include/cvmmap_streamer/encode/encoded_access_unit.hpp new file mode 100644 index 0000000..3887375 --- /dev/null +++ b/include/cvmmap_streamer/encode/encoded_access_unit.hpp @@ -0,0 +1,18 @@ +#pragma once + +#include "cvmmap_streamer/config/runtime_config.hpp" + +#include +#include + +namespace cvmmap_streamer::encode { + +struct EncodedAccessUnit { + CodecType codec{CodecType::H264}; + std::uint64_t source_timestamp_ns{0}; + std::uint64_t stream_pts_ns{0}; + bool keyframe{false}; + std::vector annexb_bytes{}; +}; + +} diff --git a/include/cvmmap_streamer/encode/encoder_backend.hpp b/include/cvmmap_streamer/encode/encoder_backend.hpp new file mode 100644 index 0000000..4a028a7 --- /dev/null +++ b/include/cvmmap_streamer/encode/encoder_backend.hpp @@ -0,0 +1,54 @@ +#pragma once + +#include "cvmmap_streamer/config/runtime_config.hpp" +#include "cvmmap_streamer/encode/encoded_access_unit.hpp" +#include "cvmmap_streamer/ipc/contracts.hpp" + +#include +#include +#include +#include +#include +#include +#include + +namespace cvmmap_streamer::encode { + +struct RawVideoFrame { + ipc::FrameInfo info{}; + std::uint64_t source_timestamp_ns{0}; + std::span bytes{}; +}; + +class EncoderBackend { +public: + virtual ~EncoderBackend() = default; + + [[nodiscard]] + virtual std::string_view backend_name() const = 0; + + [[nodiscard]] + virtual bool using_hardware() const = 0; + + [[nodiscard]] + virtual std::expected init(const RuntimeConfig &config, const ipc::FrameInfo &frame_info) = 0; + + [[nodiscard]] + virtual std::expected poll() = 0; + + [[nodiscard]] + virtual std::expected push_frame(const RawVideoFrame &frame) = 0; + + [[nodiscard]] + virtual std::expected, std::string> drain() = 0; + + [[nodiscard]] + virtual std::expected, std::string> flush() = 0; + + virtual void shutdown() = 0; +}; + +[[nodiscard]] +std::expected, std::string> make_encoder_backend(const RuntimeConfig &config); + +} diff --git a/include/cvmmap_streamer/record/mcap_record_sink.hpp b/include/cvmmap_streamer/record/mcap_record_sink.hpp new file mode 100644 index 0000000..c52fdf4 --- /dev/null +++ b/include/cvmmap_streamer/record/mcap_record_sink.hpp @@ -0,0 +1,42 @@ +#pragma once + +#include "cvmmap_streamer/config/runtime_config.hpp" +#include "cvmmap_streamer/encode/encoded_access_unit.hpp" + +#include +#include +#include + +namespace cvmmap_streamer::record { + +class McapRecordSink { +public: + McapRecordSink() = default; + ~McapRecordSink(); + + McapRecordSink(const McapRecordSink &) = delete; + McapRecordSink &operator=(const McapRecordSink &) = delete; + + McapRecordSink(McapRecordSink &&other) noexcept; + McapRecordSink &operator=(McapRecordSink &&other) noexcept; + + [[nodiscard]] + static std::expected create(const RuntimeConfig &config); + + [[nodiscard]] + std::expected write_access_unit(const encode::EncodedAccessUnit &access_unit); + + [[nodiscard]] + bool is_open() const; + + [[nodiscard]] + std::string_view path() const; + + void close(); + +private: + struct State; + State *state_{nullptr}; +}; + +} diff --git a/lib/mcap/include/mcap/crc32.hpp b/lib/mcap/include/mcap/crc32.hpp new file mode 100644 index 0000000..7eff3f7 --- /dev/null +++ b/lib/mcap/include/mcap/crc32.hpp @@ -0,0 +1,108 @@ +#include +#include +#include + +namespace mcap::internal { + +/** + * Compute CRC32 lookup tables as described at: + * https://github.com/komrad36/CRC#option-6-1-byte-tabular + * + * An iteration of CRC computation can be performed on 8 bits of input at once. By pre-computing a + * table of the values of CRC(?) for all 2^8 = 256 possible byte values, during the final + * computation we can replace a loop over 8 bits with a single lookup in the table. + * + * For further speedup, we can also pre-compute the values of CRC(?0) for all possible bytes when a + * zero byte is appended. Then we can process two bytes of input at once by computing CRC(AB) = + * CRC(A0) ^ CRC(B), using one lookup in the CRC(?0) table and one lookup in the CRC(?) table. + * + * The same technique applies for any number of bytes to be processed at once, although the speed + * improvements diminish. + * + * @param Polynomial The binary representation of the polynomial to use (reversed, i.e. most + * significant bit represents x^0). + * @param NumTables The number of bytes of input that will be processed at once. + */ +template +struct CRC32Table { +private: + std::array table = {}; + +public: + constexpr CRC32Table() { + for (uint32_t i = 0; i < 256; i++) { + uint32_t r = i; + r = ((r & 1) * Polynomial) ^ (r >> 1); + r = ((r & 1) * Polynomial) ^ (r >> 1); + r = ((r & 1) * Polynomial) ^ (r >> 1); + r = ((r & 1) * Polynomial) ^ (r >> 1); + r = ((r & 1) * Polynomial) ^ (r >> 1); + r = ((r & 1) * Polynomial) ^ (r >> 1); + r = ((r & 1) * Polynomial) ^ (r >> 1); + r = ((r & 1) * Polynomial) ^ (r >> 1); + table[i] = r; + } + for (size_t i = 256; i < table.size(); i++) { + uint32_t value = table[i - 256]; + table[i] = table[value & 0xff] ^ (value >> 8); + } + } + + constexpr uint32_t operator[](size_t index) const { + return table[index]; + } +}; + +inline uint32_t getUint32LE(const std::byte* data) { + return (uint32_t(data[0]) << 0) | (uint32_t(data[1]) << 8) | (uint32_t(data[2]) << 16) | + (uint32_t(data[3]) << 24); +} + +static constexpr CRC32Table<0xedb88320, 8> CRC32_TABLE; + +/** + * Initialize a CRC32 to all 1 bits. + */ +static constexpr uint32_t CRC32_INIT = 0xffffffff; + +/** + * Update a streaming CRC32 calculation. + * + * For performance, this implementation processes the data 8 bytes at a time, using the algorithm + * presented at: https://github.com/komrad36/CRC#option-9-8-byte-tabular + */ +inline uint32_t crc32Update(const uint32_t prev, const std::byte* const data, const size_t length) { + // Process bytes one by one until we reach the proper alignment. + uint32_t r = prev; + size_t offset = 0; + for (; (uintptr_t(data + offset) & alignof(uint32_t)) != 0 && offset < length; offset++) { + r = CRC32_TABLE[(r ^ uint8_t(data[offset])) & 0xff] ^ (r >> 8); + } + if (offset == length) { + return r; + } + + // Process 8 bytes (2 uint32s) at a time. + size_t remainingBytes = length - offset; + for (; remainingBytes >= 8; offset += 8, remainingBytes -= 8) { + r ^= getUint32LE(data + offset); + uint32_t r2 = getUint32LE(data + offset + 4); + r = CRC32_TABLE[0 * 256 + ((r2 >> 24) & 0xff)] ^ CRC32_TABLE[1 * 256 + ((r2 >> 16) & 0xff)] ^ + CRC32_TABLE[2 * 256 + ((r2 >> 8) & 0xff)] ^ CRC32_TABLE[3 * 256 + ((r2 >> 0) & 0xff)] ^ + CRC32_TABLE[4 * 256 + ((r >> 24) & 0xff)] ^ CRC32_TABLE[5 * 256 + ((r >> 16) & 0xff)] ^ + CRC32_TABLE[6 * 256 + ((r >> 8) & 0xff)] ^ CRC32_TABLE[7 * 256 + ((r >> 0) & 0xff)]; + } + + // Process any remaining bytes one by one. + for (; offset < length; offset++) { + r = CRC32_TABLE[(r ^ uint8_t(data[offset])) & 0xff] ^ (r >> 8); + } + return r; +} + +/** Finalize a CRC32 by inverting the output value. */ +inline uint32_t crc32Final(uint32_t crc) { + return crc ^ 0xffffffff; +} + +} // namespace mcap::internal diff --git a/lib/mcap/include/mcap/errors.hpp b/lib/mcap/include/mcap/errors.hpp new file mode 100644 index 0000000..8c61b8d --- /dev/null +++ b/lib/mcap/include/mcap/errors.hpp @@ -0,0 +1,120 @@ +#pragma once + +#include + +namespace mcap { + +/** + * @brief Status codes for MCAP readers and writers. + */ +enum class StatusCode { + Success = 0, + NotOpen, + InvalidSchemaId, + InvalidChannelId, + FileTooSmall, + ReadFailed, + MagicMismatch, + InvalidFile, + InvalidRecord, + InvalidOpCode, + InvalidChunkOffset, + InvalidFooter, + DecompressionFailed, + DecompressionSizeMismatch, + UnrecognizedCompression, + OpenFailed, + MissingStatistics, + InvalidMessageReadOptions, + NoMessageIndexesAvailable, + UnsupportedCompression, +}; + +/** + * @brief Wraps a status code and string message carrying additional context. + */ +struct [[nodiscard]] Status { + StatusCode code; + std::string message; + + Status() + : code(StatusCode::Success) {} + + Status(StatusCode _code) + : code(_code) { + switch (code) { + case StatusCode::Success: + break; + case StatusCode::NotOpen: + message = "not open"; + break; + case StatusCode::InvalidSchemaId: + message = "invalid schema id"; + break; + case StatusCode::InvalidChannelId: + message = "invalid channel id"; + break; + case StatusCode::FileTooSmall: + message = "file too small"; + break; + case StatusCode::ReadFailed: + message = "read failed"; + break; + case StatusCode::MagicMismatch: + message = "magic mismatch"; + break; + case StatusCode::InvalidFile: + message = "invalid file"; + break; + case StatusCode::InvalidRecord: + message = "invalid record"; + break; + case StatusCode::InvalidOpCode: + message = "invalid opcode"; + break; + case StatusCode::InvalidChunkOffset: + message = "invalid chunk offset"; + break; + case StatusCode::InvalidFooter: + message = "invalid footer"; + break; + case StatusCode::DecompressionFailed: + message = "decompression failed"; + break; + case StatusCode::DecompressionSizeMismatch: + message = "decompression size mismatch"; + break; + case StatusCode::UnrecognizedCompression: + message = "unrecognized compression"; + break; + case StatusCode::OpenFailed: + message = "open failed"; + break; + case StatusCode::MissingStatistics: + message = "missing statistics"; + break; + case StatusCode::InvalidMessageReadOptions: + message = "message read options conflict"; + break; + case StatusCode::NoMessageIndexesAvailable: + message = "file has no message indices"; + break; + case StatusCode::UnsupportedCompression: + message = "unsupported compression"; + break; + default: + message = "unknown"; + break; + } + } + + Status(StatusCode _code, const std::string& _message) + : code(_code) + , message(_message) {} + + bool ok() const { + return code == StatusCode::Success; + } +}; + +} // namespace mcap diff --git a/lib/mcap/include/mcap/internal.hpp b/lib/mcap/include/mcap/internal.hpp new file mode 100644 index 0000000..69b1dd9 --- /dev/null +++ b/lib/mcap/include/mcap/internal.hpp @@ -0,0 +1,193 @@ +#pragma once + +#include "types.hpp" +#include + +// Do not compile on systems with non-8-bit bytes +static_assert(std::numeric_limits::digits == 8); + +namespace mcap { + +namespace internal { + +constexpr uint64_t MinHeaderLength = /* magic bytes */ sizeof(Magic) + + /* opcode */ 1 + + /* record length */ 8 + + /* profile length */ 4 + + /* library length */ 4; +constexpr uint64_t FooterLength = /* opcode */ 1 + + /* record length */ 8 + + /* summary start */ 8 + + /* summary offset start */ 8 + + /* summary crc */ 4 + + /* magic bytes */ sizeof(Magic); + +inline std::string ToHex(uint8_t byte) { + std::string result{2, '\0'}; + result[0] = "0123456789ABCDEF"[(uint8_t(byte) >> 4) & 0x0F]; + result[1] = "0123456789ABCDEF"[uint8_t(byte) & 0x0F]; + return result; +} +inline std::string ToHex(std::byte byte) { + return ToHex(uint8_t(byte)); +} + +inline std::string to_string(const std::string& arg) { + return arg; +} +inline std::string to_string(std::string_view arg) { + return std::string(arg); +} +inline std::string to_string(const char* arg) { + return std::string(arg); +} +template +[[nodiscard]] inline std::string StrCat(T&&... args) { + using mcap::internal::to_string; + using std::to_string; + return ("" + ... + to_string(std::forward(args))); +} + +inline uint32_t KeyValueMapSize(const KeyValueMap& map) { + size_t size = 0; + for (const auto& [key, value] : map) { + size += 4 + key.size() + 4 + value.size(); + } + return (uint32_t)(size); +} + +inline const std::string CompressionString(Compression compression) { + switch (compression) { + case Compression::None: + default: + return std::string{}; + case Compression::Lz4: + return "lz4"; + case Compression::Zstd: + return "zstd"; + } +} + +inline uint16_t ParseUint16(const std::byte* data) { + return uint16_t(data[0]) | (uint16_t(data[1]) << 8); +} + +inline uint32_t ParseUint32(const std::byte* data) { + return uint32_t(data[0]) | (uint32_t(data[1]) << 8) | (uint32_t(data[2]) << 16) | + (uint32_t(data[3]) << 24); +} + +inline Status ParseUint32(const std::byte* data, uint64_t maxSize, uint32_t* output) { + if (maxSize < 4) { + const auto msg = StrCat("cannot read uint32 from ", maxSize, " bytes"); + return Status{StatusCode::InvalidRecord, msg}; + } + *output = ParseUint32(data); + return StatusCode::Success; +} + +inline uint64_t ParseUint64(const std::byte* data) { + return uint64_t(data[0]) | (uint64_t(data[1]) << 8) | (uint64_t(data[2]) << 16) | + (uint64_t(data[3]) << 24) | (uint64_t(data[4]) << 32) | (uint64_t(data[5]) << 40) | + (uint64_t(data[6]) << 48) | (uint64_t(data[7]) << 56); +} + +inline Status ParseUint64(const std::byte* data, uint64_t maxSize, uint64_t* output) { + if (maxSize < 8) { + const auto msg = StrCat("cannot read uint64 from ", maxSize, " bytes"); + return Status{StatusCode::InvalidRecord, msg}; + } + *output = ParseUint64(data); + return StatusCode::Success; +} + +inline Status ParseStringView(const std::byte* data, uint64_t maxSize, std::string_view* output) { + uint32_t size = 0; + if (auto status = ParseUint32(data, maxSize, &size); !status.ok()) { + const auto msg = StrCat("cannot read string size: ", status.message); + return Status{StatusCode::InvalidRecord, msg}; + } + if (uint64_t(size) > (maxSize - 4)) { + const auto msg = StrCat("string size ", size, " exceeds remaining bytes ", (maxSize - 4)); + return Status(StatusCode::InvalidRecord, msg); + } + *output = std::string_view(reinterpret_cast(data + 4), size); + return StatusCode::Success; +} + +inline Status ParseString(const std::byte* data, uint64_t maxSize, std::string* output) { + uint32_t size = 0; + if (auto status = ParseUint32(data, maxSize, &size); !status.ok()) { + return status; + } + if (uint64_t(size) > (maxSize - 4)) { + const auto msg = StrCat("string size ", size, " exceeds remaining bytes ", (maxSize - 4)); + return Status(StatusCode::InvalidRecord, msg); + } + *output = std::string(reinterpret_cast(data + 4), size); + return StatusCode::Success; +} + +inline Status ParseByteArray(const std::byte* data, uint64_t maxSize, ByteArray* output) { + uint32_t size = 0; + if (auto status = ParseUint32(data, maxSize, &size); !status.ok()) { + return status; + } + if (uint64_t(size) > (maxSize - 4)) { + const auto msg = StrCat("byte array size ", size, " exceeds remaining bytes ", (maxSize - 4)); + return Status(StatusCode::InvalidRecord, msg); + } + output->resize(size); + // output->data() may return nullptr if 'output' is empty, but memcpy() does not accept nullptr. + // 'output' will be empty only if the 'size' is equal to 0. + if (size > 0) { + std::memcpy(output->data(), data + 4, size); + } + return StatusCode::Success; +} + +inline Status ParseKeyValueMap(const std::byte* data, uint64_t maxSize, KeyValueMap* output) { + uint32_t sizeInBytes = 0; + if (auto status = ParseUint32(data, maxSize, &sizeInBytes); !status.ok()) { + return status; + } + if (sizeInBytes > (maxSize - 4)) { + const auto msg = + StrCat("key-value map size ", sizeInBytes, " exceeds remaining bytes ", (maxSize - 4)); + return Status(StatusCode::InvalidRecord, msg); + } + + // Account for the byte size prefix in sizeInBytes to make the bounds checking + // below simpler + sizeInBytes += 4; + + output->clear(); + uint64_t pos = 4; + while (pos < sizeInBytes) { + std::string_view key; + if (auto status = ParseStringView(data + pos, sizeInBytes - pos, &key); !status.ok()) { + const auto msg = StrCat("cannot read key-value map key at pos ", pos, ": ", status.message); + return Status{StatusCode::InvalidRecord, msg}; + } + pos += 4 + key.size(); + std::string_view value; + if (auto status = ParseStringView(data + pos, sizeInBytes - pos, &value); !status.ok()) { + const auto msg = StrCat("cannot read key-value map value for key \"", key, "\" at pos ", pos, + ": ", status.message); + return Status{StatusCode::InvalidRecord, msg}; + } + pos += 4 + value.size(); + output->emplace(key, value); + } + return StatusCode::Success; +} + +inline std::string MagicToHex(const std::byte* data) { + return internal::ToHex(data[0]) + internal::ToHex(data[1]) + internal::ToHex(data[2]) + + internal::ToHex(data[3]) + internal::ToHex(data[4]) + internal::ToHex(data[5]) + + internal::ToHex(data[6]) + internal::ToHex(data[7]); +} + +} // namespace internal + +} // namespace mcap diff --git a/lib/mcap/include/mcap/intervaltree.hpp b/lib/mcap/include/mcap/intervaltree.hpp new file mode 100644 index 0000000..267eb7a --- /dev/null +++ b/lib/mcap/include/mcap/intervaltree.hpp @@ -0,0 +1,303 @@ +// Adapted from + +#pragma once + +#include +#include +#include +#include +#include +#include + +namespace mcap::internal { + +template +class Interval { +public: + Scalar start; + Scalar stop; + Value value; + Interval(const Scalar& s, const Scalar& e, const Value& v) + : start(std::min(s, e)) + , stop(std::max(s, e)) + , value(v) {} +}; + +template +Value intervalStart(const Interval& i) { + return i.start; +} + +template +Value intervalStop(const Interval& i) { + return i.stop; +} + +template +std::ostream& operator<<(std::ostream& out, const Interval& i) { + out << "Interval(" << i.start << ", " << i.stop << "): " << i.value; + return out; +} + +template +class IntervalTree { +public: + using interval = Interval; + using interval_vector = std::vector; + + struct IntervalStartCmp { + bool operator()(const interval& a, const interval& b) { + return a.start < b.start; + } + }; + + struct IntervalStopCmp { + bool operator()(const interval& a, const interval& b) { + return a.stop < b.stop; + } + }; + + IntervalTree() + : left(nullptr) + , right(nullptr) + , center(Scalar(0)) {} + + ~IntervalTree() = default; + + std::unique_ptr clone() const { + return std::unique_ptr(new IntervalTree(*this)); + } + + IntervalTree(const IntervalTree& other) + : intervals(other.intervals) + , left(other.left ? other.left->clone() : nullptr) + , right(other.right ? other.right->clone() : nullptr) + , center(other.center) {} + + IntervalTree& operator=(IntervalTree&&) = default; + IntervalTree(IntervalTree&&) = default; + + IntervalTree& operator=(const IntervalTree& other) { + center = other.center; + intervals = other.intervals; + left = other.left ? other.left->clone() : nullptr; + right = other.right ? other.right->clone() : nullptr; + return *this; + } + + IntervalTree(interval_vector&& ivals, std::size_t depth = 16, std::size_t minbucket = 64, + std::size_t maxbucket = 512, Scalar leftextent = 0, Scalar rightextent = 0) + : left(nullptr) + , right(nullptr) { + --depth; + const auto minmaxStop = std::minmax_element(ivals.begin(), ivals.end(), IntervalStopCmp()); + const auto minmaxStart = std::minmax_element(ivals.begin(), ivals.end(), IntervalStartCmp()); + if (!ivals.empty()) { + center = (minmaxStart.first->start + minmaxStop.second->stop) / 2; + } + if (leftextent == 0 && rightextent == 0) { + // sort intervals by start + std::sort(ivals.begin(), ivals.end(), IntervalStartCmp()); + } else { + assert(std::is_sorted(ivals.begin(), ivals.end(), IntervalStartCmp())); + } + if (depth == 0 || (ivals.size() < minbucket && ivals.size() < maxbucket)) { + std::sort(ivals.begin(), ivals.end(), IntervalStartCmp()); + intervals = std::move(ivals); + assert(is_valid().first); + return; + } else { + Scalar leftp = 0; + Scalar rightp = 0; + + if (leftextent || rightextent) { + leftp = leftextent; + rightp = rightextent; + } else { + leftp = ivals.front().start; + rightp = std::max_element(ivals.begin(), ivals.end(), IntervalStopCmp())->stop; + } + + interval_vector lefts; + interval_vector rights; + + for (typename interval_vector::const_iterator i = ivals.begin(); i != ivals.end(); ++i) { + const interval& cur = *i; + if (cur.stop < center) { + lefts.push_back(cur); + } else if (cur.start > center) { + rights.push_back(cur); + } else { + assert(cur.start <= center); + assert(center <= cur.stop); + intervals.push_back(cur); + } + } + + if (!lefts.empty()) { + left.reset(new IntervalTree(std::move(lefts), depth, minbucket, maxbucket, leftp, center)); + } + if (!rights.empty()) { + right.reset( + new IntervalTree(std::move(rights), depth, minbucket, maxbucket, center, rightp)); + } + } + assert(is_valid().first); + } + + // Call f on all intervals near the range [start, stop]: + template + void visit_near(const Scalar& start, const Scalar& stop, UnaryFunction f) const { + if (!intervals.empty() && !(stop < intervals.front().start)) { + for (auto& i : intervals) { + f(i); + } + } + if (left && start <= center) { + left->visit_near(start, stop, f); + } + if (right && stop >= center) { + right->visit_near(start, stop, f); + } + } + + // Call f on all intervals crossing pos + template + void visit_overlapping(const Scalar& pos, UnaryFunction f) const { + visit_overlapping(pos, pos, f); + } + + // Call f on all intervals overlapping [start, stop] + template + void visit_overlapping(const Scalar& start, const Scalar& stop, UnaryFunction f) const { + auto filterF = [&](const interval& cur) { + if (cur.stop >= start && cur.start <= stop) { + // Only apply f if overlapping + f(cur); + } + }; + visit_near(start, stop, filterF); + } + + // Call f on all intervals contained within [start, stop] + template + void visit_contained(const Scalar& start, const Scalar& stop, UnaryFunction f) const { + auto filterF = [&](const interval& cur) { + if (start <= cur.start && cur.stop <= stop) { + f(cur); + } + }; + visit_near(start, stop, filterF); + } + + interval_vector find_overlapping(const Scalar& start, const Scalar& stop) const { + interval_vector result; + visit_overlapping(start, stop, [&](const interval& cur) { + result.emplace_back(cur); + }); + return result; + } + + interval_vector find_contained(const Scalar& start, const Scalar& stop) const { + interval_vector result; + visit_contained(start, stop, [&](const interval& cur) { + result.push_back(cur); + }); + return result; + } + + bool empty() const { + if (left && !left->empty()) { + return false; + } + if (!intervals.empty()) { + return false; + } + if (right && !right->empty()) { + return false; + } + return true; + } + + template + void visit_all(UnaryFunction f) const { + if (left) { + left->visit_all(f); + } + std::for_each(intervals.begin(), intervals.end(), f); + if (right) { + right->visit_all(f); + } + } + + std::pair extent() const { + struct Extent { + std::pair x{std::numeric_limits::max(), + std::numeric_limits::min()}; + void operator()(const interval& cur) { + x.first = std::min(x.first, cur.start); + x.second = std::max(x.second, cur.stop); + } + }; + Extent extent; + + visit_all([&](const interval& cur) { + extent(cur); + }); + return extent.x; + } + + // Check all constraints. + // If first is false, second is invalid. + std::pair> is_valid() const { + const auto minmaxStop = + std::minmax_element(intervals.begin(), intervals.end(), IntervalStopCmp()); + const auto minmaxStart = + std::minmax_element(intervals.begin(), intervals.end(), IntervalStartCmp()); + + std::pair> result = { + true, {std::numeric_limits::max(), std::numeric_limits::min()}}; + if (!intervals.empty()) { + result.second.first = std::min(result.second.first, minmaxStart.first->start); + result.second.second = std::min(result.second.second, minmaxStop.second->stop); + } + if (left) { + auto valid = left->is_valid(); + result.first &= valid.first; + result.second.first = std::min(result.second.first, valid.second.first); + result.second.second = std::min(result.second.second, valid.second.second); + if (!result.first) { + return result; + } + if (valid.second.second >= center) { + result.first = false; + return result; + } + } + if (right) { + auto valid = right->is_valid(); + result.first &= valid.first; + result.second.first = std::min(result.second.first, valid.second.first); + result.second.second = std::min(result.second.second, valid.second.second); + if (!result.first) { + return result; + } + if (valid.second.first <= center) { + result.first = false; + return result; + } + } + if (!std::is_sorted(intervals.begin(), intervals.end(), IntervalStartCmp())) { + result.first = false; + } + return result; + } + +private: + interval_vector intervals; + std::unique_ptr left; + std::unique_ptr right; + Scalar center; +}; + +} // namespace mcap::internal diff --git a/lib/mcap/include/mcap/mcap.hpp b/lib/mcap/include/mcap/mcap.hpp new file mode 100644 index 0000000..71f479c --- /dev/null +++ b/lib/mcap/include/mcap/mcap.hpp @@ -0,0 +1,4 @@ +#pragma once + +#include "reader.hpp" +#include "writer.hpp" diff --git a/lib/mcap/include/mcap/read_job_queue.hpp b/lib/mcap/include/mcap/read_job_queue.hpp new file mode 100644 index 0000000..7faf446 --- /dev/null +++ b/lib/mcap/include/mcap/read_job_queue.hpp @@ -0,0 +1,147 @@ +#pragma once + +#include "types.hpp" +#include +#include + +namespace mcap::internal { + +// Helper for writing compile-time exhaustive variant visitors. +template +inline constexpr bool always_false_v = false; + +/** + * @brief A job to read a specific message at offset `offset` from the decompressed chunk + * stored in `chunkReaderIndex`. A timestamp is provided to order this job relative to other jobs. + */ +struct ReadMessageJob { + Timestamp timestamp; + RecordOffset offset; + size_t chunkReaderIndex; +}; + +/** + * @brief A job to decompress the chunk starting at `chunkStartOffset`. The message indices + * starting directly after the chunk record and ending at `messageIndexEndOffset` will be used to + * find specific messages within the chunk. + */ +struct DecompressChunkJob { + Timestamp messageStartTime; + Timestamp messageEndTime; + ByteOffset chunkStartOffset; + ByteOffset messageIndexEndOffset; +}; + +/** + * @brief A union of jobs that an indexed MCAP reader executes. + */ +using ReadJob = std::variant; + +/** + * @brief A priority queue of jobs for an indexed MCAP reader to execute. + */ +struct ReadJobQueue { +private: + bool reverse_ = false; + std::vector heap_; + + /** + * @brief return the timestamp key that should be used to compare jobs. + */ + static Timestamp TimeComparisonKey(const ReadJob& job, bool reverse) { + Timestamp result = 0; + std::visit( + [&](auto&& arg) { + using T = std::decay_t; + if constexpr (std::is_same_v) { + result = arg.timestamp; + } else if constexpr (std::is_same_v) { + if (reverse) { + result = arg.messageEndTime; + } else { + result = arg.messageStartTime; + } + } else { + static_assert(always_false_v, "non-exhaustive visitor!"); + } + }, + job); + return result; + } + static RecordOffset PositionComparisonKey(const ReadJob& job, bool reverse) { + RecordOffset result; + std::visit( + [&](auto&& arg) { + using T = std::decay_t; + if constexpr (std::is_same_v) { + result = arg.offset; + } else if constexpr (std::is_same_v) { + if (reverse) { + result.offset = arg.messageIndexEndOffset; + } else { + result.offset = arg.chunkStartOffset; + } + } else { + static_assert(always_false_v, "non-exhaustive visitor!"); + } + }, + job); + return result; + } + + static bool CompareForward(const ReadJob& a, const ReadJob& b) { + auto aTimestamp = TimeComparisonKey(a, false); + auto bTimestamp = TimeComparisonKey(b, false); + if (aTimestamp == bTimestamp) { + return PositionComparisonKey(a, false) > PositionComparisonKey(b, false); + } + return aTimestamp > bTimestamp; + } + + static bool CompareReverse(const ReadJob& a, const ReadJob& b) { + auto aTimestamp = TimeComparisonKey(a, true); + auto bTimestamp = TimeComparisonKey(b, true); + if (aTimestamp == bTimestamp) { + return PositionComparisonKey(a, true) < PositionComparisonKey(b, true); + } + return aTimestamp < bTimestamp; + } + +public: + explicit ReadJobQueue(bool reverse) + : reverse_(reverse) {} + void push(DecompressChunkJob&& decompressChunkJob) { + heap_.emplace_back(std::move(decompressChunkJob)); + if (!reverse_) { + std::push_heap(heap_.begin(), heap_.end(), CompareForward); + } else { + std::push_heap(heap_.begin(), heap_.end(), CompareReverse); + } + } + + void push(ReadMessageJob&& readMessageJob) { + heap_.emplace_back(std::move(readMessageJob)); + if (!reverse_) { + std::push_heap(heap_.begin(), heap_.end(), CompareForward); + } else { + std::push_heap(heap_.begin(), heap_.end(), CompareReverse); + } + } + + ReadJob pop() { + if (!reverse_) { + std::pop_heap(heap_.begin(), heap_.end(), CompareForward); + } else { + std::pop_heap(heap_.begin(), heap_.end(), CompareReverse); + } + auto popped = heap_.back(); + heap_.pop_back(); + return popped; + } + + size_t len() const { + return heap_.size(); + } +}; + +} // namespace mcap::internal diff --git a/lib/mcap/include/mcap/reader.hpp b/lib/mcap/include/mcap/reader.hpp new file mode 100644 index 0000000..38eae7f --- /dev/null +++ b/lib/mcap/include/mcap/reader.hpp @@ -0,0 +1,743 @@ +#pragma once + +#include "intervaltree.hpp" +#include "read_job_queue.hpp" +#include "types.hpp" +#include "visibility.hpp" +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace mcap { + +enum struct ReadSummaryMethod { + /** + * @brief Parse the Summary section to produce seeking indexes and summary + * statistics. If the Summary section is not present or corrupt, a failure + * Status is returned and the seeking indexes and summary statistics are not + * populated. + */ + NoFallbackScan, + /** + * @brief If the Summary section is missing or incomplete, allow falling back + * to reading the file sequentially to produce seeking indexes and summary + * statistics. + */ + AllowFallbackScan, + /** + * @brief Read the file sequentially from Header to DataEnd to produce seeking + * indexes and summary statistics. + */ + ForceScan, +}; + +/** + * @brief An abstract interface for reading MCAP data. + */ +struct MCAP_PUBLIC IReadable { + virtual ~IReadable() = default; + + /** + * @brief Returns the size of the file in bytes. + * + * @return uint64_t The total number of bytes in the MCAP file. + */ + virtual uint64_t size() const = 0; + /** + * @brief This method is called by MCAP reader classes when they need to read + * a portion of the file. + * + * @param output A pointer to a pointer to the buffer to write to. This method + * is expected to either maintain an internal buffer, read data into it, and + * update this pointer to point at the internal buffer, or update this + * pointer to point directly at the source data if possible. The pointer and + * data must remain valid and unmodified until the next call to read(). + * @param offset The offset in bytes from the beginning of the file to read. + * @param size The number of bytes to read. + * @return uint64_t Number of bytes actually read. This may be less than the + * requested size if the end of the file is reached. The output pointer must + * be readable from `output` to `output + size`. If the read fails, this + * method should return 0. + */ + virtual uint64_t read(std::byte** output, uint64_t offset, uint64_t size) = 0; +}; + +/** + * @brief IReadable implementation wrapping a FILE* pointer created by fopen() + * and a read buffer. + */ +class MCAP_PUBLIC FileReader final : public IReadable { +public: + FileReader(std::FILE* file); + + uint64_t size() const override; + uint64_t read(std::byte** output, uint64_t offset, uint64_t size) override; + +private: + // Numeric type returned by the tell/seek operations. Necessary because long on Windows is 32 + // bits so the standard C library interfaces don't work for files larger than 2GiB. +#if defined _WIN32 || defined __CYGWIN__ + typedef __int64 offset_type; +#else + typedef long offset_type; +#endif + + static_assert((offset_type)(uint64_t)std::numeric_limits::max() == + std::numeric_limits::max(), + "offset_type should fit in uint64_t"); + + std::FILE* file_; + std::vector buffer_; + uint64_t size_; + uint64_t position_; +}; + +/** + * @brief IReadable implementation wrapping a std::ifstream input file stream. + */ +class MCAP_PUBLIC FileStreamReader final : public IReadable { +public: + FileStreamReader(std::ifstream& stream); + + uint64_t size() const override; + uint64_t read(std::byte** output, uint64_t offset, uint64_t size) override; + +private: + std::ifstream& stream_; + std::vector buffer_; + uint64_t size_; + uint64_t position_; +}; + +/** + * @brief An abstract interface for compressed readers. + */ +class MCAP_PUBLIC ICompressedReader : public IReadable { +public: + virtual ~ICompressedReader() override = default; + + /** + * @brief Reset the reader state, clearing any internal buffers and state, and + * initialize with new compressed data. + * + * @param data Compressed data to read from. + * @param size Size of the compressed data in bytes. + * @param uncompressedSize Size of the data in bytes after decompression. A + * buffer of this size will be allocated for the uncompressed data. + */ + virtual void reset(const std::byte* data, uint64_t size, uint64_t uncompressedSize) = 0; + /** + * @brief Report the current status of decompression. A StatusCode other than + * `StatusCode::Success` after `reset()` is called indicates the decompression + * was not successful and the reader is in an invalid state. + */ + virtual Status status() const = 0; +}; + +/** + * @brief A "null" compressed reader that directly passes through uncompressed + * data. No internal buffers are allocated. + */ +class MCAP_PUBLIC BufferReader final : public ICompressedReader { +public: + void reset(const std::byte* data, uint64_t size, uint64_t uncompressedSize) override; + uint64_t read(std::byte** output, uint64_t offset, uint64_t size) override; + uint64_t size() const override; + Status status() const override; + + BufferReader() = default; + BufferReader(const BufferReader&) = delete; + BufferReader& operator=(const BufferReader&) = delete; + BufferReader(BufferReader&&) = delete; + BufferReader& operator=(BufferReader&&) = delete; + +private: + const std::byte* data_; + uint64_t size_; +}; + +#ifndef MCAP_COMPRESSION_NO_ZSTD +/** + * @brief ICompressedReader implementation that decompresses Zstandard + * (https://facebook.github.io/zstd/) data. + */ +class MCAP_PUBLIC ZStdReader final : public ICompressedReader { +public: + void reset(const std::byte* data, uint64_t size, uint64_t uncompressedSize) override; + uint64_t read(std::byte** output, uint64_t offset, uint64_t size) override; + uint64_t size() const override; + Status status() const override; + + /** + * @brief Decompresses an entire Zstd-compressed chunk into `output`. + * + * @param data The Zstd-compressed input chunk. + * @param compressedSize The size of the Zstd-compressed input. + * @param uncompressedSize The size of the data once uncompressed. + * @param output The output vector. This will be resized to `uncompressedSize` to fit the data, + * or 0 if the decompression encountered an error. + * @return Status + */ + static Status DecompressAll(const std::byte* data, uint64_t compressedSize, + uint64_t uncompressedSize, ByteArray* output); + ZStdReader() = default; + ZStdReader(const ZStdReader&) = delete; + ZStdReader& operator=(const ZStdReader&) = delete; + ZStdReader(ZStdReader&&) = delete; + ZStdReader& operator=(ZStdReader&&) = delete; + +private: + Status status_; + ByteArray uncompressedData_; +}; +#endif + +#ifndef MCAP_COMPRESSION_NO_LZ4 +/** + * @brief ICompressedReader implementation that decompresses LZ4 + * (https://lz4.github.io/lz4/) data. + */ +class MCAP_PUBLIC LZ4Reader final : public ICompressedReader { +public: + void reset(const std::byte* data, uint64_t size, uint64_t uncompressedSize) override; + uint64_t read(std::byte** output, uint64_t offset, uint64_t size) override; + uint64_t size() const override; + Status status() const override; + + /** + * @brief Decompresses an entire LZ4-encoded chunk into `output`. + * + * @param data The LZ4-compressed input chunk. + * @param size The size of the LZ4-compressed input. + * @param uncompressedSize The size of the data once uncompressed. + * @param output The output vector. This will be resized to `uncompressedSize` to fit the data, + * or 0 if the decompression encountered an error. + * @return Status + */ + Status decompressAll(const std::byte* data, uint64_t size, uint64_t uncompressedSize, + ByteArray* output); + LZ4Reader(); + LZ4Reader(const LZ4Reader&) = delete; + LZ4Reader& operator=(const LZ4Reader&) = delete; + LZ4Reader(LZ4Reader&&) = delete; + LZ4Reader& operator=(LZ4Reader&&) = delete; + ~LZ4Reader() override; + +private: + void* decompressionContext_ = nullptr; // LZ4F_dctx* + Status status_; + const std::byte* compressedData_; + ByteArray uncompressedData_; + uint64_t compressedSize_; + uint64_t uncompressedSize_; +}; +#endif + +struct LinearMessageView; + +/** + * @brief Options for reading messages out of an MCAP file. + */ +struct MCAP_PUBLIC ReadMessageOptions { +public: + /** + * @brief Only messages with log timestamps greater or equal to startTime will be included. + */ + Timestamp startTime = 0; + /** + * @brief Only messages with log timestamps less than endTime will be included. + */ + Timestamp endTime = MaxTime; + /** + * @brief If provided, `topicFilter` is called on all topics found in the MCAP file. If + * `topicFilter` returns true for a given channel, messages from that channel will be included. + * if not provided, messages from all channels are provided. + */ + std::function topicFilter; + enum struct ReadOrder { FileOrder, LogTimeOrder, ReverseLogTimeOrder }; + /** + * @brief Set the expected order that messages should be returned in. + * if readOrder == FileOrder, messages will be returned in the order they appear in the MCAP file. + * if readOrder == LogTimeOrder, messages will be returned in ascending log time order. + * if readOrder == ReverseLogTimeOrder, messages will be returned in descending log time order. + */ + ReadOrder readOrder = ReadOrder::FileOrder; + + ReadMessageOptions(Timestamp start, Timestamp end) + : startTime(start) + , endTime(end) {} + + ReadMessageOptions() = default; + + /** + * @brief validate the configuration. + */ + Status validate() const; +}; + +/** + * @brief Provides a read interface to an MCAP file. + */ +class MCAP_PUBLIC McapReader final { +public: + ~McapReader(); + + /** + * @brief Opens an MCAP file for reading from an already constructed IReadable + * implementation. + * + * @param reader An implementation of the IReader interface that provides raw + * MCAP data. + * @return Status StatusCode::Success on success. If a non-success Status is + * returned, the data source is not considered open and McapReader is not + * usable until `open()` is called and a success response is returned. + */ + Status open(IReadable& reader); + /** + * @brief Opens an MCAP file for reading from a given filename. + * + * @param filename Filename to open. + * @return Status StatusCode::Success on success. If a non-success Status is + * returned, the data source is not considered open and McapReader is not + * usable until `open()` is called and a success response is returned. + */ + Status open(std::string_view filename); + /** + * @brief Opens an MCAP file for reading from a std::ifstream input file + * stream. + * + * @param stream Input file stream to read MCAP data from. + * @return Status StatusCode::Success on success. If a non-success Status is + * returned, the file is not considered open and McapReader is not usable + * until `open()` is called and a success response is returned. + */ + Status open(std::ifstream& stream); + + /** + * @brief Closes the MCAP file, clearing any internal data structures and + * state and dropping the data source reference. + * + */ + void close(); + + /** + * @brief Read and parse the Summary section at the end of the MCAP file, if + * available. This will populate internal indexes to allow for efficient + * summarization and random access. This method will automatically be called + * upon requesting summary data or first seek if Summary section parsing is + * allowed by the configuration options. + */ + Status readSummary( + ReadSummaryMethod method, const ProblemCallback& onProblem = [](const Status&) {}); + + /** + * @brief Returns an iterable view with `begin()` and `end()` methods for + * iterating Messages in the MCAP file. If a non-zero `startTime` is provided, + * this will first parse the Summary section (by calling `readSummary()`) if + * allowed by the configuration options and it has not been parsed yet. + * + * @param startTime Optional start time in nanoseconds. Messages before this + * time will not be returned. + * @param endTime Optional end time in nanoseconds. Messages equal to or after + * this time will not be returned. + */ + LinearMessageView readMessages(Timestamp startTime = 0, Timestamp endTime = MaxTime); + /** + * @brief Returns an iterable view with `begin()` and `end()` methods for + * iterating Messages in the MCAP file. If a non-zero `startTime` is provided, + * this will first parse the Summary section (by calling `readSummary()`) if + * allowed by the configuration options and it has not been parsed yet. + * + * @param onProblem A callback that will be called when a parsing error + * occurs. Problems can either be recoverable, indicating some data could + * not be read, or non-recoverable, stopping the iteration. + * @param startTime Optional start time in nanoseconds. Messages before this + * time will not be returned. + * @param endTime Optional end time in nanoseconds. Messages equal to or after + * this time will not be returned. + */ + LinearMessageView readMessages(const ProblemCallback& onProblem, Timestamp startTime = 0, + Timestamp endTime = MaxTime); + + /** + * @brief Returns an iterable view with `begin()` and `end()` methods for + * iterating Messages in the MCAP file. + * Uses the options from `options` to select the messages that are yielded. + */ + LinearMessageView readMessages(const ProblemCallback& onProblem, + const ReadMessageOptions& options); + + /** + * @brief Returns starting and ending byte offsets that must be read to + * iterate all messages in the given time range. If `readSummary()` has been + * successfully called and the recording contains Chunk records, this range + * will be narrowed to Chunk records that contain messages in the given time + * range. Otherwise, this range will be the entire Data section if the Data + * End record has been found or the entire file otherwise. + * + * This method is automatically used by `readMessages()`, and only needs to be + * called directly if the caller is manually constructing an iterator. + * + * @param startTime Start time in nanoseconds. + * @param endTime Optional end time in nanoseconds. + * @return Start and end byte offsets. + */ + std::pair byteRange(Timestamp startTime, + Timestamp endTime = MaxTime) const; + + /** + * @brief Returns a pointer to the IReadable data source backing this reader. + * Will return nullptr if the reader is not open. + */ + IReadable* dataSource(); + + /** + * @brief Returns the parsed Header record, if it has been encountered. + */ + const std::optional
& header() const; + /** + * @brief Returns the parsed Footer record, if it has been encountered. + */ + const std::optional