refactor(streamer): consume control and body over NATS

This commit is contained in:
2026-03-16 17:07:45 +08:00
parent ee8ff747ea
commit ee53d1958e
5 changed files with 144 additions and 116 deletions
+119 -112
View File
@@ -8,14 +8,17 @@
#include "cvmmap_streamer/record/mcap_record_sink.hpp"
#include <cvmmap/client.hpp>
#include <cvmmap/nats_client.hpp>
#include <cvmmap/parser.hpp>
#include <chrono>
#include <cstddef>
#include <cstdint>
#include <cstring>
#include <deque>
#include <exception>
#include <expected>
#include <mutex>
#include <optional>
#include <span>
#include <string>
@@ -60,7 +63,7 @@ constexpr int exit_code(PipelineExitCode code) {
struct ResolvedInputEndpoints {
std::string shm_name;
std::string zmq_endpoint;
std::string body_zmq_endpoint;
std::string nats_target_key;
};
[[nodiscard]]
@@ -68,14 +71,15 @@ std::expected<ResolvedInputEndpoints, std::string> resolve_input_endpoints(const
try {
auto target = cvmmap::resolve_cvmmap_target_or_throw(config.input.uri);
spdlog::info(
"pipeline input resolved: uri='{}' shm_name='{}' zmq_endpoint='{}'",
"pipeline input resolved: uri='{}' shm_name='{}' zmq_endpoint='{}' nats_target_key='{}'",
config.input.uri,
target.shm_name,
target.zmq_addr);
target.zmq_addr,
target.nats_target_key);
return ResolvedInputEndpoints{
.shm_name = target.shm_name,
.zmq_endpoint = target.zmq_addr,
.body_zmq_endpoint = target.zmq_body_addr,
.nats_target_key = target.nats_target_key,
};
} catch (const std::exception &e) {
return std::unexpected(std::string("invalid cvmmap input URI: ") + e.what());
@@ -371,21 +375,31 @@ int run_pipeline(const RuntimeConfig &config) {
return exit_code(PipelineExitCode::SubscriberError);
}
std::optional<zmq::socket_t> body_subscriber{};
if (config.record.mcap.enabled) {
try {
body_subscriber.emplace(zmq_ctx, zmq::socket_type::sub);
body_subscriber->set(zmq::sockopt::subscribe, "");
body_subscriber->connect(input_endpoints->body_zmq_endpoint);
} catch (const zmq::error_t &e) {
spdlog::error("pipeline body subscribe failed on '{}': {}", input_endpoints->body_zmq_endpoint, e.what());
return exit_code(PipelineExitCode::SubscriberError);
}
}
std::optional<protocol::UdpRtpPublisher> rtp_publisher{};
std::optional<protocol::RtmpOutput> rtmp_output{};
std::optional<record::McapRecordSink> mcap_sink{};
cvmmap::NatsControlClient nats_client(
input_endpoints->nats_target_key,
config.input.nats_url);
std::mutex nats_event_mutex{};
std::deque<std::vector<std::uint8_t>> pending_body_packets{};
std::deque<int32_t> pending_status_codes{};
nats_client.SetModuleStatusCallback([&nats_event_mutex, &pending_status_codes](int32_t status_code) {
std::lock_guard lock(nats_event_mutex);
pending_status_codes.push_back(status_code);
});
if (config.record.mcap.enabled) {
nats_client.SetBodyTrackingRawCallback(
[&nats_event_mutex, &pending_body_packets](std::span<const std::uint8_t> bytes) {
std::lock_guard lock(nats_event_mutex);
pending_body_packets.emplace_back(bytes.begin(), bytes.end());
});
}
if (!nats_client.Start()) {
spdlog::error("pipeline NATS subscribe failed on '{}'", config.input.nats_url);
return exit_code(PipelineExitCode::SubscriberError);
}
if (config.outputs.rtp.enabled) {
auto created = protocol::UdpRtpPublisher::create(config);
@@ -476,35 +490,94 @@ int run_pipeline(const RuntimeConfig &config) {
return {};
};
const auto idle_timeout = std::chrono::milliseconds(config.latency.ingest_idle_timeout_ms);
auto last_event = std::chrono::steady_clock::now();
const auto idle_timeout = std::chrono::milliseconds(config.latency.ingest_idle_timeout_ms);
auto last_event = std::chrono::steady_clock::now();
while (true) {
auto poll = (*backend)->poll();
if (!poll) {
const auto reason = format_error(poll.error());
restart_backend(reason, active_info);
}
while (true) {
std::deque<std::vector<std::uint8_t>> body_packets;
std::deque<int32_t> status_codes;
{
std::lock_guard lock(nats_event_mutex);
body_packets.swap(pending_body_packets);
status_codes.swap(pending_status_codes);
}
const bool had_nats_events = !body_packets.empty() || !status_codes.empty();
if (had_nats_events) {
last_event = std::chrono::steady_clock::now();
}
for (const auto status_code : status_codes) {
stats.status_messages += 1;
switch (static_cast<cvmmap::ModuleStatus>(status_code)) {
case cvmmap::ModuleStatus::Online:
spdlog::info("pipeline status event status=online");
producer_offline = false;
break;
case cvmmap::ModuleStatus::Offline:
spdlog::info("pipeline status event status=offline");
producer_offline = true;
break;
case cvmmap::ModuleStatus::StreamReset:
spdlog::info("pipeline status event status=stream_reset");
stats.resets += 1;
(*backend)->shutdown();
started = false;
restart_pending = false;
restart_target_info.reset();
active_info.reset();
rtmp_output.reset();
break;
}
}
for (const auto &body_bytes_vec : body_packets) {
if (!mcap_sink) {
continue;
}
std::array<zmq::pollitem_t, 2> poll_items{{
{subscriber.handle(), 0, ZMQ_POLLIN, 0},
{body_subscriber ? body_subscriber->handle() : nullptr, 0, ZMQ_POLLIN, 0},
}};
try {
zmq::poll(poll_items, std::chrono::milliseconds{20});
} catch (const zmq::error_t &e) {
spdlog::error("pipeline poll failed: {}", e.what());
return exit_code(PipelineExitCode::SubscriberError);
}
const auto body_bytes = std::span<const std::uint8_t>(
body_bytes_vec.data(),
body_bytes_vec.size());
if (body_bytes.empty()) {
continue;
}
const bool frame_socket_ready = (poll_items[0].revents & ZMQ_POLLIN) != 0;
const bool body_socket_ready =
body_subscriber.has_value() &&
(poll_items[1].revents & ZMQ_POLLIN) != 0;
if (!frame_socket_ready && !body_socket_ready) {
const auto now = std::chrono::steady_clock::now();
if (restart_pending && restart_target_info) {
auto start_result = attempt_backend_start(*restart_target_info);
auto parsed_body = cvmmap::parse_body_tracking_message(body_bytes);
if (!parsed_body) {
spdlog::warn("pipeline body packet parse error: {}", parsed_body.error());
continue;
}
auto write_body = mcap_sink->write_body_tracking_message(record::RawBodyTrackingMessageView{
.timestamp_ns = body_tracking_timestamp_ns(*parsed_body),
.bytes = body_bytes,
});
if (!write_body) {
const auto reason = "pipeline body MCAP write failed: " + write_body.error();
restart_backend(reason, active_info);
break;
}
}
auto poll = (*backend)->poll();
if (!poll) {
const auto reason = format_error(poll.error());
restart_backend(reason, active_info);
}
std::array<zmq::pollitem_t, 1> poll_items{{
{subscriber.handle(), 0, ZMQ_POLLIN, 0},
}};
try {
zmq::poll(poll_items, std::chrono::milliseconds{20});
} catch (const zmq::error_t &e) {
spdlog::error("pipeline poll failed: {}", e.what());
return exit_code(PipelineExitCode::SubscriberError);
}
const bool frame_socket_ready = (poll_items[0].revents & ZMQ_POLLIN) != 0;
if (!frame_socket_ready) {
const auto now = std::chrono::steady_clock::now();
if (restart_pending && restart_target_info) {
auto start_result = attempt_backend_start(*restart_target_info);
if (!start_result) {
spdlog::error("pipeline backend restart failed: {}", format_error(start_result.error()));
return exit_code(PipelineExitCode::RuntimeError);
@@ -532,49 +605,8 @@ int run_pipeline(const RuntimeConfig &config) {
continue;
}
if (body_socket_ready && body_subscriber) {
while (true) {
zmq::message_t body_message;
const auto recv_body = body_subscriber->recv(body_message, zmq::recv_flags::dontwait);
if (!recv_body) {
break;
}
last_event = std::chrono::steady_clock::now();
auto body_bytes = std::span<const std::uint8_t>(
static_cast<const std::uint8_t *>(body_message.data()),
body_message.size());
if (body_bytes.empty()) {
continue;
}
if (!mcap_sink) {
continue;
}
auto parsed_body = cvmmap::parse_body_tracking_message(body_bytes);
if (!parsed_body) {
spdlog::warn("pipeline body packet parse error: {}", parsed_body.error());
continue;
}
auto write_body = mcap_sink->write_body_tracking_message(record::RawBodyTrackingMessageView{
.timestamp_ns = body_tracking_timestamp_ns(*parsed_body),
.bytes = body_bytes,
});
if (!write_body) {
const auto reason = "pipeline body MCAP write failed: " + write_body.error();
restart_backend(reason, active_info);
break;
}
}
}
if (!frame_socket_ready) {
continue;
}
zmq::message_t message;
const auto recv_result = subscriber.recv(message, zmq::recv_flags::dontwait);
zmq::message_t message;
const auto recv_result = subscriber.recv(message, zmq::recv_flags::dontwait);
if (!recv_result) {
continue;
}
@@ -685,36 +717,11 @@ int run_pipeline(const RuntimeConfig &config) {
continue;
}
if (bytes[0] == ipc::kModuleStatusMagic) {
stats.status_messages += 1;
auto status = ipc::parse_module_status_message(bytes);
if (!status) {
spdlog::warn("pipeline status parse error: {}", ipc::to_string(status.error()));
continue;
}
spdlog::info("pipeline status event label='{}' status={}", status->label(), status_to_string(status->module_status));
if (status->module_status == ipc::ModuleStatus::Online) {
producer_offline = false;
}
if (status->module_status == ipc::ModuleStatus::Offline) {
producer_offline = true;
}
if (status->module_status == ipc::ModuleStatus::StreamReset) {
stats.resets += 1;
(*backend)->shutdown();
started = false;
restart_pending = false;
restart_target_info.reset();
active_info.reset();
rtmp_output.reset();
}
continue;
}
spdlog::warn("pipeline unknown message type: magic=0x{:02x} size={}", bytes[0], bytes.size());
}
nats_client.Stop();
if (started) {
auto drain = drain_encoder(
config,