From 3e1ee23e42a975e1491129b2223a5cb74759c522 Mon Sep 17 00:00:00 2001 From: crosstyan Date: Fri, 6 Mar 2026 08:50:15 +0800 Subject: [PATCH] feat(input): add real and dummy FrameSource backends --- CMakeLists.txt | 14 +- include/cvmmap_streamer/core/frame_source.hpp | 26 ++ lib/cvmmap-client-cpp | 2 +- src/core/frame_source.cpp | 430 ++++++++++++++++++ src/core/ingest_runtime.cpp | 61 ++- src/pipeline/pipeline_stub.cpp | 67 ++- 6 files changed, 584 insertions(+), 16 deletions(-) create mode 100644 include/cvmmap_streamer/core/frame_source.hpp create mode 100644 src/core/frame_source.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt index defa604..0fbdcf1 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -33,9 +33,13 @@ endif() add_library(cvmmap_streamer_common STATIC src/ipc/help.cpp src/config/runtime_config.cpp + src/core/frame_source.cpp src/core/ingest_runtime.cpp src/ipc/contracts.cpp src/ipc/ipc_stub.cpp + src/sim/wire.cpp + lib/cvmmap-client-cpp/app_cvmmap_client.cpp + lib/cvmmap-client-cpp/app_cvmmap_parser.cpp src/metrics/latency_tracker.cpp src/pipeline/pipeline_stub.cpp src/protocol/rtmp_publisher.cpp @@ -43,7 +47,10 @@ add_library(cvmmap_streamer_common STATIC target_include_directories(cvmmap_streamer_common PUBLIC - "${CMAKE_CURRENT_LIST_DIR}/include") + "${CMAKE_CURRENT_LIST_DIR}/include" + PRIVATE + "${CMAKE_CURRENT_LIST_DIR}/lib/cvmmap-client-cpp" + "${CMAKE_CURRENT_LIST_DIR}/lib/cvmmap-client-cpp/include") set(CVMAP_STREAMER_LINK_DEPS Threads::Threads) if (TARGET cppzmq::cppzmq) @@ -98,11 +105,6 @@ function(add_cvmmap_binary target source) endfunction() add_cvmmap_binary(cvmmap_streamer src/main_streamer.cpp) -add_cvmmap_binary( - cvmmap_sim - src/main_sim.cpp - src/sim/options.cpp - src/sim/wire.cpp) add_cvmmap_binary(rtp_receiver_tester src/testers/rtp_receiver_tester.cpp) add_cvmmap_binary(rtmp_stub_tester src/testers/rtmp_stub_tester.cpp) add_cvmmap_binary(ipc_snapshot_tester src/testers/ipc_snapshot_tester.cpp) diff --git a/include/cvmmap_streamer/core/frame_source.hpp b/include/cvmmap_streamer/core/frame_source.hpp new file mode 100644 index 0000000..95618f2 --- /dev/null +++ b/include/cvmmap_streamer/core/frame_source.hpp @@ -0,0 +1,26 @@ +#pragma once + +#include "cvmmap_streamer/config/runtime_config.hpp" + +#include +#include +#include +#include + +namespace cvmmap_streamer::core { + +class FrameSource { +public: + virtual ~FrameSource() = default; + + [[nodiscard]] + virtual std::string_view backend_name() const = 0; + + [[nodiscard]] + virtual std::expected prepare_runtime() const = 0; +}; + +[[nodiscard]] +std::expected, std::string> make_frame_source(const RuntimeConfig &config); + +} diff --git a/lib/cvmmap-client-cpp b/lib/cvmmap-client-cpp index 186c054..f792552 160000 --- a/lib/cvmmap-client-cpp +++ b/lib/cvmmap-client-cpp @@ -1 +1 @@ -Subproject commit 186c0544b1e5b98d5e30a808c1ba77b025b38295 +Subproject commit f7925528983f42fce870b01491ab5e772546f485 diff --git a/src/core/frame_source.cpp b/src/core/frame_source.cpp new file mode 100644 index 0000000..7d894c2 --- /dev/null +++ b/src/core/frame_source.cpp @@ -0,0 +1,430 @@ +#include "cvmmap_streamer/core/frame_source.hpp" + +#include "cvmmap_streamer/ipc/contracts.hpp" +#include "cvmmap_streamer/sim/wire.hpp" + +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include + +namespace cvmmap_streamer::core { + +namespace { + +namespace ipc = cvmmap_streamer::ipc; + +[[nodiscard]] +std::string resolve_client_target(const RuntimeConfig &config) { + if (config.input.shm_name.starts_with("cvmmap://")) { + return config.input.shm_name; + } + + if (!config.input.shm_name.empty() && config.input.shm_name.front() == '/') { + return config.input.shm_name.substr(1); + } + + return config.input.shm_name; +} + +[[nodiscard]] +std::string_view to_string(app::cvmmap::ModuleStatus status) { + switch (status) { + case app::cvmmap::ModuleStatus::Online: + return "online"; + case app::cvmmap::ModuleStatus::Offline: + return "offline"; + case app::cvmmap::ModuleStatus::StreamReset: + return "stream_reset"; + } + return "unknown"; +} + +[[nodiscard]] +std::string normalized_shm_name(std::string_view raw_name) { + if (!raw_name.empty() && raw_name.front() == '/') { + return std::string(raw_name); + } + return "/" + std::string(raw_name); +} + +void cleanup_zmq_ipc_endpoint_path(std::string_view endpoint) { + constexpr std::string_view kPrefix{"ipc://"}; + if (!endpoint.starts_with(kPrefix)) { + return; + } + const auto path = endpoint.substr(kPrefix.size()); + if (!path.empty()) { + unlink(std::string(path).c_str()); + } +} + +class SharedMemoryRegion { +public: + static std::expected create(const std::string &name, std::size_t bytes) { + const int fd = shm_open(name.c_str(), O_CREAT | O_RDWR, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH); + if (fd < 0) { + return std::unexpected("shm_open failed"); + } + + if (ftruncate(fd, static_cast(bytes)) != 0) { + close(fd); + shm_unlink(name.c_str()); + return std::unexpected("ftruncate failed"); + } + + auto *mapped = static_cast(mmap(nullptr, bytes, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0)); + if (mapped == MAP_FAILED) { + close(fd); + shm_unlink(name.c_str()); + return std::unexpected("mmap failed"); + } + + std::memset(mapped, 0, bytes); + return SharedMemoryRegion(name, fd, mapped, bytes); + } + + SharedMemoryRegion() = default; + + SharedMemoryRegion(const SharedMemoryRegion &) = delete; + SharedMemoryRegion &operator=(const SharedMemoryRegion &) = delete; + + SharedMemoryRegion(SharedMemoryRegion &&other) noexcept + : name_(std::move(other.name_)), + fd_(std::exchange(other.fd_, -1)), + ptr_(std::exchange(other.ptr_, nullptr)), + bytes_(std::exchange(other.bytes_, 0)) {} + + SharedMemoryRegion &operator=(SharedMemoryRegion &&other) noexcept { + if (this == &other) { + return *this; + } + cleanup(); + name_ = std::move(other.name_); + fd_ = std::exchange(other.fd_, -1); + ptr_ = std::exchange(other.ptr_, nullptr); + bytes_ = std::exchange(other.bytes_, 0); + return *this; + } + + ~SharedMemoryRegion() { + cleanup(); + } + + [[nodiscard]] + std::span metadata() { + return std::span(ptr_, ipc::kShmPayloadOffset); + } + + [[nodiscard]] + std::span payload(std::size_t bytes) { + return std::span(ptr_ + ipc::kShmPayloadOffset, bytes); + } + +private: + SharedMemoryRegion(std::string name, int fd, std::uint8_t *ptr, std::size_t bytes) + : name_(std::move(name)), fd_(fd), ptr_(ptr), bytes_(bytes) {} + + void cleanup() { + if (ptr_ != nullptr && bytes_ > 0) { + munmap(ptr_, bytes_); + ptr_ = nullptr; + } + if (fd_ >= 0) { + close(fd_); + fd_ = -1; + } + if (!name_.empty()) { + shm_unlink(name_.c_str()); + } + } + + std::string name_{}; + int fd_{-1}; + std::uint8_t *ptr_{nullptr}; + std::size_t bytes_{0}; +}; + +class RealFrameSource final : public FrameSource { +public: + explicit RealFrameSource(const RuntimeConfig &config) + : client_target_(resolve_client_target(config)) {} + + ~RealFrameSource() override { + if (client_ != nullptr) { + client_->Stop(); + spdlog::info( + "real source backend stopped: target='{}' frames={} events={}", + client_target_, + observed_frames_.load(std::memory_order_relaxed), + observed_events_.load(std::memory_order_relaxed)); + } + } + + [[nodiscard]] + std::string_view backend_name() const override { + return "real"; + } + + [[nodiscard]] + std::expected prepare_runtime() const override { + if (prepared_.exchange(true, std::memory_order_acq_rel)) { + return {}; + } + + try { + auto client = std::make_unique(client_target_); + client->SetEventCallback([this](app::cvmmap::ModuleStatus status) { + const auto events = observed_events_.fetch_add(1, std::memory_order_relaxed) + 1; + spdlog::info( + "real source event status={} total_events={}", + to_string(status), + events); + }); + client->SetFrameCallback([this](const app::cvmmap::frame_metadata_t &metadata, std::span payload) { + const auto frames = observed_frames_.fetch_add(1, std::memory_order_relaxed) + 1; + if (frames <= 1 || (frames % 60) == 0) { + spdlog::debug( + "real source frame frame_count={} timestamp_ns={} payload_bytes={} observed_frames={}", + metadata.frame_count, + metadata.timestamp_ns, + payload.size(), + frames); + } + }); + client->Start(); + client_ = std::move(client); + } catch (const std::exception &e) { + prepared_.store(false, std::memory_order_release); + return std::unexpected(std::string("cvmmap-client-cpp init failed: ") + e.what()); + } + + spdlog::info( + "real source backend initialized via cvmmap-client-cpp: target='{}'", + client_target_); + return {}; + } + +private: + std::string client_target_; + mutable std::unique_ptr client_{nullptr}; + mutable std::atomic_bool prepared_{false}; + mutable std::atomic observed_frames_{0}; + mutable std::atomic observed_events_{0}; +}; + +class DummyFrameSource final : public FrameSource { +public: + explicit DummyFrameSource(const RuntimeConfig &config) + : config_(config), + shm_name_(normalized_shm_name(config.input.shm_name)), + zmq_endpoint_(config.input.zmq_endpoint) {} + + ~DummyFrameSource() override { + shutdown(); + } + + [[nodiscard]] + std::string_view backend_name() const override { + return "dummy"; + } + + [[nodiscard]] + std::expected prepare_runtime() const override { + if (prepared_.exchange(true, std::memory_order_acq_rel)) { + return {}; + } + + const auto payload_bytes = static_cast(config_.dummy.width) * + static_cast(config_.dummy.height) * + static_cast(config_.dummy.channels); + auto shm_region = SharedMemoryRegion::create(shm_name_, ipc::kShmPayloadOffset + payload_bytes); + if (!shm_region) { + prepared_.store(false, std::memory_order_release); + return std::unexpected("dummy shm setup failed: " + shm_region.error()); + } + + cleanup_zmq_ipc_endpoint_path(zmq_endpoint_); + try { + zmq_context_.emplace(1); + publisher_.emplace(*zmq_context_, zmq::socket_type::pub); + publisher_->bind(zmq_endpoint_); + } catch (const std::exception &e) { + publisher_.reset(); + zmq_context_.reset(); + cleanup_zmq_ipc_endpoint_path(zmq_endpoint_); + prepared_.store(false, std::memory_order_release); + return std::unexpected(std::string("dummy zmq bind failed: ") + e.what()); + } + + shm_.emplace(std::move(*shm_region)); + stop_requested_.store(false, std::memory_order_release); + + try { + producer_thread_ = std::thread([this]() { + run_producer_loop(); + }); + } catch (const std::exception &e) { + publisher_.reset(); + zmq_context_.reset(); + shm_.reset(); + cleanup_zmq_ipc_endpoint_path(zmq_endpoint_); + prepared_.store(false, std::memory_order_release); + return std::unexpected(std::string("dummy producer thread start failed: ") + e.what()); + } + + spdlog::info( + "dummy source backend initialized in-process: shm='{}' zmq='{}' label='{}'", + shm_name_, + zmq_endpoint_, + config_.dummy.label); + return {}; + } + +private: + void shutdown() const { + if (!prepared_.exchange(false, std::memory_order_acq_rel)) { + return; + } + + stop_requested_.store(true, std::memory_order_release); + if (producer_thread_.joinable()) { + producer_thread_.join(); + } + + publisher_.reset(); + zmq_context_.reset(); + shm_.reset(); + cleanup_zmq_ipc_endpoint_path(zmq_endpoint_); + } + + void sleep_with_stop(std::chrono::nanoseconds duration) const { + constexpr auto kSlice = std::chrono::milliseconds(10); + auto remaining = duration; + while (remaining.count() > 0 && !stop_requested_.load(std::memory_order_relaxed)) { + auto step = std::min(remaining, std::chrono::duration_cast(kSlice)); + std::this_thread::sleep_for(step); + remaining -= step; + } + } + + void send_status( + std::array &buffer, + ipc::ModuleStatus status, + std::uint32_t frame_count) const { + sim::write_module_status_message(buffer, config_.dummy.label, status); + publisher_->send(zmq::buffer(buffer), zmq::send_flags::none); + if (status == ipc::ModuleStatus::Offline || status == ipc::ModuleStatus::Online) { + spdlog::info("dummy source status={} frame_count={}", static_cast(status), frame_count); + } + } + + void run_producer_loop() const { + std::array sync_buffer{}; + std::array status_buffer{}; + + const auto payload_bytes = static_cast(config_.dummy.width) * + static_cast(config_.dummy.height) * + static_cast(config_.dummy.channels); + const auto tick_ns = + (config_.dummy.fps == 0) + ? 0ull + : std::max(1ull, 1'000'000'000ull / static_cast(config_.dummy.fps)); + std::uint64_t timestamp_ns = config_.dummy.start_timestamp_ns; + std::uint32_t frame_count = 0; + + if (config_.dummy.startup_delay_ms > 0) { + sleep_with_stop(std::chrono::milliseconds(config_.dummy.startup_delay_ms)); + } + + try { + send_status(status_buffer, ipc::ModuleStatus::Online, 0); + + while (!stop_requested_.load(std::memory_order_relaxed)) { + frame_count += 1; + auto payload = shm_->payload(payload_bytes); + sim::write_deterministic_payload(payload, frame_count, config_.dummy.width, config_.dummy.height, config_.dummy.channels); + + sim::write_frame_metadata( + shm_->metadata(), + ipc::FrameInfo{ + .width = config_.dummy.width, + .height = config_.dummy.height, + .channels = config_.dummy.channels, + .depth = config_.dummy.depth, + .pixel_format = config_.dummy.pixel_format, + .buffer_size = static_cast(payload_bytes)}, + frame_count, + timestamp_ns); + + sim::write_sync_message(sync_buffer, config_.dummy.label, frame_count, timestamp_ns); + publisher_->send(zmq::buffer(sync_buffer), zmq::send_flags::none); + + if (config_.dummy.emit_reset_at && *config_.dummy.emit_reset_at == frame_count) { + send_status(status_buffer, ipc::ModuleStatus::StreamReset, frame_count); + } + + if (config_.dummy.emit_reset_every && *config_.dummy.emit_reset_every > 0 && + (frame_count % *config_.dummy.emit_reset_every) == 0) { + send_status(status_buffer, ipc::ModuleStatus::StreamReset, frame_count); + } + + timestamp_ns += tick_ns; + if (config_.dummy.fps > 0) { + sleep_with_stop(std::chrono::nanoseconds(tick_ns)); + } + } + } catch (const std::exception &e) { + spdlog::error("dummy source producer failed: {}", e.what()); + } + + if (publisher_) { + send_status(status_buffer, ipc::ModuleStatus::Offline, frame_count); + } + } + + RuntimeConfig config_; + std::string shm_name_; + std::string zmq_endpoint_; + + mutable std::optional shm_{}; + mutable std::optional zmq_context_{}; + mutable std::optional publisher_{}; + mutable std::thread producer_thread_{}; + mutable std::atomic_bool prepared_{false}; + mutable std::atomic_bool stop_requested_{false}; +}; + +} + +std::expected, std::string> make_frame_source(const RuntimeConfig &config) { + switch (config.input_mode) { + case InputMode::Real: + return std::unique_ptr(new RealFrameSource(config)); + case InputMode::Dummy: + return std::unique_ptr(new DummyFrameSource(config)); + } + + return std::unexpected("unsupported input mode"); +} + +} diff --git a/src/core/ingest_runtime.cpp b/src/core/ingest_runtime.cpp index 4ae62ce..3cd4d62 100644 --- a/src/core/ingest_runtime.cpp +++ b/src/core/ingest_runtime.cpp @@ -1,12 +1,16 @@ #include "cvmmap_streamer/config/runtime_config.hpp" +#include "cvmmap_streamer/core/frame_source.hpp" #include "cvmmap_streamer/ipc/contracts.hpp" +#include "include/app/cvmmap/cvmmap_client.hpp" + #include #include #include #include #include #include +#include #include #include #include @@ -31,6 +35,37 @@ namespace { using namespace std::chrono_literals; namespace ipc = cvmmap_streamer::ipc; + struct ResolvedInputEndpoints { + std::string shm_name; + std::string zmq_endpoint; + }; + + [[nodiscard]] + std::expected resolve_input_endpoints(const RuntimeConfig &config) { + ResolvedInputEndpoints resolved{ + .shm_name = config.input.shm_name, + .zmq_endpoint = config.input.zmq_endpoint, + }; + + if (config.input_mode != InputMode::Real || !config.input.shm_name.starts_with("cvmmap://")) { + return resolved; + } + + try { + auto target = app::cvmmap::resolve_cvmmap_target_or_throw(config.input.shm_name); + resolved.shm_name = target.shm_name; + resolved.zmq_endpoint = target.zmq_addr; + spdlog::info( + "ingest real-input URI resolved: shm_name='{}' zmq_endpoint='{}'", + resolved.shm_name, + resolved.zmq_endpoint); + } catch (const std::exception &e) { + return std::unexpected(std::string("invalid cvmmap uri in --shm-name: ") + e.what()); + } + + return resolved; + } + struct SharedMemoryView { SharedMemoryView() = default; @@ -153,7 +188,27 @@ namespace { } int run_ingest_loop(const RuntimeConfig &config) { - auto shm = SharedMemoryView::open_readonly(config.input.shm_name); + auto input_endpoints = resolve_input_endpoints(config); + if (!input_endpoints) { + spdlog::error("{}", input_endpoints.error()); + return 2; + } + + auto source = make_frame_source(config); + if (!source) { + spdlog::error("ingest input source selection failed: {}", source.error()); + return 2; + } + + auto source_prepare = (*source)->prepare_runtime(); + if (!source_prepare) { + spdlog::error("ingest source backend '{}' setup failed: {}", (*source)->backend_name(), source_prepare.error()); + return 2; + } + + spdlog::info("ingest source backend selected: {}", (*source)->backend_name()); + + auto shm = SharedMemoryView::open_readonly(input_endpoints->shm_name); if (!shm) { spdlog::error("ingest open shared memory failed: {}", shm.error()); return 3; @@ -172,9 +227,9 @@ int run_ingest_loop(const RuntimeConfig &config) { try { subscriber.set(zmq::sockopt::subscribe, ""); subscriber.set(zmq::sockopt::rcvtimeo, 20); - subscriber.connect(config.input.zmq_endpoint); + subscriber.connect(input_endpoints->zmq_endpoint); } catch (const zmq::error_t &e) { - spdlog::error("ingest subscribe failed on '{}': {}", config.input.zmq_endpoint, e.what()); + spdlog::error("ingest subscribe failed on '{}': {}", input_endpoints->zmq_endpoint, e.what()); return 4; } diff --git a/src/pipeline/pipeline_stub.cpp b/src/pipeline/pipeline_stub.cpp index 14e5e15..8393fb5 100644 --- a/src/pipeline/pipeline_stub.cpp +++ b/src/pipeline/pipeline_stub.cpp @@ -1,12 +1,12 @@ #include "cvmmap_streamer/config/runtime_config.hpp" #include "cvmmap_streamer/ipc/contracts.hpp" -#include "cvmmap_streamer/metrics/latency_tracker.hpp" -#include "cvmmap_streamer/protocol/rtmp_publisher.hpp" -#include "cvmmap_streamer/protocol/rtp_publisher.hpp" + +#include "include/app/cvmmap/cvmmap_client.hpp" #include #include #include +#include #include #include #include @@ -22,6 +22,10 @@ #if __has_include() && __has_include() && __has_include() #define CVMMAP_STREAMER_HAS_GSTREAMER 1 +#include "cvmmap_streamer/core/frame_source.hpp" +#include "cvmmap_streamer/metrics/latency_tracker.hpp" +#include "cvmmap_streamer/protocol/rtmp_publisher.hpp" +#include "cvmmap_streamer/protocol/rtp_publisher.hpp" #include #include #include @@ -47,6 +51,37 @@ namespace { namespace ipc = cvmmap_streamer::ipc; + struct ResolvedInputEndpoints { + std::string shm_name; + std::string zmq_endpoint; + }; + + [[nodiscard]] + std::expected resolve_input_endpoints(const RuntimeConfig &config) { + ResolvedInputEndpoints resolved{ + .shm_name = config.input.shm_name, + .zmq_endpoint = config.input.zmq_endpoint, + }; + + if (config.input_mode != InputMode::Real || !config.input.shm_name.starts_with("cvmmap://")) { + return resolved; + } + + try { + auto target = app::cvmmap::resolve_cvmmap_target_or_throw(config.input.shm_name); + resolved.shm_name = target.shm_name; + resolved.zmq_endpoint = target.zmq_addr; + spdlog::info( + "pipeline real-input URI resolved: shm_name='{}' zmq_endpoint='{}'", + resolved.shm_name, + resolved.zmq_endpoint); + } catch (const std::exception &e) { + return std::unexpected(std::string("invalid cvmmap uri in --shm-name: ") + e.what()); + } + + return resolved; + } + struct SharedMemoryView { SharedMemoryView() = default; @@ -695,11 +730,31 @@ int run_nvenc_pipeline(const RuntimeConfig &config) { "GStreamer development/runtime libraries are unavailable; NVENC pipeline core requires gstreamer-1.0, gstreamer-app-1.0, and gstreamer-video-1.0"); return 5; #else + auto input_endpoints = resolve_input_endpoints(config); + if (!input_endpoints) { + spdlog::error("{}", input_endpoints.error()); + return 2; + } + + auto source = make_frame_source(config); + if (!source) { + spdlog::error("pipeline input source selection failed: {}", source.error()); + return 2; + } + + auto source_prepare = (*source)->prepare_runtime(); + if (!source_prepare) { + spdlog::error("pipeline source backend '{}' setup failed: {}", (*source)->backend_name(), source_prepare.error()); + return 2; + } + + spdlog::info("pipeline source backend selected: {}", (*source)->backend_name()); + GStreamerPipeline pipeline; pipeline.configured_codec = config.codec; ensure_gst_initialized(); - auto shm = SharedMemoryView::open_readonly(config.input.shm_name); + auto shm = SharedMemoryView::open_readonly(input_endpoints->shm_name); if (!shm) { spdlog::error("pipeline open shared memory failed: {}", shm.error()); return 3; @@ -719,9 +774,9 @@ int run_nvenc_pipeline(const RuntimeConfig &config) { try { subscriber.set(zmq::sockopt::subscribe, ""); subscriber.set(zmq::sockopt::rcvtimeo, 20); - subscriber.connect(config.input.zmq_endpoint); + subscriber.connect(input_endpoints->zmq_endpoint); } catch (const zmq::error_t &e) { - spdlog::error("pipeline subscribe failed on '{}': {}", config.input.zmq_endpoint, e.what()); + spdlog::error("pipeline subscribe failed on '{}': {}", input_endpoints->zmq_endpoint, e.what()); return 4; }