feat(downstream): add cvmmap downstream runtime implementation

This commit introduces the full downstream runtime implementation needed to ingest, transform, and publish streams.

It preserves the original upstream request boundary by packaging the entire cvmmap-streamer module (build config, public API, protocol and IPC glue, and simulator/tester entrypoints) in one coherent core unit.

Keeping this group isolated enables reviewers to validate runtime behavior and correctness without mixing test evidence or process documentation changes.

Ultraworked with [Sisyphus](https://github.com/code-yeongyu/oh-my-opencode)

Co-authored-by: Sisyphus <clio-agent@sisyphuslabs.ai>
This commit is contained in:
2026-03-05 20:31:58 +08:00
commit 56e874ab6d
27 changed files with 8483 additions and 0 deletions
+388
View File
@@ -0,0 +1,388 @@
#include "cvmmap_streamer/config/runtime_config.hpp"
#include "cvmmap_streamer/ipc/contracts.hpp"
#include <algorithm>
#include <atomic>
#include <chrono>
#include <cstddef>
#include <cstdint>
#include <cstring>
#include <condition_variable>
#include <deque>
#include <mutex>
#include <optional>
#include <span>
#include <string>
#include <thread>
#include <utility>
#include <vector>
#include <fcntl.h>
#include <spdlog/spdlog.h>
#include <sys/mman.h>
#include <sys/stat.h>
#include <unistd.h>
#include <zmq.hpp>
namespace cvmmap_streamer::core {
namespace {
using namespace std::chrono_literals;
namespace ipc = cvmmap_streamer::ipc;
struct SharedMemoryView {
SharedMemoryView() = default;
int fd{-1};
std::uint8_t *ptr{nullptr};
std::size_t bytes{0};
~SharedMemoryView() {
if (ptr != nullptr && ptr != MAP_FAILED && bytes > 0) {
munmap(ptr, bytes);
}
if (fd >= 0) {
close(fd);
}
}
SharedMemoryView(const SharedMemoryView &) = delete;
SharedMemoryView &operator=(const SharedMemoryView &) = delete;
SharedMemoryView(SharedMemoryView &&other) noexcept {
fd = std::exchange(other.fd, -1);
ptr = std::exchange(other.ptr, nullptr);
bytes = std::exchange(other.bytes, 0);
}
SharedMemoryView &operator=(SharedMemoryView &&other) noexcept {
if (this == &other) {
return *this;
}
if (ptr != nullptr && ptr != MAP_FAILED && bytes > 0) {
munmap(ptr, bytes);
}
if (fd >= 0) {
close(fd);
}
fd = std::exchange(other.fd, -1);
ptr = std::exchange(other.ptr, nullptr);
bytes = std::exchange(other.bytes, 0);
return *this;
}
[[nodiscard]]
std::span<const std::uint8_t> region() const {
return std::span<const std::uint8_t>(ptr, bytes);
}
[[nodiscard]]
static std::expected<SharedMemoryView, std::string> open_readonly(const std::string &raw_name) {
const auto shm_name = raw_name.starts_with('/') ? raw_name : "/" + raw_name;
const int fd = shm_open(shm_name.c_str(), O_RDONLY, 0);
if (fd < 0) {
return std::unexpected("shm_open failed for '" + shm_name + "'");
}
struct stat statbuf {
};
if (fstat(fd, &statbuf) != 0) {
close(fd);
return std::unexpected("fstat failed for '" + shm_name + "'");
}
if (statbuf.st_size <= 0) {
close(fd);
return std::unexpected("shared memory size is zero for '" + shm_name + "'");
}
const auto bytes = static_cast<std::size_t>(statbuf.st_size);
auto *mapped = static_cast<std::uint8_t *>(mmap(nullptr, bytes, PROT_READ, MAP_SHARED, fd, 0));
if (mapped == MAP_FAILED) {
close(fd);
return std::unexpected("mmap failed for '" + shm_name + "'");
}
SharedMemoryView view;
view.fd = fd;
view.ptr = mapped;
view.bytes = bytes;
return view;
}
};
struct IngestFrame {
std::uint32_t frame_count;
std::uint64_t timestamp_ns;
std::size_t payload_bytes;
};
struct IngestStats {
std::uint64_t dropped_frames{0};
std::uint64_t torn_frames{0};
std::uint64_t resets{0};
std::uint64_t decode_reconfigs{0};
std::uint64_t sync_messages{0};
std::uint64_t status_messages{0};
std::atomic<std::uint64_t> consumed_frames{0};
};
[[nodiscard]]
bool frame_info_equal(const ipc::FrameInfo &lhs, const ipc::FrameInfo &rhs) {
return lhs.width == rhs.width &&
lhs.height == rhs.height &&
lhs.channels == rhs.channels &&
lhs.depth == rhs.depth &&
lhs.pixel_format == rhs.pixel_format &&
lhs.buffer_size == rhs.buffer_size;
}
[[nodiscard]]
std::string status_to_string(ipc::ModuleStatus status) {
switch (status) {
case ipc::ModuleStatus::Online:
return "online";
case ipc::ModuleStatus::Offline:
return "offline";
case ipc::ModuleStatus::StreamReset:
return "stream_reset";
}
return "unknown";
}
}
int run_ingest_loop(const RuntimeConfig &config) {
auto shm = SharedMemoryView::open_readonly(config.input.shm_name);
if (!shm) {
spdlog::error("ingest open shared memory failed: {}", shm.error());
return 3;
}
if (shm->bytes <= ipc::kShmPayloadOffset) {
spdlog::error("ingest invalid shared memory size: {}", shm->bytes);
return 3;
}
const std::size_t queue_capacity = std::max<std::size_t>(1, config.latency.queue_size);
std::vector<std::uint8_t> snapshot_buffer(shm->bytes - ipc::kShmPayloadOffset, static_cast<std::uint8_t>(0));
zmq::context_t zmq_ctx{1};
zmq::socket_t subscriber(zmq_ctx, zmq::socket_type::sub);
try {
subscriber.set(zmq::sockopt::subscribe, "");
subscriber.set(zmq::sockopt::rcvtimeo, 20);
subscriber.connect(config.input.zmq_endpoint);
} catch (const zmq::error_t &e) {
spdlog::error("ingest subscribe failed on '{}': {}", config.input.zmq_endpoint, e.what());
return 4;
}
std::mutex queue_mutex;
std::condition_variable queue_cv;
std::deque<IngestFrame> queue;
std::size_t queue_depth_peak{0};
std::optional<ipc::FrameInfo> last_frame_info{};
std::atomic_bool stop_requested{false};
std::atomic_bool producer_offline{false};
IngestStats stats{};
std::thread consumer([&]() {
while (true) {
std::optional<IngestFrame> next{};
{
std::unique_lock lock(queue_mutex);
queue_cv.wait_for(lock, 25ms, [&]() {
return stop_requested.load(std::memory_order_relaxed) || !queue.empty();
});
if (stop_requested.load(std::memory_order_relaxed) && queue.empty()) {
break;
}
if (!queue.empty()) {
next.emplace(std::move(queue.front()));
queue.pop_front();
}
}
if (!next) {
continue;
}
if (config.latency.ingest_consumer_delay_ms > 0) {
std::this_thread::sleep_for(std::chrono::milliseconds(config.latency.ingest_consumer_delay_ms));
}
const auto consumed = stats.consumed_frames.fetch_add(1, std::memory_order_relaxed) + 1;
spdlog::debug(
"consume frame_count={} timestamp_ns={} payload_bytes={} consumed_frames={}",
next->frame_count,
next->timestamp_ns,
next->payload_bytes,
consumed);
if (config.latency.ingest_max_frames > 0 && consumed >= config.latency.ingest_max_frames) {
stop_requested.store(true, std::memory_order_relaxed);
queue_cv.notify_all();
}
}
});
const auto idle_timeout = std::chrono::milliseconds(config.latency.ingest_idle_timeout_ms);
auto last_event = std::chrono::steady_clock::now();
while (!stop_requested.load(std::memory_order_relaxed)) {
zmq::message_t message;
const auto recv_result = subscriber.recv(message, zmq::recv_flags::none);
if (!recv_result) {
const auto now = std::chrono::steady_clock::now();
if (now - last_event >= idle_timeout) {
spdlog::info(
"ingest idle timeout reached ({} ms), stopping",
config.latency.ingest_idle_timeout_ms);
break;
}
if (producer_offline.load(std::memory_order_relaxed)) {
std::lock_guard lock(queue_mutex);
if (queue.empty()) {
spdlog::info("producer offline and queue drained, stopping ingest loop");
break;
}
}
continue;
}
last_event = std::chrono::steady_clock::now();
const auto bytes = std::span<const std::uint8_t>(
static_cast<const std::uint8_t *>(message.data()),
message.size());
if (bytes.empty()) {
continue;
}
if (bytes[0] == ipc::kFrameTopicMagic) {
stats.sync_messages += 1;
auto sync = ipc::parse_sync_message(bytes);
if (!sync) {
spdlog::warn("sync parse error: {}", ipc::to_string(sync.error()));
continue;
}
auto snapshot = ipc::read_coherent_snapshot(shm->region(), snapshot_buffer);
if (!snapshot) {
if (snapshot.error() == ipc::SnapshotError::TornRead) {
stats.torn_frames += 1;
}
spdlog::warn("snapshot rejected: {}", ipc::to_string(snapshot.error()));
continue;
}
if (last_frame_info && !frame_info_equal(*last_frame_info, snapshot->metadata.info)) {
stats.decode_reconfigs += 1;
spdlog::info(
"decode reconfig detected old={}x{}x{} new={}x{}x{}",
last_frame_info->width,
last_frame_info->height,
static_cast<unsigned>(last_frame_info->channels),
snapshot->metadata.info.width,
snapshot->metadata.info.height,
static_cast<unsigned>(snapshot->metadata.info.channels));
}
last_frame_info = snapshot->metadata.info;
IngestFrame frame{
.frame_count = snapshot->metadata.frame_count,
.timestamp_ns = snapshot->metadata.timestamp_ns,
.payload_bytes = snapshot->bytes_copied};
std::size_t depth_after_push{0};
{
std::lock_guard lock(queue_mutex);
while (queue.size() >= queue_capacity) {
queue.pop_front();
stats.dropped_frames += 1;
}
queue.push_back(std::move(frame));
depth_after_push = queue.size();
queue_depth_peak = std::max(queue_depth_peak, depth_after_push);
}
queue_cv.notify_one();
spdlog::debug(
"ingest sync={} snapshot={} queue_depth={} dropped_frames={}",
sync->frame_count,
snapshot->metadata.frame_count,
depth_after_push,
stats.dropped_frames);
continue;
}
if (bytes[0] == ipc::kModuleStatusMagic) {
stats.status_messages += 1;
auto status = ipc::parse_module_status_message(bytes);
if (!status) {
spdlog::warn("status parse error: {}", ipc::to_string(status.error()));
continue;
}
spdlog::info(
"status event label='{}' status={}",
status->label(),
status_to_string(status->module_status));
if (status->module_status == ipc::ModuleStatus::Online) {
producer_offline.store(false, std::memory_order_relaxed);
}
if (status->module_status == ipc::ModuleStatus::Offline) {
producer_offline.store(true, std::memory_order_relaxed);
}
if (status->module_status == ipc::ModuleStatus::StreamReset) {
stats.resets += 1;
std::size_t cleared{0};
{
std::lock_guard lock(queue_mutex);
cleared = queue.size();
queue.clear();
}
last_frame_info.reset();
spdlog::info(
"ingest state reset applied queue_cleared={} resets={}",
cleared,
stats.resets);
}
continue;
}
spdlog::warn("unknown message type: magic=0x{:02x} size={}", bytes[0], bytes.size());
}
stop_requested.store(true, std::memory_order_relaxed);
queue_cv.notify_all();
consumer.join();
std::size_t final_depth{0};
{
std::lock_guard lock(queue_mutex);
final_depth = queue.size();
}
spdlog::info(
"INGEST_METRICS queue_capacity={} queue_depth={} queue_depth_peak={} dropped_frames={} torn_frames={} resets={} decode_reconfigs={} consumed_frames={} sync_messages={} status_messages={}",
queue_capacity,
final_depth,
queue_depth_peak,
stats.dropped_frames,
stats.torn_frames,
stats.resets,
stats.decode_reconfigs,
stats.consumed_frames.load(std::memory_order_relaxed),
stats.sync_messages,
stats.status_messages);
return 0;
}
}