diff --git a/README.md b/README.md index b9a8c67..3cc6691 100644 --- a/README.md +++ b/README.md @@ -7,7 +7,8 @@ A standalone C++ downstream project that reads frames from cv-mmap IPC, encodes This project consumes video frames from the cv-mmap shared memory interface and publishes them as encoded streams. It operates as a downstream consumer only, never writing to the cv-mmap shared memory. **Key Features:** -- Reads cv-mmap IPC frames via POSIX shared memory + ZeroMQ synchronization +- Reads cv-mmap IPC frames via POSIX shared memory + ZeroMQ frame sync +- Consumes cv-mmap control/status/body over NATS - NVENC H.264/H.265 encoding with deterministic software fallback - RTP UDP-unicast publisher with automatic SDP generation - RTMP publisher with dual H.265 modes (Enhanced-RTMP + domestic extension) @@ -22,6 +23,7 @@ This project consumes video frames from the cv-mmap shared memory interface and - CMake 3.20+ - GStreamer 1.20+ with development headers - ZeroMQ (cppzmq) with development headers +- NATS server reachable at runtime - spdlog - NVIDIA GPU with NVENC support (optional, falls back to software encoding) @@ -156,7 +158,8 @@ Run the fault injection and latency validation suite. | Flag | Description | Default | |------|-------------|---------| | `--shm-name NAME` | POSIX shared memory segment name | required | -| `--zmq-endpoint URI` | ZeroMQ PUB endpoint for sync/status | required | +| `--zmq-endpoint URI` | ZeroMQ PUB endpoint for frame sync | required | +| `--nats-url URL` | NATS server for control/status/body | `nats://localhost:4222` | | `--queue-size N` | Ingest queue capacity (1 = latest-frame) | 1 | ### Codec Options diff --git a/include/cvmmap_streamer/config/runtime_config.hpp b/include/cvmmap_streamer/config/runtime_config.hpp index 9bcd94a..63cd8d2 100644 --- a/include/cvmmap_streamer/config/runtime_config.hpp +++ b/include/cvmmap_streamer/config/runtime_config.hpp @@ -48,6 +48,7 @@ enum class McapCompression { struct InputConfig { std::string uri{"cvmmap://default"}; + std::string nats_url{"nats://localhost:4222"}; }; struct EncoderConfig { diff --git a/src/config/runtime_config.cpp b/src/config/runtime_config.cpp index 64f327a..1608b60 100644 --- a/src/config/runtime_config.cpp +++ b/src/config/runtime_config.cpp @@ -260,6 +260,9 @@ std::expected apply_toml_file(RuntimeConfig &config, const st if (auto value = toml_value(table, "input.uri")) { config.input.uri = *value; } + if (auto value = toml_value(table, "input.nats_url")) { + config.input.nats_url = *value; + } if (auto value = toml_value(table, "run_mode")) { auto parsed = parse_run_mode(*value); if (!parsed) { @@ -551,6 +554,7 @@ std::expected parse_runtime_config(int argc, char ** std::string config_path_raw{}; std::string input_uri_raw{}; + std::string input_nats_url_raw{}; std::string run_mode_raw{}; std::string codec_raw{}; std::string encoder_backend_raw{}; @@ -588,6 +592,7 @@ std::expected parse_runtime_config(int argc, char ** app.add_option("--config", config_path_raw); app.add_option("--input-uri", input_uri_raw); + app.add_option("--nats-url", input_nats_url_raw); app.add_option("--run-mode", run_mode_raw); app.add_option("--codec", codec_raw); app.add_option("--encoder-backend", encoder_backend_raw); @@ -640,6 +645,9 @@ std::expected parse_runtime_config(int argc, char ** if (!input_uri_raw.empty()) { config.input.uri = input_uri_raw; } + if (!input_nats_url_raw.empty()) { + config.input.nats_url = input_nats_url_raw; + } if (!run_mode_raw.empty()) { auto parsed = parse_run_mode(run_mode_raw); if (!parsed) { @@ -814,6 +822,9 @@ std::expected validate_runtime_config(const RuntimeConfig &co if (config.input.uri.empty()) { return std::unexpected("invalid input config: input.uri must not be empty"); } + if (config.input.nats_url.empty()) { + return std::unexpected("invalid input config: input.nats_url must not be empty"); + } if (config.outputs.rtmp.enabled && config.outputs.rtmp.urls.empty()) { return std::unexpected("invalid RTMP config: enabled RTMP output requires at least one URL"); @@ -882,6 +893,7 @@ std::expected validate_runtime_config(const RuntimeConfig &co std::string summarize_runtime_config(const RuntimeConfig &config) { std::ostringstream ss; ss << "input.uri=" << config.input.uri; + ss << ", input.nats_url=" << config.input.nats_url; ss << ", run_mode=" << to_string(config.run_mode); ss << ", encoder.backend=" << to_string(config.encoder.backend); ss << ", encoder.device=" << to_string(config.encoder.device); diff --git a/src/core/frame_source.cpp b/src/core/frame_source.cpp index 839d7e9..385eed0 100644 --- a/src/core/frame_source.cpp +++ b/src/core/frame_source.cpp @@ -37,7 +37,8 @@ std::string_view to_string(cvmmap::ModuleStatus status) { class RealFrameSource final : public FrameSource { public: explicit RealFrameSource(const RuntimeConfig &config) - : client_target_(resolve_client_target(config)) {} + : client_target_(resolve_client_target(config)), + nats_url_(config.input.nats_url) {} ~RealFrameSource() override { if (client_ != nullptr) { @@ -62,7 +63,10 @@ public: } try { - auto client = std::make_unique(client_target_); + auto client = std::make_unique(cvmmap::ClientConfig{ + .instance_name = client_target_, + .nats_url = nats_url_, + }); client->SetEventCallback([this](cvmmap::ModuleStatus status) { const auto events = observed_events_.fetch_add(1, std::memory_order_relaxed) + 1; spdlog::info( @@ -96,6 +100,7 @@ public: private: std::string client_target_; + std::string nats_url_; mutable std::unique_ptr client_{nullptr}; mutable std::atomic_bool prepared_{false}; mutable std::atomic observed_frames_{0}; diff --git a/src/pipeline/pipeline_runtime.cpp b/src/pipeline/pipeline_runtime.cpp index abd6d9d..437e748 100644 --- a/src/pipeline/pipeline_runtime.cpp +++ b/src/pipeline/pipeline_runtime.cpp @@ -8,14 +8,17 @@ #include "cvmmap_streamer/record/mcap_record_sink.hpp" #include +#include #include #include #include #include #include +#include #include #include +#include #include #include #include @@ -60,7 +63,7 @@ constexpr int exit_code(PipelineExitCode code) { struct ResolvedInputEndpoints { std::string shm_name; std::string zmq_endpoint; - std::string body_zmq_endpoint; + std::string nats_target_key; }; [[nodiscard]] @@ -68,14 +71,15 @@ std::expected resolve_input_endpoints(const try { auto target = cvmmap::resolve_cvmmap_target_or_throw(config.input.uri); spdlog::info( - "pipeline input resolved: uri='{}' shm_name='{}' zmq_endpoint='{}'", + "pipeline input resolved: uri='{}' shm_name='{}' zmq_endpoint='{}' nats_target_key='{}'", config.input.uri, target.shm_name, - target.zmq_addr); + target.zmq_addr, + target.nats_target_key); return ResolvedInputEndpoints{ .shm_name = target.shm_name, .zmq_endpoint = target.zmq_addr, - .body_zmq_endpoint = target.zmq_body_addr, + .nats_target_key = target.nats_target_key, }; } catch (const std::exception &e) { return std::unexpected(std::string("invalid cvmmap input URI: ") + e.what()); @@ -371,21 +375,31 @@ int run_pipeline(const RuntimeConfig &config) { return exit_code(PipelineExitCode::SubscriberError); } - std::optional body_subscriber{}; - if (config.record.mcap.enabled) { - try { - body_subscriber.emplace(zmq_ctx, zmq::socket_type::sub); - body_subscriber->set(zmq::sockopt::subscribe, ""); - body_subscriber->connect(input_endpoints->body_zmq_endpoint); - } catch (const zmq::error_t &e) { - spdlog::error("pipeline body subscribe failed on '{}': {}", input_endpoints->body_zmq_endpoint, e.what()); - return exit_code(PipelineExitCode::SubscriberError); - } - } - std::optional rtp_publisher{}; std::optional rtmp_output{}; std::optional mcap_sink{}; + cvmmap::NatsControlClient nats_client( + input_endpoints->nats_target_key, + config.input.nats_url); + std::mutex nats_event_mutex{}; + std::deque> pending_body_packets{}; + std::deque pending_status_codes{}; + + nats_client.SetModuleStatusCallback([&nats_event_mutex, &pending_status_codes](int32_t status_code) { + std::lock_guard lock(nats_event_mutex); + pending_status_codes.push_back(status_code); + }); + if (config.record.mcap.enabled) { + nats_client.SetBodyTrackingRawCallback( + [&nats_event_mutex, &pending_body_packets](std::span bytes) { + std::lock_guard lock(nats_event_mutex); + pending_body_packets.emplace_back(bytes.begin(), bytes.end()); + }); + } + if (!nats_client.Start()) { + spdlog::error("pipeline NATS subscribe failed on '{}'", config.input.nats_url); + return exit_code(PipelineExitCode::SubscriberError); + } if (config.outputs.rtp.enabled) { auto created = protocol::UdpRtpPublisher::create(config); @@ -476,35 +490,94 @@ int run_pipeline(const RuntimeConfig &config) { return {}; }; - const auto idle_timeout = std::chrono::milliseconds(config.latency.ingest_idle_timeout_ms); - auto last_event = std::chrono::steady_clock::now(); + const auto idle_timeout = std::chrono::milliseconds(config.latency.ingest_idle_timeout_ms); + auto last_event = std::chrono::steady_clock::now(); - while (true) { - auto poll = (*backend)->poll(); - if (!poll) { - const auto reason = format_error(poll.error()); - restart_backend(reason, active_info); - } + while (true) { + std::deque> body_packets; + std::deque status_codes; + { + std::lock_guard lock(nats_event_mutex); + body_packets.swap(pending_body_packets); + status_codes.swap(pending_status_codes); + } + const bool had_nats_events = !body_packets.empty() || !status_codes.empty(); + if (had_nats_events) { + last_event = std::chrono::steady_clock::now(); + } + for (const auto status_code : status_codes) { + stats.status_messages += 1; + switch (static_cast(status_code)) { + case cvmmap::ModuleStatus::Online: + spdlog::info("pipeline status event status=online"); + producer_offline = false; + break; + case cvmmap::ModuleStatus::Offline: + spdlog::info("pipeline status event status=offline"); + producer_offline = true; + break; + case cvmmap::ModuleStatus::StreamReset: + spdlog::info("pipeline status event status=stream_reset"); + stats.resets += 1; + (*backend)->shutdown(); + started = false; + restart_pending = false; + restart_target_info.reset(); + active_info.reset(); + rtmp_output.reset(); + break; + } + } + for (const auto &body_bytes_vec : body_packets) { + if (!mcap_sink) { + continue; + } - std::array poll_items{{ - {subscriber.handle(), 0, ZMQ_POLLIN, 0}, - {body_subscriber ? body_subscriber->handle() : nullptr, 0, ZMQ_POLLIN, 0}, - }}; - try { - zmq::poll(poll_items, std::chrono::milliseconds{20}); - } catch (const zmq::error_t &e) { - spdlog::error("pipeline poll failed: {}", e.what()); - return exit_code(PipelineExitCode::SubscriberError); - } + const auto body_bytes = std::span( + body_bytes_vec.data(), + body_bytes_vec.size()); + if (body_bytes.empty()) { + continue; + } - const bool frame_socket_ready = (poll_items[0].revents & ZMQ_POLLIN) != 0; - const bool body_socket_ready = - body_subscriber.has_value() && - (poll_items[1].revents & ZMQ_POLLIN) != 0; - if (!frame_socket_ready && !body_socket_ready) { - const auto now = std::chrono::steady_clock::now(); - if (restart_pending && restart_target_info) { - auto start_result = attempt_backend_start(*restart_target_info); + auto parsed_body = cvmmap::parse_body_tracking_message(body_bytes); + if (!parsed_body) { + spdlog::warn("pipeline body packet parse error: {}", parsed_body.error()); + continue; + } + + auto write_body = mcap_sink->write_body_tracking_message(record::RawBodyTrackingMessageView{ + .timestamp_ns = body_tracking_timestamp_ns(*parsed_body), + .bytes = body_bytes, + }); + if (!write_body) { + const auto reason = "pipeline body MCAP write failed: " + write_body.error(); + restart_backend(reason, active_info); + break; + } + } + + auto poll = (*backend)->poll(); + if (!poll) { + const auto reason = format_error(poll.error()); + restart_backend(reason, active_info); + } + + std::array poll_items{{ + {subscriber.handle(), 0, ZMQ_POLLIN, 0}, + }}; + try { + zmq::poll(poll_items, std::chrono::milliseconds{20}); + } catch (const zmq::error_t &e) { + spdlog::error("pipeline poll failed: {}", e.what()); + return exit_code(PipelineExitCode::SubscriberError); + } + + const bool frame_socket_ready = (poll_items[0].revents & ZMQ_POLLIN) != 0; + if (!frame_socket_ready) { + const auto now = std::chrono::steady_clock::now(); + if (restart_pending && restart_target_info) { + auto start_result = attempt_backend_start(*restart_target_info); if (!start_result) { spdlog::error("pipeline backend restart failed: {}", format_error(start_result.error())); return exit_code(PipelineExitCode::RuntimeError); @@ -532,49 +605,8 @@ int run_pipeline(const RuntimeConfig &config) { continue; } - if (body_socket_ready && body_subscriber) { - while (true) { - zmq::message_t body_message; - const auto recv_body = body_subscriber->recv(body_message, zmq::recv_flags::dontwait); - if (!recv_body) { - break; - } - - last_event = std::chrono::steady_clock::now(); - auto body_bytes = std::span( - static_cast(body_message.data()), - body_message.size()); - if (body_bytes.empty()) { - continue; - } - if (!mcap_sink) { - continue; - } - - auto parsed_body = cvmmap::parse_body_tracking_message(body_bytes); - if (!parsed_body) { - spdlog::warn("pipeline body packet parse error: {}", parsed_body.error()); - continue; - } - - auto write_body = mcap_sink->write_body_tracking_message(record::RawBodyTrackingMessageView{ - .timestamp_ns = body_tracking_timestamp_ns(*parsed_body), - .bytes = body_bytes, - }); - if (!write_body) { - const auto reason = "pipeline body MCAP write failed: " + write_body.error(); - restart_backend(reason, active_info); - break; - } - } - } - - if (!frame_socket_ready) { - continue; - } - - zmq::message_t message; - const auto recv_result = subscriber.recv(message, zmq::recv_flags::dontwait); + zmq::message_t message; + const auto recv_result = subscriber.recv(message, zmq::recv_flags::dontwait); if (!recv_result) { continue; } @@ -685,36 +717,11 @@ int run_pipeline(const RuntimeConfig &config) { continue; } - if (bytes[0] == ipc::kModuleStatusMagic) { - stats.status_messages += 1; - auto status = ipc::parse_module_status_message(bytes); - if (!status) { - spdlog::warn("pipeline status parse error: {}", ipc::to_string(status.error())); - continue; - } - - spdlog::info("pipeline status event label='{}' status={}", status->label(), status_to_string(status->module_status)); - if (status->module_status == ipc::ModuleStatus::Online) { - producer_offline = false; - } - if (status->module_status == ipc::ModuleStatus::Offline) { - producer_offline = true; - } - if (status->module_status == ipc::ModuleStatus::StreamReset) { - stats.resets += 1; - (*backend)->shutdown(); - started = false; - restart_pending = false; - restart_target_info.reset(); - active_info.reset(); - rtmp_output.reset(); - } - continue; - } - spdlog::warn("pipeline unknown message type: magic=0x{:02x} size={}", bytes[0], bytes.size()); } + nats_client.Stop(); + if (started) { auto drain = drain_encoder( config,