feat: add streamer-owned recording control service

Introduce a dedicated streamer-side recording control plane instead of sharing the producer recorder API.

- register streamer-owned recorder endpoints as a NATS micro service
- add explicit MP4 and MCAP recorder control protobufs and subject helpers
- wire recorder lifecycle handling into the pipeline runtime
- add MP4 writer and depth-alignment support files used by the new recording flow
This commit is contained in:
2026-04-12 20:21:33 +08:00
parent 4f016d9cef
commit 213adee887
11 changed files with 2474 additions and 400 deletions
+263
View File
@@ -0,0 +1,263 @@
#include "cvmmap_streamer/protocol/nats_request_reply_server.hpp"
#include "cvmmap_streamer/protocol/streamer_subjects.hpp"
#include <nats.h>
#include <spdlog/spdlog.h>
#include <string_view>
#include <utility>
#include <vector>
namespace cvmmap_streamer::protocol {
namespace {
constexpr std::string_view kStreamerServiceName = "cvmmap_streamer";
constexpr std::string_view kStreamerServiceVersion = "0.1.0";
constexpr std::string_view kStreamerServiceDescription =
"cvmmap-streamer recorder service";
[[nodiscard]]
std::string nats_status_text(const natsStatus status) {
const char *text = natsStatus_GetText(status);
return text != nullptr ? std::string(text) : std::string("unknown NATS error");
}
[[nodiscard]]
std::string micro_error_to_string(microError *error) {
if (error == nullptr) {
return {};
}
char buffer[512];
return std::string(microError_String(error, buffer, sizeof(buffer)));
}
} // namespace
struct NatsRequestReplyServer::Endpoint {
std::string name{};
std::string subject{};
std::function<std::string(std::span<const std::uint8_t>)> handler{};
};
struct NatsRequestReplyServer::Impl {
explicit Impl(NatsRequestReplyServerOptions options_arg)
: options(std::move(options_arg)) {}
NatsRequestReplyServerOptions options{};
natsConnection *connection{nullptr};
microService *service{nullptr};
std::vector<std::unique_ptr<Endpoint>> endpoints{};
std::vector<std::string> metadata_storage{};
bool started{false};
void build_metadata_storage() {
metadata_storage.clear();
metadata_storage.reserve(18);
auto append = [this](std::string key, std::string value) {
metadata_storage.push_back(std::move(key));
metadata_storage.push_back(std::move(value));
};
append("instance_name", options.instance_name);
append("namespace", options.namespace_name);
append("ipc_prefix", options.ipc_prefix);
append("base_name", options.base_name);
append("nats_target_key", options.nats_target_key);
append(
"streamer_subject_prefix",
streamer_subject_prefix(options.nats_target_key));
append("backend", options.backend);
append("recording_formats", options.recording_formats);
}
};
NatsRequestReplyServer::NatsRequestReplyServer(NatsRequestReplyServerOptions options)
: impl_(std::make_unique<Impl>(std::move(options))) {}
NatsRequestReplyServer::~NatsRequestReplyServer() {
stop();
}
void NatsRequestReplyServer::register_raw_endpoint(
std::string endpoint_name,
std::string subject,
std::function<std::string(std::span<const std::uint8_t>)> handler) {
auto endpoint = std::make_unique<Endpoint>();
endpoint->name = std::move(endpoint_name);
endpoint->subject = std::move(subject);
endpoint->handler = std::move(handler);
impl_->endpoints.push_back(std::move(endpoint));
}
cvmmap_streamer::proto::RpcCode NatsRequestReplyServer::to_proto_rpc_code(
const RpcErrorCode code) {
switch (code) {
case RpcErrorCode::InvalidRequest:
return cvmmap_streamer::proto::RPC_CODE_INVALID_REQUEST;
case RpcErrorCode::Unsupported:
return cvmmap_streamer::proto::RPC_CODE_UNSUPPORTED;
case RpcErrorCode::Busy:
return cvmmap_streamer::proto::RPC_CODE_BUSY;
case RpcErrorCode::Internal:
return cvmmap_streamer::proto::RPC_CODE_INTERNAL;
}
return cvmmap_streamer::proto::RPC_CODE_INTERNAL;
}
bool NatsRequestReplyServer::start() {
if (impl_->started) {
return true;
}
if (impl_->endpoints.empty()) {
spdlog::error("streamer service start requested without any endpoints");
return false;
}
natsOptions *options = nullptr;
auto status = natsOptions_Create(&options);
if (status != NATS_OK) {
spdlog::error("failed to create NATS options: {}", nats_status_text(status));
return false;
}
status = natsOptions_SetURL(options, impl_->options.nats_url.c_str());
if (status != NATS_OK) {
spdlog::error(
"failed to set NATS url '{}': {}",
impl_->options.nats_url,
nats_status_text(status));
natsOptions_Destroy(options);
return false;
}
status = natsConnection_Connect(&impl_->connection, options);
natsOptions_Destroy(options);
if (status != NATS_OK) {
spdlog::error(
"failed to connect streamer service to '{}': {}",
impl_->options.nats_url,
nats_status_text(status));
return false;
}
impl_->build_metadata_storage();
std::vector<const char *> metadata_list{};
metadata_list.reserve(impl_->metadata_storage.size());
for (const auto &entry : impl_->metadata_storage) {
metadata_list.push_back(entry.c_str());
}
const auto &default_endpoint_ref = impl_->endpoints.front();
microEndpointConfig default_endpoint{};
default_endpoint.Name = default_endpoint_ref->name.c_str();
default_endpoint.Subject = default_endpoint_ref->subject.c_str();
default_endpoint.Handler =
[](microRequest *request) -> microError * {
auto *endpoint = static_cast<Endpoint *>(microRequest_GetEndpointState(request));
if (endpoint == nullptr) {
return micro_Errorf("streamer endpoint state is missing");
}
auto *wire_message = microRequest_GetMsg(request);
std::span<const std::uint8_t> payload{};
if (wire_message != nullptr && natsMsg_GetDataLength(wire_message) > 0) {
payload = std::span<const std::uint8_t>(
reinterpret_cast<const std::uint8_t *>(natsMsg_GetData(wire_message)),
static_cast<std::size_t>(natsMsg_GetDataLength(wire_message)));
}
const auto response_payload = endpoint->handler(payload);
return microRequest_Respond(
request,
response_payload.data(),
response_payload.size());
};
default_endpoint.State = default_endpoint_ref.get();
microServiceConfig service_config{};
service_config.Name = kStreamerServiceName.data();
service_config.Version = kStreamerServiceVersion.data();
service_config.Description = kStreamerServiceDescription.data();
service_config.Metadata = natsMetadata{
.List = metadata_list.data(),
.Count = static_cast<int>(metadata_list.size() / 2),
};
service_config.Endpoint = &default_endpoint;
service_config.State = impl_.get();
service_config.ErrHandler =
[](microService *, microEndpoint *, natsStatus internal_status) {
spdlog::error(
"streamer micro service internal error: {}",
natsStatus_GetText(internal_status));
};
service_config.DoneHandler =
[](microService *service) {
auto *self = static_cast<Impl *>(microService_GetState(service));
if (self == nullptr) {
return;
}
spdlog::info(
"streamer micro service stopped for target '{}'",
self->options.nats_target_key);
};
if (auto *error = micro_AddService(&impl_->service, impl_->connection, &service_config)) {
spdlog::error(
"failed to start streamer micro service '{}': {}",
kStreamerServiceName,
micro_error_to_string(error));
microError_Destroy(error);
natsConnection_Close(impl_->connection);
natsConnection_Destroy(impl_->connection);
impl_->connection = nullptr;
return false;
}
for (std::size_t index = 1; index < impl_->endpoints.size(); ++index) {
const auto &endpoint_ref = impl_->endpoints[index];
microEndpointConfig endpoint_config{};
endpoint_config.Name = endpoint_ref->name.c_str();
endpoint_config.Subject = endpoint_ref->subject.c_str();
endpoint_config.Handler = default_endpoint.Handler;
endpoint_config.State = endpoint_ref.get();
if (auto *error = microService_AddEndpoint(impl_->service, &endpoint_config)) {
spdlog::error(
"failed to add streamer endpoint '{}' on '{}': {}",
endpoint_ref->name,
endpoint_ref->subject,
micro_error_to_string(error));
microError_Destroy(error);
stop();
return false;
}
}
impl_->started = true;
spdlog::info(
"streamer micro service '{}' started for target '{}' with {} endpoints",
kStreamerServiceName,
impl_->options.nats_target_key,
impl_->endpoints.size());
return true;
}
void NatsRequestReplyServer::stop() {
if (impl_->service != nullptr) {
if (auto *error = microService_Destroy(impl_->service)) {
spdlog::error(
"failed to stop streamer micro service '{}': {}",
kStreamerServiceName,
micro_error_to_string(error));
microError_Destroy(error);
}
impl_->service = nullptr;
}
if (impl_->connection != nullptr) {
natsConnection_Close(impl_->connection);
natsConnection_Destroy(impl_->connection);
impl_->connection = nullptr;
}
impl_->started = false;
}
} // namespace cvmmap_streamer::protocol