Files
cvmmap-streamer/src/core/frame_source.cpp
T

431 lines
12 KiB
C++

#include "cvmmap_streamer/core/frame_source.hpp"
#include "cvmmap_streamer/ipc/contracts.hpp"
#include "cvmmap_streamer/sim/wire.hpp"
#include <app/cvmmap/cvmmap_client.hpp>
#include <array>
#include <atomic>
#include <chrono>
#include <cstddef>
#include <cstdint>
#include <cstring>
#include <exception>
#include <memory>
#include <optional>
#include <span>
#include <string>
#include <string_view>
#include <thread>
#include <utility>
#include <fcntl.h>
#include <spdlog/spdlog.h>
#include <sys/mman.h>
#include <sys/stat.h>
#include <unistd.h>
#include <zmq.hpp>
namespace cvmmap_streamer::core {
namespace {
namespace ipc = cvmmap_streamer::ipc;
[[nodiscard]]
std::string resolve_client_target(const RuntimeConfig &config) {
if (config.input.shm_name.starts_with("cvmmap://")) {
return config.input.shm_name;
}
if (!config.input.shm_name.empty() && config.input.shm_name.front() == '/') {
return config.input.shm_name.substr(1);
}
return config.input.shm_name;
}
[[nodiscard]]
std::string_view to_string(app::cvmmap::ModuleStatus status) {
switch (status) {
case app::cvmmap::ModuleStatus::Online:
return "online";
case app::cvmmap::ModuleStatus::Offline:
return "offline";
case app::cvmmap::ModuleStatus::StreamReset:
return "stream_reset";
}
return "unknown";
}
[[nodiscard]]
std::string normalized_shm_name(std::string_view raw_name) {
if (!raw_name.empty() && raw_name.front() == '/') {
return std::string(raw_name);
}
return "/" + std::string(raw_name);
}
void cleanup_zmq_ipc_endpoint_path(std::string_view endpoint) {
constexpr std::string_view kPrefix{"ipc://"};
if (!endpoint.starts_with(kPrefix)) {
return;
}
const auto path = endpoint.substr(kPrefix.size());
if (!path.empty()) {
unlink(std::string(path).c_str());
}
}
class SharedMemoryRegion {
public:
static std::expected<SharedMemoryRegion, std::string> create(const std::string &name, std::size_t bytes) {
const int fd = shm_open(name.c_str(), O_CREAT | O_RDWR, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
if (fd < 0) {
return std::unexpected("shm_open failed");
}
if (ftruncate(fd, static_cast<off_t>(bytes)) != 0) {
close(fd);
shm_unlink(name.c_str());
return std::unexpected("ftruncate failed");
}
auto *mapped = static_cast<std::uint8_t *>(mmap(nullptr, bytes, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0));
if (mapped == MAP_FAILED) {
close(fd);
shm_unlink(name.c_str());
return std::unexpected("mmap failed");
}
std::memset(mapped, 0, bytes);
return SharedMemoryRegion(name, fd, mapped, bytes);
}
SharedMemoryRegion() = default;
SharedMemoryRegion(const SharedMemoryRegion &) = delete;
SharedMemoryRegion &operator=(const SharedMemoryRegion &) = delete;
SharedMemoryRegion(SharedMemoryRegion &&other) noexcept
: name_(std::move(other.name_)),
fd_(std::exchange(other.fd_, -1)),
ptr_(std::exchange(other.ptr_, nullptr)),
bytes_(std::exchange(other.bytes_, 0)) {}
SharedMemoryRegion &operator=(SharedMemoryRegion &&other) noexcept {
if (this == &other) {
return *this;
}
cleanup();
name_ = std::move(other.name_);
fd_ = std::exchange(other.fd_, -1);
ptr_ = std::exchange(other.ptr_, nullptr);
bytes_ = std::exchange(other.bytes_, 0);
return *this;
}
~SharedMemoryRegion() {
cleanup();
}
[[nodiscard]]
std::span<std::uint8_t> metadata() {
return std::span<std::uint8_t>(ptr_, ipc::kShmPayloadOffset);
}
[[nodiscard]]
std::span<std::uint8_t> payload(std::size_t bytes) {
return std::span<std::uint8_t>(ptr_ + ipc::kShmPayloadOffset, bytes);
}
private:
SharedMemoryRegion(std::string name, int fd, std::uint8_t *ptr, std::size_t bytes)
: name_(std::move(name)), fd_(fd), ptr_(ptr), bytes_(bytes) {}
void cleanup() {
if (ptr_ != nullptr && bytes_ > 0) {
munmap(ptr_, bytes_);
ptr_ = nullptr;
}
if (fd_ >= 0) {
close(fd_);
fd_ = -1;
}
if (!name_.empty()) {
shm_unlink(name_.c_str());
}
}
std::string name_{};
int fd_{-1};
std::uint8_t *ptr_{nullptr};
std::size_t bytes_{0};
};
class RealFrameSource final : public FrameSource {
public:
explicit RealFrameSource(const RuntimeConfig &config)
: client_target_(resolve_client_target(config)) {}
~RealFrameSource() override {
if (client_ != nullptr) {
client_->Stop();
spdlog::info(
"real source backend stopped: target='{}' frames={} events={}",
client_target_,
observed_frames_.load(std::memory_order_relaxed),
observed_events_.load(std::memory_order_relaxed));
}
}
[[nodiscard]]
std::string_view backend_name() const override {
return "real";
}
[[nodiscard]]
std::expected<void, std::string> prepare_runtime() const override {
if (prepared_.exchange(true, std::memory_order_acq_rel)) {
return {};
}
try {
auto client = std::make_unique<app::cvmmap::CvMmapClient>(client_target_);
client->SetEventCallback([this](app::cvmmap::ModuleStatus status) {
const auto events = observed_events_.fetch_add(1, std::memory_order_relaxed) + 1;
spdlog::info(
"real source event status={} total_events={}",
to_string(status),
events);
});
client->SetFrameCallback([this](const app::cvmmap::frame_metadata_t &metadata, std::span<const std::uint8_t> payload) {
const auto frames = observed_frames_.fetch_add(1, std::memory_order_relaxed) + 1;
if (frames <= 1 || (frames % 60) == 0) {
spdlog::debug(
"real source frame frame_count={} timestamp_ns={} payload_bytes={} observed_frames={}",
metadata.frame_count,
metadata.timestamp_ns,
payload.size(),
frames);
}
});
client->Start();
client_ = std::move(client);
} catch (const std::exception &e) {
prepared_.store(false, std::memory_order_release);
return std::unexpected(std::string("cvmmap-client-cpp init failed: ") + e.what());
}
spdlog::info(
"real source backend initialized via cvmmap-client-cpp: target='{}'",
client_target_);
return {};
}
private:
std::string client_target_;
mutable std::unique_ptr<app::cvmmap::CvMmapClient> client_{nullptr};
mutable std::atomic_bool prepared_{false};
mutable std::atomic<std::uint64_t> observed_frames_{0};
mutable std::atomic<std::uint64_t> observed_events_{0};
};
class DummyFrameSource final : public FrameSource {
public:
explicit DummyFrameSource(const RuntimeConfig &config)
: config_(config),
shm_name_(normalized_shm_name(config.input.shm_name)),
zmq_endpoint_(config.input.zmq_endpoint) {}
~DummyFrameSource() override {
shutdown();
}
[[nodiscard]]
std::string_view backend_name() const override {
return "dummy";
}
[[nodiscard]]
std::expected<void, std::string> prepare_runtime() const override {
if (prepared_.exchange(true, std::memory_order_acq_rel)) {
return {};
}
const auto payload_bytes = static_cast<std::size_t>(config_.dummy.width) *
static_cast<std::size_t>(config_.dummy.height) *
static_cast<std::size_t>(config_.dummy.channels);
auto shm_region = SharedMemoryRegion::create(shm_name_, ipc::kShmPayloadOffset + payload_bytes);
if (!shm_region) {
prepared_.store(false, std::memory_order_release);
return std::unexpected("dummy shm setup failed: " + shm_region.error());
}
cleanup_zmq_ipc_endpoint_path(zmq_endpoint_);
try {
zmq_context_.emplace(1);
publisher_.emplace(*zmq_context_, zmq::socket_type::pub);
publisher_->bind(zmq_endpoint_);
} catch (const std::exception &e) {
publisher_.reset();
zmq_context_.reset();
cleanup_zmq_ipc_endpoint_path(zmq_endpoint_);
prepared_.store(false, std::memory_order_release);
return std::unexpected(std::string("dummy zmq bind failed: ") + e.what());
}
shm_.emplace(std::move(*shm_region));
stop_requested_.store(false, std::memory_order_release);
try {
producer_thread_ = std::thread([this]() {
run_producer_loop();
});
} catch (const std::exception &e) {
publisher_.reset();
zmq_context_.reset();
shm_.reset();
cleanup_zmq_ipc_endpoint_path(zmq_endpoint_);
prepared_.store(false, std::memory_order_release);
return std::unexpected(std::string("dummy producer thread start failed: ") + e.what());
}
spdlog::info(
"dummy source backend initialized in-process: shm='{}' zmq='{}' label='{}'",
shm_name_,
zmq_endpoint_,
config_.dummy.label);
return {};
}
private:
void shutdown() const {
if (!prepared_.exchange(false, std::memory_order_acq_rel)) {
return;
}
stop_requested_.store(true, std::memory_order_release);
if (producer_thread_.joinable()) {
producer_thread_.join();
}
publisher_.reset();
zmq_context_.reset();
shm_.reset();
cleanup_zmq_ipc_endpoint_path(zmq_endpoint_);
}
void sleep_with_stop(std::chrono::nanoseconds duration) const {
constexpr auto kSlice = std::chrono::milliseconds(10);
auto remaining = duration;
while (remaining.count() > 0 && !stop_requested_.load(std::memory_order_relaxed)) {
auto step = std::min(remaining, std::chrono::duration_cast<std::chrono::nanoseconds>(kSlice));
std::this_thread::sleep_for(step);
remaining -= step;
}
}
void send_status(
std::array<std::uint8_t, sim::kModuleStatusMessageBytes> &buffer,
ipc::ModuleStatus status,
std::uint32_t frame_count) const {
sim::write_module_status_message(buffer, config_.dummy.label, status);
publisher_->send(zmq::buffer(buffer), zmq::send_flags::none);
if (status == ipc::ModuleStatus::Offline || status == ipc::ModuleStatus::Online) {
spdlog::info("dummy source status={} frame_count={}", static_cast<std::int32_t>(status), frame_count);
}
}
void run_producer_loop() const {
std::array<std::uint8_t, sim::kSyncMessageBytes> sync_buffer{};
std::array<std::uint8_t, sim::kModuleStatusMessageBytes> status_buffer{};
const auto payload_bytes = static_cast<std::size_t>(config_.dummy.width) *
static_cast<std::size_t>(config_.dummy.height) *
static_cast<std::size_t>(config_.dummy.channels);
const auto tick_ns =
(config_.dummy.fps == 0)
? 0ull
: std::max<std::uint64_t>(1ull, 1'000'000'000ull / static_cast<std::uint64_t>(config_.dummy.fps));
std::uint64_t timestamp_ns = config_.dummy.start_timestamp_ns;
std::uint32_t frame_count = 0;
if (config_.dummy.startup_delay_ms > 0) {
sleep_with_stop(std::chrono::milliseconds(config_.dummy.startup_delay_ms));
}
try {
send_status(status_buffer, ipc::ModuleStatus::Online, 0);
while (!stop_requested_.load(std::memory_order_relaxed)) {
frame_count += 1;
auto payload = shm_->payload(payload_bytes);
sim::write_deterministic_payload(payload, frame_count, config_.dummy.width, config_.dummy.height, config_.dummy.channels);
sim::write_frame_metadata(
shm_->metadata(),
ipc::FrameInfo{
.width = config_.dummy.width,
.height = config_.dummy.height,
.channels = config_.dummy.channels,
.depth = config_.dummy.depth,
.pixel_format = config_.dummy.pixel_format,
.buffer_size = static_cast<std::uint32_t>(payload_bytes)},
frame_count,
timestamp_ns);
sim::write_sync_message(sync_buffer, config_.dummy.label, frame_count, timestamp_ns);
publisher_->send(zmq::buffer(sync_buffer), zmq::send_flags::none);
if (config_.dummy.emit_reset_at && *config_.dummy.emit_reset_at == frame_count) {
send_status(status_buffer, ipc::ModuleStatus::StreamReset, frame_count);
}
if (config_.dummy.emit_reset_every && *config_.dummy.emit_reset_every > 0 &&
(frame_count % *config_.dummy.emit_reset_every) == 0) {
send_status(status_buffer, ipc::ModuleStatus::StreamReset, frame_count);
}
timestamp_ns += tick_ns;
if (config_.dummy.fps > 0) {
sleep_with_stop(std::chrono::nanoseconds(tick_ns));
}
}
} catch (const std::exception &e) {
spdlog::error("dummy source producer failed: {}", e.what());
}
if (publisher_) {
send_status(status_buffer, ipc::ModuleStatus::Offline, frame_count);
}
}
RuntimeConfig config_;
std::string shm_name_;
std::string zmq_endpoint_;
mutable std::optional<SharedMemoryRegion> shm_{};
mutable std::optional<zmq::context_t> zmq_context_{};
mutable std::optional<zmq::socket_t> publisher_{};
mutable std::thread producer_thread_{};
mutable std::atomic_bool prepared_{false};
mutable std::atomic_bool stop_requested_{false};
};
}
std::expected<std::unique_ptr<FrameSource>, std::string> make_frame_source(const RuntimeConfig &config) {
switch (config.input_mode) {
case InputMode::Real:
return std::unique_ptr<FrameSource>(new RealFrameSource(config));
case InputMode::Dummy:
return std::unique_ptr<FrameSource>(new DummyFrameSource(config));
}
return std::unexpected("unsupported input mode");
}
}