From ec492fe9854aab0f16514f458feb6cb9f14ceca0 Mon Sep 17 00:00:00 2001 From: crosstyan Date: Fri, 6 Mar 2026 14:42:07 +0800 Subject: [PATCH] refactor(input): consume cvmmap-core package --- CMakeLists.txt | 10 +- .../cvmmap_streamer/config/runtime_config.hpp | 25 -- src/config/runtime_config.cpp | 200 +---------- src/core/frame_source.cpp | 332 +----------------- src/core/ingest_runtime.cpp | 6 +- src/ipc/contracts.cpp | 54 ++- src/ipc/help.cpp | 2 +- src/pipeline/pipeline_runtime.cpp | 6 +- 8 files changed, 64 insertions(+), 571 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 6f48e12..765d436 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -13,6 +13,7 @@ if (EXISTS "${CMAKE_CURRENT_LIST_DIR}/lib/CLI11/CMakeLists.txt") endif() find_package(cppzmq QUIET) +find_package(cvmmap-core CONFIG REQUIRED) find_package(ZeroMQ QUIET) find_package(spdlog REQUIRED) find_package(PkgConfig REQUIRED) @@ -37,8 +38,6 @@ add_library(cvmmap_streamer_common STATIC src/core/ingest_runtime.cpp src/ipc/contracts.cpp src/protocol/wire_codec.cpp - lib/cvmmap-client-cpp/app_cvmmap_client.cpp - lib/cvmmap-client-cpp/app_cvmmap_parser.cpp src/metrics/latency_tracker.cpp src/pipeline/pipeline_runtime.cpp src/protocol/rtmp_publisher.cpp @@ -46,10 +45,7 @@ add_library(cvmmap_streamer_common STATIC target_include_directories(cvmmap_streamer_common PUBLIC - "${CMAKE_CURRENT_LIST_DIR}/include" - PRIVATE - "${CMAKE_CURRENT_LIST_DIR}/lib/cvmmap-client-cpp" - "${CMAKE_CURRENT_LIST_DIR}/lib/cvmmap-client-cpp/include") + "${CMAKE_CURRENT_LIST_DIR}/include") set(CVMAP_STREAMER_LINK_DEPS Threads::Threads) if (TARGET cppzmq::cppzmq) @@ -90,6 +86,8 @@ if (TARGET CLI11::CLI11) list(APPEND CVMAP_STREAMER_LINK_DEPS CLI11::CLI11) endif() +list(APPEND CVMAP_STREAMER_LINK_DEPS cvmmap::client) + target_link_libraries(cvmmap_streamer_common PUBLIC ${CVMAP_STREAMER_LINK_DEPS}) function(add_cvmmap_binary target source) diff --git a/include/cvmmap_streamer/config/runtime_config.hpp b/include/cvmmap_streamer/config/runtime_config.hpp index b30e06b..7bf1370 100644 --- a/include/cvmmap_streamer/config/runtime_config.hpp +++ b/include/cvmmap_streamer/config/runtime_config.hpp @@ -8,8 +8,6 @@ #include #include -#include "cvmmap_streamer/ipc/contracts.hpp" - namespace cvmmap_streamer { enum class CodecType { @@ -22,11 +20,6 @@ enum class RunMode { Ingest, }; -enum class InputMode { - Real, - Dummy, -}; - enum class RtmpMode { Enhanced, Domestic, @@ -37,21 +30,6 @@ struct InputConfig { std::string zmq_endpoint{"ipc:///tmp/cvmmap_default"}; }; -struct DummyInputConfig { - std::uint32_t frames{0}; - std::uint32_t fps{60}; - std::uint16_t width{64}; - std::uint16_t height{48}; - std::optional emit_reset_at{}; - std::optional emit_reset_every{}; - std::string label{"dummy"}; - std::uint8_t channels{3}; - ipc::Depth depth{ipc::Depth::U8}; - ipc::PixelFormat pixel_format{ipc::PixelFormat::BGR}; - std::uint64_t start_timestamp_ns{1'000'000'000ull}; - std::uint32_t startup_delay_ms{100}; -}; - struct RtmpOutputConfig { bool enabled{false}; std::vector urls{}; @@ -87,8 +65,6 @@ struct LatencyConfig { struct RuntimeConfig { InputConfig input{}; - DummyInputConfig dummy{}; - InputMode input_mode{InputMode::Real}; RunMode run_mode{RunMode::Pipeline}; CodecType codec{CodecType::H264}; OutputsConfig outputs{}; @@ -99,7 +75,6 @@ struct RuntimeConfig { std::string_view to_string(CodecType codec); std::string_view to_string(RunMode mode); -std::string_view to_string(InputMode mode); std::string_view to_string(RtmpMode mode); std::expected parse_runtime_config(int argc, char **argv); diff --git a/src/config/runtime_config.cpp b/src/config/runtime_config.cpp index b2ef92d..f30c60b 100644 --- a/src/config/runtime_config.cpp +++ b/src/config/runtime_config.cpp @@ -42,20 +42,10 @@ std::string normalize_cli_error(std::string raw_message) { return "unknown argument"; } - constexpr std::array kFlags{"--codec", + constexpr std::array kFlags{"--codec", "--run-mode", - "--input-mode", "--shm-name", "--zmq-endpoint", - "--dummy-frames", - "--dummy-fps", - "--dummy-width", - "--dummy-height", - "--dummy-reset-at", - "--dummy-reset-every", - "--dummy-label", - "--dummy-start-timestamp-ns", - "--dummy-startup-delay-ms", "--rtmp-url", "--rtmp-mode", "--rtp-endpoint", @@ -95,19 +85,6 @@ parse_u32(std::string_view raw, std::string_view flag_name) { return value; } -std::expected -parse_u64(std::string_view raw, std::string_view flag_name) { - std::uint64_t value{0}; - const auto *begin = raw.data(); - const auto *end = raw.data() + raw.size(); - const auto result = std::from_chars(begin, end, value, 10); - if (result.ec != std::errc{} || result.ptr != end) { - return std::unexpected("invalid value for " + std::string(flag_name) + - ": '" + std::string(raw) + "'"); - } - return value; -} - std::expected parse_u16(std::string_view raw, std::string_view flag_name) { std::uint16_t value{0}; @@ -173,17 +150,6 @@ std::expected parse_run_mode(std::string_view raw) { "' (expected: pipeline|ingest)"); } -std::expected parse_input_mode(std::string_view raw) { - if (raw == "real") { - return InputMode::Real; - } - if (raw == "dummy") { - return InputMode::Dummy; - } - return std::unexpected("invalid input mode: '" + std::string(raw) + - "' (expected: real|dummy)"); -} - std::expected parse_rtmp_mode(std::string_view raw) { if (raw == "enhanced") { return RtmpMode::Enhanced; @@ -254,17 +220,6 @@ std::string_view to_string(RunMode mode) { } } -std::string_view to_string(InputMode mode) { - switch (mode) { - case InputMode::Real: - return "real"; - case InputMode::Dummy: - return "dummy"; - default: - return "unknown"; - } -} - std::string_view to_string(RtmpMode mode) { switch (mode) { case RtmpMode::Enhanced: @@ -282,7 +237,6 @@ std::expected parse_runtime_config(int argc, std::string codec_raw; std::string run_mode_raw; - std::string input_mode_raw; std::string shm_name_raw; std::string zmq_endpoint_raw; std::vector rtmp_urls_raw; @@ -300,16 +254,6 @@ std::expected parse_runtime_config(int argc, std::string ingest_consumer_delay_raw; std::string snapshot_copy_delay_raw; std::string emit_stall_raw; - std::string dummy_frames_raw; - std::string dummy_fps_raw; - std::string dummy_width_raw; - std::string dummy_height_raw; - std::string dummy_reset_at_raw; - std::string dummy_reset_every_raw; - std::string dummy_label_raw; - std::string dummy_start_timestamp_ns_raw; - std::string dummy_startup_delay_ms_raw; - bool rtmp_enabled{false}; bool rtp_enabled{false}; bool version_requested{false}; @@ -320,18 +264,8 @@ std::expected parse_runtime_config(int argc, app.add_option("--codec", codec_raw); app.add_option("--run-mode", run_mode_raw); - app.add_option("--input-mode", input_mode_raw); app.add_option("--shm-name", shm_name_raw); app.add_option("--zmq-endpoint", zmq_endpoint_raw); - app.add_option("--dummy-frames", dummy_frames_raw); - app.add_option("--dummy-fps", dummy_fps_raw); - app.add_option("--dummy-width", dummy_width_raw); - app.add_option("--dummy-height", dummy_height_raw); - app.add_option("--dummy-reset-at", dummy_reset_at_raw); - app.add_option("--dummy-reset-every", dummy_reset_every_raw); - app.add_option("--dummy-label", dummy_label_raw); - app.add_option("--dummy-start-timestamp-ns", dummy_start_timestamp_ns_raw); - app.add_option("--dummy-startup-delay-ms", dummy_startup_delay_ms_raw); app.add_flag("--rtmp", rtmp_enabled); app.add_option("--rtmp-url", rtmp_urls_raw); app.add_option("--rtmp-mode", rtmp_mode_raw); @@ -378,14 +312,6 @@ std::expected parse_runtime_config(int argc, config.run_mode = *run_mode; } - if (!input_mode_raw.empty()) { - auto input_mode = parse_input_mode(input_mode_raw); - if (!input_mode) { - return std::unexpected(input_mode.error()); - } - config.input_mode = *input_mode; - } - if (!shm_name_raw.empty()) { config.input.shm_name = shm_name_raw; } @@ -394,84 +320,6 @@ std::expected parse_runtime_config(int argc, config.input.zmq_endpoint = zmq_endpoint_raw; } - if (!dummy_frames_raw.empty()) { - auto parsed = parse_u32(dummy_frames_raw, "--dummy-frames"); - if (!parsed) { - return std::unexpected(parsed.error()); - } - config.dummy.frames = *parsed; - } - - if (!dummy_fps_raw.empty()) { - auto parsed = parse_u32(dummy_fps_raw, "--dummy-fps"); - if (!parsed) { - return std::unexpected(parsed.error()); - } - config.dummy.fps = *parsed; - } - - if (!dummy_width_raw.empty()) { - auto parsed = parse_u32(dummy_width_raw, "--dummy-width"); - if (!parsed) { - return std::unexpected(parsed.error()); - } - if (*parsed > std::numeric_limits::max()) { - return std::unexpected("value out of range for --dummy-width: '" + - dummy_width_raw + "'"); - } - config.dummy.width = static_cast(*parsed); - } - - if (!dummy_height_raw.empty()) { - auto parsed = parse_u32(dummy_height_raw, "--dummy-height"); - if (!parsed) { - return std::unexpected(parsed.error()); - } - if (*parsed > std::numeric_limits::max()) { - return std::unexpected("value out of range for --dummy-height: '" + - dummy_height_raw + "'"); - } - config.dummy.height = static_cast(*parsed); - } - - if (!dummy_reset_at_raw.empty()) { - auto parsed = parse_u32(dummy_reset_at_raw, "--dummy-reset-at"); - if (!parsed) { - return std::unexpected(parsed.error()); - } - config.dummy.emit_reset_at = *parsed; - } - - if (!dummy_reset_every_raw.empty()) { - auto parsed = parse_u32(dummy_reset_every_raw, "--dummy-reset-every"); - if (!parsed) { - return std::unexpected(parsed.error()); - } - config.dummy.emit_reset_every = *parsed; - } - - if (!dummy_label_raw.empty()) { - config.dummy.label = dummy_label_raw; - } - - if (!dummy_start_timestamp_ns_raw.empty()) { - auto parsed = - parse_u64(dummy_start_timestamp_ns_raw, "--dummy-start-timestamp-ns"); - if (!parsed) { - return std::unexpected(parsed.error()); - } - config.dummy.start_timestamp_ns = *parsed; - } - - if (!dummy_startup_delay_ms_raw.empty()) { - auto parsed = - parse_u32(dummy_startup_delay_ms_raw, "--dummy-startup-delay-ms"); - if (!parsed) { - return std::unexpected(parsed.error()); - } - config.dummy.startup_delay_ms = *parsed; - } - config.outputs.rtmp.enabled = rtmp_enabled; if (!rtmp_urls_raw.empty()) { config.outputs.rtmp.enabled = true; @@ -609,13 +457,6 @@ validate_runtime_config(const RuntimeConfig &config) { "invalid input config: --shm-name must not be empty"); } - if (config.input_mode == InputMode::Dummy && - config.input.shm_name.starts_with("cvmmap://")) { - return std::unexpected( - "invalid input config: --input-mode dummy requires POSIX --shm-name, " - "not cvmmap:// URI"); - } - if (config.input.zmq_endpoint.empty()) { return std::unexpected( "invalid input config: --zmq-endpoint must not be empty"); @@ -680,39 +521,6 @@ validate_runtime_config(const RuntimeConfig &config) { "invalid ingest config: --ingest-idle-timeout-ms must be >= 1"); } - if (config.dummy.width == 0 || config.dummy.height == 0) { - return std::unexpected( - "invalid dummy config: --dummy-width and --dummy-height must be >= 1"); - } - - if (config.dummy.channels == 0) { - return std::unexpected("invalid dummy config: channels must be >= 1"); - } - - if (config.dummy.label.empty()) { - return std::unexpected("invalid dummy config: --dummy-label must not be empty"); - } - - if (config.dummy.label.size() > ipc::kLabelLenMax) { - return std::unexpected("invalid dummy config: --dummy-label exceeds 24 bytes"); - } - - if (config.dummy.emit_reset_at && *config.dummy.emit_reset_at == 0) { - return std::unexpected( - "invalid dummy config: --dummy-reset-at must be >= 1"); - } - - if (config.dummy.frames > 0 && config.dummy.emit_reset_at && - *config.dummy.emit_reset_at > config.dummy.frames) { - return std::unexpected("invalid dummy config: --dummy-reset-at must be <= " - "--dummy-frames when --dummy-frames > 0"); - } - - if (config.dummy.emit_reset_every && *config.dummy.emit_reset_every == 0) { - return std::unexpected( - "invalid dummy config: --dummy-reset-every must be >= 1"); - } - return {}; } @@ -721,12 +529,6 @@ std::string summarize_runtime_config(const RuntimeConfig &config) { ss << "input.shm=" << config.input.shm_name; ss << ", input.zmq=" << config.input.zmq_endpoint; ss << ", run_mode=" << to_string(config.run_mode); - ss << ", input_mode=" << to_string(config.input_mode); - ss << ", dummy.frames=" << config.dummy.frames; - ss << ", dummy.fps=" << config.dummy.fps; - ss << ", dummy.size=" << config.dummy.width << "x" << config.dummy.height; - ss << ", dummy.label=" << config.dummy.label; - ss << ", dummy.start_timestamp_ns=" << config.dummy.start_timestamp_ns; ss << ", codec=" << to_string(config.codec); ss << ", rtmp.enabled=" << (config.outputs.rtmp.enabled ? "true" : "false"); ss << ", rtmp.mode=" << to_string(config.outputs.rtmp.mode); diff --git a/src/core/frame_source.cpp b/src/core/frame_source.cpp index 0cb6153..29e0fd5 100644 --- a/src/core/frame_source.cpp +++ b/src/core/frame_source.cpp @@ -1,39 +1,21 @@ #include "cvmmap_streamer/core/frame_source.hpp" -#include "cvmmap_streamer/ipc/contracts.hpp" -#include "cvmmap_streamer/protocol/wire_codec.hpp" +#include -#include - -#include #include -#include -#include #include -#include #include #include -#include #include #include #include -#include -#include -#include #include -#include -#include -#include -#include namespace cvmmap_streamer::core { namespace { -namespace ipc = cvmmap_streamer::ipc; -namespace wire_codec = cvmmap_streamer::protocol::wire_codec; - [[nodiscard]] std::string resolve_client_target(const RuntimeConfig &config) { if (config.input.shm_name.starts_with("cvmmap://")) { @@ -48,123 +30,18 @@ std::string resolve_client_target(const RuntimeConfig &config) { } [[nodiscard]] -std::string_view to_string(app::cvmmap::ModuleStatus status) { +std::string_view to_string(cvmmap::ModuleStatus status) { switch (status) { - case app::cvmmap::ModuleStatus::Online: + case cvmmap::ModuleStatus::Online: return "online"; - case app::cvmmap::ModuleStatus::Offline: + case cvmmap::ModuleStatus::Offline: return "offline"; - case app::cvmmap::ModuleStatus::StreamReset: + case cvmmap::ModuleStatus::StreamReset: return "stream_reset"; } return "unknown"; } -[[nodiscard]] -std::string normalized_shm_name(std::string_view raw_name) { - if (!raw_name.empty() && raw_name.front() == '/') { - return std::string(raw_name); - } - return "/" + std::string(raw_name); -} - -void cleanup_zmq_ipc_endpoint_path(std::string_view endpoint) { - constexpr std::string_view kPrefix{"ipc://"}; - if (!endpoint.starts_with(kPrefix)) { - return; - } - const auto path = endpoint.substr(kPrefix.size()); - if (!path.empty()) { - unlink(std::string(path).c_str()); - } -} - -class SharedMemoryRegion { -public: - static std::expected create(const std::string &name, std::size_t bytes) { - const int fd = shm_open(name.c_str(), O_CREAT | O_RDWR, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH); - if (fd < 0) { - return std::unexpected("shm_open failed"); - } - - if (ftruncate(fd, static_cast(bytes)) != 0) { - close(fd); - shm_unlink(name.c_str()); - return std::unexpected("ftruncate failed"); - } - - auto *mapped = static_cast(mmap(nullptr, bytes, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0)); - if (mapped == MAP_FAILED) { - close(fd); - shm_unlink(name.c_str()); - return std::unexpected("mmap failed"); - } - - std::memset(mapped, 0, bytes); - return SharedMemoryRegion(name, fd, mapped, bytes); - } - - SharedMemoryRegion() = default; - - SharedMemoryRegion(const SharedMemoryRegion &) = delete; - SharedMemoryRegion &operator=(const SharedMemoryRegion &) = delete; - - SharedMemoryRegion(SharedMemoryRegion &&other) noexcept - : name_(std::move(other.name_)), - fd_(std::exchange(other.fd_, -1)), - ptr_(std::exchange(other.ptr_, nullptr)), - bytes_(std::exchange(other.bytes_, 0)) {} - - SharedMemoryRegion &operator=(SharedMemoryRegion &&other) noexcept { - if (this == &other) { - return *this; - } - cleanup(); - name_ = std::move(other.name_); - fd_ = std::exchange(other.fd_, -1); - ptr_ = std::exchange(other.ptr_, nullptr); - bytes_ = std::exchange(other.bytes_, 0); - return *this; - } - - ~SharedMemoryRegion() { - cleanup(); - } - - [[nodiscard]] - std::span metadata() { - return std::span(ptr_, ipc::kShmPayloadOffset); - } - - [[nodiscard]] - std::span payload(std::size_t bytes) { - return std::span(ptr_ + ipc::kShmPayloadOffset, bytes); - } - -private: - SharedMemoryRegion(std::string name, int fd, std::uint8_t *ptr, std::size_t bytes) - : name_(std::move(name)), fd_(fd), ptr_(ptr), bytes_(bytes) {} - - void cleanup() { - if (ptr_ != nullptr && bytes_ > 0) { - munmap(ptr_, bytes_); - ptr_ = nullptr; - } - if (fd_ >= 0) { - close(fd_); - fd_ = -1; - } - if (!name_.empty()) { - shm_unlink(name_.c_str()); - } - } - - std::string name_{}; - int fd_{-1}; - std::uint8_t *ptr_{nullptr}; - std::size_t bytes_{0}; -}; - class RealFrameSource final : public FrameSource { public: explicit RealFrameSource(const RuntimeConfig &config) @@ -193,15 +70,15 @@ public: } try { - auto client = std::make_unique(client_target_); - client->SetEventCallback([this](app::cvmmap::ModuleStatus status) { + auto client = std::make_unique(client_target_); + client->SetEventCallback([this](cvmmap::ModuleStatus status) { const auto events = observed_events_.fetch_add(1, std::memory_order_relaxed) + 1; spdlog::info( "real source event status={} total_events={}", to_string(status), events); }); - client->SetFrameCallback([this](const app::cvmmap::frame_metadata_t &metadata, std::span payload) { + client->SetFrameCallback([this](const cvmmap::frame_metadata_t &metadata, std::span payload) { const auto frames = observed_frames_.fetch_add(1, std::memory_order_relaxed) + 1; if (frames <= 1 || (frames % 60) == 0) { spdlog::debug( @@ -227,205 +104,16 @@ public: private: std::string client_target_; - mutable std::unique_ptr client_{nullptr}; + mutable std::unique_ptr client_{nullptr}; mutable std::atomic_bool prepared_{false}; mutable std::atomic observed_frames_{0}; mutable std::atomic observed_events_{0}; }; -class DummyFrameSource final : public FrameSource { -public: - explicit DummyFrameSource(const RuntimeConfig &config) - : config_(config), - shm_name_(normalized_shm_name(config.input.shm_name)), - zmq_endpoint_(config.input.zmq_endpoint) {} - - ~DummyFrameSource() override { - shutdown(); - } - - [[nodiscard]] - std::string_view backend_name() const override { - return "dummy"; - } - - [[nodiscard]] - std::expected prepare_runtime() const override { - if (prepared_.exchange(true, std::memory_order_acq_rel)) { - return {}; - } - - const auto payload_bytes = static_cast(config_.dummy.width) * - static_cast(config_.dummy.height) * - static_cast(config_.dummy.channels); - auto shm_region = SharedMemoryRegion::create(shm_name_, ipc::kShmPayloadOffset + payload_bytes); - if (!shm_region) { - prepared_.store(false, std::memory_order_release); - return std::unexpected("dummy shm setup failed: " + shm_region.error()); - } - - cleanup_zmq_ipc_endpoint_path(zmq_endpoint_); - try { - zmq_context_.emplace(1); - publisher_.emplace(*zmq_context_, zmq::socket_type::pub); - publisher_->bind(zmq_endpoint_); - } catch (const std::exception &e) { - publisher_.reset(); - zmq_context_.reset(); - cleanup_zmq_ipc_endpoint_path(zmq_endpoint_); - prepared_.store(false, std::memory_order_release); - return std::unexpected(std::string("dummy zmq bind failed: ") + e.what()); - } - - shm_.emplace(std::move(*shm_region)); - stop_requested_.store(false, std::memory_order_release); - - try { - producer_thread_ = std::thread([this]() { - run_producer_loop(); - }); - } catch (const std::exception &e) { - publisher_.reset(); - zmq_context_.reset(); - shm_.reset(); - cleanup_zmq_ipc_endpoint_path(zmq_endpoint_); - prepared_.store(false, std::memory_order_release); - return std::unexpected(std::string("dummy producer thread start failed: ") + e.what()); - } - - spdlog::info( - "dummy source backend initialized in-process: shm='{}' zmq='{}' label='{}'", - shm_name_, - zmq_endpoint_, - config_.dummy.label); - return {}; - } - -private: - void shutdown() const { - if (!prepared_.exchange(false, std::memory_order_acq_rel)) { - return; - } - - stop_requested_.store(true, std::memory_order_release); - if (producer_thread_.joinable()) { - producer_thread_.join(); - } - - publisher_.reset(); - zmq_context_.reset(); - shm_.reset(); - cleanup_zmq_ipc_endpoint_path(zmq_endpoint_); - } - - void sleep_with_stop(std::chrono::nanoseconds duration) const { - constexpr auto kSlice = std::chrono::milliseconds(10); - auto remaining = duration; - while (remaining.count() > 0 && !stop_requested_.load(std::memory_order_relaxed)) { - auto step = std::min(remaining, std::chrono::duration_cast(kSlice)); - std::this_thread::sleep_for(step); - remaining -= step; - } - } - - void send_status( - std::array &buffer, - ipc::ModuleStatus status, - std::uint32_t frame_count) const { - wire_codec::write_module_status_message(buffer, config_.dummy.label, status); - publisher_->send(zmq::buffer(buffer), zmq::send_flags::none); - if (status == ipc::ModuleStatus::Offline || status == ipc::ModuleStatus::Online) { - spdlog::info("dummy source status={} frame_count={}", static_cast(status), frame_count); - } - } - - void run_producer_loop() const { - std::array sync_buffer{}; - std::array status_buffer{}; - - const auto payload_bytes = static_cast(config_.dummy.width) * - static_cast(config_.dummy.height) * - static_cast(config_.dummy.channels); - const auto tick_ns = - (config_.dummy.fps == 0) - ? 0ull - : std::max(1ull, 1'000'000'000ull / static_cast(config_.dummy.fps)); - std::uint64_t timestamp_ns = config_.dummy.start_timestamp_ns; - std::uint32_t frame_count = 0; - - if (config_.dummy.startup_delay_ms > 0) { - sleep_with_stop(std::chrono::milliseconds(config_.dummy.startup_delay_ms)); - } - - try { - send_status(status_buffer, ipc::ModuleStatus::Online, 0); - - while (!stop_requested_.load(std::memory_order_relaxed)) { - frame_count += 1; - auto payload = shm_->payload(payload_bytes); - wire_codec::write_deterministic_payload(payload, frame_count, config_.dummy.width, config_.dummy.height, config_.dummy.channels); - - wire_codec::write_frame_metadata( - shm_->metadata(), - ipc::FrameInfo{ - .width = config_.dummy.width, - .height = config_.dummy.height, - .channels = config_.dummy.channels, - .depth = config_.dummy.depth, - .pixel_format = config_.dummy.pixel_format, - .buffer_size = static_cast(payload_bytes)}, - frame_count, - timestamp_ns); - - wire_codec::write_sync_message(sync_buffer, config_.dummy.label, frame_count, timestamp_ns); - publisher_->send(zmq::buffer(sync_buffer), zmq::send_flags::none); - - if (config_.dummy.emit_reset_at && *config_.dummy.emit_reset_at == frame_count) { - send_status(status_buffer, ipc::ModuleStatus::StreamReset, frame_count); - } - - if (config_.dummy.emit_reset_every && *config_.dummy.emit_reset_every > 0 && - (frame_count % *config_.dummy.emit_reset_every) == 0) { - send_status(status_buffer, ipc::ModuleStatus::StreamReset, frame_count); - } - - timestamp_ns += tick_ns; - if (config_.dummy.fps > 0) { - sleep_with_stop(std::chrono::nanoseconds(tick_ns)); - } - } - } catch (const std::exception &e) { - spdlog::error("dummy source producer failed: {}", e.what()); - } - - if (publisher_) { - send_status(status_buffer, ipc::ModuleStatus::Offline, frame_count); - } - } - - RuntimeConfig config_; - std::string shm_name_; - std::string zmq_endpoint_; - - mutable std::optional shm_{}; - mutable std::optional zmq_context_{}; - mutable std::optional publisher_{}; - mutable std::thread producer_thread_{}; - mutable std::atomic_bool prepared_{false}; - mutable std::atomic_bool stop_requested_{false}; -}; - } std::expected, std::string> make_frame_source(const RuntimeConfig &config) { - switch (config.input_mode) { - case InputMode::Real: - return std::unique_ptr(new RealFrameSource(config)); - case InputMode::Dummy: - return std::unique_ptr(new DummyFrameSource(config)); - } - - return std::unexpected("unsupported input mode"); + return std::unique_ptr(new RealFrameSource(config)); } } diff --git a/src/core/ingest_runtime.cpp b/src/core/ingest_runtime.cpp index 3cd4d62..2993c83 100644 --- a/src/core/ingest_runtime.cpp +++ b/src/core/ingest_runtime.cpp @@ -2,7 +2,7 @@ #include "cvmmap_streamer/core/frame_source.hpp" #include "cvmmap_streamer/ipc/contracts.hpp" -#include "include/app/cvmmap/cvmmap_client.hpp" +#include #include #include @@ -47,12 +47,12 @@ namespace { .zmq_endpoint = config.input.zmq_endpoint, }; - if (config.input_mode != InputMode::Real || !config.input.shm_name.starts_with("cvmmap://")) { + if (!config.input.shm_name.starts_with("cvmmap://")) { return resolved; } try { - auto target = app::cvmmap::resolve_cvmmap_target_or_throw(config.input.shm_name); + auto target = cvmmap::resolve_cvmmap_target_or_throw(config.input.shm_name); resolved.shm_name = target.shm_name; resolved.zmq_endpoint = target.zmq_addr; spdlog::info( diff --git a/src/ipc/contracts.cpp b/src/ipc/contracts.cpp index a5148e4..d102efc 100644 --- a/src/ipc/contracts.cpp +++ b/src/ipc/contracts.cpp @@ -1,5 +1,7 @@ #include "cvmmap_streamer/ipc/contracts.hpp" +#include + #include #include @@ -109,6 +111,40 @@ namespace { } } + [[nodiscard]] + ParseError map_core_parser_error(const std::string_view error) { + if (error.contains("magic")) { + return ParseError::InvalidMagic; + } + if (error.contains("version")) { + return ParseError::UnsupportedVersion; + } + if (error.contains("depth")) { + return ParseError::InvalidDepth; + } + if (error.contains("pixel_format")) { + return ParseError::InvalidPixelFormat; + } + return ParseError::InvalidSize; + } + + [[nodiscard]] + FrameMetadata from_core_metadata(const cvmmap::frame_metadata_t &metadata) { + FrameMetadata converted{}; + std::copy_n(metadata.magic, converted.magic.size(), converted.magic.begin()); + converted.versions_major = metadata.versions_major; + converted.versions_minor = metadata.versions_minor; + converted.frame_count = metadata.frame_count; + converted.timestamp_ns = metadata.timestamp_ns; + converted.info.width = metadata.info.width; + converted.info.height = metadata.info.height; + converted.info.channels = metadata.info.channels; + converted.info.depth = static_cast(metadata.info.depth); + converted.info.pixel_format = static_cast(metadata.info.pixel_format); + converted.info.buffer_size = metadata.info.buffer_size; + return converted; + } + } std::string_view to_string(ParseError error) { @@ -301,22 +337,16 @@ std::expected validate_shm_region(std::span(metadata_result->info.buffer_size); - if (payload_size > std::numeric_limits::max() - kShmPayloadOffset) { - return std::unexpected(ParseError::InvalidSize); - } - if (payload_size > shm_region.size() - kShmPayloadOffset) { - return std::unexpected(ParseError::InvalidSize); + return std::unexpected(map_core_parser_error(metadata_result.error())); } return ValidatedShmView{ - .metadata = *metadata_result, - .payload = shm_region.subspan(kShmPayloadOffset, payload_size)}; + .metadata = from_core_metadata(metadata_result->normalized_metadata), + .payload = metadata_result->left_plane}; } std::expected read_coherent_snapshot( diff --git a/src/ipc/help.cpp b/src/ipc/help.cpp index a8e5658..ad665a4 100644 --- a/src/ipc/help.cpp +++ b/src/ipc/help.cpp @@ -18,7 +18,7 @@ namespace { "", "Examples:", " cvmmap_streamer --help", - " cvmmap_streamer --run-mode pipeline --input-mode dummy --help", + " cvmmap_streamer --run-mode pipeline --shm-name cvmmap://default --help", " rtp_receiver_tester --help"}; } diff --git a/src/pipeline/pipeline_runtime.cpp b/src/pipeline/pipeline_runtime.cpp index 8393fb5..c82747f 100644 --- a/src/pipeline/pipeline_runtime.cpp +++ b/src/pipeline/pipeline_runtime.cpp @@ -1,7 +1,7 @@ #include "cvmmap_streamer/config/runtime_config.hpp" #include "cvmmap_streamer/ipc/contracts.hpp" -#include "include/app/cvmmap/cvmmap_client.hpp" +#include #include #include @@ -63,12 +63,12 @@ namespace { .zmq_endpoint = config.input.zmq_endpoint, }; - if (config.input_mode != InputMode::Real || !config.input.shm_name.starts_with("cvmmap://")) { + if (!config.input.shm_name.starts_with("cvmmap://")) { return resolved; } try { - auto target = app::cvmmap::resolve_cvmmap_target_or_throw(config.input.shm_name); + auto target = cvmmap::resolve_cvmmap_target_or_throw(config.input.shm_name); resolved.shm_name = target.shm_name; resolved.zmq_endpoint = target.zmq_addr; spdlog::info(