Files
RapidPoseTriangulation/scripts/utils_2d_pose.hpp
2025-11-24 18:58:34 +01:00

1162 lines
39 KiB
C++

#pragma once
#include <filesystem>
#include <memory>
#include <string>
#include <vector>
#include <onnxruntime_cxx_api.h>
#include <onnxruntime_c_api.h>
#include <opencv2/opencv.hpp>
#include <tensorrt_provider_options.h>
// =================================================================================================
// =================================================================================================
namespace utils_2d_pose
{
class BaseModel
{
public:
explicit BaseModel(const std::string &model_path, int warmup_iterations);
virtual ~BaseModel() = default;
std::vector<std::vector<std::vector<float>>> call_by_image(
const std::vector<cv::Mat> &images);
protected:
static Ort::Env &get_env()
{
// Static method to create or retrieve a single global Ort::Env
static Ort::Env env(ORT_LOGGING_LEVEL_WARNING, "tensorrt_model_env");
// static Ort::Env env(ORT_LOGGING_LEVEL_VERBOSE, "tensorrt_model_env");
return env;
}
// Initialize ONNX Runtime session with TensorRT provider
void init_onnx_runtime(const std::string &model_path);
// Generate random input data, run forward pass
void warmup(int epoch);
// Actually run the session with given input tensors
std::vector<Ort::Value> call_model(const std::vector<Ort::Value> &inputs);
// Utility to check if a string ends with a given suffix
bool ends_with(const std::string &str, const std::string &suffix)
{
if (str.size() < suffix.size())
return false;
return (str.compare(str.size() - suffix.size(), suffix.size(), suffix) == 0);
}
protected:
Ort::SessionOptions session_options;
std::unique_ptr<Ort::Session> session;
Ort::AllocatorWithDefaultOptions allocator;
std::vector<std::string> input_names;
std::vector<std::vector<int64_t>> input_shapes;
std::vector<ONNXTensorElementDataType> input_types;
std::vector<std::string> output_names;
std::vector<const char *> input_name_ptrs;
std::vector<const char *> output_name_ptrs;
};
// =============================================================================================
BaseModel::BaseModel(const std::string &model_path, int warmup_iterations)
{
if (!std::filesystem::exists(model_path))
{
throw std::runtime_error("File not found: " + model_path);
}
if (!ends_with(model_path, ".onnx"))
{
throw std::runtime_error("Only .onnx models are supported: " + model_path);
}
std::cout << "Loading model: " << model_path << std::endl;
init_onnx_runtime(model_path);
if (warmup_iterations > 0)
{
std::cout << "Running warmup ...\n";
warmup((int)warmup_iterations / 2);
warmup((int)warmup_iterations / 2);
}
}
// =============================================================================================
void BaseModel::init_onnx_runtime(const std::string &model_path)
{
// 0) Create env
Ort::Env &env = get_env();
session_options = Ort::SessionOptions();
session_options.SetGraphOptimizationLevel(GraphOptimizationLevel::ORT_ENABLE_EXTENDED);
// 1) Create TensorRT provider options
OrtTensorRTProviderOptionsV2 *tensorrt_options = nullptr;
Ort::ThrowOnError(Ort::GetApi().CreateTensorRTProviderOptions(&tensorrt_options));
// 2) Configure runtime options
tensorrt_options->trt_max_workspace_size = 2147483647;
tensorrt_options->trt_engine_cache_enable = 1;
tensorrt_options->trt_engine_cache_path = "/RapidPoseTriangulation/data/trt_cache/";
if (model_path.find("_fp16") != std::string::npos)
{
tensorrt_options->trt_fp16_enable = 1;
}
else
{
tensorrt_options->trt_fp16_enable = 0;
}
if (model_path.find("_Bx") != std::string::npos)
{
tensorrt_options->trt_profile_min_shapes = "image_input:1x384x288x3";
tensorrt_options->trt_profile_max_shapes = "image_input:10x384x288x3";
tensorrt_options->trt_profile_opt_shapes = "image_input:3x384x288x3";
}
// 3) Append to session options
Ort::ThrowOnError(
Ort::GetApi().SessionOptionsAppendExecutionProvider_TensorRT_V2(
static_cast<OrtSessionOptions *>(session_options),
tensorrt_options));
// 4) Create the session
session = std::make_unique<Ort::Session>(env, model_path.c_str(), session_options);
// 5) Gather input info
size_t num_inputs = session->GetInputCount();
for (size_t i = 0; i < num_inputs; i++)
{
Ort::AllocatedStringPtr iname = session->GetInputNameAllocated(i, allocator);
input_names.push_back(iname.get());
Ort::TypeInfo type_info = session->GetInputTypeInfo(i);
auto tensor_info = type_info.GetTensorTypeAndShapeInfo();
input_shapes.push_back(tensor_info.GetShape());
input_types.push_back(tensor_info.GetElementType());
}
// Gather output info to avoid "At least one output should be requested."
size_t num_outputs = session->GetOutputCount();
for (size_t i = 0; i < num_outputs; i++)
{
Ort::AllocatedStringPtr oname = session->GetOutputNameAllocated(i, allocator);
output_names.push_back(oname.get());
}
// Build array of input name C-strings
input_name_ptrs.resize(input_names.size());
for (size_t i = 0; i < input_names.size(); i++)
{
input_name_ptrs[i] = input_names[i].c_str();
}
// Build array of output name C-strings
output_name_ptrs.resize(output_names.size());
for (size_t i = 0; i < output_names.size(); i++)
{
output_name_ptrs[i] = output_names[i].c_str();
}
}
// =============================================================================================
void BaseModel::warmup(int epoch)
{
std::vector<int> use_batch_sizes = {4, 10, 5, 1, 9, 3, 7, 2, 8, 4, 6, 1, 3, 7, 2};
std::cout << "Warmup: ";
for (int e = 0; e < epoch; e++)
{
// Extract the 4D shape.
auto shape = input_shapes[0];
int batch_size = static_cast<int>(shape[0]);
int height = static_cast<int>(shape[1]);
int width = static_cast<int>(shape[2]);
int channels = static_cast<int>(shape[3]);
// We'll only handle batch=1 in this example
if (batch_size != 1)
{
batch_size = use_batch_sizes[e % use_batch_sizes.size()];
}
std::vector<cv::Mat> images;
for (int i = 0; i < batch_size; i++)
{
// Create a multi-channel Mat of shape (Height, Width, Channels)
cv::Mat img(height, width, CV_8UC(channels));
if (channels == 1)
{
cv::randu(img, 0, 256);
}
else if (channels == 3)
{
cv::randu(img, cv::Scalar(0, 0, 0), cv::Scalar(256, 256, 256));
}
else
{
throw std::runtime_error("Unsupported number of channels for warmup.");
}
images.push_back(std::move(img));
}
// Call the model
auto outputs = call_by_image(images);
std::cout << "#";
}
std::cout << std::endl;
}
// =============================================================================================
std::vector<std::vector<std::vector<float>>> BaseModel::call_by_image(
const std::vector<cv::Mat> &images)
{
size_t batch_size = images.size();
int height = images[0].rows;
int width = images[0].cols;
int channels = images[0].channels();
// Flatten into single vector
size_t single_img_size = height * width * channels;
size_t total_elems = batch_size * single_img_size;
std::vector<uint8_t> batch_data(total_elems);
for (size_t b = 0; b < batch_size; b++)
{
auto &img = images[b];
if (img.rows != height || img.cols != width || img.channels() != channels)
{
throw std::runtime_error("All images must have the same dimensions!");
}
std::memcpy(batch_data.data() + b * single_img_size, img.data, single_img_size);
}
// Create an onnx tensor
auto mem_info = Ort::MemoryInfo::CreateCpu(OrtArenaAllocator, OrtMemTypeDefault);
std::vector<int64_t> shape = {
static_cast<int64_t>(batch_size),
static_cast<int64_t>(height),
static_cast<int64_t>(width),
static_cast<int64_t>(channels),
};
Ort::Value input_tensor = Ort::Value::CreateTensor<uint8_t>(
mem_info,
batch_data.data(),
total_elems,
shape.data(),
shape.size());
// Call model
std::vector<Ort::Value> input_tensors;
input_tensors.push_back(std::move(input_tensor));
auto outputs = call_model(input_tensors);
// Get pointer to ouput tensor
const float *tensor_data = outputs[0].GetTensorData<float>();
auto data_info = outputs[0].GetTensorTypeAndShapeInfo();
auto shape0 = data_info.GetShape();
size_t B = (size_t)shape0[0];
size_t N = (size_t)shape0[1];
size_t C = (size_t)shape0[2];
// Convert to vector of values
std::vector<std::vector<std::vector<float>>> data;
data.reserve(B);
for (size_t i = 0; i < B; i++)
{
std::vector<std::vector<float>> item;
item.reserve(N);
for (size_t j = 0; j < N; j++)
{
std::vector<float> values;
values.reserve(C);
for (size_t k = 0; k < C; k++)
{
values.push_back(tensor_data[i * N * C + j * C + k]);
}
item.push_back(values);
}
data.push_back(item);
}
return data;
}
// =============================================================================================
std::vector<Ort::Value> BaseModel::call_model(const std::vector<Ort::Value> &inputs)
{
if (inputs.size() != input_names.size())
{
throw std::runtime_error("Number of input tensors does not match model's input count.");
}
// Actually run the model, requesting all known outputs
return session->Run(
Ort::RunOptions{nullptr},
input_name_ptrs.data(),
inputs.data(),
inputs.size(),
output_name_ptrs.data(),
output_name_ptrs.size());
}
// =============================================================================================
// =============================================================================================
class LetterBox
{
public:
LetterBox(const std::array<size_t, 2> &target_size, int fill_value = 0)
{
this->target_size = target_size;
this->fill_value = fill_value;
}
std::tuple<std::array<int, 4>, double, std::array<size_t, 2>> calc_params(
const cv::Mat &image) const;
cv::Mat resize_image(const cv::Mat &image) const;
private:
std::array<size_t, 2> target_size;
int fill_value;
};
// =============================================================================================
std::tuple<std::array<int, 4>, double, std::array<size_t, 2>> LetterBox::calc_params(
const cv::Mat &image) const
{
// Original dimensions
int img_h = image.rows;
int img_w = image.cols;
// Target dimensions
int target_h = static_cast<int>(target_size[0]);
int target_w = static_cast<int>(target_size[1]);
// Scale factor
double scale = std::min(
static_cast<double>(target_w) / static_cast<double>(img_w),
static_cast<double>(target_h) / static_cast<double>(img_h));
// Compute new width/height (round to nearest int)
int new_w = static_cast<int>(std::round(img_w * scale));
int new_h = static_cast<int>(std::round(img_h * scale));
// Calculate padding
int pad_w = target_w - new_w;
int pad_h = target_h - new_h;
int pad_left = pad_w / 2;
int pad_top = pad_h / 2;
int pad_right = pad_w - pad_left;
int pad_bottom = pad_h - pad_top;
std::array<int, 4> paddings = {pad_left, pad_right, pad_top, pad_bottom};
std::array<size_t, 2> new_size = {(size_t)new_w, (size_t)new_h};
return {paddings, scale, new_size};
}
// =============================================================================================
cv::Mat LetterBox::resize_image(const cv::Mat &image) const
{
// 1) Compute letterbox params
auto [paddings, scale, new_sz] = calc_params(image);
int pad_left = paddings[0];
int pad_right = paddings[1];
int pad_top = paddings[2];
int pad_bottom = paddings[3];
// 2) Resize using nearest-neighbor interpolation
cv::Mat resized_img;
cv::Size cv_new_sz = cv::Size(new_sz[0], new_sz[1]);
cv::resize(image, resized_img, cv_new_sz, /*fx=*/0, /*fy=*/0, cv::INTER_NEAREST);
// 3) Pad if needed
if (pad_left == 0 && pad_right == 0 && pad_top == 0 && pad_bottom == 0)
{
// No padding required
return resized_img;
}
else
{
cv::Mat final_img;
cv::Scalar fill_color(fill_value, fill_value, fill_value);
cv::copyMakeBorder(
resized_img, final_img,
pad_top, pad_bottom,
pad_left, pad_right,
cv::BORDER_CONSTANT,
fill_color);
return final_img;
}
}
// =============================================================================================
// =============================================================================================
class BoxCrop
{
public:
BoxCrop(const std::array<size_t, 2> &target_size,
float padding_scale = 1.0f,
int fill_value = 0)
{
this->target_size = target_size;
this->padding_scale = padding_scale;
this->fill_value = fill_value;
}
std::tuple<std::array<int, 4>, float, std::array<int, 4>, std::array<size_t, 2>>
calc_params(const cv::Mat &image, const std::array<float, 6> &bbox) const;
cv::Mat crop_resize_box(const cv::Mat &image,
const std::array<float, 6> &bbox) const;
private:
std::array<size_t, 2> target_size;
float padding_scale;
int fill_value;
};
// =============================================================================================
std::tuple<std::array<int, 4>, float, std::array<int, 4>, std::array<size_t, 2>>
BoxCrop::calc_params(const cv::Mat &image, const std::array<float, 6> &bbox) const
{
// Extract some params
int img_h = image.rows;
int img_w = image.cols;
int target_h = target_size[0];
int target_w = target_size[1];
// Round the bounding box coordinates
float start_x = std::floor(bbox[0]);
float start_y = std::floor(bbox[1]);
float end_x = std::ceil(bbox[2]);
float end_y = std::ceil(bbox[3]);
// Calculate original bounding box center
float center_x = 0.5f * (start_x + end_x);
float center_y = 0.5f * (start_y + end_y);
// Scale the bounding box by the padding_scale
float bbox_w = end_x - start_x;
float bbox_h = end_y - start_y;
float scaled_w = bbox_w * padding_scale;
float scaled_h = bbox_h * padding_scale;
// Calculate the aspect ratios
float bbox_aspect = scaled_w / scaled_h;
float target_aspect = static_cast<float>(target_w) / static_cast<float>(target_h);
// Adjust the scaled bounding box to match the target aspect ratio
float adjusted_w, adjusted_h;
if (bbox_aspect > target_aspect)
{
adjusted_w = scaled_w;
adjusted_h = scaled_w / target_aspect;
}
else
{
adjusted_h = scaled_h;
adjusted_w = scaled_h * target_aspect;
}
// Calculate scaled bounding box coordinates
float new_start_xf = center_x - 0.5f * adjusted_w;
float new_start_yf = center_y - 0.5f * adjusted_h;
float new_end_xf = center_x + 0.5f * adjusted_w;
float new_end_yf = center_y + 0.5f * adjusted_h;
// Round the box coordinates
int start_xi = static_cast<int>(std::floor(new_start_xf));
int start_yi = static_cast<int>(std::floor(new_start_yf));
int end_xi = static_cast<int>(std::ceil(new_end_xf));
int end_yi = static_cast<int>(std::ceil(new_end_yf));
// Define the new box coordinates
int new_start_x = std::max(0, start_xi);
int new_start_y = std::max(0, start_yi);
int new_end_x = std::min(img_w - 1, end_xi);
int new_end_y = std::min(img_h - 1, end_yi);
std::array<int, 4> new_box{new_start_x, new_start_y, new_end_x, new_end_y};
// Calculate resized crop size
int clipped_w = new_box[2] - new_box[0];
int clipped_h = new_box[3] - new_box[1];
float scale_w = static_cast<float>(target_w) / static_cast<float>(clipped_w);
float scale_h = static_cast<float>(target_h) / static_cast<float>(clipped_h);
float scale = std::min(scale_w, scale_h);
int new_w = static_cast<int>(std::round(clipped_w * scale));
int new_h = static_cast<int>(std::round(clipped_h * scale));
// Calculate paddings
int pad_w = target_w - new_w;
int pad_h = target_h - new_h;
int pad_left = 0;
int pad_right = 0;
int pad_top = 0;
int pad_bottom = 0;
if (pad_w > 0)
{
if (start_xi < 0)
{
pad_left = pad_w;
pad_right = 0;
}
else if (end_xi > img_w)
{
pad_left = 0;
pad_right = pad_w;
}
else
{
// Can be caused by bbox rounding
pad_left = pad_w / 2;
pad_right = pad_w - pad_left;
}
}
if (pad_h > 0)
{
if (start_yi < 0)
{
pad_top = pad_h;
pad_bottom = 0;
}
else if (end_yi > img_h)
{
pad_top = 0;
pad_bottom = pad_h;
}
else
{
// Can be caused by bbox rounding
pad_top = pad_h / 2;
pad_bottom = pad_h - pad_top;
}
}
std::array<int, 4> paddings{pad_left, pad_right, pad_top, pad_bottom};
return {paddings, scale, new_box, {(size_t)new_w, (size_t)new_h}};
}
// =============================================================================================
cv::Mat BoxCrop::crop_resize_box(const cv::Mat &image,
const std::array<float, 6> &bbox) const
{
auto [paddings, _, new_box, new_size] = calc_params(image, bbox);
// Extract the bounding box
int x1 = new_box[0];
int y1 = new_box[1];
int x2 = new_box[2];
int y2 = new_box[3];
cv::Rect roi(x1, y1, x2 - x1, y2 - y1);
cv::Mat cropped_img = image(roi);
// Resize the image
cv::Size new_size_cv = cv::Size(new_size[0], new_size[1]);
cv::Mat resized_img;
cv::resize(cropped_img, resized_img, new_size_cv, 0, 0, cv::INTER_NEAREST);
// Optionally pad the image
int pad_left = paddings[0];
int pad_right = paddings[1];
int pad_top = paddings[2];
int pad_bottom = paddings[3];
if (pad_left == 0 && pad_right == 0 && pad_top == 0 && pad_bottom == 0)
{
// No padding
return resized_img;
}
else
{
cv::Mat final_img;
cv::Scalar fill_color(fill_value, fill_value, fill_value);
cv::copyMakeBorder(
resized_img,
final_img,
pad_top,
pad_bottom,
pad_left,
pad_right,
cv::BORDER_CONSTANT,
fill_color);
return final_img;
}
}
// =============================================================================================
// =============================================================================================
class RTMDet : public BaseModel
{
public:
RTMDet(const std::string &model_path,
float conf_threshold,
float min_area_fraction,
int warmup = 30)
: BaseModel(model_path, warmup)
{
this->target_size = {320, 320};
this->conf_threshold = conf_threshold;
float img_area = target_size[0] * target_size[1];
this->min_area = img_area * min_area_fraction;
this->letterbox = std::make_unique<LetterBox>(target_size, 114);
}
std::vector<std::array<float, 6>> call(const cv::Mat &image);
private:
std::array<size_t, 2> target_size;
float conf_threshold;
float min_area;
std::unique_ptr<LetterBox> letterbox;
cv::Mat preprocess(const cv::Mat &image);
std::vector<std::array<float, 6>> postprocess(
const std::vector<std::vector<std::vector<float>>> &result,
const cv::Mat &image);
void clip_boxes(std::vector<std::array<float, 6>> &boxes,
const cv::Mat &image) const;
};
// =============================================================================================
std::vector<std::array<float, 6>> RTMDet::call(const cv::Mat &image)
{
cv::Mat preprocessed = preprocess(image);
std::vector<cv::Mat> inputs = {preprocessed};
auto results = call_by_image(inputs);
auto outputs = postprocess(results, image);
return outputs;
}
// =============================================================================================
cv::Mat RTMDet::preprocess(const cv::Mat &image)
{
cv::Mat resized = letterbox->resize_image(image);
return resized;
}
// =============================================================================================
std::vector<std::array<float, 6>> RTMDet::postprocess(
const std::vector<std::vector<std::vector<float>>> &result,
const cv::Mat &image)
{
// Expected result shape: [B, N, 6] => (x1,y1,x2,y2,score,class)
// Convert to vector of boxes
std::vector<std::array<float, 6>> boxes;
for (auto &item : result[0])
{
float x1 = item[0];
float y1 = item[1];
float x2 = item[2];
float y2 = item[3];
float score = item[4];
float cls = item[5];
boxes.push_back({x1, y1, x2, y2, score, cls});
}
if (boxes.empty())
{
return {};
}
// Filter by confidence + area
for (int i = boxes.size() - 1; i >= 0; i--)
{
auto &b = boxes[i];
if (b[4] < conf_threshold)
{
boxes.erase(boxes.begin() + i);
continue;
}
float area = (b[2] - b[0]) * (b[3] - b[1]);
if (area < min_area)
{
boxes.erase(boxes.begin() + i);
}
}
if (boxes.empty())
{
return {};
}
// Shift by letterbox padding and scale back
clip_boxes(boxes, image);
return boxes;
}
// =============================================================================================
void RTMDet::clip_boxes(std::vector<std::array<float, 6>> &boxes,
const cv::Mat &image) const
{
// Get paddings, scale from letterbox
auto [paddings, scale, _] = letterbox->calc_params(image);
int pad_left = paddings[0];
int pad_right = paddings[1];
int pad_top = paddings[2];
int pad_bottom = paddings[3];
// The effective region in the letterboxed image
float letter_w = static_cast<float>(target_size[1] - (pad_left + pad_right));
float letter_h = static_cast<float>(target_size[0] - (pad_top + pad_bottom));
for (auto &b : boxes)
{
float &x1 = b[0];
float &y1 = b[1];
float &x2 = b[2];
float &y2 = b[3];
// Shift by left/top
x1 -= pad_left;
x2 -= pad_left;
y1 -= pad_top;
y2 -= pad_top;
// Clamp to letterbox region
x1 = std::max(0.f, std::min(x1, letter_w - 1.0f));
x2 = std::max(0.f, std::min(x2, letter_w - 1.0f));
y1 = std::max(0.f, std::min(y1, letter_h - 1.0f));
y2 = std::max(0.f, std::min(y2, letter_h - 1.0f));
// Scale back to original resolution
x1 /= scale;
x2 /= scale;
y1 /= scale;
y2 /= scale;
}
}
// =============================================================================================
// =============================================================================================
class RTMPose : public BaseModel
{
public:
RTMPose(const std::string &model_path,
int warmup = 30)
: BaseModel(model_path, warmup)
{
this->target_size = {384, 288};
this->boxcrop = std::make_unique<BoxCrop>(target_size, 1.25f, 114);
}
std::vector<std::vector<std::array<float, 3>>> call(const cv::Mat &image,
const std::vector<std::array<float, 6>> &bboxes);
private:
std::array<size_t, 2> target_size;
std::unique_ptr<BoxCrop> boxcrop;
std::vector<cv::Mat> preprocess(
const cv::Mat &image,
const std::vector<std::array<float, 6>> &bboxes);
std::vector<std::vector<std::array<float, 3>>> postprocess(
const std::vector<std::vector<std::vector<float>>> &result,
const cv::Mat &image,
const std::vector<std::array<float, 6>> &bboxes);
void clip_keypoints(std::vector<std::array<float, 3>> &kp,
const cv::Mat &image,
const std::array<float, 6> &bbox) const;
};
// =============================================================================================
std::vector<std::vector<std::array<float, 3>>> RTMPose::call(
const cv::Mat &image, const std::vector<std::array<float, 6>> &bboxes)
{
std::vector<cv::Mat> inputs = preprocess(image, bboxes);
auto results = call_by_image(inputs);
auto outputs = postprocess(results, image, bboxes);
return outputs;
}
// =============================================================================================
std::vector<cv::Mat> RTMPose::preprocess(
const cv::Mat &image, const std::vector<std::array<float, 6>> &bboxes)
{
std::vector<cv::Mat> crops;
for (auto &bbox : bboxes)
{
cv::Mat cropped = boxcrop->crop_resize_box(image, bbox);
crops.push_back(std::move(cropped));
}
return crops;
}
// =============================================================================================
std::vector<std::vector<std::array<float, 3>>> RTMPose::postprocess(
const std::vector<std::vector<std::vector<float>>> &result,
const cv::Mat &image,
const std::vector<std::array<float, 6>> &bboxes)
{
// Expected result shape: [B, N, 3] => (x,y,score)
std::vector<std::vector<std::array<float, 3>>> keypoints;
for (size_t i = 0; i < result.size(); i++)
{
// Convert to vector of keypoints
std::vector<std::array<float, 3>> kpts;
for (auto &item : result[i])
{
float x = item[0];
float y = item[1];
float score = item[2];
kpts.push_back({x, y, score});
}
if (!kpts.empty())
{
// Shift by boxcrop padding and scale back
clip_keypoints(kpts, image, bboxes[i]);
keypoints.push_back(std::move(kpts));
}
}
return keypoints;
}
// =============================================================================================
void RTMPose::clip_keypoints(std::vector<std::array<float, 3>> &kpts,
const cv::Mat &image,
const std::array<float, 6> &bbox) const
{
// Get paddings, scale from boxcrop
auto [paddings, scale, box, _] = boxcrop->calc_params(image, bbox);
int pad_left = paddings[0];
int pad_top = paddings[2];
int box_left = box[0];
int box_top = box[1];
for (auto &kp : kpts)
{
float &x = kp[0];
float &y = kp[1];
// Shift by left/top
x -= pad_left;
y -= pad_top;
x /= scale;
y /= scale;
x += box_left;
y += box_top;
}
}
// =============================================================================================
// =============================================================================================
class TopDown
{
public:
TopDown(const std::string &det_model_path,
const std::string &pose_model_path,
float box_conf_threshold,
float box_min_area,
bool batch_poses = false,
int warmup = 30)
{
this->batch_poses = batch_poses;
this->det_model = std::make_unique<RTMDet>(
det_model_path, box_conf_threshold, box_min_area, warmup);
this->pose_model = std::make_unique<RTMPose>(pose_model_path, warmup);
}
std::vector<std::vector<std::array<float, 3>>> predict(const cv::Mat &image);
private:
std::unique_ptr<RTMDet> det_model;
std::unique_ptr<RTMPose> pose_model;
bool batch_poses;
void merge_close_poses(
std::vector<std::vector<std::array<float, 3>>> &poses,
std::array<size_t, 2> image_size);
};
// =============================================================================================
std::vector<std::vector<std::array<float, 3>>> TopDown::predict(const cv::Mat &image)
{
auto bboxes = det_model->call(image);
// Drop boxes with non-person class
for (int i = bboxes.size() - 1; i >= 0; i--)
{
if (static_cast<int>(bboxes[i][5]) != 0)
{
bboxes.erase(bboxes.begin() + i);
}
}
std::vector<std::vector<std::array<float, 3>>> poses;
if (this->batch_poses)
{
if (!bboxes.empty())
{
poses = std::move(pose_model->call(image, bboxes));
}
}
else
{
for (size_t i = 0; i < bboxes.size(); i++)
{
auto kpts = pose_model->call(image, {bboxes[i]});
poses.push_back(std::move(kpts[0]));
}
}
// Sometimes the detection model predicts multiple boxes with different shapes for the same
// person. They then result in strongly overlapping poses, which are merged here.
merge_close_poses(poses, {(size_t)image.cols, (size_t)image.rows});
// Clip keypoints far outside the image
float mask_offset = (image.cols + image.rows) / 10.0;
for (size_t i = 0; i < poses.size(); ++i)
{
for (size_t j = 0; j < poses[i].size(); ++j)
{
auto &kp = poses[i][j];
if (kp[0] < -mask_offset)
{
kp[0] = -mask_offset;
kp[2] = 0.001;
}
if (kp[1] < -mask_offset)
{
kp[1] = -mask_offset;
kp[2] = 0.001;
}
if (kp[0] >= image.cols + mask_offset)
{
kp[0] = image.cols + mask_offset;
kp[2] = 0.001;
}
if (kp[1] >= image.rows + mask_offset)
{
kp[1] = image.rows + mask_offset;
kp[2] = 0.001;
}
}
}
return poses;
}
// =============================================================================================
void TopDown::merge_close_poses(
std::vector<std::vector<std::array<float, 3>>> &poses,
std::array<size_t, 2> image_size)
{
// Joint ids in COCO order
const size_t num_overlaps = 5;
const std::map<std::string, size_t> joint_indices = {
{"nose", 0},
{"left_shoulder", 5},
{"right_shoulder", 6},
{"left_hip", 11},
{"right_hip", 12},
{"left_elbow", 7},
{"right_elbow", 8},
};
if (poses.size() < 2)
{
return;
}
// Calculate pixel threshold based on image size
size_t min_dim = std::min(image_size[0], image_size[1]);
float pixel_threshold = 0.025f * min_dim;
// Merge poses if enough joints are close
std::vector<std::vector<std::array<float, 3>>> merged_poses;
merged_poses.reserve(poses.size());
for (auto &opose : poses)
{
bool merged = false;
for (auto &mpose : merged_poses)
{
size_t close_count = 0;
for (auto &kv : joint_indices)
{
size_t joint_id = kv.second;
float x1 = opose[joint_id][0];
float y1 = opose[joint_id][1];
float c1 = opose[joint_id][2];
float x2 = mpose[joint_id][0];
float y2 = mpose[joint_id][1];
float c2 = mpose[joint_id][2];
if (c1 > 0.0f && c2 > 0.0f)
{
float dx = x1 - x2;
float dy = y1 - y2;
float dist_sq = dx * dx + dy * dy;
if (dist_sq <= pixel_threshold * pixel_threshold)
{
++close_count;
}
}
}
if (close_count >= num_overlaps)
{
// Merge new pose with existing one
for (size_t j = 0; j < mpose.size(); ++j)
{
if (opose[j][2] > mpose[j][2])
{
mpose[j] = std::move(opose[j]);
}
}
merged = true;
break;
}
}
if (!merged)
{
// Not mergeable, add as new pose
merged_poses.push_back(std::move(opose));
}
}
// Replace original poses with merged ones
poses = std::move(merged_poses);
}
// =============================================================================================
// =============================================================================================
class PosePredictor
{
public:
PosePredictor(bool whole_body,
float min_bbox_score,
float min_bbox_area,
bool batch_poses = false)
{
std::string base_path = "/RapidPoseTriangulation/extras/mmdeploy/exports/";
std::string path_det_n1 = base_path + "rtmdet-nano_1x320x320x3_fp16_extra-steps.onnx";
std::string path_det_m1 = base_path + "rtmdet-m_1x320x320x3_fp16_extra-steps.onnx";
std::string path_pose_m1 = base_path + "rtmpose-m_1x384x288x3_fp16_extra-steps.onnx";
std::string path_pose_mb = base_path + "rtmpose-m_Bx384x288x3_fp16_extra-steps.onnx";
std::string path_pose_w1 = base_path + "rtmpose-l_wb_1x384x288x3_fp16_extra-steps.onnx";
std::string path_pose_wb = base_path + "rtmpose-l_wb_Bx384x288x3_fp16_extra-steps.onnx";
this->num_joints = whole_body ? 133 : 17;
std::string path_det;
std::string path_pose;
if (!whole_body)
{
path_det = path_det_n1;
if (!batch_poses)
{
path_pose = path_pose_m1;
}
else
{
path_pose = path_pose_mb;
}
}
else
{
path_det = path_det_m1;
if (!batch_poses)
{
path_pose = path_pose_w1;
}
else
{
path_pose = path_pose_wb;
}
}
std::cout << "Loading 2D" + std::string(whole_body ? "-WB" : "") + " models ..."
<< std::endl;
this->topdown_model = std::make_unique<TopDown>(
path_det, path_pose, min_bbox_score, min_bbox_area, batch_poses, 30);
std::cout << "Loaded models \n"
<< std::endl;
}
std::vector<std::vector<std::vector<std::array<float, 3>>>> predict(
const std::vector<cv::Mat> &images);
private:
std::unique_ptr<TopDown> topdown_model;
size_t num_joints;
};
// =============================================================================================
std::vector<std::vector<std::vector<std::array<float, 3>>>> PosePredictor::predict(
const std::vector<cv::Mat> &images)
{
std::vector<std::vector<std::vector<std::array<float, 3>>>> keypoints;
for (auto &img : images)
{
auto kpts = topdown_model->predict(img);
if (!kpts.empty())
{
keypoints.push_back(std::move(kpts));
}
else
{
// create zero keypoints
std::vector<std::vector<std::array<float, 3>>> zero_keypoints;
zero_keypoints.resize(1);
zero_keypoints[0].resize(num_joints, {0.0f, 0.0f, 0.0f});
keypoints.push_back(std::move(zero_keypoints));
}
}
return keypoints;
}
}