refactor(input): consume cvmmap-core package

This commit is contained in:
2026-03-06 14:42:07 +08:00
parent b9161ad8b6
commit ec492fe985
8 changed files with 64 additions and 571 deletions
+4 -6
View File
@@ -13,6 +13,7 @@ if (EXISTS "${CMAKE_CURRENT_LIST_DIR}/lib/CLI11/CMakeLists.txt")
endif()
find_package(cppzmq QUIET)
find_package(cvmmap-core CONFIG REQUIRED)
find_package(ZeroMQ QUIET)
find_package(spdlog REQUIRED)
find_package(PkgConfig REQUIRED)
@@ -37,8 +38,6 @@ add_library(cvmmap_streamer_common STATIC
src/core/ingest_runtime.cpp
src/ipc/contracts.cpp
src/protocol/wire_codec.cpp
lib/cvmmap-client-cpp/app_cvmmap_client.cpp
lib/cvmmap-client-cpp/app_cvmmap_parser.cpp
src/metrics/latency_tracker.cpp
src/pipeline/pipeline_runtime.cpp
src/protocol/rtmp_publisher.cpp
@@ -46,10 +45,7 @@ add_library(cvmmap_streamer_common STATIC
target_include_directories(cvmmap_streamer_common
PUBLIC
"${CMAKE_CURRENT_LIST_DIR}/include"
PRIVATE
"${CMAKE_CURRENT_LIST_DIR}/lib/cvmmap-client-cpp"
"${CMAKE_CURRENT_LIST_DIR}/lib/cvmmap-client-cpp/include")
"${CMAKE_CURRENT_LIST_DIR}/include")
set(CVMAP_STREAMER_LINK_DEPS Threads::Threads)
if (TARGET cppzmq::cppzmq)
@@ -90,6 +86,8 @@ if (TARGET CLI11::CLI11)
list(APPEND CVMAP_STREAMER_LINK_DEPS CLI11::CLI11)
endif()
list(APPEND CVMAP_STREAMER_LINK_DEPS cvmmap::client)
target_link_libraries(cvmmap_streamer_common PUBLIC ${CVMAP_STREAMER_LINK_DEPS})
function(add_cvmmap_binary target source)
@@ -8,8 +8,6 @@
#include <string_view>
#include <vector>
#include "cvmmap_streamer/ipc/contracts.hpp"
namespace cvmmap_streamer {
enum class CodecType {
@@ -22,11 +20,6 @@ enum class RunMode {
Ingest,
};
enum class InputMode {
Real,
Dummy,
};
enum class RtmpMode {
Enhanced,
Domestic,
@@ -37,21 +30,6 @@ struct InputConfig {
std::string zmq_endpoint{"ipc:///tmp/cvmmap_default"};
};
struct DummyInputConfig {
std::uint32_t frames{0};
std::uint32_t fps{60};
std::uint16_t width{64};
std::uint16_t height{48};
std::optional<std::uint32_t> emit_reset_at{};
std::optional<std::uint32_t> emit_reset_every{};
std::string label{"dummy"};
std::uint8_t channels{3};
ipc::Depth depth{ipc::Depth::U8};
ipc::PixelFormat pixel_format{ipc::PixelFormat::BGR};
std::uint64_t start_timestamp_ns{1'000'000'000ull};
std::uint32_t startup_delay_ms{100};
};
struct RtmpOutputConfig {
bool enabled{false};
std::vector<std::string> urls{};
@@ -87,8 +65,6 @@ struct LatencyConfig {
struct RuntimeConfig {
InputConfig input{};
DummyInputConfig dummy{};
InputMode input_mode{InputMode::Real};
RunMode run_mode{RunMode::Pipeline};
CodecType codec{CodecType::H264};
OutputsConfig outputs{};
@@ -99,7 +75,6 @@ struct RuntimeConfig {
std::string_view to_string(CodecType codec);
std::string_view to_string(RunMode mode);
std::string_view to_string(InputMode mode);
std::string_view to_string(RtmpMode mode);
std::expected<RuntimeConfig, std::string> parse_runtime_config(int argc, char **argv);
+1 -199
View File
@@ -42,20 +42,10 @@ std::string normalize_cli_error(std::string raw_message) {
return "unknown argument";
}
constexpr std::array<std::string_view, 27> kFlags{"--codec",
constexpr std::array<std::string_view, 17> kFlags{"--codec",
"--run-mode",
"--input-mode",
"--shm-name",
"--zmq-endpoint",
"--dummy-frames",
"--dummy-fps",
"--dummy-width",
"--dummy-height",
"--dummy-reset-at",
"--dummy-reset-every",
"--dummy-label",
"--dummy-start-timestamp-ns",
"--dummy-startup-delay-ms",
"--rtmp-url",
"--rtmp-mode",
"--rtp-endpoint",
@@ -95,19 +85,6 @@ parse_u32(std::string_view raw, std::string_view flag_name) {
return value;
}
std::expected<std::uint64_t, std::string>
parse_u64(std::string_view raw, std::string_view flag_name) {
std::uint64_t value{0};
const auto *begin = raw.data();
const auto *end = raw.data() + raw.size();
const auto result = std::from_chars(begin, end, value, 10);
if (result.ec != std::errc{} || result.ptr != end) {
return std::unexpected("invalid value for " + std::string(flag_name) +
": '" + std::string(raw) + "'");
}
return value;
}
std::expected<std::uint16_t, std::string>
parse_u16(std::string_view raw, std::string_view flag_name) {
std::uint16_t value{0};
@@ -173,17 +150,6 @@ std::expected<RunMode, std::string> parse_run_mode(std::string_view raw) {
"' (expected: pipeline|ingest)");
}
std::expected<InputMode, std::string> parse_input_mode(std::string_view raw) {
if (raw == "real") {
return InputMode::Real;
}
if (raw == "dummy") {
return InputMode::Dummy;
}
return std::unexpected("invalid input mode: '" + std::string(raw) +
"' (expected: real|dummy)");
}
std::expected<RtmpMode, std::string> parse_rtmp_mode(std::string_view raw) {
if (raw == "enhanced") {
return RtmpMode::Enhanced;
@@ -254,17 +220,6 @@ std::string_view to_string(RunMode mode) {
}
}
std::string_view to_string(InputMode mode) {
switch (mode) {
case InputMode::Real:
return "real";
case InputMode::Dummy:
return "dummy";
default:
return "unknown";
}
}
std::string_view to_string(RtmpMode mode) {
switch (mode) {
case RtmpMode::Enhanced:
@@ -282,7 +237,6 @@ std::expected<RuntimeConfig, std::string> parse_runtime_config(int argc,
std::string codec_raw;
std::string run_mode_raw;
std::string input_mode_raw;
std::string shm_name_raw;
std::string zmq_endpoint_raw;
std::vector<std::string> rtmp_urls_raw;
@@ -300,16 +254,6 @@ std::expected<RuntimeConfig, std::string> parse_runtime_config(int argc,
std::string ingest_consumer_delay_raw;
std::string snapshot_copy_delay_raw;
std::string emit_stall_raw;
std::string dummy_frames_raw;
std::string dummy_fps_raw;
std::string dummy_width_raw;
std::string dummy_height_raw;
std::string dummy_reset_at_raw;
std::string dummy_reset_every_raw;
std::string dummy_label_raw;
std::string dummy_start_timestamp_ns_raw;
std::string dummy_startup_delay_ms_raw;
bool rtmp_enabled{false};
bool rtp_enabled{false};
bool version_requested{false};
@@ -320,18 +264,8 @@ std::expected<RuntimeConfig, std::string> parse_runtime_config(int argc,
app.add_option("--codec", codec_raw);
app.add_option("--run-mode", run_mode_raw);
app.add_option("--input-mode", input_mode_raw);
app.add_option("--shm-name", shm_name_raw);
app.add_option("--zmq-endpoint", zmq_endpoint_raw);
app.add_option("--dummy-frames", dummy_frames_raw);
app.add_option("--dummy-fps", dummy_fps_raw);
app.add_option("--dummy-width", dummy_width_raw);
app.add_option("--dummy-height", dummy_height_raw);
app.add_option("--dummy-reset-at", dummy_reset_at_raw);
app.add_option("--dummy-reset-every", dummy_reset_every_raw);
app.add_option("--dummy-label", dummy_label_raw);
app.add_option("--dummy-start-timestamp-ns", dummy_start_timestamp_ns_raw);
app.add_option("--dummy-startup-delay-ms", dummy_startup_delay_ms_raw);
app.add_flag("--rtmp", rtmp_enabled);
app.add_option("--rtmp-url", rtmp_urls_raw);
app.add_option("--rtmp-mode", rtmp_mode_raw);
@@ -378,14 +312,6 @@ std::expected<RuntimeConfig, std::string> parse_runtime_config(int argc,
config.run_mode = *run_mode;
}
if (!input_mode_raw.empty()) {
auto input_mode = parse_input_mode(input_mode_raw);
if (!input_mode) {
return std::unexpected(input_mode.error());
}
config.input_mode = *input_mode;
}
if (!shm_name_raw.empty()) {
config.input.shm_name = shm_name_raw;
}
@@ -394,84 +320,6 @@ std::expected<RuntimeConfig, std::string> parse_runtime_config(int argc,
config.input.zmq_endpoint = zmq_endpoint_raw;
}
if (!dummy_frames_raw.empty()) {
auto parsed = parse_u32(dummy_frames_raw, "--dummy-frames");
if (!parsed) {
return std::unexpected(parsed.error());
}
config.dummy.frames = *parsed;
}
if (!dummy_fps_raw.empty()) {
auto parsed = parse_u32(dummy_fps_raw, "--dummy-fps");
if (!parsed) {
return std::unexpected(parsed.error());
}
config.dummy.fps = *parsed;
}
if (!dummy_width_raw.empty()) {
auto parsed = parse_u32(dummy_width_raw, "--dummy-width");
if (!parsed) {
return std::unexpected(parsed.error());
}
if (*parsed > std::numeric_limits<std::uint16_t>::max()) {
return std::unexpected("value out of range for --dummy-width: '" +
dummy_width_raw + "'");
}
config.dummy.width = static_cast<std::uint16_t>(*parsed);
}
if (!dummy_height_raw.empty()) {
auto parsed = parse_u32(dummy_height_raw, "--dummy-height");
if (!parsed) {
return std::unexpected(parsed.error());
}
if (*parsed > std::numeric_limits<std::uint16_t>::max()) {
return std::unexpected("value out of range for --dummy-height: '" +
dummy_height_raw + "'");
}
config.dummy.height = static_cast<std::uint16_t>(*parsed);
}
if (!dummy_reset_at_raw.empty()) {
auto parsed = parse_u32(dummy_reset_at_raw, "--dummy-reset-at");
if (!parsed) {
return std::unexpected(parsed.error());
}
config.dummy.emit_reset_at = *parsed;
}
if (!dummy_reset_every_raw.empty()) {
auto parsed = parse_u32(dummy_reset_every_raw, "--dummy-reset-every");
if (!parsed) {
return std::unexpected(parsed.error());
}
config.dummy.emit_reset_every = *parsed;
}
if (!dummy_label_raw.empty()) {
config.dummy.label = dummy_label_raw;
}
if (!dummy_start_timestamp_ns_raw.empty()) {
auto parsed =
parse_u64(dummy_start_timestamp_ns_raw, "--dummy-start-timestamp-ns");
if (!parsed) {
return std::unexpected(parsed.error());
}
config.dummy.start_timestamp_ns = *parsed;
}
if (!dummy_startup_delay_ms_raw.empty()) {
auto parsed =
parse_u32(dummy_startup_delay_ms_raw, "--dummy-startup-delay-ms");
if (!parsed) {
return std::unexpected(parsed.error());
}
config.dummy.startup_delay_ms = *parsed;
}
config.outputs.rtmp.enabled = rtmp_enabled;
if (!rtmp_urls_raw.empty()) {
config.outputs.rtmp.enabled = true;
@@ -609,13 +457,6 @@ validate_runtime_config(const RuntimeConfig &config) {
"invalid input config: --shm-name must not be empty");
}
if (config.input_mode == InputMode::Dummy &&
config.input.shm_name.starts_with("cvmmap://")) {
return std::unexpected(
"invalid input config: --input-mode dummy requires POSIX --shm-name, "
"not cvmmap:// URI");
}
if (config.input.zmq_endpoint.empty()) {
return std::unexpected(
"invalid input config: --zmq-endpoint must not be empty");
@@ -680,39 +521,6 @@ validate_runtime_config(const RuntimeConfig &config) {
"invalid ingest config: --ingest-idle-timeout-ms must be >= 1");
}
if (config.dummy.width == 0 || config.dummy.height == 0) {
return std::unexpected(
"invalid dummy config: --dummy-width and --dummy-height must be >= 1");
}
if (config.dummy.channels == 0) {
return std::unexpected("invalid dummy config: channels must be >= 1");
}
if (config.dummy.label.empty()) {
return std::unexpected("invalid dummy config: --dummy-label must not be empty");
}
if (config.dummy.label.size() > ipc::kLabelLenMax) {
return std::unexpected("invalid dummy config: --dummy-label exceeds 24 bytes");
}
if (config.dummy.emit_reset_at && *config.dummy.emit_reset_at == 0) {
return std::unexpected(
"invalid dummy config: --dummy-reset-at must be >= 1");
}
if (config.dummy.frames > 0 && config.dummy.emit_reset_at &&
*config.dummy.emit_reset_at > config.dummy.frames) {
return std::unexpected("invalid dummy config: --dummy-reset-at must be <= "
"--dummy-frames when --dummy-frames > 0");
}
if (config.dummy.emit_reset_every && *config.dummy.emit_reset_every == 0) {
return std::unexpected(
"invalid dummy config: --dummy-reset-every must be >= 1");
}
return {};
}
@@ -721,12 +529,6 @@ std::string summarize_runtime_config(const RuntimeConfig &config) {
ss << "input.shm=" << config.input.shm_name;
ss << ", input.zmq=" << config.input.zmq_endpoint;
ss << ", run_mode=" << to_string(config.run_mode);
ss << ", input_mode=" << to_string(config.input_mode);
ss << ", dummy.frames=" << config.dummy.frames;
ss << ", dummy.fps=" << config.dummy.fps;
ss << ", dummy.size=" << config.dummy.width << "x" << config.dummy.height;
ss << ", dummy.label=" << config.dummy.label;
ss << ", dummy.start_timestamp_ns=" << config.dummy.start_timestamp_ns;
ss << ", codec=" << to_string(config.codec);
ss << ", rtmp.enabled=" << (config.outputs.rtmp.enabled ? "true" : "false");
ss << ", rtmp.mode=" << to_string(config.outputs.rtmp.mode);
+10 -322
View File
@@ -1,39 +1,21 @@
#include "cvmmap_streamer/core/frame_source.hpp"
#include "cvmmap_streamer/ipc/contracts.hpp"
#include "cvmmap_streamer/protocol/wire_codec.hpp"
#include <cvmmap/client.hpp>
#include <app/cvmmap/cvmmap_client.hpp>
#include <array>
#include <atomic>
#include <chrono>
#include <cstddef>
#include <cstdint>
#include <cstring>
#include <exception>
#include <memory>
#include <optional>
#include <span>
#include <string>
#include <string_view>
#include <thread>
#include <utility>
#include <fcntl.h>
#include <spdlog/spdlog.h>
#include <sys/mman.h>
#include <sys/stat.h>
#include <unistd.h>
#include <zmq.hpp>
namespace cvmmap_streamer::core {
namespace {
namespace ipc = cvmmap_streamer::ipc;
namespace wire_codec = cvmmap_streamer::protocol::wire_codec;
[[nodiscard]]
std::string resolve_client_target(const RuntimeConfig &config) {
if (config.input.shm_name.starts_with("cvmmap://")) {
@@ -48,123 +30,18 @@ std::string resolve_client_target(const RuntimeConfig &config) {
}
[[nodiscard]]
std::string_view to_string(app::cvmmap::ModuleStatus status) {
std::string_view to_string(cvmmap::ModuleStatus status) {
switch (status) {
case app::cvmmap::ModuleStatus::Online:
case cvmmap::ModuleStatus::Online:
return "online";
case app::cvmmap::ModuleStatus::Offline:
case cvmmap::ModuleStatus::Offline:
return "offline";
case app::cvmmap::ModuleStatus::StreamReset:
case cvmmap::ModuleStatus::StreamReset:
return "stream_reset";
}
return "unknown";
}
[[nodiscard]]
std::string normalized_shm_name(std::string_view raw_name) {
if (!raw_name.empty() && raw_name.front() == '/') {
return std::string(raw_name);
}
return "/" + std::string(raw_name);
}
void cleanup_zmq_ipc_endpoint_path(std::string_view endpoint) {
constexpr std::string_view kPrefix{"ipc://"};
if (!endpoint.starts_with(kPrefix)) {
return;
}
const auto path = endpoint.substr(kPrefix.size());
if (!path.empty()) {
unlink(std::string(path).c_str());
}
}
class SharedMemoryRegion {
public:
static std::expected<SharedMemoryRegion, std::string> create(const std::string &name, std::size_t bytes) {
const int fd = shm_open(name.c_str(), O_CREAT | O_RDWR, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
if (fd < 0) {
return std::unexpected("shm_open failed");
}
if (ftruncate(fd, static_cast<off_t>(bytes)) != 0) {
close(fd);
shm_unlink(name.c_str());
return std::unexpected("ftruncate failed");
}
auto *mapped = static_cast<std::uint8_t *>(mmap(nullptr, bytes, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0));
if (mapped == MAP_FAILED) {
close(fd);
shm_unlink(name.c_str());
return std::unexpected("mmap failed");
}
std::memset(mapped, 0, bytes);
return SharedMemoryRegion(name, fd, mapped, bytes);
}
SharedMemoryRegion() = default;
SharedMemoryRegion(const SharedMemoryRegion &) = delete;
SharedMemoryRegion &operator=(const SharedMemoryRegion &) = delete;
SharedMemoryRegion(SharedMemoryRegion &&other) noexcept
: name_(std::move(other.name_)),
fd_(std::exchange(other.fd_, -1)),
ptr_(std::exchange(other.ptr_, nullptr)),
bytes_(std::exchange(other.bytes_, 0)) {}
SharedMemoryRegion &operator=(SharedMemoryRegion &&other) noexcept {
if (this == &other) {
return *this;
}
cleanup();
name_ = std::move(other.name_);
fd_ = std::exchange(other.fd_, -1);
ptr_ = std::exchange(other.ptr_, nullptr);
bytes_ = std::exchange(other.bytes_, 0);
return *this;
}
~SharedMemoryRegion() {
cleanup();
}
[[nodiscard]]
std::span<std::uint8_t> metadata() {
return std::span<std::uint8_t>(ptr_, ipc::kShmPayloadOffset);
}
[[nodiscard]]
std::span<std::uint8_t> payload(std::size_t bytes) {
return std::span<std::uint8_t>(ptr_ + ipc::kShmPayloadOffset, bytes);
}
private:
SharedMemoryRegion(std::string name, int fd, std::uint8_t *ptr, std::size_t bytes)
: name_(std::move(name)), fd_(fd), ptr_(ptr), bytes_(bytes) {}
void cleanup() {
if (ptr_ != nullptr && bytes_ > 0) {
munmap(ptr_, bytes_);
ptr_ = nullptr;
}
if (fd_ >= 0) {
close(fd_);
fd_ = -1;
}
if (!name_.empty()) {
shm_unlink(name_.c_str());
}
}
std::string name_{};
int fd_{-1};
std::uint8_t *ptr_{nullptr};
std::size_t bytes_{0};
};
class RealFrameSource final : public FrameSource {
public:
explicit RealFrameSource(const RuntimeConfig &config)
@@ -193,15 +70,15 @@ public:
}
try {
auto client = std::make_unique<app::cvmmap::CvMmapClient>(client_target_);
client->SetEventCallback([this](app::cvmmap::ModuleStatus status) {
auto client = std::make_unique<cvmmap::CvMmapClient>(client_target_);
client->SetEventCallback([this](cvmmap::ModuleStatus status) {
const auto events = observed_events_.fetch_add(1, std::memory_order_relaxed) + 1;
spdlog::info(
"real source event status={} total_events={}",
to_string(status),
events);
});
client->SetFrameCallback([this](const app::cvmmap::frame_metadata_t &metadata, std::span<const std::uint8_t> payload) {
client->SetFrameCallback([this](const cvmmap::frame_metadata_t &metadata, std::span<const std::uint8_t> payload) {
const auto frames = observed_frames_.fetch_add(1, std::memory_order_relaxed) + 1;
if (frames <= 1 || (frames % 60) == 0) {
spdlog::debug(
@@ -227,205 +104,16 @@ public:
private:
std::string client_target_;
mutable std::unique_ptr<app::cvmmap::CvMmapClient> client_{nullptr};
mutable std::unique_ptr<cvmmap::CvMmapClient> client_{nullptr};
mutable std::atomic_bool prepared_{false};
mutable std::atomic<std::uint64_t> observed_frames_{0};
mutable std::atomic<std::uint64_t> observed_events_{0};
};
class DummyFrameSource final : public FrameSource {
public:
explicit DummyFrameSource(const RuntimeConfig &config)
: config_(config),
shm_name_(normalized_shm_name(config.input.shm_name)),
zmq_endpoint_(config.input.zmq_endpoint) {}
~DummyFrameSource() override {
shutdown();
}
[[nodiscard]]
std::string_view backend_name() const override {
return "dummy";
}
[[nodiscard]]
std::expected<void, std::string> prepare_runtime() const override {
if (prepared_.exchange(true, std::memory_order_acq_rel)) {
return {};
}
const auto payload_bytes = static_cast<std::size_t>(config_.dummy.width) *
static_cast<std::size_t>(config_.dummy.height) *
static_cast<std::size_t>(config_.dummy.channels);
auto shm_region = SharedMemoryRegion::create(shm_name_, ipc::kShmPayloadOffset + payload_bytes);
if (!shm_region) {
prepared_.store(false, std::memory_order_release);
return std::unexpected("dummy shm setup failed: " + shm_region.error());
}
cleanup_zmq_ipc_endpoint_path(zmq_endpoint_);
try {
zmq_context_.emplace(1);
publisher_.emplace(*zmq_context_, zmq::socket_type::pub);
publisher_->bind(zmq_endpoint_);
} catch (const std::exception &e) {
publisher_.reset();
zmq_context_.reset();
cleanup_zmq_ipc_endpoint_path(zmq_endpoint_);
prepared_.store(false, std::memory_order_release);
return std::unexpected(std::string("dummy zmq bind failed: ") + e.what());
}
shm_.emplace(std::move(*shm_region));
stop_requested_.store(false, std::memory_order_release);
try {
producer_thread_ = std::thread([this]() {
run_producer_loop();
});
} catch (const std::exception &e) {
publisher_.reset();
zmq_context_.reset();
shm_.reset();
cleanup_zmq_ipc_endpoint_path(zmq_endpoint_);
prepared_.store(false, std::memory_order_release);
return std::unexpected(std::string("dummy producer thread start failed: ") + e.what());
}
spdlog::info(
"dummy source backend initialized in-process: shm='{}' zmq='{}' label='{}'",
shm_name_,
zmq_endpoint_,
config_.dummy.label);
return {};
}
private:
void shutdown() const {
if (!prepared_.exchange(false, std::memory_order_acq_rel)) {
return;
}
stop_requested_.store(true, std::memory_order_release);
if (producer_thread_.joinable()) {
producer_thread_.join();
}
publisher_.reset();
zmq_context_.reset();
shm_.reset();
cleanup_zmq_ipc_endpoint_path(zmq_endpoint_);
}
void sleep_with_stop(std::chrono::nanoseconds duration) const {
constexpr auto kSlice = std::chrono::milliseconds(10);
auto remaining = duration;
while (remaining.count() > 0 && !stop_requested_.load(std::memory_order_relaxed)) {
auto step = std::min(remaining, std::chrono::duration_cast<std::chrono::nanoseconds>(kSlice));
std::this_thread::sleep_for(step);
remaining -= step;
}
}
void send_status(
std::array<std::uint8_t, wire_codec::kModuleStatusMessageBytes> &buffer,
ipc::ModuleStatus status,
std::uint32_t frame_count) const {
wire_codec::write_module_status_message(buffer, config_.dummy.label, status);
publisher_->send(zmq::buffer(buffer), zmq::send_flags::none);
if (status == ipc::ModuleStatus::Offline || status == ipc::ModuleStatus::Online) {
spdlog::info("dummy source status={} frame_count={}", static_cast<std::int32_t>(status), frame_count);
}
}
void run_producer_loop() const {
std::array<std::uint8_t, wire_codec::kSyncMessageBytes> sync_buffer{};
std::array<std::uint8_t, wire_codec::kModuleStatusMessageBytes> status_buffer{};
const auto payload_bytes = static_cast<std::size_t>(config_.dummy.width) *
static_cast<std::size_t>(config_.dummy.height) *
static_cast<std::size_t>(config_.dummy.channels);
const auto tick_ns =
(config_.dummy.fps == 0)
? 0ull
: std::max<std::uint64_t>(1ull, 1'000'000'000ull / static_cast<std::uint64_t>(config_.dummy.fps));
std::uint64_t timestamp_ns = config_.dummy.start_timestamp_ns;
std::uint32_t frame_count = 0;
if (config_.dummy.startup_delay_ms > 0) {
sleep_with_stop(std::chrono::milliseconds(config_.dummy.startup_delay_ms));
}
try {
send_status(status_buffer, ipc::ModuleStatus::Online, 0);
while (!stop_requested_.load(std::memory_order_relaxed)) {
frame_count += 1;
auto payload = shm_->payload(payload_bytes);
wire_codec::write_deterministic_payload(payload, frame_count, config_.dummy.width, config_.dummy.height, config_.dummy.channels);
wire_codec::write_frame_metadata(
shm_->metadata(),
ipc::FrameInfo{
.width = config_.dummy.width,
.height = config_.dummy.height,
.channels = config_.dummy.channels,
.depth = config_.dummy.depth,
.pixel_format = config_.dummy.pixel_format,
.buffer_size = static_cast<std::uint32_t>(payload_bytes)},
frame_count,
timestamp_ns);
wire_codec::write_sync_message(sync_buffer, config_.dummy.label, frame_count, timestamp_ns);
publisher_->send(zmq::buffer(sync_buffer), zmq::send_flags::none);
if (config_.dummy.emit_reset_at && *config_.dummy.emit_reset_at == frame_count) {
send_status(status_buffer, ipc::ModuleStatus::StreamReset, frame_count);
}
if (config_.dummy.emit_reset_every && *config_.dummy.emit_reset_every > 0 &&
(frame_count % *config_.dummy.emit_reset_every) == 0) {
send_status(status_buffer, ipc::ModuleStatus::StreamReset, frame_count);
}
timestamp_ns += tick_ns;
if (config_.dummy.fps > 0) {
sleep_with_stop(std::chrono::nanoseconds(tick_ns));
}
}
} catch (const std::exception &e) {
spdlog::error("dummy source producer failed: {}", e.what());
}
if (publisher_) {
send_status(status_buffer, ipc::ModuleStatus::Offline, frame_count);
}
}
RuntimeConfig config_;
std::string shm_name_;
std::string zmq_endpoint_;
mutable std::optional<SharedMemoryRegion> shm_{};
mutable std::optional<zmq::context_t> zmq_context_{};
mutable std::optional<zmq::socket_t> publisher_{};
mutable std::thread producer_thread_{};
mutable std::atomic_bool prepared_{false};
mutable std::atomic_bool stop_requested_{false};
};
}
std::expected<std::unique_ptr<FrameSource>, std::string> make_frame_source(const RuntimeConfig &config) {
switch (config.input_mode) {
case InputMode::Real:
return std::unique_ptr<FrameSource>(new RealFrameSource(config));
case InputMode::Dummy:
return std::unique_ptr<FrameSource>(new DummyFrameSource(config));
}
return std::unexpected("unsupported input mode");
return std::unique_ptr<FrameSource>(new RealFrameSource(config));
}
}
+3 -3
View File
@@ -2,7 +2,7 @@
#include "cvmmap_streamer/core/frame_source.hpp"
#include "cvmmap_streamer/ipc/contracts.hpp"
#include "include/app/cvmmap/cvmmap_client.hpp"
#include <cvmmap/client.hpp>
#include <algorithm>
#include <atomic>
@@ -47,12 +47,12 @@ namespace {
.zmq_endpoint = config.input.zmq_endpoint,
};
if (config.input_mode != InputMode::Real || !config.input.shm_name.starts_with("cvmmap://")) {
if (!config.input.shm_name.starts_with("cvmmap://")) {
return resolved;
}
try {
auto target = app::cvmmap::resolve_cvmmap_target_or_throw(config.input.shm_name);
auto target = cvmmap::resolve_cvmmap_target_or_throw(config.input.shm_name);
resolved.shm_name = target.shm_name;
resolved.zmq_endpoint = target.zmq_addr;
spdlog::info(
+42 -12
View File
@@ -1,5 +1,7 @@
#include "cvmmap_streamer/ipc/contracts.hpp"
#include <cvmmap/parser.hpp>
#include <algorithm>
#include <limits>
@@ -109,6 +111,40 @@ namespace {
}
}
[[nodiscard]]
ParseError map_core_parser_error(const std::string_view error) {
if (error.contains("magic")) {
return ParseError::InvalidMagic;
}
if (error.contains("version")) {
return ParseError::UnsupportedVersion;
}
if (error.contains("depth")) {
return ParseError::InvalidDepth;
}
if (error.contains("pixel_format")) {
return ParseError::InvalidPixelFormat;
}
return ParseError::InvalidSize;
}
[[nodiscard]]
FrameMetadata from_core_metadata(const cvmmap::frame_metadata_t &metadata) {
FrameMetadata converted{};
std::copy_n(metadata.magic, converted.magic.size(), converted.magic.begin());
converted.versions_major = metadata.versions_major;
converted.versions_minor = metadata.versions_minor;
converted.frame_count = metadata.frame_count;
converted.timestamp_ns = metadata.timestamp_ns;
converted.info.width = metadata.info.width;
converted.info.height = metadata.info.height;
converted.info.channels = metadata.info.channels;
converted.info.depth = static_cast<Depth>(metadata.info.depth);
converted.info.pixel_format = static_cast<PixelFormat>(metadata.info.pixel_format);
converted.info.buffer_size = metadata.info.buffer_size;
return converted;
}
}
std::string_view to_string(ParseError error) {
@@ -301,22 +337,16 @@ std::expected<ValidatedShmView, ParseError> validate_shm_region(std::span<const
return std::unexpected(ParseError::BufferTooSmall);
}
auto metadata_result = parse_frame_metadata(shm_region);
auto metadata_result = cvmmap::parse_frame_metadata_regions(
shm_region.first(kShmPayloadOffset),
shm_region.subspan(kShmPayloadOffset));
if (!metadata_result) {
return std::unexpected(metadata_result.error());
}
const auto payload_size = static_cast<std::size_t>(metadata_result->info.buffer_size);
if (payload_size > std::numeric_limits<std::size_t>::max() - kShmPayloadOffset) {
return std::unexpected(ParseError::InvalidSize);
}
if (payload_size > shm_region.size() - kShmPayloadOffset) {
return std::unexpected(ParseError::InvalidSize);
return std::unexpected(map_core_parser_error(metadata_result.error()));
}
return ValidatedShmView{
.metadata = *metadata_result,
.payload = shm_region.subspan(kShmPayloadOffset, payload_size)};
.metadata = from_core_metadata(metadata_result->normalized_metadata),
.payload = metadata_result->left_plane};
}
std::expected<CoherentSnapshot, SnapshotError> read_coherent_snapshot(
+1 -1
View File
@@ -18,7 +18,7 @@ namespace {
"",
"Examples:",
" cvmmap_streamer --help",
" cvmmap_streamer --run-mode pipeline --input-mode dummy --help",
" cvmmap_streamer --run-mode pipeline --shm-name cvmmap://default --help",
" rtp_receiver_tester --help"};
}
+3 -3
View File
@@ -1,7 +1,7 @@
#include "cvmmap_streamer/config/runtime_config.hpp"
#include "cvmmap_streamer/ipc/contracts.hpp"
#include "include/app/cvmmap/cvmmap_client.hpp"
#include <cvmmap/client.hpp>
#include <cstddef>
#include <cstdint>
@@ -63,12 +63,12 @@ namespace {
.zmq_endpoint = config.input.zmq_endpoint,
};
if (config.input_mode != InputMode::Real || !config.input.shm_name.starts_with("cvmmap://")) {
if (!config.input.shm_name.starts_with("cvmmap://")) {
return resolved;
}
try {
auto target = app::cvmmap::resolve_cvmmap_target_or_throw(config.input.shm_name);
auto target = cvmmap::resolve_cvmmap_target_or_throw(config.input.shm_name);
resolved.shm_name = target.shm_name;
resolved.zmq_endpoint = target.zmq_addr;
spdlog::info(