diff --git a/.gitmodules b/.gitmodules index e52838d..50416cf 100644 --- a/.gitmodules +++ b/.gitmodules @@ -1,3 +1,6 @@ [submodule "lib/CLI11"] path = lib/CLI11 url = https://github.com/CLIUtils/CLI11 +[submodule "lib/proxy"] + path = lib/proxy + url = https://github.com/ngcpp/proxy diff --git a/CMakeLists.txt b/CMakeLists.txt index 082a931..57838db 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -28,7 +28,7 @@ if (EXISTS "${CMAKE_CURRENT_LIST_DIR}/lib/CLI11/CMakeLists.txt") add_subdirectory("${CMAKE_CURRENT_LIST_DIR}/lib/CLI11" "${CMAKE_CURRENT_BINARY_DIR}/vendor/cli11") endif() -pkg_check_modules(FFMPEG REQUIRED IMPORTED_TARGET libavcodec libavutil libswscale) +pkg_check_modules(FFMPEG REQUIRED IMPORTED_TARGET libavcodec libavformat libavutil libswscale) pkg_check_modules(PROTOBUF_PKG QUIET IMPORTED_TARGET protobuf) pkg_check_modules(ZSTD REQUIRED IMPORTED_TARGET libzstd) pkg_check_modules(LZ4 REQUIRED IMPORTED_TARGET liblz4) @@ -50,6 +50,11 @@ if (NOT TARGET cvmmap::client) endif() endif() +set(CVMMAP_PROXY_INCLUDE_DIR "${CMAKE_CURRENT_LIST_DIR}/lib/proxy/include") +if (NOT EXISTS "${CVMMAP_PROXY_INCLUDE_DIR}/proxy/proxy.h") + message(FATAL_ERROR "proxy headers not found at ${CVMMAP_PROXY_INCLUDE_DIR}") +endif() + set(CVMMAP_STREAMER_HAS_GSTREAMER 0) if (ENABLE_GSTREAMER_LEGACY) pkg_check_modules(GSTREAMER IMPORTED_TARGET gstreamer-1.0>=1.14 gstreamer-video-1.0>=1.14 gstreamer-app-1.0>=1.14) @@ -95,6 +100,7 @@ add_library(cvmmap_streamer_common STATIC src/protocol/wire_codec.cpp src/metrics/latency_tracker.cpp src/pipeline/pipeline_runtime.cpp + src/protocol/rtmp_output.cpp src/protocol/rtmp_publisher.cpp src/protocol/rtp_publisher.cpp src/encode/encoder_backend.cpp @@ -109,6 +115,7 @@ target_include_directories(cvmmap_streamer_common "${CMAKE_CURRENT_LIST_DIR}/lib/CLI11/include" "${CMAKE_CURRENT_LIST_DIR}/lib/tomlplusplus/include" "${CMAKE_CURRENT_LIST_DIR}/lib/mcap/include" + "${CVMMAP_PROXY_INCLUDE_DIR}" "${CMAKE_CURRENT_BINARY_DIR}") target_compile_definitions(cvmmap_streamer_common @@ -164,6 +171,7 @@ function(add_cvmmap_binary target source) "${CMAKE_CURRENT_LIST_DIR}/lib/CLI11/include" "${CMAKE_CURRENT_LIST_DIR}/lib/tomlplusplus/include" "${CMAKE_CURRENT_LIST_DIR}/lib/mcap/include" + "${CVMMAP_PROXY_INCLUDE_DIR}" "${CMAKE_CURRENT_BINARY_DIR}") target_link_libraries(${target} PRIVATE @@ -174,6 +182,7 @@ endfunction() add_cvmmap_binary(cvmmap_streamer src/main_streamer.cpp) add_cvmmap_binary(rtp_receiver_tester src/testers/rtp_receiver_tester.cpp) add_cvmmap_binary(rtmp_stub_tester src/testers/rtmp_stub_tester.cpp) +add_cvmmap_binary(rtmp_output_tester src/testers/rtmp_output_tester.cpp) add_cvmmap_binary(ipc_snapshot_tester src/testers/ipc_snapshot_tester.cpp) add_executable(mcap_reader_tester src/testers/mcap_reader_tester.cpp) @@ -182,6 +191,7 @@ target_include_directories(mcap_reader_tester "${CMAKE_CURRENT_LIST_DIR}/include" "${CMAKE_CURRENT_LIST_DIR}/lib/CLI11/include" "${CMAKE_CURRENT_LIST_DIR}/lib/mcap/include" + "${CVMMAP_PROXY_INCLUDE_DIR}" "${CMAKE_CURRENT_BINARY_DIR}") target_link_libraries(mcap_reader_tester PRIVATE diff --git a/include/cvmmap_streamer/config/runtime_config.hpp b/include/cvmmap_streamer/config/runtime_config.hpp index 05c304e..dbd3cfe 100644 --- a/include/cvmmap_streamer/config/runtime_config.hpp +++ b/include/cvmmap_streamer/config/runtime_config.hpp @@ -25,6 +25,12 @@ enum class RtmpMode { Domestic, }; +enum class RtmpTransportType { + Libavformat, + FfmpegProcess, + LegacyCustom, +}; + enum class EncoderBackendType { Auto, FFmpeg, @@ -58,6 +64,8 @@ struct EncoderConfig { struct RtmpOutputConfig { bool enabled{false}; std::vector urls{}; + RtmpTransportType transport{RtmpTransportType::Libavformat}; + std::string ffmpeg_path{"ffmpeg"}; RtmpMode mode{RtmpMode::Enhanced}; }; @@ -112,6 +120,7 @@ struct RuntimeConfig { std::string_view to_string(CodecType codec); std::string_view to_string(RunMode mode); std::string_view to_string(RtmpMode mode); +std::string_view to_string(RtmpTransportType transport); std::string_view to_string(EncoderBackendType backend); std::string_view to_string(EncoderDeviceType device); std::string_view to_string(McapCompression compression); diff --git a/include/cvmmap_streamer/core/status.hpp b/include/cvmmap_streamer/core/status.hpp new file mode 100644 index 0000000..bcf125c --- /dev/null +++ b/include/cvmmap_streamer/core/status.hpp @@ -0,0 +1,114 @@ +#pragma once + +#include +#include +#include +#include + +namespace cvmmap_streamer { + +using error_t = int; + +enum class ErrorCode : error_t { + Ok = 0, + InvalidArgument, + InvalidConfig, + Unsupported, + NotReady, + BackendUnavailable, + AllocationFailed, + Io, + Network, + Protocol, + Encoder, + ExternalLibrary, + Serialization, + ChildProcess, + EndOfStream, + Internal, +}; + +inline constexpr ErrorCode ERR_OK = ErrorCode::Ok; +inline constexpr ErrorCode ERR_INVALID_ARGUMENT = ErrorCode::InvalidArgument; +inline constexpr ErrorCode ERR_INVALID_CONFIG = ErrorCode::InvalidConfig; +inline constexpr ErrorCode ERR_UNSUPPORTED = ErrorCode::Unsupported; +inline constexpr ErrorCode ERR_NOT_READY = ErrorCode::NotReady; +inline constexpr ErrorCode ERR_BACKEND_UNAVAILABLE = ErrorCode::BackendUnavailable; +inline constexpr ErrorCode ERR_ALLOCATION_FAILED = ErrorCode::AllocationFailed; +inline constexpr ErrorCode ERR_IO = ErrorCode::Io; +inline constexpr ErrorCode ERR_NETWORK = ErrorCode::Network; +inline constexpr ErrorCode ERR_PROTOCOL = ErrorCode::Protocol; +inline constexpr ErrorCode ERR_ENCODER = ErrorCode::Encoder; +inline constexpr ErrorCode ERR_EXTERNAL_LIBRARY = ErrorCode::ExternalLibrary; +inline constexpr ErrorCode ERR_SERIALIZATION = ErrorCode::Serialization; +inline constexpr ErrorCode ERR_CHILD_PROCESS = ErrorCode::ChildProcess; +inline constexpr ErrorCode ERR_END_OF_STREAM = ErrorCode::EndOfStream; +inline constexpr ErrorCode ERR_INTERNAL = ErrorCode::Internal; + +struct Error { + ErrorCode code{ERR_OK}; + std::string detail{}; +}; + +template +using Result = std::expected; + +using Status = Result; + +[[nodiscard]] +inline std::unexpected unexpected_error(ErrorCode code, std::string detail = {}) { + return std::unexpected(Error{ + .code = code, + .detail = std::move(detail), + }); +} + +[[nodiscard]] +inline std::string_view error_code_name(ErrorCode code) { + switch (code) { + case ERR_OK: + return "ok"; + case ERR_INVALID_ARGUMENT: + return "invalid_argument"; + case ERR_INVALID_CONFIG: + return "invalid_config"; + case ERR_UNSUPPORTED: + return "unsupported"; + case ERR_NOT_READY: + return "not_ready"; + case ERR_BACKEND_UNAVAILABLE: + return "backend_unavailable"; + case ERR_ALLOCATION_FAILED: + return "allocation_failed"; + case ERR_IO: + return "io"; + case ERR_NETWORK: + return "network"; + case ERR_PROTOCOL: + return "protocol"; + case ERR_ENCODER: + return "encoder"; + case ERR_EXTERNAL_LIBRARY: + return "external_library"; + case ERR_SERIALIZATION: + return "serialization"; + case ERR_CHILD_PROCESS: + return "child_process"; + case ERR_END_OF_STREAM: + return "end_of_stream"; + case ERR_INTERNAL: + return "internal"; + default: + return "unknown"; + } +} + +[[nodiscard]] +inline std::string format_error(const Error &error) { + if (error.detail.empty()) { + return std::string(error_code_name(error.code)); + } + return std::string(error_code_name(error.code)) + ": " + error.detail; +} + +} diff --git a/include/cvmmap_streamer/encode/encoded_access_unit.hpp b/include/cvmmap_streamer/encode/encoded_access_unit.hpp index 3887375..307eaaf 100644 --- a/include/cvmmap_streamer/encode/encoded_access_unit.hpp +++ b/include/cvmmap_streamer/encode/encoded_access_unit.hpp @@ -7,6 +7,22 @@ namespace cvmmap_streamer::encode { +enum class EncodedBitstreamFormat { + AnnexB, +}; + +struct EncodedStreamInfo { + CodecType codec{CodecType::H264}; + std::uint32_t width{0}; + std::uint32_t height{0}; + std::uint32_t time_base_num{1}; + std::uint32_t time_base_den{1'000'000'000u}; + std::uint32_t frame_rate_num{30}; + std::uint32_t frame_rate_den{1}; + EncodedBitstreamFormat bitstream_format{EncodedBitstreamFormat::AnnexB}; + std::vector decoder_config{}; +}; + struct EncodedAccessUnit { CodecType codec{CodecType::H264}; std::uint64_t source_timestamp_ns{0}; diff --git a/include/cvmmap_streamer/encode/encoder_backend.hpp b/include/cvmmap_streamer/encode/encoder_backend.hpp index 4a028a7..77e2150 100644 --- a/include/cvmmap_streamer/encode/encoder_backend.hpp +++ b/include/cvmmap_streamer/encode/encoder_backend.hpp @@ -1,17 +1,17 @@ #pragma once #include "cvmmap_streamer/config/runtime_config.hpp" +#include "cvmmap_streamer/core/status.hpp" #include "cvmmap_streamer/encode/encoded_access_unit.hpp" #include "cvmmap_streamer/ipc/contracts.hpp" #include -#include -#include #include -#include #include #include +#include + namespace cvmmap_streamer::encode { struct RawVideoFrame { @@ -20,35 +20,31 @@ struct RawVideoFrame { std::span bytes{}; }; -class EncoderBackend { -public: - virtual ~EncoderBackend() = default; +PRO_DEF_MEM_DISPATCH(MemBackendName, backend_name); +PRO_DEF_MEM_DISPATCH(MemUsingHardware, using_hardware); +PRO_DEF_MEM_DISPATCH(MemInit, init); +PRO_DEF_MEM_DISPATCH(MemStreamInfo, stream_info); +PRO_DEF_MEM_DISPATCH(MemPoll, poll); +PRO_DEF_MEM_DISPATCH(MemPushFrame, push_frame); +PRO_DEF_MEM_DISPATCH(MemDrain, drain); +PRO_DEF_MEM_DISPATCH(MemFlush, flush); +PRO_DEF_MEM_DISPATCH(MemShutdown, shutdown); - [[nodiscard]] - virtual std::string_view backend_name() const = 0; +struct EncoderBackendFacade : pro::facade_builder + ::add_convention + ::add_convention + ::add_convention + ::add_convention() const> + ::add_convention + ::add_convention + ::add_convention>()> + ::add_convention>()> + ::add_convention + ::build {}; - [[nodiscard]] - virtual bool using_hardware() const = 0; - - [[nodiscard]] - virtual std::expected init(const RuntimeConfig &config, const ipc::FrameInfo &frame_info) = 0; - - [[nodiscard]] - virtual std::expected poll() = 0; - - [[nodiscard]] - virtual std::expected push_frame(const RawVideoFrame &frame) = 0; - - [[nodiscard]] - virtual std::expected, std::string> drain() = 0; - - [[nodiscard]] - virtual std::expected, std::string> flush() = 0; - - virtual void shutdown() = 0; -}; +using EncoderBackend = pro::proxy; [[nodiscard]] -std::expected, std::string> make_encoder_backend(const RuntimeConfig &config); +Result make_encoder_backend(const RuntimeConfig &config); } diff --git a/include/cvmmap_streamer/protocol/rtmp_output.hpp b/include/cvmmap_streamer/protocol/rtmp_output.hpp new file mode 100644 index 0000000..5b072a3 --- /dev/null +++ b/include/cvmmap_streamer/protocol/rtmp_output.hpp @@ -0,0 +1,30 @@ +#pragma once + +#include "cvmmap_streamer/config/runtime_config.hpp" +#include "cvmmap_streamer/core/status.hpp" +#include "cvmmap_streamer/encode/encoded_access_unit.hpp" + +#include + +#include + +namespace cvmmap_streamer::protocol { + +PRO_DEF_MEM_DISPATCH(MemBackendName, backend_name); +PRO_DEF_MEM_DISPATCH(MemPublishAccessUnit, publish_access_unit); +PRO_DEF_MEM_DISPATCH(MemLogMetrics, log_metrics); + +struct RtmpOutputFacade : pro::facade_builder + ::add_convention + ::add_convention + ::add_convention + ::build {}; + +using RtmpOutput = pro::proxy; + +[[nodiscard]] +Result make_rtmp_output( + const RuntimeConfig &config, + const encode::EncodedStreamInfo &stream_info); + +} diff --git a/lib/proxy b/lib/proxy new file mode 160000 index 0000000..1a3f464 --- /dev/null +++ b/lib/proxy @@ -0,0 +1 @@ +Subproject commit 1a3f4646d5e3f8610d7338904677f7a0b9bff5b6 diff --git a/scripts/rtmp_srs_smoke.sh b/scripts/rtmp_srs_smoke.sh new file mode 100755 index 0000000..8daac97 --- /dev/null +++ b/scripts/rtmp_srs_smoke.sh @@ -0,0 +1,99 @@ +#!/usr/bin/env bash +set -euo pipefail + +ROOT_DIR="$(cd -- "$(dirname -- "${BASH_SOURCE[0]}")/.." && pwd)" +BUILD_DIR="${BUILD_DIR:-$ROOT_DIR/build}" +SRS_ROOT="${SRS_ROOT:-$HOME/Code/srs}" +SRS_BIN="${SRS_BIN:-$SRS_ROOT/trunk/objs/srs}" +SRS_CONF="${SRS_CONF:-$SRS_ROOT/trunk/conf/srs.conf}" +FFPROBE_BIN="${FFPROBE_BIN:-ffprobe}" + +wait_for_port() { + local host="$1" + local port="$2" + for _ in $(seq 1 80); do + if (echo >"/dev/tcp/${host}/${port}") >/dev/null 2>&1; then + return 0 + fi + sleep 0.25 + done + return 1 +} + +probe_stream() { + local stream_name="$1" + local expected_codec="$2" + "${FFPROBE_BIN}" \ + -v error \ + -select_streams v:0 \ + -show_entries stream=codec_name,width,height \ + -of default=noprint_wrappers=1 \ + "http://127.0.0.1:8080/live/${stream_name}.flv" | + tee /dev/stderr | + grep -q "^codec_name=${expected_codec}$" +} + +wait_for_stream() { + local stream_name="$1" + local expected_codec="$2" + for _ in $(seq 1 40); do + if probe_stream "$stream_name" "$expected_codec"; then + return 0 + fi + sleep 0.25 + done + return 1 +} + +run_case() { + local transport="$1" + local codec="$2" + local stream_name="cvmmap_${transport}_${codec}" + local expected_codec="$codec" + if [[ "$codec" == "h265" ]]; then + expected_codec="hevc" + fi + + "${BUILD_DIR}/rtmp_output_tester" \ + --transport "$transport" \ + --codec "$codec" \ + --rtmp-url "rtmp://127.0.0.1/live/${stream_name}" \ + --frames 36 \ + --frame-interval-ms 33 \ + --linger-ms 4000 & + local tester_pid=$! + + wait_for_stream "$stream_name" "$expected_codec" + wait "$tester_pid" +} + +if [[ ! -x "$SRS_BIN" ]]; then + echo "SRS binary not found: $SRS_BIN" >&2 + exit 1 +fi + +mkdir -p "$BUILD_DIR" +SRS_LOG="${BUILD_DIR}/srs-smoke.log" + +pushd "${SRS_ROOT}/trunk" >/dev/null +"$SRS_BIN" -c "$SRS_CONF" >"$SRS_LOG" 2>&1 & +SRS_PID=$! +popd >/dev/null + +cleanup() { + if kill -0 "$SRS_PID" >/dev/null 2>&1; then + kill "$SRS_PID" >/dev/null 2>&1 || true + wait "$SRS_PID" >/dev/null 2>&1 || true + fi +} +trap cleanup EXIT + +wait_for_port 127.0.0.1 1935 +wait_for_port 127.0.0.1 8080 + +run_case libavformat h264 +run_case ffmpeg_process h264 +run_case libavformat h265 +run_case ffmpeg_process h265 + +echo "RTMP SRS smoke tests completed successfully" diff --git a/src/config/runtime_config.cpp b/src/config/runtime_config.cpp index 0c64064..b0e8d7a 100644 --- a/src/config/runtime_config.cpp +++ b/src/config/runtime_config.cpp @@ -123,6 +123,20 @@ std::expected parse_rtmp_mode(std::string_view raw) { return std::unexpected("invalid rtmp mode: '" + std::string(raw) + "' (expected: enhanced|domestic)"); } +std::expected parse_rtmp_transport(std::string_view raw) { + if (raw == "libavformat") { + return RtmpTransportType::Libavformat; + } + if (raw == "ffmpeg_process" || raw == "ffmpeg-process") { + return RtmpTransportType::FfmpegProcess; + } + if (raw == "legacy_custom" || raw == "legacy-custom") { + return RtmpTransportType::LegacyCustom; + } + return std::unexpected( + "invalid rtmp transport: '" + std::string(raw) + "' (expected: libavformat|ffmpeg_process|legacy_custom)"); +} + std::expected parse_encoder_backend(std::string_view raw) { if (raw == "auto") { return EncoderBackendType::Auto; @@ -328,6 +342,16 @@ std::expected apply_toml_file(RuntimeConfig &config, const st config.outputs.rtmp.enabled = true; config.outputs.rtmp.urls = std::move(*values); } + if (auto value = toml_value(table, "outputs.rtmp.transport")) { + auto parsed = parse_rtmp_transport(*value); + if (!parsed) { + return std::unexpected(parsed.error()); + } + config.outputs.rtmp.transport = *parsed; + } + if (auto value = toml_value(table, "outputs.rtmp.ffmpeg_path")) { + config.outputs.rtmp.ffmpeg_path = *value; + } if (auto value = toml_value(table, "outputs.rtmp.mode")) { auto parsed = parse_rtmp_mode(*value); if (!parsed) { @@ -484,6 +508,18 @@ std::string_view to_string(RtmpMode mode) { return "unknown"; } +std::string_view to_string(RtmpTransportType transport) { + switch (transport) { + case RtmpTransportType::Libavformat: + return "libavformat"; + case RtmpTransportType::FfmpegProcess: + return "ffmpeg_process"; + case RtmpTransportType::LegacyCustom: + return "legacy_custom"; + } + return "unknown"; +} + std::string_view to_string(EncoderBackendType backend) { switch (backend) { case EncoderBackendType::Auto: @@ -530,6 +566,8 @@ std::expected parse_runtime_config(int argc, char ** std::string encoder_backend_raw{}; std::string encoder_device_raw{}; std::string rtmp_mode_raw{}; + std::string rtmp_transport_raw{}; + std::string rtmp_ffmpeg_path_raw{}; std::vector rtmp_urls_raw{}; std::string rtp_endpoint_raw{}; std::string rtp_payload_type_raw{}; @@ -565,6 +603,8 @@ std::expected parse_runtime_config(int argc, char ** app.add_option("--encoder-device", encoder_device_raw); app.add_flag("--rtmp", rtmp_enabled); app.add_option("--rtmp-url", rtmp_urls_raw); + app.add_option("--rtmp-transport", rtmp_transport_raw); + app.add_option("--rtmp-ffmpeg", rtmp_ffmpeg_path_raw); app.add_option("--rtmp-mode", rtmp_mode_raw); app.add_flag("--rtp", rtp_enabled); app.add_option("--rtp-endpoint", rtp_endpoint_raw); @@ -642,6 +682,16 @@ std::expected parse_runtime_config(int argc, char ** config.outputs.rtmp.enabled = true; config.outputs.rtmp.urls = std::move(rtmp_urls_raw); } + if (!rtmp_transport_raw.empty()) { + auto parsed = parse_rtmp_transport(rtmp_transport_raw); + if (!parsed) { + return std::unexpected(parsed.error()); + } + config.outputs.rtmp.transport = *parsed; + } + if (!rtmp_ffmpeg_path_raw.empty()) { + config.outputs.rtmp.ffmpeg_path = rtmp_ffmpeg_path_raw; + } if (!rtmp_mode_raw.empty()) { auto parsed = parse_rtmp_mode(rtmp_mode_raw); if (!parsed) { @@ -781,15 +831,29 @@ std::expected validate_runtime_config(const RuntimeConfig &co return std::unexpected("invalid RTMP config: URL must not be empty"); } } - if (config.outputs.rtmp.mode == RtmpMode::Domestic && config.encoder.codec != CodecType::H265) { - return std::unexpected("invalid mode matrix: domestic RTMP mode requires codec h265"); - } - if (config.encoder.backend == EncoderBackendType::FFmpeg && config.outputs.rtmp.enabled) { - return std::unexpected("invalid backend/output matrix: RTMP is only supported by gstreamer_legacy in this build"); - } if (config.encoder.backend == EncoderBackendType::GStreamerLegacy && config.record.mcap.enabled) { return std::unexpected("invalid backend/output matrix: MCAP recording requires the ffmpeg encoded access-unit path"); } + if (config.outputs.rtmp.enabled) { + if (config.outputs.rtmp.transport == RtmpTransportType::LegacyCustom) { + if (config.outputs.rtmp.mode == RtmpMode::Domestic && config.encoder.codec != CodecType::H265) { + return std::unexpected("invalid mode matrix: domestic RTMP mode requires codec h265"); + } + if (config.encoder.backend != EncoderBackendType::GStreamerLegacy) { + return std::unexpected("invalid backend/output matrix: legacy_custom RTMP requires encoder.backend=gstreamer_legacy"); + } + } else { + if (config.outputs.rtmp.mode != RtmpMode::Enhanced) { + return std::unexpected("invalid RTMP config: non-legacy RTMP transports only support rtmp.mode=enhanced"); + } + if (config.encoder.backend != EncoderBackendType::FFmpeg) { + return std::unexpected("invalid backend/output matrix: RTMP transports libavformat and ffmpeg_process require encoder.backend=ffmpeg"); + } + if (config.outputs.rtmp.transport == RtmpTransportType::FfmpegProcess && config.outputs.rtmp.ffmpeg_path.empty()) { + return std::unexpected("invalid RTMP config: ffmpeg_process transport requires a non-empty ffmpeg path"); + } + } + } if (config.outputs.rtp.enabled) { if (!config.outputs.rtp.endpoint || config.outputs.rtp.endpoint->empty()) { @@ -831,8 +895,8 @@ std::expected validate_runtime_config(const RuntimeConfig &co if (config.encoder.backend == EncoderBackendType::GStreamerLegacy) { return std::unexpected("invalid backend config: gstreamer_legacy backend requested but GStreamer support is not compiled"); } - if (config.outputs.rtmp.enabled) { - return std::unexpected("invalid output config: RTMP requires GStreamer legacy support, which is not compiled"); + if (config.outputs.rtmp.enabled && config.outputs.rtmp.transport == RtmpTransportType::LegacyCustom) { + return std::unexpected("invalid output config: legacy_custom RTMP requires GStreamer support, which is not compiled"); } #endif @@ -849,6 +913,7 @@ std::string summarize_runtime_config(const RuntimeConfig &config) { ss << ", encoder.gop=" << config.encoder.gop; ss << ", encoder.b_frames=" << config.encoder.b_frames; ss << ", rtmp.enabled=" << (config.outputs.rtmp.enabled ? "true" : "false"); + ss << ", rtmp.transport=" << to_string(config.outputs.rtmp.transport); ss << ", rtmp.mode=" << to_string(config.outputs.rtmp.mode); ss << ", rtmp.urls=" << config.outputs.rtmp.urls.size(); ss << ", rtp.enabled=" << (config.outputs.rtp.enabled ? "true" : "false"); diff --git a/src/encode/encoder_backend.cpp b/src/encode/encoder_backend.cpp index 023985d..538ebc8 100644 --- a/src/encode/encoder_backend.cpp +++ b/src/encode/encoder_backend.cpp @@ -1,36 +1,35 @@ #include "cvmmap_streamer/encode/encoder_backend.hpp" -#include -#include - namespace cvmmap_streamer::encode { -std::unique_ptr make_ffmpeg_backend(); -std::unique_ptr make_gstreamer_legacy_backend(); +EncoderBackend make_ffmpeg_backend(); +EncoderBackend make_gstreamer_legacy_backend(); -std::expected, std::string> make_encoder_backend(const RuntimeConfig &config) { +Result make_encoder_backend(const RuntimeConfig &config) { switch (config.encoder.backend) { case EncoderBackendType::FFmpeg: return make_ffmpeg_backend(); case EncoderBackendType::GStreamerLegacy: { auto backend = make_gstreamer_legacy_backend(); if (!backend) { - return std::unexpected("legacy GStreamer backend is not compiled in this build"); + return unexpected_error(ERR_BACKEND_UNAVAILABLE, "legacy GStreamer backend is not compiled in this build"); } return backend; } case EncoderBackendType::Auto: - if (config.outputs.rtmp.enabled) { + if (config.outputs.rtmp.enabled && config.outputs.rtmp.transport == RtmpTransportType::LegacyCustom) { auto backend = make_gstreamer_legacy_backend(); if (!backend) { - return std::unexpected("RTMP requires the legacy GStreamer backend, but it is not compiled"); + return unexpected_error( + ERR_BACKEND_UNAVAILABLE, + "legacy_custom RTMP requires the GStreamer backend, but it is not compiled"); } return backend; } return make_ffmpeg_backend(); } - return std::unexpected("unknown encoder backend"); + return unexpected_error(ERR_INTERNAL, "unknown encoder backend"); } } diff --git a/src/encode/ffmpeg_encoder_backend.cpp b/src/encode/ffmpeg_encoder_backend.cpp index eb207fa..72be0ee 100644 --- a/src/encode/ffmpeg_encoder_backend.cpp +++ b/src/encode/ffmpeg_encoder_backend.cpp @@ -12,8 +12,6 @@ extern "C" { #include #include -#include -#include #include #include #include @@ -27,24 +25,31 @@ namespace cvmmap_streamer::encode { namespace { -class FfmpegEncoderBackend final : public EncoderBackend { +[[nodiscard]] +std::string av_error_string(int error_code) { + char buffer[AV_ERROR_MAX_STRING_SIZE]{}; + av_strerror(error_code, buffer, sizeof(buffer)); + return std::string(buffer); +} + +class FfmpegEncoderBackend { public: - ~FfmpegEncoderBackend() override { + ~FfmpegEncoderBackend() { shutdown(); } [[nodiscard]] - std::string_view backend_name() const override { + std::string_view backend_name() const { return "ffmpeg"; } [[nodiscard]] - bool using_hardware() const override { + bool using_hardware() const { return using_hardware_; } [[nodiscard]] - std::expected init(const RuntimeConfig &config, const ipc::FrameInfo &frame_info) override { + Status init(const RuntimeConfig &config, const ipc::FrameInfo &frame_info) { shutdown(); config_ = &config; @@ -56,7 +61,6 @@ public: if (!input_pixel_format) { return std::unexpected(input_pixel_format.error()); } - input_pix_fmt_ = *input_pixel_format; auto encoder_name = pick_encoder_name(config); @@ -67,33 +71,36 @@ public: const auto *encoder = avcodec_find_encoder_by_name(encoder_name->c_str()); if (encoder == nullptr) { - return std::unexpected("FFmpeg encoder '" + *encoder_name + "' is unavailable"); + return unexpected_error(ERR_BACKEND_UNAVAILABLE, "FFmpeg encoder '" + *encoder_name + "' is unavailable"); } context_ = avcodec_alloc_context3(encoder); if (context_ == nullptr) { - return std::unexpected("failed to allocate FFmpeg encoder context"); + return unexpected_error(ERR_ALLOCATION_FAILED, "failed to allocate FFmpeg encoder context"); } - context_->codec_type = AVMEDIA_TYPE_VIDEO; - context_->codec_id = encoder->id; - context_->width = static_cast(frame_info.width); - context_->height = static_cast(frame_info.height); - context_->pix_fmt = encoder_pix_fmt_; - context_->time_base = AVRational{1, 1000000000}; - context_->framerate = AVRational{30, 1}; - context_->gop_size = static_cast(config.encoder.gop); + context_->codec_type = AVMEDIA_TYPE_VIDEO; + context_->codec_id = encoder->id; + context_->width = static_cast(frame_info.width); + context_->height = static_cast(frame_info.height); + context_->pix_fmt = encoder_pix_fmt_; + context_->flags |= AV_CODEC_FLAG_GLOBAL_HEADER; + context_->time_base = AVRational{1, 1000000000}; + context_->framerate = AVRational{30, 1}; + context_->gop_size = static_cast(config.encoder.gop); context_->max_b_frames = static_cast(config.encoder.b_frames); context_->thread_count = 1; auto codec_setup = configure_codec(*encoder_name, config); if (!codec_setup) { - return std::unexpected(codec_setup.error()); + return codec_setup; } const auto open_result = avcodec_open2(context_, encoder, nullptr); if (open_result < 0) { - return std::unexpected("failed to open FFmpeg encoder '" + *encoder_name + "': " + av_error_string(open_result)); + return unexpected_error( + ERR_ENCODER, + "failed to open FFmpeg encoder '" + *encoder_name + "': " + av_error_string(open_result)); } scaler_ = sws_getCachedContext( @@ -109,36 +116,40 @@ public: nullptr, nullptr); if (scaler_ == nullptr) { - return std::unexpected("failed to create swscale conversion context"); + return unexpected_error(ERR_EXTERNAL_LIBRARY, "failed to create swscale conversion context"); } frame_ = av_frame_alloc(); if (frame_ == nullptr) { - return std::unexpected("failed to allocate FFmpeg frame"); + return unexpected_error(ERR_ALLOCATION_FAILED, "failed to allocate FFmpeg frame"); } frame_->format = encoder_pix_fmt_; frame_->width = context_->width; frame_->height = context_->height; const auto frame_buffer = av_frame_get_buffer(frame_, 32); if (frame_buffer < 0) { - return std::unexpected("failed to allocate FFmpeg frame buffer: " + av_error_string(frame_buffer)); + return unexpected_error( + ERR_ALLOCATION_FAILED, + "failed to allocate FFmpeg frame buffer: " + av_error_string(frame_buffer)); } packet_ = av_packet_alloc(); if (packet_ == nullptr) { - return std::unexpected("failed to allocate FFmpeg packet"); + return unexpected_error(ERR_ALLOCATION_FAILED, "failed to allocate FFmpeg packet"); } filtered_packet_ = av_packet_alloc(); if (filtered_packet_ == nullptr) { - return std::unexpected("failed to allocate FFmpeg filtered packet"); + return unexpected_error(ERR_ALLOCATION_FAILED, "failed to allocate FFmpeg filtered packet"); } auto bitstream_filter = create_bitstream_filter(); if (!bitstream_filter) { - return std::unexpected(bitstream_filter.error()); + return bitstream_filter; } + stream_info_ = build_stream_info(); + spdlog::info( "FFMPEG_ENCODER_PATH codec={} device={} encoder={} pix_fmt={}", cvmmap_streamer::to_string(codec_), @@ -149,14 +160,22 @@ public: } [[nodiscard]] - std::expected poll() override { + Result stream_info() const { + if (!stream_info_) { + return unexpected_error(ERR_NOT_READY, "FFmpeg backend stream info is unavailable before initialization"); + } + return *stream_info_; + } + + [[nodiscard]] + Status poll() { return {}; } [[nodiscard]] - std::expected push_frame(const RawVideoFrame &frame) override { + Status push_frame(const RawVideoFrame &frame) { if (context_ == nullptr || frame_ == nullptr || scaler_ == nullptr) { - return std::unexpected("FFmpeg backend not initialized"); + return unexpected_error(ERR_NOT_READY, "FFmpeg backend not initialized"); } if (frame.bytes.empty()) { return {}; @@ -164,7 +183,7 @@ public: const auto make_writable = av_frame_make_writable(frame_); if (make_writable < 0) { - return std::unexpected("failed to make FFmpeg frame writable: " + av_error_string(make_writable)); + return unexpected_error(ERR_EXTERNAL_LIBRARY, "failed to make FFmpeg frame writable: " + av_error_string(make_writable)); } AVFrame input_frame{}; @@ -179,7 +198,7 @@ public: input_frame.width, input_frame.height, 1) < 0) { - return std::unexpected("failed to map input frame into FFmpeg image arrays"); + return unexpected_error(ERR_INVALID_ARGUMENT, "failed to map input frame into FFmpeg image arrays"); } sws_scale( @@ -198,29 +217,29 @@ public: frame_->pts = static_cast(frame.source_timestamp_ns - *first_source_timestamp_ns_); const auto send_result = avcodec_send_frame(context_, frame_); if (send_result < 0) { - return std::unexpected("failed to send frame to FFmpeg encoder: " + av_error_string(send_result)); + return unexpected_error(ERR_ENCODER, "failed to send frame to FFmpeg encoder: " + av_error_string(send_result)); } return {}; } [[nodiscard]] - std::expected, std::string> drain() override { + Result> drain() { return drain_packets(); } [[nodiscard]] - std::expected, std::string> flush() override { + Result> flush() { if (context_ == nullptr) { return std::vector{}; } const auto flush_result = avcodec_send_frame(context_, nullptr); if (flush_result < 0 && flush_result != AVERROR_EOF) { - return std::unexpected("failed to flush FFmpeg encoder: " + av_error_string(flush_result)); + return unexpected_error(ERR_ENCODER, "failed to flush FFmpeg encoder: " + av_error_string(flush_result)); } return drain_packets(); } - void shutdown() override { + void shutdown() { if (bsf_context_ != nullptr) { av_bsf_free(&bsf_context_); } @@ -241,19 +260,13 @@ public: scaler_ = nullptr; } first_source_timestamp_ns_.reset(); + stream_info_.reset(); using_hardware_ = false; } private: [[nodiscard]] - static std::string av_error_string(int error_code) { - char buffer[AV_ERROR_MAX_STRING_SIZE]{}; - av_strerror(error_code, buffer, sizeof(buffer)); - return std::string(buffer); - } - - [[nodiscard]] - static std::expected to_av_pixel_format(ipc::PixelFormat format) { + static Result to_av_pixel_format(ipc::PixelFormat format) { switch (format) { case ipc::PixelFormat::BGR: return AV_PIX_FMT_BGR24; @@ -266,7 +279,9 @@ private: case ipc::PixelFormat::GRAY: return AV_PIX_FMT_GRAY8; default: - return std::unexpected("unsupported raw pixel format for FFmpeg backend (supported: BGR/RGB/BGRA/RGBA/GRAY)"); + return unexpected_error( + ERR_UNSUPPORTED, + "unsupported raw pixel format for FFmpeg backend (supported: BGR/RGB/BGRA/RGBA/GRAY)"); } } @@ -292,7 +307,7 @@ private: } [[nodiscard]] - std::expected pick_encoder_name(const RuntimeConfig &config) const { + Result pick_encoder_name(const RuntimeConfig &config) const { const bool prefer_hardware = config.encoder.device != EncoderDeviceType::Software; const bool prefer_software = config.encoder.device == EncoderDeviceType::Software; if (codec_ == CodecType::H265) { @@ -307,7 +322,9 @@ private: if (!prefer_software && avcodec_find_encoder_by_name("hevc_nvenc") != nullptr) { return std::string("hevc_nvenc"); } - return std::unexpected("no usable FFmpeg encoder found for h265 (looked for hevc_nvenc, libx265)"); + return unexpected_error( + ERR_BACKEND_UNAVAILABLE, + "no usable FFmpeg encoder found for h265 (looked for hevc_nvenc, libx265)"); } if (prefer_hardware && avcodec_find_encoder_by_name("h264_nvenc") != nullptr) { @@ -321,11 +338,13 @@ private: if (!prefer_software && avcodec_find_encoder_by_name("h264_nvenc") != nullptr) { return std::string("h264_nvenc"); } - return std::unexpected("no usable FFmpeg encoder found for h264 (looked for h264_nvenc, libx264)"); + return unexpected_error( + ERR_BACKEND_UNAVAILABLE, + "no usable FFmpeg encoder found for h264 (looked for h264_nvenc, libx264)"); } [[nodiscard]] - std::expected configure_codec(std::string_view encoder_name, const RuntimeConfig &config) { + Status configure_codec(std::string_view encoder_name, const RuntimeConfig &config) { av_opt_set(context_->priv_data, "preset", encoder_name.find("nvenc") != std::string_view::npos ? "llhq" : "veryfast", 0); if (encoder_name.find("nvenc") != std::string_view::npos) { av_opt_set(context_->priv_data, "tune", "ull", 0); @@ -343,33 +362,57 @@ private: } [[nodiscard]] - std::expected create_bitstream_filter() { + Status create_bitstream_filter() { const char *filter_name = codec_ == CodecType::H265 ? "hevc_mp4toannexb" : "h264_mp4toannexb"; const auto *filter = av_bsf_get_by_name(filter_name); if (filter == nullptr) { - return std::unexpected(std::string("required FFmpeg bitstream filter '") + filter_name + "' is unavailable"); + return unexpected_error( + ERR_BACKEND_UNAVAILABLE, + std::string("required FFmpeg bitstream filter '") + filter_name + "' is unavailable"); } const auto alloc_result = av_bsf_alloc(filter, &bsf_context_); if (alloc_result < 0) { - return std::unexpected("failed to allocate FFmpeg bitstream filter: " + av_error_string(alloc_result)); + return unexpected_error( + ERR_ALLOCATION_FAILED, + "failed to allocate FFmpeg bitstream filter: " + av_error_string(alloc_result)); } const auto copy_result = avcodec_parameters_from_context(bsf_context_->par_in, context_); if (copy_result < 0) { - return std::unexpected("failed to copy codec parameters into bitstream filter: " + av_error_string(copy_result)); + return unexpected_error( + ERR_EXTERNAL_LIBRARY, + "failed to copy codec parameters into bitstream filter: " + av_error_string(copy_result)); } bsf_context_->time_base_in = context_->time_base; const auto init_result = av_bsf_init(bsf_context_); if (init_result < 0) { - return std::unexpected("failed to initialize FFmpeg bitstream filter: " + av_error_string(init_result)); + return unexpected_error( + ERR_EXTERNAL_LIBRARY, + "failed to initialize FFmpeg bitstream filter: " + av_error_string(init_result)); } return {}; } [[nodiscard]] - std::expected, std::string> drain_packets() { + EncodedStreamInfo build_stream_info() const { + EncodedStreamInfo info{}; + info.codec = codec_; + info.width = frame_info_.width; + info.height = frame_info_.height; + info.time_base_num = static_cast(context_ != nullptr ? context_->time_base.num : 1); + info.time_base_den = static_cast(context_ != nullptr ? context_->time_base.den : 1'000'000'000); + info.frame_rate_num = static_cast(context_ != nullptr ? context_->framerate.num : 30); + info.frame_rate_den = static_cast(context_ != nullptr ? context_->framerate.den : 1); + if (context_ != nullptr && context_->extradata != nullptr && context_->extradata_size > 0) { + info.decoder_config.assign(context_->extradata, context_->extradata + context_->extradata_size); + } + return info; + } + + [[nodiscard]] + Result> drain_packets() { std::vector access_units{}; while (true) { const auto receive_result = avcodec_receive_packet(context_, packet_); @@ -377,13 +420,15 @@ private: break; } if (receive_result < 0) { - return std::unexpected("failed to receive FFmpeg packet: " + av_error_string(receive_result)); + return unexpected_error(ERR_ENCODER, "failed to receive FFmpeg packet: " + av_error_string(receive_result)); } const auto bsf_send_result = av_bsf_send_packet(bsf_context_, packet_); if (bsf_send_result < 0) { av_packet_unref(packet_); - return std::unexpected("failed to send packet to bitstream filter: " + av_error_string(bsf_send_result)); + return unexpected_error( + ERR_EXTERNAL_LIBRARY, + "failed to send packet to bitstream filter: " + av_error_string(bsf_send_result)); } av_packet_unref(packet_); @@ -393,7 +438,9 @@ private: break; } if (bsf_receive_result < 0) { - return std::unexpected("failed to receive filtered packet: " + av_error_string(bsf_receive_result)); + return unexpected_error( + ERR_EXTERNAL_LIBRARY, + "failed to receive filtered packet: " + av_error_string(bsf_receive_result)); } EncodedAccessUnit access_unit{}; @@ -421,13 +468,14 @@ private: AVPixelFormat input_pix_fmt_{AV_PIX_FMT_NONE}; AVPixelFormat encoder_pix_fmt_{AV_PIX_FMT_NONE}; std::optional first_source_timestamp_ns_{}; + std::optional stream_info_{}; bool using_hardware_{false}; }; } -std::unique_ptr make_ffmpeg_backend() { - return std::make_unique(); +EncoderBackend make_ffmpeg_backend() { + return pro::make_proxy(); } } diff --git a/src/encode/gstreamer_legacy_backend.cpp b/src/encode/gstreamer_legacy_backend.cpp index d5fa873..47f7302 100644 --- a/src/encode/gstreamer_legacy_backend.cpp +++ b/src/encode/gstreamer_legacy_backend.cpp @@ -27,7 +27,7 @@ namespace { #if CVMMAP_STREAMER_HAS_GSTREAMER [[nodiscard]] -std::expected pixel_format_to_caps(ipc::PixelFormat format) { +Result pixel_format_to_caps(ipc::PixelFormat format) { switch (format) { case ipc::PixelFormat::BGR: return "BGR"; @@ -40,7 +40,7 @@ std::expected pixel_format_to_caps(ipc::PixelFormat f case ipc::PixelFormat::GRAY: return "GRAY8"; default: - return std::unexpected("unsupported raw pixel format for legacy GStreamer backend"); + return unexpected_error(ERR_UNSUPPORTED, "unsupported raw pixel format for legacy GStreamer backend"); } } @@ -79,10 +79,12 @@ std::vector encoder_candidates(CodecType codec, bool prefer_nv } [[nodiscard]] -std::expected pick_encoder_choice(CodecType codec, bool prefer_nvenc) { +Result pick_encoder_choice(CodecType codec, bool prefer_nvenc) { const std::string parser_name = selected_parser_name(codec); if (gst_element_factory_find(parser_name.c_str()) == nullptr) { - return std::unexpected("required GStreamer parser element '" + parser_name + "' is unavailable"); + return unexpected_error( + ERR_BACKEND_UNAVAILABLE, + "required GStreamer parser element '" + parser_name + "' is unavailable"); } for (const auto candidate : encoder_candidates(codec, prefer_nvenc)) { @@ -96,7 +98,7 @@ std::expected pick_encoder_choice(CodecType codec, b return choice; } - return std::unexpected("no usable GStreamer encoder available"); + return unexpected_error(ERR_BACKEND_UNAVAILABLE, "no usable GStreamer encoder available"); } [[nodiscard]] @@ -124,27 +126,29 @@ bool set_property_arg_if_exists(GObject *object, const char *name, const std::st return true; } -class GstreamerLegacyBackend final : public EncoderBackend { +class GstreamerLegacyBackend { public: - GstreamerLegacyBackend() = default; - ~GstreamerLegacyBackend() override { + GstreamerLegacyBackend() = default; + + ~GstreamerLegacyBackend() { shutdown(); } [[nodiscard]] - std::string_view backend_name() const override { + std::string_view backend_name() const { return "gstreamer_legacy"; } [[nodiscard]] - bool using_hardware() const override { + bool using_hardware() const { return using_hardware_; } [[nodiscard]] - std::expected init(const RuntimeConfig &config, const ipc::FrameInfo &frame_info) override { + Status init(const RuntimeConfig &config, const ipc::FrameInfo &frame_info) { shutdown(); - config_ = &config; + config_ = &config; + frame_info_ = frame_info; ensure_gst_initialized(); bool prefer_nvenc = config.encoder.device != EncoderDeviceType::Software; @@ -156,7 +160,7 @@ public: return std::unexpected(encoder_choice.error()); } - using_hardware_ = encoder_choice->is_nvenc; + using_hardware_ = encoder_choice->is_nvenc; active_encoder_name_ = encoder_choice->encoder_name; active_parser_name_ = encoder_choice->parser_name; @@ -185,17 +189,17 @@ public: if (error != nullptr) { const std::string message = "failed to create GStreamer pipeline: " + std::string(error->message); g_error_free(error); - return std::unexpected(message); + return unexpected_error(ERR_EXTERNAL_LIBRARY, message); } if (pipeline_ == nullptr) { - return std::unexpected("failed to create GStreamer pipeline"); + return unexpected_error(ERR_EXTERNAL_LIBRARY, "failed to create GStreamer pipeline"); } - appsrc_ = gst_bin_get_by_name(GST_BIN(pipeline_), "ingest_src"); + appsrc_ = gst_bin_get_by_name(GST_BIN(pipeline_), "ingest_src"); appsink_ = gst_bin_get_by_name(GST_BIN(pipeline_), "encoded_sink"); encoder_ = gst_bin_get_by_name(GST_BIN(pipeline_), "encoder"); if (appsrc_ == nullptr || appsink_ == nullptr || encoder_ == nullptr) { - return std::unexpected("failed to locate GStreamer pipeline elements"); + return unexpected_error(ERR_EXTERNAL_LIBRARY, "failed to locate GStreamer pipeline elements"); } const auto caps_string = @@ -208,7 +212,7 @@ public: ",framerate=(fraction)30/1"; GstCaps *caps = gst_caps_from_string(caps_string.c_str()); if (caps == nullptr) { - return std::unexpected("failed to create GStreamer caps: " + caps_string); + return unexpected_error(ERR_EXTERNAL_LIBRARY, "failed to create GStreamer caps: " + caps_string); } gst_app_src_set_caps(GST_APP_SRC(appsrc_), caps); gst_caps_unref(caps); @@ -228,7 +232,7 @@ public: bus_ = gst_element_get_bus(pipeline_); if (gst_element_set_state(pipeline_, GST_STATE_PLAYING) == GST_STATE_CHANGE_FAILURE) { - return std::unexpected("failed to set GStreamer pipeline to PLAYING"); + return unexpected_error(ERR_EXTERNAL_LIBRARY, "failed to set GStreamer pipeline to PLAYING"); } spdlog::info( @@ -240,7 +244,20 @@ public: } [[nodiscard]] - std::expected poll() override { + Result stream_info() const { + if (config_ == nullptr) { + return unexpected_error(ERR_NOT_READY, "legacy GStreamer backend stream info is unavailable before initialization"); + } + + EncodedStreamInfo info{}; + info.codec = config_->encoder.codec; + info.width = frame_info_.width; + info.height = frame_info_.height; + return info; + } + + [[nodiscard]] + Status poll() { if (bus_ == nullptr) { return {}; } @@ -267,7 +284,7 @@ public: if (GST_MESSAGE_TYPE(message) == GST_MESSAGE_EOS) { gst_message_unref(message); - return std::unexpected("legacy backend reached EOS"); + return unexpected_error(ERR_END_OF_STREAM, "legacy backend reached EOS"); } GError *error = nullptr; @@ -286,26 +303,26 @@ public: g_free(debug); } gst_message_unref(message); - return std::unexpected(message_text); + return unexpected_error(ERR_EXTERNAL_LIBRARY, message_text); } return {}; } [[nodiscard]] - std::expected push_frame(const RawVideoFrame &frame) override { + Status push_frame(const RawVideoFrame &frame) { if (appsrc_ == nullptr) { - return std::unexpected("legacy backend appsrc is null"); + return unexpected_error(ERR_NOT_READY, "legacy backend appsrc is null"); } auto *buffer = gst_buffer_new_allocate(nullptr, frame.bytes.size(), nullptr); if (buffer == nullptr) { - return std::unexpected("failed to allocate GStreamer buffer"); + return unexpected_error(ERR_ALLOCATION_FAILED, "failed to allocate GStreamer buffer"); } GstMapInfo map{}; if (!gst_buffer_map(buffer, &map, GST_MAP_WRITE)) { gst_buffer_unref(buffer); - return std::unexpected("failed to map GStreamer buffer"); + return unexpected_error(ERR_EXTERNAL_LIBRARY, "failed to map GStreamer buffer"); } std::memcpy(map.data, frame.bytes.data(), frame.bytes.size()); gst_buffer_unmap(buffer, &map); @@ -322,25 +339,27 @@ public: const auto flow = gst_app_src_push_buffer(GST_APP_SRC(appsrc_), buffer); if (flow != GST_FLOW_OK) { - return std::unexpected("legacy backend push failed with flow=" + std::to_string(static_cast(flow))); + return unexpected_error( + ERR_EXTERNAL_LIBRARY, + "legacy backend push failed with flow=" + std::to_string(static_cast(flow))); } return {}; } [[nodiscard]] - std::expected, std::string> drain() override { + Result> drain() { return pull_samples(); } [[nodiscard]] - std::expected, std::string> flush() override { + Result> flush() { if (appsrc_ != nullptr) { (void)gst_app_src_end_of_stream(GST_APP_SRC(appsrc_)); } return pull_samples(); } - void shutdown() override { + void shutdown() { if (pipeline_ != nullptr) { gst_element_set_state(pipeline_, GST_STATE_NULL); } @@ -366,13 +385,14 @@ public: } active_encoder_name_.clear(); active_parser_name_.clear(); + frame_info_ = ipc::FrameInfo{}; first_source_timestamp_ns_.reset(); using_hardware_ = false; } private: [[nodiscard]] - std::expected, std::string> pull_samples() { + Result> pull_samples() { std::vector access_units{}; if (appsink_ == nullptr || config_ == nullptr) { return access_units; @@ -388,7 +408,7 @@ private: GstMapInfo map{}; if (!gst_buffer_map(buffer, &map, GST_MAP_READ)) { gst_sample_unref(sample); - return std::unexpected("failed to map legacy encoded buffer"); + return unexpected_error(ERR_EXTERNAL_LIBRARY, "failed to map legacy encoded buffer"); } EncodedAccessUnit access_unit{}; @@ -397,9 +417,8 @@ private: if (pts != GST_CLOCK_TIME_NONE) { access_unit.stream_pts_ns = static_cast(pts); } - access_unit.source_timestamp_ns = - first_source_timestamp_ns_.value_or(0) + access_unit.stream_pts_ns; - access_unit.keyframe = !GST_BUFFER_FLAG_IS_SET(buffer, GST_BUFFER_FLAG_DELTA_UNIT); + access_unit.source_timestamp_ns = first_source_timestamp_ns_.value_or(0) + access_unit.stream_pts_ns; + access_unit.keyframe = !GST_BUFFER_FLAG_IS_SET(buffer, GST_BUFFER_FLAG_DELTA_UNIT); access_unit.annexb_bytes.assign(map.data, map.data + map.size); access_units.push_back(std::move(access_unit)); @@ -411,6 +430,7 @@ private: } const RuntimeConfig *config_{nullptr}; + ipc::FrameInfo frame_info_{}; GstElement *pipeline_{nullptr}; GstElement *appsrc_{nullptr}; GstElement *appsink_{nullptr}; @@ -426,9 +446,9 @@ private: } -std::unique_ptr make_gstreamer_legacy_backend() { +EncoderBackend make_gstreamer_legacy_backend() { #if CVMMAP_STREAMER_HAS_GSTREAMER - return std::make_unique(); + return pro::make_proxy(); #else return {}; #endif diff --git a/src/ipc/help.cpp b/src/ipc/help.cpp index 6e44ec0..f51ca93 100644 --- a/src/ipc/help.cpp +++ b/src/ipc/help.cpp @@ -9,7 +9,7 @@ namespace cvmmap_streamer { namespace { -constexpr std::array kHelpLines{ +constexpr std::array kHelpLines{ "Usage:", " --help, -h\tshow this message", "", @@ -26,9 +26,11 @@ constexpr std::array kHelpLines{ " --rtp\t\tenable RTP output", " --rtp-endpoint \tRTP destination", " --rtp-payload-type \tRTP payload type (96-127)", - " --rtp-sdp \twrite SDP sidecar", + " --rtp-sdp \twrite optional SDP sidecar", " --rtmp\t\tenable RTMP output", " --rtmp-url \tadd RTMP destination (repeatable)", + " --rtmp-transport \tlibavformat|ffmpeg_process|legacy_custom", + " --rtmp-ffmpeg \tffmpeg binary for ffmpeg_process transport", " --rtmp-mode \tenhanced|domestic", " --mcap\t\tenable MCAP recording", " --mcap-path \tMCAP output file", diff --git a/src/main_streamer.cpp b/src/main_streamer.cpp index 7fe4318..b753917 100644 --- a/src/main_streamer.cpp +++ b/src/main_streamer.cpp @@ -9,23 +9,37 @@ int run_pipeline(const RuntimeConfig &config); } +namespace { + +enum class MainExitCode : int { + Success = 0, + ConfigError = 2, +}; + +[[nodiscard]] +constexpr int exit_code(MainExitCode code) { + return static_cast(code); +} + +} + int main(int argc, char **argv) { auto config = cvmmap_streamer::parse_runtime_config(argc, argv); if (!config) { if (config.error() == "help") { - return 0; + return exit_code(MainExitCode::Success); } if (config.error() == "parse_error") { - return 2; + return exit_code(MainExitCode::ConfigError); } spdlog::error("{}", config.error()); - return 2; + return exit_code(MainExitCode::ConfigError); } auto validation = cvmmap_streamer::validate_runtime_config(*config); if (!validation) { spdlog::error("{}", validation.error()); - return 2; + return exit_code(MainExitCode::ConfigError); } spdlog::info("runtime config: {}", cvmmap_streamer::summarize_runtime_config(*config)); @@ -38,5 +52,5 @@ int main(int argc, char **argv) { } spdlog::error("unknown run mode"); - return 2; + return exit_code(MainExitCode::ConfigError); } diff --git a/src/pipeline/pipeline_runtime.cpp b/src/pipeline/pipeline_runtime.cpp index 8ae2f3e..44bc6ae 100644 --- a/src/pipeline/pipeline_runtime.cpp +++ b/src/pipeline/pipeline_runtime.cpp @@ -3,7 +3,7 @@ #include "cvmmap_streamer/encode/encoder_backend.hpp" #include "cvmmap_streamer/ipc/contracts.hpp" #include "cvmmap_streamer/metrics/latency_tracker.hpp" -#include "cvmmap_streamer/protocol/rtmp_publisher.hpp" +#include "cvmmap_streamer/protocol/rtmp_output.hpp" #include "cvmmap_streamer/protocol/rtp_publisher.hpp" #include "cvmmap_streamer/record/mcap_record_sink.hpp" @@ -42,6 +42,20 @@ namespace { namespace ipc = cvmmap_streamer::ipc; +enum class PipelineExitCode : int { + Success = 0, + InputError = 2, + SharedMemoryError = 3, + SubscriberError = 4, + InitializationError = 5, + RuntimeError = 6, +}; + +[[nodiscard]] +constexpr int exit_code(PipelineExitCode code) { + return static_cast(code); +} + struct ResolvedInputEndpoints { std::string shm_name; std::string zmq_endpoint; @@ -179,12 +193,12 @@ bool frame_info_equal(const ipc::FrameInfo &lhs, const ipc::FrameInfo &rhs) { } [[nodiscard]] -std::expected publish_access_units( +Status publish_access_units( const RuntimeConfig &config, std::vector &&access_units, PipelineStats &stats, protocol::UdpRtpPublisher *rtp_publisher, - protocol::RtmpPublisher *rtmp_publisher, + protocol::RtmpOutput *rtmp_output, record::McapRecordSink *mcap_sink, metrics::IngestEmitLatencyTracker &latency_tracker) { for (auto &access_unit : access_units) { @@ -200,8 +214,8 @@ std::expected publish_access_units( if (rtp_publisher != nullptr) { rtp_publisher->publish_access_unit(access_unit.annexb_bytes, access_unit.stream_pts_ns); } - if (rtmp_publisher != nullptr) { - auto publish = rtmp_publisher->publish_access_unit(access_unit.annexb_bytes, access_unit.stream_pts_ns); + if (rtmp_output != nullptr) { + auto publish = (*rtmp_output)->publish_access_unit(access_unit); if (!publish) { return std::unexpected(publish.error()); } @@ -209,7 +223,7 @@ std::expected publish_access_units( if (mcap_sink != nullptr) { auto write = mcap_sink->write_access_unit(access_unit); if (!write) { - return std::unexpected(write.error()); + return unexpected_error(ERR_SERIALIZATION, write.error()); } } @@ -229,16 +243,16 @@ std::expected publish_access_units( } [[nodiscard]] -std::expected drain_encoder( +Status drain_encoder( const RuntimeConfig &config, encode::EncoderBackend &backend, bool flushing, PipelineStats &stats, protocol::UdpRtpPublisher *rtp_publisher, - protocol::RtmpPublisher *rtmp_publisher, + protocol::RtmpOutput *rtmp_output, record::McapRecordSink *mcap_sink, metrics::IngestEmitLatencyTracker &latency_tracker) { - auto drained = flushing ? backend.flush() : backend.drain(); + auto drained = flushing ? backend->flush() : backend->drain(); if (!drained) { return std::unexpected(drained.error()); } @@ -247,7 +261,7 @@ std::expected drain_encoder( std::move(*drained), stats, rtp_publisher, - rtmp_publisher, + rtmp_output, mcap_sink, latency_tracker); } @@ -258,35 +272,35 @@ int run_pipeline(const RuntimeConfig &config) { auto input_endpoints = resolve_input_endpoints(config); if (!input_endpoints) { spdlog::error("{}", input_endpoints.error()); - return 2; + return exit_code(PipelineExitCode::InputError); } auto source = make_frame_source(config); if (!source) { spdlog::error("pipeline input source selection failed: {}", source.error()); - return 2; + return exit_code(PipelineExitCode::InputError); } auto source_prepare = (*source)->prepare_runtime(); if (!source_prepare) { spdlog::error("pipeline source backend '{}' setup failed: {}", (*source)->backend_name(), source_prepare.error()); - return 2; + return exit_code(PipelineExitCode::InputError); } auto backend = encode::make_encoder_backend(config); if (!backend) { - spdlog::error("pipeline encoder backend selection failed: {}", backend.error()); - return 5; + spdlog::error("pipeline encoder backend selection failed: {}", format_error(backend.error())); + return exit_code(PipelineExitCode::InitializationError); } auto shm = SharedMemoryView::open_readonly(input_endpoints->shm_name); if (!shm) { spdlog::error("pipeline open shared memory failed: {}", shm.error()); - return 3; + return exit_code(PipelineExitCode::SharedMemoryError); } if (shm->bytes <= ipc::kShmPayloadOffset) { spdlog::error("pipeline invalid shared memory size: {}", shm->bytes); - return 3; + return exit_code(PipelineExitCode::SharedMemoryError); } std::vector snapshot_buffer( @@ -301,36 +315,27 @@ int run_pipeline(const RuntimeConfig &config) { subscriber.connect(input_endpoints->zmq_endpoint); } catch (const zmq::error_t &e) { spdlog::error("pipeline subscribe failed on '{}': {}", input_endpoints->zmq_endpoint, e.what()); - return 4; + return exit_code(PipelineExitCode::SubscriberError); } std::optional rtp_publisher{}; - std::optional rtmp_publisher{}; + std::optional rtmp_output{}; std::optional mcap_sink{}; if (config.outputs.rtp.enabled) { auto created = protocol::UdpRtpPublisher::create(config); if (!created) { spdlog::error("pipeline RTP publisher init failed: {}", created.error()); - return 5; + return exit_code(PipelineExitCode::InitializationError); } rtp_publisher.emplace(std::move(*created)); } - if (config.outputs.rtmp.enabled) { - auto created = protocol::RtmpPublisher::create(config); - if (!created) { - spdlog::error("pipeline RTMP publisher init failed: {}", created.error()); - return 5; - } - rtmp_publisher.emplace(std::move(*created)); - } - if (config.record.mcap.enabled) { auto created = record::McapRecordSink::create(config); if (!created) { spdlog::error("pipeline MCAP sink init failed: {}", created.error()); - return 5; + return exit_code(PipelineExitCode::InitializationError); } mcap_sink.emplace(std::move(*created)); } @@ -356,17 +361,31 @@ int run_pipeline(const RuntimeConfig &config) { started = false; restart_pending = true; restart_target_info = target_info; - if (rtmp_publisher) { - rtmp_publisher->on_stream_reset(); - } + rtmp_output.reset(); }; - const auto attempt_backend_start = [&](const ipc::FrameInfo &target_info) -> std::expected { + const auto attempt_backend_start = [&](const ipc::FrameInfo &target_info) -> Status { (*backend)->shutdown(); + rtmp_output.reset(); auto init = (*backend)->init(config, target_info); if (!init) { return std::unexpected(init.error()); } + if (config.outputs.rtmp.enabled) { + auto stream_info = (*backend)->stream_info(); + if (!stream_info) { + return unexpected_error( + stream_info.error().code, + "pipeline RTMP output stream info unavailable: " + format_error(stream_info.error())); + } + auto created = protocol::make_rtmp_output(config, *stream_info); + if (!created) { + return unexpected_error( + created.error().code, + "pipeline RTMP output init failed: " + format_error(created.error())); + } + rtmp_output.emplace(std::move(*created)); + } started = true; restart_pending = false; restart_target_info.reset(); @@ -380,7 +399,8 @@ int run_pipeline(const RuntimeConfig &config) { while (true) { auto poll = (*backend)->poll(); if (!poll) { - restart_backend(poll.error(), active_info); + const auto reason = format_error(poll.error()); + restart_backend(reason, active_info); } zmq::message_t message; @@ -390,8 +410,8 @@ int run_pipeline(const RuntimeConfig &config) { if (restart_pending && restart_target_info) { auto start_result = attempt_backend_start(*restart_target_info); if (!start_result) { - spdlog::error("pipeline backend restart failed: {}", start_result.error()); - return 6; + spdlog::error("pipeline backend restart failed: {}", format_error(start_result.error())); + return exit_code(PipelineExitCode::RuntimeError); } } if (now - last_event >= idle_timeout) { @@ -401,15 +421,16 @@ int run_pipeline(const RuntimeConfig &config) { if (!producer_offline && started) { auto drain = drain_encoder( config, - **backend, + *backend, false, stats, rtp_publisher ? &*rtp_publisher : nullptr, - rtmp_publisher ? &*rtmp_publisher : nullptr, + rtmp_output ? &*rtmp_output : nullptr, mcap_sink ? &*mcap_sink : nullptr, latency_tracker); if (!drain) { - restart_backend(drain.error(), active_info); + const auto reason = format_error(drain.error()); + restart_backend(reason, active_info); } } continue; @@ -456,8 +477,8 @@ int run_pipeline(const RuntimeConfig &config) { const auto target_info = restart_target_info.value_or(snapshot->metadata.info); auto start_result = attempt_backend_start(target_info); if (!start_result) { - spdlog::error("pipeline backend init failed: {}", start_result.error()); - return 5; + spdlog::error("pipeline backend init failed: {}", format_error(start_result.error())); + return exit_code(PipelineExitCode::InitializationError); } } @@ -469,22 +490,24 @@ int run_pipeline(const RuntimeConfig &config) { .bytes = std::span(snapshot_buffer.data(), snapshot->bytes_copied), }); if (!push) { - restart_backend(push.error(), active_info); + const auto reason = format_error(push.error()); + restart_backend(reason, active_info); continue; } stats.pushed_frames += 1; auto drain = drain_encoder( config, - **backend, + *backend, false, stats, rtp_publisher ? &*rtp_publisher : nullptr, - rtmp_publisher ? &*rtmp_publisher : nullptr, + rtmp_output ? &*rtmp_output : nullptr, mcap_sink ? &*mcap_sink : nullptr, latency_tracker); if (!drain) { - restart_backend(drain.error(), active_info); + const auto reason = format_error(drain.error()); + restart_backend(reason, active_info); continue; } @@ -517,9 +540,7 @@ int run_pipeline(const RuntimeConfig &config) { restart_pending = false; restart_target_info.reset(); active_info.reset(); - if (rtmp_publisher) { - rtmp_publisher->on_stream_reset(); - } + rtmp_output.reset(); } continue; } @@ -530,16 +551,16 @@ int run_pipeline(const RuntimeConfig &config) { if (started) { auto drain = drain_encoder( config, - **backend, + *backend, true, stats, rtp_publisher ? &*rtp_publisher : nullptr, - rtmp_publisher ? &*rtmp_publisher : nullptr, + rtmp_output ? &*rtmp_output : nullptr, mcap_sink ? &*mcap_sink : nullptr, latency_tracker); - if (!drain) { - spdlog::error("pipeline publish failed during flush: {}", drain.error()); - return 6; + if (!drain) { + spdlog::error("pipeline publish failed during flush: {}", format_error(drain.error())); + return exit_code(PipelineExitCode::RuntimeError); } } @@ -591,11 +612,11 @@ int run_pipeline(const RuntimeConfig &config) { if (rtp_publisher) { rtp_publisher->log_metrics(); } - if (rtmp_publisher) { - rtmp_publisher->log_metrics(); + if (rtmp_output) { + (*rtmp_output)->log_metrics(); } - return 0; + return exit_code(PipelineExitCode::Success); } } diff --git a/src/protocol/rtmp_output.cpp b/src/protocol/rtmp_output.cpp new file mode 100644 index 0000000..b5fc211 --- /dev/null +++ b/src/protocol/rtmp_output.cpp @@ -0,0 +1,684 @@ +#include "cvmmap_streamer/protocol/rtmp_output.hpp" + +#include "cvmmap_streamer/protocol/rtmp_publisher.hpp" + +extern "C" { +#include +#include +#include +} + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +namespace cvmmap_streamer::protocol { + +namespace { + +constexpr std::uint64_t kNanosPerSecond = 1'000'000'000ull; + +[[nodiscard]] +std::string av_error_string(int error_code) { + char buffer[AV_ERROR_MAX_STRING_SIZE]{}; + av_strerror(error_code, buffer, sizeof(buffer)); + return std::string(buffer); +} + +[[nodiscard]] +AVCodecID to_avcodec_id(CodecType codec) { + return codec == CodecType::H265 ? AV_CODEC_ID_HEVC : AV_CODEC_ID_H264; +} + +[[nodiscard]] +const char *ffmpeg_input_format(CodecType codec) { + return codec == CodecType::H265 ? "hevc" : "h264"; +} + +[[nodiscard]] +AVRational stream_time_base(const encode::EncodedStreamInfo &stream_info) { + return AVRational{ + static_cast(stream_info.time_base_num), + static_cast(stream_info.time_base_den), + }; +} + +[[nodiscard]] +Status copy_decoder_config( + AVCodecParameters *codecpar, + std::span decoder_config) { + if (codecpar == nullptr) { + return unexpected_error(ERR_INVALID_ARGUMENT, "RTMP stream codec parameters are null"); + } + if (decoder_config.empty()) { + return {}; + } + + codecpar->extradata = static_cast(av_mallocz(decoder_config.size() + AV_INPUT_BUFFER_PADDING_SIZE)); + if (codecpar->extradata == nullptr) { + return unexpected_error(ERR_ALLOCATION_FAILED, "failed to allocate RTMP codec extradata"); + } + std::memcpy(codecpar->extradata, decoder_config.data(), decoder_config.size()); + codecpar->extradata_size = static_cast(decoder_config.size()); + return {}; +} + +void append_start_code(std::vector &output) { + output.push_back(0x00); + output.push_back(0x00); + output.push_back(0x00); + output.push_back(0x01); +} + +[[nodiscard]] +Result read_be16(std::span bytes, std::size_t offset) { + if (offset + 2 > bytes.size()) { + return unexpected_error(ERR_PROTOCOL, "decoder config truncated"); + } + return static_cast((static_cast(bytes[offset]) << 8) | bytes[offset + 1]); +} + +[[nodiscard]] +Result> avcc_to_annexb(std::span decoder_config) { + if (decoder_config.size() < 7 || decoder_config[0] != 1) { + return unexpected_error(ERR_PROTOCOL, "invalid AVC decoder config"); + } + + std::vector annexb{}; + std::size_t offset = 5; + const auto sps_count = static_cast(decoder_config[offset++] & 0x1fu); + for (std::size_t i = 0; i < sps_count; ++i) { + auto size = read_be16(decoder_config, offset); + if (!size) { + return std::unexpected(size.error()); + } + offset += 2; + if (offset + *size > decoder_config.size()) { + return unexpected_error(ERR_PROTOCOL, "invalid AVC decoder config payload"); + } + append_start_code(annexb); + annexb.insert(annexb.end(), decoder_config.begin() + static_cast(offset), decoder_config.begin() + static_cast(offset + *size)); + offset += *size; + } + + if (offset >= decoder_config.size()) { + return unexpected_error(ERR_PROTOCOL, "invalid AVC decoder config: missing PPS count"); + } + const auto pps_count = static_cast(decoder_config[offset++]); + for (std::size_t i = 0; i < pps_count; ++i) { + auto size = read_be16(decoder_config, offset); + if (!size) { + return std::unexpected(size.error()); + } + offset += 2; + if (offset + *size > decoder_config.size()) { + return unexpected_error(ERR_PROTOCOL, "invalid AVC decoder config payload"); + } + append_start_code(annexb); + annexb.insert(annexb.end(), decoder_config.begin() + static_cast(offset), decoder_config.begin() + static_cast(offset + *size)); + offset += *size; + } + + return annexb; +} + +[[nodiscard]] +Result> hvcc_to_annexb(std::span decoder_config) { + if (decoder_config.size() < 23 || decoder_config[0] != 1) { + return unexpected_error(ERR_PROTOCOL, "invalid HEVC decoder config"); + } + + std::vector annexb{}; + std::size_t offset = 22; + const auto array_count = static_cast(decoder_config[offset++]); + for (std::size_t array_index = 0; array_index < array_count; ++array_index) { + if (offset + 3 > decoder_config.size()) { + return unexpected_error(ERR_PROTOCOL, "invalid HEVC decoder config arrays"); + } + offset += 1; + auto nal_count = read_be16(decoder_config, offset); + if (!nal_count) { + return std::unexpected(nal_count.error()); + } + offset += 2; + + for (std::size_t nal_index = 0; nal_index < *nal_count; ++nal_index) { + auto nal_size = read_be16(decoder_config, offset); + if (!nal_size) { + return std::unexpected(nal_size.error()); + } + offset += 2; + if (offset + *nal_size > decoder_config.size()) { + return unexpected_error(ERR_PROTOCOL, "invalid HEVC decoder config payload"); + } + append_start_code(annexb); + annexb.insert(annexb.end(), decoder_config.begin() + static_cast(offset), decoder_config.begin() + static_cast(offset + *nal_size)); + offset += *nal_size; + } + } + + return annexb; +} + +[[nodiscard]] +bool looks_like_annexb(std::span bytes) { + if (bytes.size() >= 4 && + bytes[0] == 0x00 && + bytes[1] == 0x00 && + bytes[2] == 0x00 && + bytes[3] == 0x01) { + return true; + } + return bytes.size() >= 3 && + bytes[0] == 0x00 && + bytes[1] == 0x00 && + bytes[2] == 0x01; +} + +[[nodiscard]] +Result> decoder_config_to_annexb( + CodecType codec, + std::span decoder_config) { + if (decoder_config.empty()) { + return unexpected_error(ERR_PROTOCOL, "decoder config is required"); + } + if (looks_like_annexb(decoder_config)) { + return std::vector(decoder_config.begin(), decoder_config.end()); + } + if (codec == CodecType::H265) { + return hvcc_to_annexb(decoder_config); + } + return avcc_to_annexb(decoder_config); +} + +[[nodiscard]] +Status write_all(int fd, std::span bytes) { + std::size_t written{0}; + while (written < bytes.size()) { + const auto result = ::write(fd, bytes.data() + written, bytes.size() - written); + if (result < 0) { + if (errno == EINTR) { + continue; + } + return unexpected_error(ERR_IO, std::strerror(errno)); + } + if (result == 0) { + return unexpected_error(ERR_IO, "short write to ffmpeg process stdin"); + } + written += static_cast(result); + } + return {}; +} + +class LegacyCustomRtmpOutput { +public: + explicit LegacyCustomRtmpOutput(RtmpPublisher &&publisher) + : publisher_(std::move(publisher)) {} + + [[nodiscard]] + std::string_view backend_name() const { + return "legacy_custom"; + } + + [[nodiscard]] + Status publish_access_unit(const encode::EncodedAccessUnit &access_unit) { + auto publish = publisher_.publish_access_unit(access_unit.annexb_bytes, access_unit.stream_pts_ns); + if (!publish) { + return unexpected_error(ERR_PROTOCOL, publish.error()); + } + return {}; + } + + void log_metrics() const { + publisher_.log_metrics(); + } + +private: + RtmpPublisher publisher_{}; +}; + +class LibavformatRtmpOutput { +public: + struct Session { + std::string url{}; + AVFormatContext *format_context{nullptr}; + AVStream *video_stream{nullptr}; + }; + + LibavformatRtmpOutput() = default; + LibavformatRtmpOutput(const LibavformatRtmpOutput &) = delete; + LibavformatRtmpOutput &operator=(const LibavformatRtmpOutput &) = delete; + LibavformatRtmpOutput(LibavformatRtmpOutput &&) noexcept = default; + LibavformatRtmpOutput &operator=(LibavformatRtmpOutput &&) noexcept = default; + + ~LibavformatRtmpOutput() { + for (auto &session : sessions_) { + close_session(session); + } + } + + [[nodiscard]] + static Result create( + const RuntimeConfig &config, + const encode::EncodedStreamInfo &stream_info) { + if (stream_info.decoder_config.empty()) { + return unexpected_error(ERR_PROTOCOL, "libavformat RTMP requires encoder decoder_config/extradata"); + } + + avformat_network_init(); + + LibavformatRtmpOutput output{}; + output.codec_ = stream_info.codec; + + for (const auto &url : config.outputs.rtmp.urls) { + auto session = create_session(url, stream_info); + if (!session) { + return std::unexpected(session.error()); + } + output.sessions_.push_back(std::move(*session)); + } + + return pro::make_proxy(std::move(output)); + } + + [[nodiscard]] + std::string_view backend_name() const { + return "libavformat"; + } + + [[nodiscard]] + Status publish_access_unit(const encode::EncodedAccessUnit &access_unit) { + for (auto &session : sessions_) { + auto packet = av_packet_alloc(); + if (packet == nullptr) { + return unexpected_error(ERR_ALLOCATION_FAILED, "failed to allocate RTMP AVPacket"); + } + const auto packet_result = av_new_packet(packet, static_cast(access_unit.annexb_bytes.size())); + if (packet_result < 0) { + av_packet_free(&packet); + return unexpected_error(ERR_ALLOCATION_FAILED, "failed to allocate RTMP packet payload: " + av_error_string(packet_result)); + } + + std::memcpy(packet->data, access_unit.annexb_bytes.data(), access_unit.annexb_bytes.size()); + packet->stream_index = session.video_stream->index; + packet->flags = access_unit.keyframe ? AV_PKT_FLAG_KEY : 0; + packet->pts = av_rescale_q( + static_cast(access_unit.stream_pts_ns), + AVRational{1, static_cast(kNanosPerSecond)}, + session.video_stream->time_base); + packet->dts = packet->pts; + + const auto write_result = av_interleaved_write_frame(session.format_context, packet); + av_packet_free(&packet); + if (write_result < 0) { + return unexpected_error( + ERR_NETWORK, + "libavformat RTMP write failed for '" + session.url + "': " + av_error_string(write_result)); + } + + video_messages_ += 1; + bytes_sent_ += access_unit.annexb_bytes.size(); + } + + access_units_ += 1; + return {}; + } + + void log_metrics() const { + spdlog::info( + "RTMP_OUTPUT_METRICS backend={} codec={} urls={} access_units={} video_messages={} bytes_sent={}", + backend_name(), + to_string(codec_), + sessions_.size(), + access_units_, + video_messages_, + bytes_sent_); + } + +private: + [[nodiscard]] + static Result create_session( + const std::string &url, + const encode::EncodedStreamInfo &stream_info) { + Session session{}; + session.url = url; + + const auto alloc_result = avformat_alloc_output_context2(&session.format_context, nullptr, "flv", url.c_str()); + if (alloc_result < 0 || session.format_context == nullptr) { + return unexpected_error( + ERR_ALLOCATION_FAILED, + "failed to allocate RTMP output context for '" + url + "': " + av_error_string(alloc_result)); + } + + session.format_context->flags |= AVFMT_FLAG_FLUSH_PACKETS; + + session.video_stream = avformat_new_stream(session.format_context, nullptr); + if (session.video_stream == nullptr) { + close_session(session); + return unexpected_error(ERR_ALLOCATION_FAILED, "failed to allocate RTMP output stream for '" + url + "'"); + } + + session.video_stream->time_base = stream_time_base(stream_info); + session.video_stream->avg_frame_rate = AVRational{ + static_cast(stream_info.frame_rate_num), + static_cast(stream_info.frame_rate_den), + }; + + auto *codecpar = session.video_stream->codecpar; + codecpar->codec_type = AVMEDIA_TYPE_VIDEO; + codecpar->codec_id = to_avcodec_id(stream_info.codec); + codecpar->width = static_cast(stream_info.width); + codecpar->height = static_cast(stream_info.height); + + auto extradata_copy = copy_decoder_config(codecpar, stream_info.decoder_config); + if (!extradata_copy) { + close_session(session); + return std::unexpected(extradata_copy.error()); + } + + if (!(session.format_context->oformat->flags & AVFMT_NOFILE)) { + const auto open_result = avio_open2(&session.format_context->pb, url.c_str(), AVIO_FLAG_WRITE, nullptr, nullptr); + if (open_result < 0) { + close_session(session); + return unexpected_error( + ERR_NETWORK, + "failed to open RTMP output '" + url + "': " + av_error_string(open_result)); + } + } + + const auto header_result = avformat_write_header(session.format_context, nullptr); + if (header_result < 0) { + close_session(session); + return unexpected_error( + ERR_PROTOCOL, + "failed to write RTMP header for '" + url + "': " + av_error_string(header_result)); + } + + spdlog::info( + "RTMP_OUTPUT_READY backend=libavformat codec={} url={}", + to_string(stream_info.codec), + url); + return session; + } + + static void close_session(Session &session) { + if (session.format_context == nullptr) { + return; + } + + av_write_trailer(session.format_context); + if (!(session.format_context->oformat->flags & AVFMT_NOFILE) && session.format_context->pb != nullptr) { + avio_closep(&session.format_context->pb); + } + avformat_free_context(session.format_context); + session.format_context = nullptr; + session.video_stream = nullptr; + } + + CodecType codec_{CodecType::H264}; + std::vector sessions_{}; + std::uint64_t access_units_{0}; + std::uint64_t video_messages_{0}; + std::uint64_t bytes_sent_{0}; +}; + +class FfmpegProcessRtmpOutput { +public: + struct Session { + std::string url{}; + pid_t pid{-1}; + int stdin_fd{-1}; + }; + + FfmpegProcessRtmpOutput() = default; + FfmpegProcessRtmpOutput(const FfmpegProcessRtmpOutput &) = delete; + FfmpegProcessRtmpOutput &operator=(const FfmpegProcessRtmpOutput &) = delete; + FfmpegProcessRtmpOutput(FfmpegProcessRtmpOutput &&) noexcept = default; + FfmpegProcessRtmpOutput &operator=(FfmpegProcessRtmpOutput &&) noexcept = default; + + ~FfmpegProcessRtmpOutput() { + for (auto &session : sessions_) { + close_session(session); + } + } + + [[nodiscard]] + static Result create( + const RuntimeConfig &config, + const encode::EncodedStreamInfo &stream_info) { + FfmpegProcessRtmpOutput output{}; + output.codec_ = stream_info.codec; + output.ffmpeg_path_ = config.outputs.rtmp.ffmpeg_path; + output.frame_rate_ = AVRational{ + static_cast(stream_info.frame_rate_num), + static_cast(stream_info.frame_rate_den), + }; + auto decoder_config_annexb = decoder_config_to_annexb(stream_info.codec, stream_info.decoder_config); + if (!decoder_config_annexb) { + return unexpected_error( + ERR_PROTOCOL, + "ffmpeg_process RTMP requires decoder config: " + format_error(decoder_config_annexb.error())); + } + output.decoder_config_annexb_ = std::move(*decoder_config_annexb); + + for (const auto &url : config.outputs.rtmp.urls) { + auto session = output.spawn_session(url); + if (!session) { + return std::unexpected(session.error()); + } + output.sessions_.push_back(std::move(*session)); + } + + return pro::make_proxy(std::move(output)); + } + + [[nodiscard]] + std::string_view backend_name() const { + return "ffmpeg_process"; + } + + [[nodiscard]] + Status publish_access_unit(const encode::EncodedAccessUnit &access_unit) { + for (auto &session : sessions_) { + auto process_state = poll_session(session); + if (!process_state) { + return std::unexpected(process_state.error()); + } + + auto write_result = write_all(session.stdin_fd, access_unit.annexb_bytes); + if (!write_result) { + return unexpected_error( + ERR_IO, + "ffmpeg_process RTMP write failed for '" + session.url + "': " + format_error(write_result.error())); + } + + video_messages_ += 1; + bytes_sent_ += access_unit.annexb_bytes.size(); + } + + access_units_ += 1; + return {}; + } + + void log_metrics() const { + spdlog::info( + "RTMP_OUTPUT_METRICS backend={} codec={} urls={} access_units={} video_messages={} bytes_sent={} ffmpeg_path={}", + backend_name(), + to_string(codec_), + sessions_.size(), + access_units_, + video_messages_, + bytes_sent_, + ffmpeg_path_); + } + +private: + [[nodiscard]] + Result spawn_session(const std::string &url) const { + int stdin_pipe[2]{-1, -1}; + if (pipe(stdin_pipe) != 0) { + return unexpected_error(ERR_IO, "failed to create ffmpeg stdin pipe: " + std::string(std::strerror(errno))); + } + + const auto child = fork(); + if (child < 0) { + close(stdin_pipe[0]); + close(stdin_pipe[1]); + return unexpected_error(ERR_CHILD_PROCESS, "failed to fork ffmpeg child: " + std::string(std::strerror(errno))); + } + + if (child == 0) { + dup2(stdin_pipe[0], STDIN_FILENO); + close(stdin_pipe[0]); + close(stdin_pipe[1]); + + const int null_fd = open("/dev/null", O_WRONLY); + if (null_fd >= 0) { + dup2(null_fd, STDOUT_FILENO); + close(null_fd); + } + + const std::string frame_rate = + std::to_string(frame_rate_.num) + "/" + std::to_string(std::max(frame_rate_.den, 1)); + + std::vector args_storage{ + ffmpeg_path_, + "-hide_banner", + "-loglevel", + "warning", + "-fflags", + "+genpts+nobuffer", + "-use_wallclock_as_timestamps", + "1", + "-framerate", + frame_rate, + "-f", + ffmpeg_input_format(codec_), + "-i", + "pipe:0", + "-an", + "-c:v", + "copy", + "-f", + "flv", + url, + }; + + std::vector argv{}; + argv.reserve(args_storage.size() + 1); + for (auto &arg : args_storage) { + argv.push_back(arg.data()); + } + argv.push_back(nullptr); + + execvp(argv[0], argv.data()); + ::_exit(127); + } + + close(stdin_pipe[0]); + + Session session{}; + session.url = url; + session.pid = child; + session.stdin_fd = stdin_pipe[1]; + + auto preamble_write = write_all( + session.stdin_fd, + std::span(decoder_config_annexb_.data(), decoder_config_annexb_.size())); + if (!preamble_write) { + close_session(session); + return unexpected_error( + ERR_IO, + "failed to seed ffmpeg_process decoder config: " + format_error(preamble_write.error())); + } + + spdlog::info( + "RTMP_OUTPUT_READY backend=ffmpeg_process codec={} url={} ffmpeg_path={} pid={}", + to_string(codec_), + url, + ffmpeg_path_, + static_cast(session.pid)); + return session; + } + + [[nodiscard]] + static Status poll_session(Session &session) { + if (session.pid <= 0) { + return unexpected_error(ERR_CHILD_PROCESS, "ffmpeg child is not running"); + } + + int status{0}; + const auto wait_result = waitpid(session.pid, &status, WNOHANG); + if (wait_result == 0) { + return {}; + } + if (wait_result < 0) { + return unexpected_error(ERR_CHILD_PROCESS, "failed to poll ffmpeg child: " + std::string(std::strerror(errno))); + } + + if (session.stdin_fd >= 0) { + close(session.stdin_fd); + session.stdin_fd = -1; + } + session.pid = -1; + return unexpected_error(ERR_CHILD_PROCESS, "ffmpeg child exited before publish completed"); + } + + static void close_session(Session &session) { + if (session.stdin_fd >= 0) { + close(session.stdin_fd); + session.stdin_fd = -1; + } + if (session.pid > 0) { + kill(session.pid, SIGTERM); + (void)waitpid(session.pid, nullptr, 0); + session.pid = -1; + } + } + + CodecType codec_{CodecType::H264}; + std::string ffmpeg_path_{}; + AVRational frame_rate_{30, 1}; + std::vector decoder_config_annexb_{}; + std::vector sessions_{}; + std::uint64_t access_units_{0}; + std::uint64_t video_messages_{0}; + std::uint64_t bytes_sent_{0}; +}; + +} + +Result make_rtmp_output( + const RuntimeConfig &config, + const encode::EncodedStreamInfo &stream_info) { + switch (config.outputs.rtmp.transport) { + case RtmpTransportType::Libavformat: + return LibavformatRtmpOutput::create(config, stream_info); + case RtmpTransportType::FfmpegProcess: + return FfmpegProcessRtmpOutput::create(config, stream_info); + case RtmpTransportType::LegacyCustom: { + auto publisher = RtmpPublisher::create(config); + if (!publisher) { + return unexpected_error(ERR_PROTOCOL, publisher.error()); + } + return pro::make_proxy(std::move(*publisher)); + } + } + + return unexpected_error(ERR_INTERNAL, "unknown RTMP transport"); +} + +} diff --git a/src/protocol/rtp_publisher.cpp b/src/protocol/rtp_publisher.cpp index dd40e84..2c5211a 100644 --- a/src/protocol/rtp_publisher.cpp +++ b/src/protocol/rtp_publisher.cpp @@ -250,56 +250,55 @@ std::expected UdpRtpPublisher::create(const Runtim return std::unexpected("RTP socket non-blocking setup failed: " + std::string(std::strerror(errno))); } - const std::string codec_name = config.encoder.codec == CodecType::H265 ? "h265" : "h264"; if (config.outputs.rtp.sdp_path && !config.outputs.rtp.sdp_path->empty()) { publisher.sdp_path_ = *config.outputs.rtp.sdp_path; - } else { - publisher.sdp_path_ = - "/tmp/cvmmap_streamer_" + - codec_name + - "_" + - std::to_string(publisher.destination_port_) + - ".sdp"; - } - std::filesystem::path sdp_path{publisher.sdp_path_}; - if (sdp_path.has_parent_path() && !sdp_path.parent_path().empty()) { - std::error_code ec; - std::filesystem::create_directories(sdp_path.parent_path(), ec); - if (ec) { - return std::unexpected("RTP SDP directory create failed: " + ec.message()); + std::filesystem::path sdp_path{publisher.sdp_path_}; + if (sdp_path.has_parent_path() && !sdp_path.parent_path().empty()) { + std::error_code ec; + std::filesystem::create_directories(sdp_path.parent_path(), ec); + if (ec) { + return std::unexpected("RTP SDP directory create failed: " + ec.message()); + } } + + std::ofstream sdp(publisher.sdp_path_, std::ios::trunc); + if (!sdp.is_open()) { + return std::unexpected("RTP SDP open failed: " + publisher.sdp_path_); + } + + const auto endpoint_ip = publisher.destination_ip_.empty() ? publisher.destination_host_ : publisher.destination_ip_; + sdp << "v=0\n"; + sdp << "o=- 0 0 IN IP4 " << endpoint_ip << "\n"; + sdp << "s=cvmmap-streamer\n"; + sdp << "c=IN IP4 " << endpoint_ip << "\n"; + sdp << "t=0 0\n"; + sdp << "m=video " << publisher.destination_port_ << " RTP/AVP " << static_cast(publisher.payload_type_) << "\n"; + sdp << "a=rtpmap:" << static_cast(publisher.payload_type_) << " " << rtp_encoding_name(publisher.codec_) << "/" << kRtpVideoClockRate << "\n"; + sdp << rtp_fmtp_line(publisher.codec_, publisher.payload_type_) << "\n"; + sdp << "a=sendonly\n"; + sdp << "a=control:streamid=0\n"; + + if (!sdp.good()) { + return std::unexpected("RTP SDP write failed: " + publisher.sdp_path_); + } + + spdlog::info( + "RTP_SDP_WRITTEN codec={} payload_type={} destination={}:{} path={}", + to_string(publisher.codec_), + static_cast(publisher.payload_type_), + endpoint_ip, + publisher.destination_port_, + publisher.sdp_path_); + } else { + spdlog::info( + "RTP_SDP_SKIPPED codec={} payload_type={} destination={}:{} reason='no sdp_path configured'", + to_string(publisher.codec_), + static_cast(publisher.payload_type_), + publisher.destination_host_, + publisher.destination_port_); } - std::ofstream sdp(publisher.sdp_path_, std::ios::trunc); - if (!sdp.is_open()) { - return std::unexpected("RTP SDP open failed: " + publisher.sdp_path_); - } - - const auto endpoint_ip = publisher.destination_ip_.empty() ? publisher.destination_host_ : publisher.destination_ip_; - sdp << "v=0\n"; - sdp << "o=- 0 0 IN IP4 " << endpoint_ip << "\n"; - sdp << "s=cvmmap-streamer\n"; - sdp << "c=IN IP4 " << endpoint_ip << "\n"; - sdp << "t=0 0\n"; - sdp << "m=video " << publisher.destination_port_ << " RTP/AVP " << static_cast(publisher.payload_type_) << "\n"; - sdp << "a=rtpmap:" << static_cast(publisher.payload_type_) << " " << rtp_encoding_name(publisher.codec_) << "/" << kRtpVideoClockRate << "\n"; - sdp << rtp_fmtp_line(publisher.codec_, publisher.payload_type_) << "\n"; - sdp << "a=sendonly\n"; - sdp << "a=control:streamid=0\n"; - - if (!sdp.good()) { - return std::unexpected("RTP SDP write failed: " + publisher.sdp_path_); - } - - spdlog::info( - "RTP_SDP_WRITTEN codec={} payload_type={} destination={}:{} path={}", - to_string(publisher.codec_), - static_cast(publisher.payload_type_), - endpoint_ip, - publisher.destination_port_, - publisher.sdp_path_); - return publisher; } diff --git a/src/testers/ipc_snapshot_tester.cpp b/src/testers/ipc_snapshot_tester.cpp index cd8f4d2..890f573 100644 --- a/src/testers/ipc_snapshot_tester.cpp +++ b/src/testers/ipc_snapshot_tester.cpp @@ -11,6 +11,19 @@ namespace { +enum class TesterExitCode : int { + Success = 0, + ReadError = 2, + VerificationError = 3, + TornReadAccepted = 4, + TornReadWrongError = 5, +}; + +[[nodiscard]] +constexpr int exit_code(TesterExitCode code) { + return static_cast(code); +} + constexpr std::size_t kMagicOffset = 0; constexpr std::size_t kVersionMajorOffset = 8; constexpr std::size_t kVersionMinorOffset = 9; @@ -67,7 +80,7 @@ void write_metadata( int main(int argc, char **argv) { if (argc <= 1 || cvmmap_streamer::has_help_flag(argc, argv)) { cvmmap_streamer::print_help("ipc_snapshot_tester"); - return 0; + return exit_code(TesterExitCode::Success); } std::array shm{}; @@ -82,12 +95,12 @@ int main(int argc, char **argv) { auto valid = cvmmap_streamer::ipc::read_coherent_snapshot(shm_view, destination); if (!valid) { spdlog::error("coherent snapshot should succeed: {}", cvmmap_streamer::ipc::to_string(valid.error())); - return 2; + return exit_code(TesterExitCode::ReadError); } if (valid->bytes_copied != 32 || valid->metadata.frame_count != 7 || valid->metadata.timestamp_ns != 2222) { spdlog::error("valid snapshot verification failed"); - return 3; + return exit_code(TesterExitCode::VerificationError); } const auto torn = cvmmap_streamer::ipc::read_coherent_snapshot( @@ -99,13 +112,13 @@ int main(int argc, char **argv) { if (torn) { spdlog::error("torn read should be rejected"); - return 4; + return exit_code(TesterExitCode::TornReadAccepted); } if (torn.error() != cvmmap_streamer::ipc::SnapshotError::TornRead) { spdlog::error("unexpected torn read error: {}", cvmmap_streamer::ipc::to_string(torn.error())); - return 5; + return exit_code(TesterExitCode::TornReadWrongError); } spdlog::info("snapshot path valid and torn-read rejection verified"); - return 0; + return exit_code(TesterExitCode::Success); } diff --git a/src/testers/mcap_reader_tester.cpp b/src/testers/mcap_reader_tester.cpp index 07535cd..0fce27b 100644 --- a/src/testers/mcap_reader_tester.cpp +++ b/src/testers/mcap_reader_tester.cpp @@ -15,6 +15,23 @@ namespace { +enum class TesterExitCode : int { + Success = 0, + OpenError = 2, + SchemaError = 3, + TopicMismatch = 4, + TimestampError = 5, + ParseError = 6, + FormatMismatch = 7, + EmptyPayload = 8, + ThresholdError = 9, +}; + +[[nodiscard]] +constexpr int exit_code(TesterExitCode code) { + return static_cast(code); +} + struct Config { std::string input_path{}; std::optional expected_topic{}; @@ -51,7 +68,7 @@ int main(int argc, char **argv) { const auto open_status = reader.open(config->input_path); if (!open_status.ok()) { spdlog::error("failed to open MCAP file '{}': {}", config->input_path, open_status.message); - return 2; + return exit_code(TesterExitCode::OpenError); } std::uint64_t message_count{0}; @@ -63,7 +80,7 @@ int main(int argc, char **argv) { if (it->schema == nullptr || it->channel == nullptr) { spdlog::error("MCAP message missing schema or channel metadata"); reader.close(); - return 3; + return exit_code(TesterExitCode::SchemaError); } if (it->schema->encoding != "protobuf" || it->schema->name != "foxglove.CompressedVideo") { continue; @@ -71,34 +88,34 @@ int main(int argc, char **argv) { if (it->channel->messageEncoding != "protobuf") { spdlog::error("unexpected MCAP message encoding: {}", it->channel->messageEncoding); reader.close(); - return 3; + return exit_code(TesterExitCode::SchemaError); } if (config->expected_topic && it->channel->topic != *config->expected_topic) { spdlog::error("unexpected topic: expected '{}' got '{}'", *config->expected_topic, it->channel->topic); reader.close(); - return 4; + return exit_code(TesterExitCode::TopicMismatch); } if (saw_log_time && it->message.logTime < previous_log_time) { spdlog::error("non-monotonic logTime detected: {} < {}", it->message.logTime, previous_log_time); reader.close(); - return 5; + return exit_code(TesterExitCode::TimestampError); } foxglove::CompressedVideo message{}; if (!message.ParseFromArray(it->message.data, static_cast(it->message.dataSize))) { spdlog::error("failed to parse foxglove.CompressedVideo payload"); reader.close(); - return 6; + return exit_code(TesterExitCode::ParseError); } if (config->expected_format && message.format() != *config->expected_format) { spdlog::error("unexpected format: expected '{}' got '{}'", *config->expected_format, message.format()); reader.close(); - return 7; + return exit_code(TesterExitCode::FormatMismatch); } if (message.data().empty()) { spdlog::error("compressed video payload is empty"); reader.close(); - return 8; + return exit_code(TesterExitCode::EmptyPayload); } previous_log_time = it->message.logTime; @@ -110,9 +127,9 @@ int main(int argc, char **argv) { if (message_count < config->min_messages) { spdlog::error("message threshold not met: {} < {}", message_count, config->min_messages); - return 9; + return exit_code(TesterExitCode::ThresholdError); } spdlog::info("validated {} foxglove.CompressedVideo MCAP messages", message_count); - return 0; + return exit_code(TesterExitCode::Success); } diff --git a/src/testers/rtmp_output_tester.cpp b/src/testers/rtmp_output_tester.cpp new file mode 100644 index 0000000..b65d96a --- /dev/null +++ b/src/testers/rtmp_output_tester.cpp @@ -0,0 +1,238 @@ +#include "cvmmap_streamer/config/runtime_config.hpp" +#include "cvmmap_streamer/encode/encoder_backend.hpp" +#include "cvmmap_streamer/ipc/contracts.hpp" +#include "cvmmap_streamer/protocol/rtmp_output.hpp" + +#include + +#include +#include +#include +#include +#include +#include +#include + +#include + +namespace { + +enum class TesterExitCode : int { + Success = 0, + InvalidArgument = 2, + BackendSelectionError = 3, + BackendInitError = 4, + StreamInfoError = 5, + OutputInitError = 6, + PushError = 7, + DrainError = 8, + PublishError = 9, + FlushError = 10, + FlushPublishError = 11, +}; + +[[nodiscard]] +constexpr int exit_code(TesterExitCode code) { + return static_cast(code); +} + +struct Config { + std::string rtmp_url{"rtmp://127.0.0.1/live/cvmmap_streamer_test"}; + std::string transport{"libavformat"}; + std::string codec{"h264"}; + std::string ffmpeg_path{"ffmpeg"}; + std::uint32_t frames{48}; + std::uint32_t width{320}; + std::uint32_t height{240}; + std::uint32_t frame_interval_ms{33}; + std::uint32_t linger_ms{3000}; +}; + +[[nodiscard]] +std::expected parse_args(int argc, char **argv) { + Config config{}; + CLI::App app{"rtmp_output_tester - publish synthetic encoded video to RTMP using the configured sink"}; + app.add_option("--rtmp-url", config.rtmp_url, "RTMP destination URL")->required(); + app.add_option("--transport", config.transport, "RTMP transport backend (libavformat|ffmpeg_process)") + ->check(CLI::IsMember({"libavformat", "ffmpeg_process", "legacy_custom"})); + app.add_option("--codec", config.codec, "Video codec (h264|h265)") + ->check(CLI::IsMember({"h264", "h265"})); + app.add_option("--ffmpeg-path", config.ffmpeg_path, "ffmpeg binary path for ffmpeg_process transport"); + app.add_option("--frames", config.frames, "Number of frames to publish")->check(CLI::PositiveNumber); + app.add_option("--width", config.width, "Frame width")->check(CLI::PositiveNumber); + app.add_option("--height", config.height, "Frame height")->check(CLI::PositiveNumber); + app.add_option("--frame-interval-ms", config.frame_interval_ms, "Frame interval in milliseconds")->check(CLI::PositiveNumber); + app.add_option("--linger-ms", config.linger_ms, "How long to keep the RTMP output open after flush")->check(CLI::NonNegativeNumber); + + try { + app.parse(argc, argv); + } catch (const CLI::ParseError &e) { + return std::unexpected(app.exit(e)); + } + + return config; +} + +[[nodiscard]] +std::expected parse_codec(std::string_view raw) { + if (raw == "h264") { + return cvmmap_streamer::CodecType::H264; + } + if (raw == "h265") { + return cvmmap_streamer::CodecType::H265; + } + return std::unexpected("unsupported codec"); +} + +[[nodiscard]] +std::expected parse_transport(std::string_view raw) { + if (raw == "libavformat") { + return cvmmap_streamer::RtmpTransportType::Libavformat; + } + if (raw == "ffmpeg_process") { + return cvmmap_streamer::RtmpTransportType::FfmpegProcess; + } + if (raw == "legacy_custom") { + return cvmmap_streamer::RtmpTransportType::LegacyCustom; + } + return std::unexpected("unsupported transport"); +} + +void fill_pattern(std::vector &buffer, std::uint32_t width, std::uint32_t height, std::uint32_t frame_index) { + for (std::uint32_t y = 0; y < height; ++y) { + for (std::uint32_t x = 0; x < width; ++x) { + const std::size_t pixel = static_cast(y) * width * 3 + static_cast(x) * 3; + buffer[pixel + 0] = static_cast((x + frame_index * 3) & 0xffu); + buffer[pixel + 1] = static_cast((y * 2 + frame_index * 5) & 0xffu); + buffer[pixel + 2] = static_cast(((x + y) / 2 + frame_index * 7) & 0xffu); + } + } +} + +} + +int main(int argc, char **argv) { + auto args = parse_args(argc, argv); + if (!args) { + return args.error(); + } + + auto codec = parse_codec(args->codec); + if (!codec) { + spdlog::error("{}", codec.error()); + return exit_code(TesterExitCode::InvalidArgument); + } + + auto transport = parse_transport(args->transport); + if (!transport) { + spdlog::error("{}", transport.error()); + return exit_code(TesterExitCode::InvalidArgument); + } + + cvmmap_streamer::RuntimeConfig config = cvmmap_streamer::RuntimeConfig::defaults(); + config.encoder.backend = cvmmap_streamer::EncoderBackendType::FFmpeg; + config.encoder.device = cvmmap_streamer::EncoderDeviceType::Software; + config.encoder.codec = *codec; + config.encoder.gop = 15; + config.encoder.b_frames = 0; + config.outputs.rtmp.enabled = true; + config.outputs.rtmp.urls = {args->rtmp_url}; + config.outputs.rtmp.transport = *transport; + config.outputs.rtmp.ffmpeg_path = args->ffmpeg_path; + + if (config.outputs.rtmp.transport == cvmmap_streamer::RtmpTransportType::LegacyCustom) { + config.encoder.backend = cvmmap_streamer::EncoderBackendType::GStreamerLegacy; + } + + cvmmap_streamer::ipc::FrameInfo frame_info{ + .width = static_cast(args->width), + .height = static_cast(args->height), + .channels = 3, + .depth = cvmmap_streamer::ipc::Depth::U8, + .pixel_format = cvmmap_streamer::ipc::PixelFormat::BGR, + .buffer_size = args->width * args->height * 3, + }; + + auto backend = cvmmap_streamer::encode::make_encoder_backend(config); + if (!backend) { + spdlog::error("failed to select encoder backend: {}", cvmmap_streamer::format_error(backend.error())); + return exit_code(TesterExitCode::BackendSelectionError); + } + + auto init = (*backend)->init(config, frame_info); + if (!init) { + spdlog::error("failed to initialize encoder backend: {}", cvmmap_streamer::format_error(init.error())); + return exit_code(TesterExitCode::BackendInitError); + } + + auto stream_info = (*backend)->stream_info(); + if (!stream_info) { + spdlog::error("failed to get encoder stream info: {}", cvmmap_streamer::format_error(stream_info.error())); + return exit_code(TesterExitCode::StreamInfoError); + } + + auto output = cvmmap_streamer::protocol::make_rtmp_output(config, *stream_info); + if (!output) { + spdlog::error("failed to initialize RTMP output: {}", cvmmap_streamer::format_error(output.error())); + return exit_code(TesterExitCode::OutputInitError); + } + + std::vector frame_bytes(frame_info.buffer_size, 0); + const auto frame_interval = std::chrono::milliseconds(args->frame_interval_ms); + std::uint64_t timestamp_ns{0}; + + for (std::uint32_t frame_index = 0; frame_index < args->frames; ++frame_index) { + fill_pattern(frame_bytes, args->width, args->height, frame_index); + + auto push = (*backend)->push_frame(cvmmap_streamer::encode::RawVideoFrame{ + .info = frame_info, + .source_timestamp_ns = timestamp_ns, + .bytes = std::span(frame_bytes.data(), frame_bytes.size()), + }); + if (!push) { + spdlog::error("encoder push failed at frame {}: {}", frame_index, cvmmap_streamer::format_error(push.error())); + return exit_code(TesterExitCode::PushError); + } + + auto drained = (*backend)->drain(); + if (!drained) { + spdlog::error("encoder drain failed at frame {}: {}", frame_index, cvmmap_streamer::format_error(drained.error())); + return exit_code(TesterExitCode::DrainError); + } + for (const auto &access_unit : *drained) { + auto publish = (*output)->publish_access_unit(access_unit); + if (!publish) { + spdlog::error("RTMP publish failed at frame {}: {}", frame_index, cvmmap_streamer::format_error(publish.error())); + return exit_code(TesterExitCode::PublishError); + } + } + + std::this_thread::sleep_for(frame_interval); + timestamp_ns += static_cast(args->frame_interval_ms) * 1'000'000ull; + } + + auto flushed = (*backend)->flush(); + if (!flushed) { + spdlog::error("encoder flush failed: {}", cvmmap_streamer::format_error(flushed.error())); + return exit_code(TesterExitCode::FlushError); + } + for (const auto &access_unit : *flushed) { + auto publish = (*output)->publish_access_unit(access_unit); + if (!publish) { + spdlog::error("RTMP publish failed during flush: {}", cvmmap_streamer::format_error(publish.error())); + return exit_code(TesterExitCode::FlushPublishError); + } + } + + spdlog::info( + "rtmp_output_tester completed publish: transport={} codec={} frames={} linger_ms={}", + args->transport, + args->codec, + args->frames, + args->linger_ms); + std::this_thread::sleep_for(std::chrono::milliseconds(args->linger_ms)); + + (*output)->log_metrics(); + (*backend)->shutdown(); + return exit_code(TesterExitCode::Success); +} diff --git a/src/testers/rtp_receiver_tester.cpp b/src/testers/rtp_receiver_tester.cpp index 093bce4..bc237b9 100644 --- a/src/testers/rtp_receiver_tester.cpp +++ b/src/testers/rtp_receiver_tester.cpp @@ -26,6 +26,19 @@ namespace { +enum class TesterExitCode : int { + Success = 0, + SocketError = 2, + PayloadTypeMismatch = 3, + PacketThresholdError = 4, + SdpValidationError = 5, +}; + +[[nodiscard]] +constexpr int exit_code(TesterExitCode code) { + return static_cast(code); +} + // RFC3550 RTP header constants constexpr std::size_t kRtpHeaderMinSize = 12; constexpr std::uint8_t kRtpVersion = 2; @@ -300,7 +313,7 @@ int main(int argc, char **argv) { sdpInfo = parseSdpFile(*config.sdpFile); if (!sdpInfo) { spdlog::error("Failed to parse SDP file: {}", *config.sdpFile); - return 5; + return exit_code(TesterExitCode::SdpValidationError); } spdlog::info("SDP parsed: encoding={}, clock-rate={}, PT={}", sdpInfo->encodingName, @@ -312,7 +325,7 @@ int main(int argc, char **argv) { spdlog::error("Expected PT({}) does not match SDP PT({})", *config.expectedPt, sdpInfo->payloadType); - return 5; + return exit_code(TesterExitCode::SdpValidationError); } } @@ -320,7 +333,7 @@ int main(int argc, char **argv) { auto sockResult = createUdpSocket(config.port); if (!sockResult) { spdlog::error("Socket error: {}", sockResult.error()); - return 2; + return exit_code(TesterExitCode::SocketError); } int sock = *sockResult; @@ -473,16 +486,16 @@ int main(int argc, char **argv) { spdlog::error("FAIL: Payload type mismatch detected (expected {}, got {})", config.expectedPt.value(), *stats.ptMismatchError); - return 3; + return exit_code(TesterExitCode::PayloadTypeMismatch); } if (stats.packetsReceived < config.packetThreshold) { spdlog::error("FAIL: Packet threshold not met (received {}, required {})", stats.packetsReceived, config.packetThreshold); - return 4; + return exit_code(TesterExitCode::PacketThresholdError); } spdlog::info("PASS: All validations successful"); - return 0; + return exit_code(TesterExitCode::Success); }