refactor(streamer): adopt proxy backends and typed statuses

This commit is contained in:
2026-03-10 23:29:59 +08:00
parent 6af97ee5d3
commit 0ad6887095
22 changed files with 1686 additions and 275 deletions
+78 -57
View File
@@ -3,7 +3,7 @@
#include "cvmmap_streamer/encode/encoder_backend.hpp"
#include "cvmmap_streamer/ipc/contracts.hpp"
#include "cvmmap_streamer/metrics/latency_tracker.hpp"
#include "cvmmap_streamer/protocol/rtmp_publisher.hpp"
#include "cvmmap_streamer/protocol/rtmp_output.hpp"
#include "cvmmap_streamer/protocol/rtp_publisher.hpp"
#include "cvmmap_streamer/record/mcap_record_sink.hpp"
@@ -42,6 +42,20 @@ namespace {
namespace ipc = cvmmap_streamer::ipc;
enum class PipelineExitCode : int {
Success = 0,
InputError = 2,
SharedMemoryError = 3,
SubscriberError = 4,
InitializationError = 5,
RuntimeError = 6,
};
[[nodiscard]]
constexpr int exit_code(PipelineExitCode code) {
return static_cast<int>(code);
}
struct ResolvedInputEndpoints {
std::string shm_name;
std::string zmq_endpoint;
@@ -179,12 +193,12 @@ bool frame_info_equal(const ipc::FrameInfo &lhs, const ipc::FrameInfo &rhs) {
}
[[nodiscard]]
std::expected<void, std::string> publish_access_units(
Status publish_access_units(
const RuntimeConfig &config,
std::vector<encode::EncodedAccessUnit> &&access_units,
PipelineStats &stats,
protocol::UdpRtpPublisher *rtp_publisher,
protocol::RtmpPublisher *rtmp_publisher,
protocol::RtmpOutput *rtmp_output,
record::McapRecordSink *mcap_sink,
metrics::IngestEmitLatencyTracker &latency_tracker) {
for (auto &access_unit : access_units) {
@@ -200,8 +214,8 @@ std::expected<void, std::string> publish_access_units(
if (rtp_publisher != nullptr) {
rtp_publisher->publish_access_unit(access_unit.annexb_bytes, access_unit.stream_pts_ns);
}
if (rtmp_publisher != nullptr) {
auto publish = rtmp_publisher->publish_access_unit(access_unit.annexb_bytes, access_unit.stream_pts_ns);
if (rtmp_output != nullptr) {
auto publish = (*rtmp_output)->publish_access_unit(access_unit);
if (!publish) {
return std::unexpected(publish.error());
}
@@ -209,7 +223,7 @@ std::expected<void, std::string> publish_access_units(
if (mcap_sink != nullptr) {
auto write = mcap_sink->write_access_unit(access_unit);
if (!write) {
return std::unexpected(write.error());
return unexpected_error(ERR_SERIALIZATION, write.error());
}
}
@@ -229,16 +243,16 @@ std::expected<void, std::string> publish_access_units(
}
[[nodiscard]]
std::expected<void, std::string> drain_encoder(
Status drain_encoder(
const RuntimeConfig &config,
encode::EncoderBackend &backend,
bool flushing,
PipelineStats &stats,
protocol::UdpRtpPublisher *rtp_publisher,
protocol::RtmpPublisher *rtmp_publisher,
protocol::RtmpOutput *rtmp_output,
record::McapRecordSink *mcap_sink,
metrics::IngestEmitLatencyTracker &latency_tracker) {
auto drained = flushing ? backend.flush() : backend.drain();
auto drained = flushing ? backend->flush() : backend->drain();
if (!drained) {
return std::unexpected(drained.error());
}
@@ -247,7 +261,7 @@ std::expected<void, std::string> drain_encoder(
std::move(*drained),
stats,
rtp_publisher,
rtmp_publisher,
rtmp_output,
mcap_sink,
latency_tracker);
}
@@ -258,35 +272,35 @@ int run_pipeline(const RuntimeConfig &config) {
auto input_endpoints = resolve_input_endpoints(config);
if (!input_endpoints) {
spdlog::error("{}", input_endpoints.error());
return 2;
return exit_code(PipelineExitCode::InputError);
}
auto source = make_frame_source(config);
if (!source) {
spdlog::error("pipeline input source selection failed: {}", source.error());
return 2;
return exit_code(PipelineExitCode::InputError);
}
auto source_prepare = (*source)->prepare_runtime();
if (!source_prepare) {
spdlog::error("pipeline source backend '{}' setup failed: {}", (*source)->backend_name(), source_prepare.error());
return 2;
return exit_code(PipelineExitCode::InputError);
}
auto backend = encode::make_encoder_backend(config);
if (!backend) {
spdlog::error("pipeline encoder backend selection failed: {}", backend.error());
return 5;
spdlog::error("pipeline encoder backend selection failed: {}", format_error(backend.error()));
return exit_code(PipelineExitCode::InitializationError);
}
auto shm = SharedMemoryView::open_readonly(input_endpoints->shm_name);
if (!shm) {
spdlog::error("pipeline open shared memory failed: {}", shm.error());
return 3;
return exit_code(PipelineExitCode::SharedMemoryError);
}
if (shm->bytes <= ipc::kShmPayloadOffset) {
spdlog::error("pipeline invalid shared memory size: {}", shm->bytes);
return 3;
return exit_code(PipelineExitCode::SharedMemoryError);
}
std::vector<std::uint8_t> snapshot_buffer(
@@ -301,36 +315,27 @@ int run_pipeline(const RuntimeConfig &config) {
subscriber.connect(input_endpoints->zmq_endpoint);
} catch (const zmq::error_t &e) {
spdlog::error("pipeline subscribe failed on '{}': {}", input_endpoints->zmq_endpoint, e.what());
return 4;
return exit_code(PipelineExitCode::SubscriberError);
}
std::optional<protocol::UdpRtpPublisher> rtp_publisher{};
std::optional<protocol::RtmpPublisher> rtmp_publisher{};
std::optional<protocol::RtmpOutput> rtmp_output{};
std::optional<record::McapRecordSink> mcap_sink{};
if (config.outputs.rtp.enabled) {
auto created = protocol::UdpRtpPublisher::create(config);
if (!created) {
spdlog::error("pipeline RTP publisher init failed: {}", created.error());
return 5;
return exit_code(PipelineExitCode::InitializationError);
}
rtp_publisher.emplace(std::move(*created));
}
if (config.outputs.rtmp.enabled) {
auto created = protocol::RtmpPublisher::create(config);
if (!created) {
spdlog::error("pipeline RTMP publisher init failed: {}", created.error());
return 5;
}
rtmp_publisher.emplace(std::move(*created));
}
if (config.record.mcap.enabled) {
auto created = record::McapRecordSink::create(config);
if (!created) {
spdlog::error("pipeline MCAP sink init failed: {}", created.error());
return 5;
return exit_code(PipelineExitCode::InitializationError);
}
mcap_sink.emplace(std::move(*created));
}
@@ -356,17 +361,31 @@ int run_pipeline(const RuntimeConfig &config) {
started = false;
restart_pending = true;
restart_target_info = target_info;
if (rtmp_publisher) {
rtmp_publisher->on_stream_reset();
}
rtmp_output.reset();
};
const auto attempt_backend_start = [&](const ipc::FrameInfo &target_info) -> std::expected<void, std::string> {
const auto attempt_backend_start = [&](const ipc::FrameInfo &target_info) -> Status {
(*backend)->shutdown();
rtmp_output.reset();
auto init = (*backend)->init(config, target_info);
if (!init) {
return std::unexpected(init.error());
}
if (config.outputs.rtmp.enabled) {
auto stream_info = (*backend)->stream_info();
if (!stream_info) {
return unexpected_error(
stream_info.error().code,
"pipeline RTMP output stream info unavailable: " + format_error(stream_info.error()));
}
auto created = protocol::make_rtmp_output(config, *stream_info);
if (!created) {
return unexpected_error(
created.error().code,
"pipeline RTMP output init failed: " + format_error(created.error()));
}
rtmp_output.emplace(std::move(*created));
}
started = true;
restart_pending = false;
restart_target_info.reset();
@@ -380,7 +399,8 @@ int run_pipeline(const RuntimeConfig &config) {
while (true) {
auto poll = (*backend)->poll();
if (!poll) {
restart_backend(poll.error(), active_info);
const auto reason = format_error(poll.error());
restart_backend(reason, active_info);
}
zmq::message_t message;
@@ -390,8 +410,8 @@ int run_pipeline(const RuntimeConfig &config) {
if (restart_pending && restart_target_info) {
auto start_result = attempt_backend_start(*restart_target_info);
if (!start_result) {
spdlog::error("pipeline backend restart failed: {}", start_result.error());
return 6;
spdlog::error("pipeline backend restart failed: {}", format_error(start_result.error()));
return exit_code(PipelineExitCode::RuntimeError);
}
}
if (now - last_event >= idle_timeout) {
@@ -401,15 +421,16 @@ int run_pipeline(const RuntimeConfig &config) {
if (!producer_offline && started) {
auto drain = drain_encoder(
config,
**backend,
*backend,
false,
stats,
rtp_publisher ? &*rtp_publisher : nullptr,
rtmp_publisher ? &*rtmp_publisher : nullptr,
rtmp_output ? &*rtmp_output : nullptr,
mcap_sink ? &*mcap_sink : nullptr,
latency_tracker);
if (!drain) {
restart_backend(drain.error(), active_info);
const auto reason = format_error(drain.error());
restart_backend(reason, active_info);
}
}
continue;
@@ -456,8 +477,8 @@ int run_pipeline(const RuntimeConfig &config) {
const auto target_info = restart_target_info.value_or(snapshot->metadata.info);
auto start_result = attempt_backend_start(target_info);
if (!start_result) {
spdlog::error("pipeline backend init failed: {}", start_result.error());
return 5;
spdlog::error("pipeline backend init failed: {}", format_error(start_result.error()));
return exit_code(PipelineExitCode::InitializationError);
}
}
@@ -469,22 +490,24 @@ int run_pipeline(const RuntimeConfig &config) {
.bytes = std::span<const std::uint8_t>(snapshot_buffer.data(), snapshot->bytes_copied),
});
if (!push) {
restart_backend(push.error(), active_info);
const auto reason = format_error(push.error());
restart_backend(reason, active_info);
continue;
}
stats.pushed_frames += 1;
auto drain = drain_encoder(
config,
**backend,
*backend,
false,
stats,
rtp_publisher ? &*rtp_publisher : nullptr,
rtmp_publisher ? &*rtmp_publisher : nullptr,
rtmp_output ? &*rtmp_output : nullptr,
mcap_sink ? &*mcap_sink : nullptr,
latency_tracker);
if (!drain) {
restart_backend(drain.error(), active_info);
const auto reason = format_error(drain.error());
restart_backend(reason, active_info);
continue;
}
@@ -517,9 +540,7 @@ int run_pipeline(const RuntimeConfig &config) {
restart_pending = false;
restart_target_info.reset();
active_info.reset();
if (rtmp_publisher) {
rtmp_publisher->on_stream_reset();
}
rtmp_output.reset();
}
continue;
}
@@ -530,16 +551,16 @@ int run_pipeline(const RuntimeConfig &config) {
if (started) {
auto drain = drain_encoder(
config,
**backend,
*backend,
true,
stats,
rtp_publisher ? &*rtp_publisher : nullptr,
rtmp_publisher ? &*rtmp_publisher : nullptr,
rtmp_output ? &*rtmp_output : nullptr,
mcap_sink ? &*mcap_sink : nullptr,
latency_tracker);
if (!drain) {
spdlog::error("pipeline publish failed during flush: {}", drain.error());
return 6;
if (!drain) {
spdlog::error("pipeline publish failed during flush: {}", format_error(drain.error()));
return exit_code(PipelineExitCode::RuntimeError);
}
}
@@ -591,11 +612,11 @@ int run_pipeline(const RuntimeConfig &config) {
if (rtp_publisher) {
rtp_publisher->log_metrics();
}
if (rtmp_publisher) {
rtmp_publisher->log_metrics();
if (rtmp_output) {
(*rtmp_output)->log_metrics();
}
return 0;
return exit_code(PipelineExitCode::Success);
}
}