Files
cvmmap-streamer/src/protocol/rtp_publisher.cpp
T

503 lines
16 KiB
C++

#include "cvmmap_streamer/protocol/rtp_publisher.hpp"
#include <algorithm>
#include <cerrno>
#include <chrono>
#include <cstddef>
#include <cstdint>
#include <cstring>
#include <filesystem>
#include <fstream>
#include <optional>
#include <span>
#include <string>
#include <utility>
#include <vector>
#include <arpa/inet.h>
#include <fcntl.h>
#include <netdb.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <unistd.h>
#include <spdlog/spdlog.h>
namespace cvmmap_streamer {
}
namespace cvmmap_streamer::protocol {
namespace {
constexpr std::size_t kRtpHeaderBytes = 12;
constexpr std::size_t kRtpPayloadBytesMax = 1200;
constexpr std::uint32_t kRtpVideoClockRate = 90'000;
constexpr std::uint64_t kNanosPerSecond = 1'000'000'000ull;
constexpr std::uint64_t kErrorLogFirstPackets = 8;
constexpr std::uint64_t kErrorLogEveryNPackets = 120;
[[nodiscard]]
std::uint32_t compute_ssrc(std::string_view host, std::uint16_t port, std::uint8_t payload_type, CodecType codec) {
std::uint32_t hash = 2166136261u;
const auto mix_byte = [&](std::uint8_t b) {
hash ^= static_cast<std::uint32_t>(b);
hash *= 16777619u;
};
for (const auto ch : host) {
mix_byte(static_cast<std::uint8_t>(ch));
}
mix_byte(static_cast<std::uint8_t>(port >> 8));
mix_byte(static_cast<std::uint8_t>(port & 0xffu));
mix_byte(payload_type);
mix_byte(codec == CodecType::H265 ? 0x65u : 0x64u);
if (hash == 0u) {
hash = 1u;
}
return hash;
}
[[nodiscard]]
std::uint16_t compute_initial_sequence() {
const auto now = std::chrono::steady_clock::now().time_since_epoch();
return static_cast<std::uint16_t>(std::chrono::duration_cast<std::chrono::nanoseconds>(now).count() & 0xffffu);
}
[[nodiscard]]
std::uint32_t to_rtp_timestamp(std::uint64_t pts_ns) {
const auto ticks = (pts_ns * kRtpVideoClockRate) / kNanosPerSecond;
return static_cast<std::uint32_t>(ticks & 0xffffffffu);
}
[[nodiscard]]
std::string rtp_encoding_name(CodecType codec) {
return codec == CodecType::H265 ? "H265" : "H264";
}
[[nodiscard]]
std::string rtp_fmtp_line(CodecType codec, std::uint8_t payload_type) {
if (codec == CodecType::H265) {
return "a=fmtp:" + std::to_string(payload_type) + " sprop-max-don-diff=0";
}
return "a=fmtp:" + std::to_string(payload_type) + " packetization-mode=1;profile-level-id=42e01f";
}
[[nodiscard]]
std::optional<std::pair<std::size_t, std::size_t>> next_start_code(std::span<const std::uint8_t> bytes, std::size_t offset) {
for (std::size_t i = offset; i + 3 <= bytes.size(); ++i) {
if (bytes[i] == 0 && bytes[i + 1] == 0 && bytes[i + 2] == 1) {
return std::pair{i, static_cast<std::size_t>(3)};
}
if (i + 4 <= bytes.size() && bytes[i] == 0 && bytes[i + 1] == 0 && bytes[i + 2] == 0 && bytes[i + 3] == 1) {
return std::pair{i, static_cast<std::size_t>(4)};
}
}
return std::nullopt;
}
[[nodiscard]]
std::vector<std::span<const std::uint8_t>> split_annexb_nalus(std::span<const std::uint8_t> access_unit) {
std::vector<std::span<const std::uint8_t>> nalus{};
auto first_sc = next_start_code(access_unit, 0);
if (!first_sc) {
if (!access_unit.empty()) {
nalus.push_back(access_unit);
}
return nalus;
}
std::size_t cursor = first_sc->first;
while (true) {
auto current_sc = next_start_code(access_unit, cursor);
if (!current_sc) {
break;
}
const std::size_t payload_begin = current_sc->first + current_sc->second;
auto next_sc = next_start_code(access_unit, payload_begin);
const std::size_t payload_end = next_sc ? next_sc->first : access_unit.size();
if (payload_begin < payload_end) {
nalus.push_back(access_unit.subspan(payload_begin, payload_end - payload_begin));
}
if (!next_sc) {
break;
}
cursor = next_sc->first;
}
if (nalus.empty() && !access_unit.empty()) {
nalus.push_back(access_unit);
}
return nalus;
}
}
UdpRtpPublisher::~UdpRtpPublisher() {
if (socket_fd_ >= 0) {
close(socket_fd_);
socket_fd_ = -1;
}
}
UdpRtpPublisher::UdpRtpPublisher(UdpRtpPublisher &&other) noexcept {
socket_fd_ = std::exchange(other.socket_fd_, -1);
destination_host_ = std::move(other.destination_host_);
destination_ip_ = std::move(other.destination_ip_);
destination_port_ = std::exchange(other.destination_port_, 0);
payload_type_ = std::exchange(other.payload_type_, 96);
codec_ = std::exchange(other.codec_, CodecType::H264);
sequence_ = std::exchange(other.sequence_, 0);
ssrc_ = std::exchange(other.ssrc_, 0);
sdp_path_ = std::move(other.sdp_path_);
stats_ = other.stats_;
endpoint_addr_ = other.endpoint_addr_;
endpoint_addr_len_ = std::exchange(other.endpoint_addr_len_, 0);
}
UdpRtpPublisher &UdpRtpPublisher::operator=(UdpRtpPublisher &&other) noexcept {
if (this == &other) {
return *this;
}
if (socket_fd_ >= 0) {
close(socket_fd_);
}
socket_fd_ = std::exchange(other.socket_fd_, -1);
destination_host_ = std::move(other.destination_host_);
destination_ip_ = std::move(other.destination_ip_);
destination_port_ = std::exchange(other.destination_port_, 0);
payload_type_ = std::exchange(other.payload_type_, 96);
codec_ = std::exchange(other.codec_, CodecType::H264);
sequence_ = std::exchange(other.sequence_, 0);
ssrc_ = std::exchange(other.ssrc_, 0);
sdp_path_ = std::move(other.sdp_path_);
stats_ = other.stats_;
endpoint_addr_ = other.endpoint_addr_;
endpoint_addr_len_ = std::exchange(other.endpoint_addr_len_, 0);
return *this;
}
std::expected<UdpRtpPublisher, std::string> UdpRtpPublisher::create(const RuntimeConfig &config) {
if (!config.outputs.rtp.enabled) {
return std::unexpected("invalid RTP publisher init: RTP output disabled");
}
if (!config.outputs.rtp.host || !config.outputs.rtp.port) {
return std::unexpected("invalid RTP publisher init: host/port not configured");
}
UdpRtpPublisher publisher{};
publisher.destination_host_ = *config.outputs.rtp.host;
publisher.destination_port_ = *config.outputs.rtp.port;
publisher.payload_type_ = config.outputs.rtp.payload_type;
publisher.codec_ = config.encoder.codec;
publisher.sequence_ = compute_initial_sequence();
publisher.ssrc_ = compute_ssrc(
publisher.destination_host_,
publisher.destination_port_,
publisher.payload_type_,
publisher.codec_);
addrinfo hints{};
hints.ai_family = AF_INET;
hints.ai_socktype = SOCK_DGRAM;
addrinfo *result = nullptr;
const auto port = std::to_string(publisher.destination_port_);
const int gai = getaddrinfo(
publisher.destination_host_.c_str(),
port.c_str(),
&hints,
&result);
if (gai != 0) {
return std::unexpected("RTP getaddrinfo failed for host '" + publisher.destination_host_ + "': " + std::string(gai_strerror(gai)));
}
for (auto *it = result; it != nullptr; it = it->ai_next) {
if (it->ai_addrlen > sizeof(sockaddr_storage)) {
continue;
}
std::memcpy(&publisher.endpoint_addr_, it->ai_addr, it->ai_addrlen);
publisher.endpoint_addr_len_ = static_cast<socklen_t>(it->ai_addrlen);
char ip_text[INET6_ADDRSTRLEN]{};
if (getnameinfo(
it->ai_addr,
it->ai_addrlen,
ip_text,
sizeof(ip_text),
nullptr,
0,
NI_NUMERICHOST) == 0) {
publisher.destination_ip_ = ip_text;
}
break;
}
freeaddrinfo(result);
if (publisher.endpoint_addr_len_ == 0) {
return std::unexpected("RTP endpoint resolution failed for host '" + publisher.destination_host_ + "'");
}
publisher.socket_fd_ = socket(AF_INET, SOCK_DGRAM, 0);
if (publisher.socket_fd_ < 0) {
return std::unexpected("RTP socket create failed: " + std::string(std::strerror(errno)));
}
const int current_flags = fcntl(publisher.socket_fd_, F_GETFL, 0);
if (current_flags < 0 || fcntl(publisher.socket_fd_, F_SETFL, current_flags | O_NONBLOCK) < 0) {
return std::unexpected("RTP socket non-blocking setup failed: " + std::string(std::strerror(errno)));
}
const std::string codec_name = config.encoder.codec == CodecType::H265 ? "h265" : "h264";
if (config.outputs.rtp.sdp_path && !config.outputs.rtp.sdp_path->empty()) {
publisher.sdp_path_ = *config.outputs.rtp.sdp_path;
} else {
publisher.sdp_path_ =
"/tmp/cvmmap_streamer_" +
codec_name +
"_" +
std::to_string(publisher.destination_port_) +
".sdp";
}
std::filesystem::path sdp_path{publisher.sdp_path_};
if (sdp_path.has_parent_path() && !sdp_path.parent_path().empty()) {
std::error_code ec;
std::filesystem::create_directories(sdp_path.parent_path(), ec);
if (ec) {
return std::unexpected("RTP SDP directory create failed: " + ec.message());
}
}
std::ofstream sdp(publisher.sdp_path_, std::ios::trunc);
if (!sdp.is_open()) {
return std::unexpected("RTP SDP open failed: " + publisher.sdp_path_);
}
const auto endpoint_ip = publisher.destination_ip_.empty() ? publisher.destination_host_ : publisher.destination_ip_;
sdp << "v=0\n";
sdp << "o=- 0 0 IN IP4 " << endpoint_ip << "\n";
sdp << "s=cvmmap-streamer\n";
sdp << "c=IN IP4 " << endpoint_ip << "\n";
sdp << "t=0 0\n";
sdp << "m=video " << publisher.destination_port_ << " RTP/AVP " << static_cast<unsigned>(publisher.payload_type_) << "\n";
sdp << "a=rtpmap:" << static_cast<unsigned>(publisher.payload_type_) << " " << rtp_encoding_name(publisher.codec_) << "/" << kRtpVideoClockRate << "\n";
sdp << rtp_fmtp_line(publisher.codec_, publisher.payload_type_) << "\n";
sdp << "a=sendonly\n";
sdp << "a=control:streamid=0\n";
if (!sdp.good()) {
return std::unexpected("RTP SDP write failed: " + publisher.sdp_path_);
}
spdlog::info(
"RTP_SDP_WRITTEN codec={} payload_type={} destination={}:{} path={}",
to_string(publisher.codec_),
static_cast<unsigned>(publisher.payload_type_),
endpoint_ip,
publisher.destination_port_,
publisher.sdp_path_);
return publisher;
}
void UdpRtpPublisher::publish_access_unit(std::span<const std::uint8_t> access_unit, std::uint64_t pts_ns) {
stats_.access_units += 1;
stats_.access_unit_bytes += access_unit.size();
if (socket_fd_ < 0 || endpoint_addr_len_ == 0 || access_unit.empty()) {
if (!access_unit.empty()) {
stats_.packets_dropped += 1;
}
return;
}
const auto send_packet = [&](std::span<const std::uint8_t> payload, bool marker) {
std::vector<std::uint8_t> packet{};
packet.resize(kRtpHeaderBytes + payload.size());
packet[0] = 0x80;
packet[1] = static_cast<std::uint8_t>((marker ? 0x80u : 0x00u) | (payload_type_ & 0x7fu));
const auto seq = sequence_++;
packet[2] = static_cast<std::uint8_t>((seq >> 8) & 0xffu);
packet[3] = static_cast<std::uint8_t>(seq & 0xffu);
const auto timestamp = to_rtp_timestamp(pts_ns);
packet[4] = static_cast<std::uint8_t>((timestamp >> 24) & 0xffu);
packet[5] = static_cast<std::uint8_t>((timestamp >> 16) & 0xffu);
packet[6] = static_cast<std::uint8_t>((timestamp >> 8) & 0xffu);
packet[7] = static_cast<std::uint8_t>(timestamp & 0xffu);
packet[8] = static_cast<std::uint8_t>((ssrc_ >> 24) & 0xffu);
packet[9] = static_cast<std::uint8_t>((ssrc_ >> 16) & 0xffu);
packet[10] = static_cast<std::uint8_t>((ssrc_ >> 8) & 0xffu);
packet[11] = static_cast<std::uint8_t>(ssrc_ & 0xffu);
if (!payload.empty()) {
std::memcpy(packet.data() + kRtpHeaderBytes, payload.data(), payload.size());
}
const auto sent = sendto(
socket_fd_,
reinterpret_cast<const void *>(packet.data()),
packet.size(),
MSG_DONTWAIT,
reinterpret_cast<const sockaddr *>(&endpoint_addr_),
endpoint_addr_len_);
if (sent < 0) {
stats_.send_errors += 1;
stats_.packets_dropped += 1;
if (stats_.send_errors <= kErrorLogFirstPackets || (stats_.send_errors % kErrorLogEveryNPackets) == 0) {
spdlog::warn(
"RTP_SEND_ERROR codec={} payload_type={} destination={} errno={} detail='{}' send_errors={}",
to_string(codec_),
static_cast<unsigned>(payload_type_),
destination(),
errno,
std::strerror(errno),
stats_.send_errors);
}
return false;
}
if (static_cast<std::size_t>(sent) != packet.size()) {
stats_.packets_dropped += 1;
stats_.send_errors += 1;
return false;
}
stats_.packets_sent += 1;
stats_.bytes_sent += static_cast<std::uint64_t>(sent);
return true;
};
auto nalus = split_annexb_nalus(access_unit);
if (nalus.empty()) {
nalus.push_back(access_unit);
}
for (std::size_t nal_index = 0; nal_index < nalus.size(); ++nal_index) {
const auto nal = nalus[nal_index];
const bool is_last_nal = (nal_index + 1) == nalus.size();
const bool use_single_ru = nal.size() <= kRtpPayloadBytesMax;
if (use_single_ru) {
(void)send_packet(nal, is_last_nal);
continue;
}
if (codec_ == CodecType::H264) {
if (nal.size() < 2) {
stats_.packets_dropped += 1;
continue;
}
const std::uint8_t nal_hdr = nal[0];
const std::uint8_t fu_indicator = static_cast<std::uint8_t>((nal_hdr & 0xe0u) | 28u);
const std::uint8_t nal_type = static_cast<std::uint8_t>(nal_hdr & 0x1fu);
auto remaining = nal.subspan(1);
bool first = true;
while (!remaining.empty()) {
const auto chunk_size = std::min<std::size_t>(remaining.size(), kRtpPayloadBytesMax - 2);
const bool last_chunk = chunk_size == remaining.size();
std::vector<std::uint8_t> fu_payload{};
fu_payload.reserve(2 + chunk_size);
fu_payload.push_back(fu_indicator);
std::uint8_t fu_header = nal_type;
if (first) {
fu_header = static_cast<std::uint8_t>(fu_header | 0x80u);
}
if (last_chunk) {
fu_header = static_cast<std::uint8_t>(fu_header | 0x40u);
}
fu_payload.push_back(fu_header);
fu_payload.insert(fu_payload.end(), remaining.begin(), remaining.begin() + static_cast<std::ptrdiff_t>(chunk_size));
const bool marker = is_last_nal && last_chunk;
(void)send_packet(std::span<const std::uint8_t>(fu_payload.data(), fu_payload.size()), marker);
remaining = remaining.subspan(chunk_size);
first = false;
}
continue;
}
if (nal.size() < 3) {
stats_.packets_dropped += 1;
continue;
}
const std::uint8_t hdr0 = nal[0];
const std::uint8_t hdr1 = nal[1];
const std::uint8_t nal_type = static_cast<std::uint8_t>((hdr0 >> 1) & 0x3fu);
const std::uint8_t fu_indicator0 = static_cast<std::uint8_t>((hdr0 & 0x81u) | (49u << 1));
const std::uint8_t fu_indicator1 = hdr1;
auto remaining = nal.subspan(2);
bool first = true;
while (!remaining.empty()) {
const auto chunk_size = std::min<std::size_t>(remaining.size(), kRtpPayloadBytesMax - 3);
const bool last_chunk = chunk_size == remaining.size();
std::vector<std::uint8_t> fu_payload{};
fu_payload.reserve(3 + chunk_size);
fu_payload.push_back(fu_indicator0);
fu_payload.push_back(fu_indicator1);
std::uint8_t fu_header = nal_type;
if (first) {
fu_header = static_cast<std::uint8_t>(fu_header | 0x80u);
}
if (last_chunk) {
fu_header = static_cast<std::uint8_t>(fu_header | 0x40u);
}
fu_payload.push_back(fu_header);
fu_payload.insert(fu_payload.end(), remaining.begin(), remaining.begin() + static_cast<std::ptrdiff_t>(chunk_size));
const bool marker = is_last_nal && last_chunk;
(void)send_packet(std::span<const std::uint8_t>(fu_payload.data(), fu_payload.size()), marker);
remaining = remaining.subspan(chunk_size);
first = false;
}
}
}
const RtpPublisherStats &UdpRtpPublisher::stats() const {
return stats_;
}
std::string_view UdpRtpPublisher::sdp_path() const {
return sdp_path_;
}
std::string_view UdpRtpPublisher::destination() const {
static thread_local std::string destination_text{};
const auto host_or_ip = destination_ip_.empty() ? destination_host_ : destination_ip_;
destination_text = host_or_ip + ":" + std::to_string(destination_port_);
return destination_text;
}
void UdpRtpPublisher::log_metrics() const {
spdlog::info(
"RTP_METRICS codec={} payload_type={} destination={} sdp={} access_units={} access_unit_bytes={} packets_sent={} packets_dropped={} bytes_sent={} send_errors={}",
to_string(codec_),
static_cast<unsigned>(payload_type_),
destination(),
sdp_path_,
stats_.access_units,
stats_.access_unit_bytes,
stats_.packets_sent,
stats_.packets_dropped,
stats_.bytes_sent,
stats_.send_errors);
}
}