Restructuring some code.

This commit is contained in:
Daniel
2025-01-20 18:00:37 +01:00
parent a866485c8e
commit d77fee7103
34 changed files with 660 additions and 608 deletions

File diff suppressed because it is too large Load Diff

20
extras/ros/README.md Normal file
View File

@ -0,0 +1,20 @@
# ROS-Wrapper
Run pose estimator with ros topics as inputs and publish detected poses.
<br>
- Build container:
```bash
docker build --progress=plain -t rapidposetriangulation_ros -f extras/ros/dockerfile .
```
- Update or remove the `ROS_DOMAIN_ID` in `docker-compose.yml` to fit your environment
- Run and test:
```bash
xhost +; docker compose -f extras/ros/docker-compose.yml up
docker exec -it ros-test_node-1 bash
export ROS_DOMAIN_ID=18
```

View File

@ -0,0 +1,71 @@
version: "2.3"
# runtime: nvidia needs version 2 else change standard runtime at host pc
services:
test_node:
image: rapidposetriangulation_ros
network_mode: "host"
ipc: "host"
runtime: nvidia
privileged: true
volumes:
- ../../:/RapidPoseTriangulation/
- ../../skelda/:/skelda/
- /tmp/.X11-unix:/tmp/.X11-unix
- /dev/shm:/dev/shm
environment:
- DISPLAY
- QT_X11_NO_MITSHM=1
- "PYTHONUNBUFFERED=1"
command: /bin/bash -i -c 'sleep infinity'
estimator:
image: rapidposetriangulation_ros
network_mode: "host"
ipc: "host"
runtime: nvidia
privileged: true
volumes:
- ../../:/RapidPoseTriangulation/
- ../../skelda/:/skelda/
- /tmp/.X11-unix:/tmp/.X11-unix
- /dev/shm:/dev/shm
environment:
- DISPLAY
- QT_X11_NO_MITSHM=1
- "PYTHONUNBUFFERED=1"
# command: /bin/bash -i -c 'export ROS_DOMAIN_ID=18 && ros2 run rpt2D_wrapper_py rpt2D_wrapper'
command: /bin/bash -i -c 'export ROS_DOMAIN_ID=18 && ros2 run rpt2D_wrapper_cpp rpt2D_wrapper'
pose_visualizer:
image: rapidposetriangulation_ros
network_mode: "host"
ipc: "host"
runtime: nvidia
privileged: true
volumes:
- ../../:/RapidPoseTriangulation/
- ../../skelda/:/skelda/
- /tmp/.X11-unix:/tmp/.X11-unix
- /dev/shm:/dev/shm
environment:
- DISPLAY
- QT_X11_NO_MITSHM=1
- "PYTHONUNBUFFERED=1"
command: /bin/bash -i -c 'sleep 2 && export ROS_DOMAIN_ID=18 && ros2 run pose2D_visualizer pose2D_visualizer'
pose_viewer:
image: rapidposetriangulation_ros
network_mode: "host"
ipc: "host"
runtime: nvidia
privileged: true
volumes:
- /tmp/.X11-unix:/tmp/.X11-unix
- /dev/shm:/dev/shm
environment:
- DISPLAY
- QT_X11_NO_MITSHM=1
- "PYTHONUNBUFFERED=1"
command: /bin/bash -i -c 'sleep 2 && export ROS_DOMAIN_ID=18 && ros2 run image_view image_view --ros-args --remap image:=/camera01/img_with_pose -p autosize:=True -p window_name:=MyImage'

84
extras/ros/dockerfile Normal file
View File

@ -0,0 +1,84 @@
FROM rapidposetriangulation
WORKDIR /
# Install ONNX runtime
# See: https://github.com/microsoft/onnxruntime/blob/main/dockerfiles/Dockerfile.tensorrt
RUN pip3 uninstall -y onnxruntime-gpu
RUN git clone --recursive --depth=1 --branch=v1.20.1 https://github.com/Microsoft/onnxruntime.git
ENV PATH=/usr/local/nvidia/bin:/usr/local/cuda/bin:/cmake-3.30.1-linux-x86_64/bin:${PATH}
ARG CMAKE_CUDA_ARCHITECTURES=75;80;90
ENV TRT_VERSION=10.5.0.18
RUN /bin/sh onnxruntime/dockerfiles/scripts/install_common_deps.sh
RUN /bin/sh onnxruntime/dockerfiles/scripts/checkout_submodules.sh ${trt_version}
RUN ls
RUN cd onnxruntime && \
/bin/sh build.sh --allow_running_as_root --parallel --build_shared_lib \
--cuda_home /usr/local/cuda --cudnn_home /usr/lib/x86_64-linux-gnu/ --use_tensorrt \
--tensorrt_home /usr/lib/x86_64-linux-gnu/ --config Release --build_wheel --skip_tests \
--skip_submodule_sync --cmake_extra_defines '"CMAKE_CUDA_ARCHITECTURES='${CMAKE_CUDA_ARCHITECTURES}'"'
RUN cd onnxruntime && pip install build/Linux/Release/dist/*.whl
# Install ROS2
# https://docs.ros.org/en/humble/Installation/Ubuntu-Install-Debians.html
RUN apt-get update && apt-get install -y --no-install-recommends locales
RUN locale-gen en_US en_US.UTF-8 && update-locale LC_ALL=en_US.UTF-8 LANG=en_US.UTF-8
RUN apt-get update && apt-get install -y --no-install-recommends software-properties-common
RUN add-apt-repository universe
RUN apt-get update && apt-get install -y --no-install-recommends curl
RUN curl -sSL https://raw.githubusercontent.com/ros/rosdistro/master/ros.key -o /usr/share/keyrings/ros-archive-keyring.gpg
RUN echo "deb [arch=$(dpkg --print-architecture) signed-by=/usr/share/keyrings/ros-archive-keyring.gpg] http://packages.ros.org/ros2/ubuntu $(. /etc/os-release && echo $UBUNTU_CODENAME) main" > /etc/apt/sources.list.d/ros2.list
RUN apt-get update && apt-get install -y --no-install-recommends ros-humble-ros-base python3-argcomplete
RUN apt-get update && apt-get install -y --no-install-recommends ros-dev-tools
RUN echo "source /opt/ros/humble/setup.bash" >> ~/.bashrc
# Create ROS2 workspace for basic packages
RUN mkdir -p /project/base/src/
RUN cd /project/base/; colcon build
RUN echo "source /project/base/install/setup.bash" >> ~/.bashrc
# Install opencv and cv_bridge
RUN apt-get update && apt-get install -y --no-install-recommends libboost-dev
RUN apt-get update && apt-get install -y --no-install-recommends libboost-python-dev
RUN apt-get update && apt-get install -y --no-install-recommends libopencv-dev
RUN cd /project/base/src/; git clone --branch humble --depth=1 https://github.com/ros-perception/vision_opencv.git
RUN /bin/bash -i -c 'cd /project/base/; colcon build --symlink-install --cmake-args -DCMAKE_BUILD_TYPE=Release'
# Install ROS2 image viewer
RUN cd /project/base/src/; git clone --branch=humble --depth=1 https://github.com/ros-perception/image_pipeline.git
RUN cd /project/base/src/; git clone --branch=humble --depth=1 https://github.com/ros-perception/image_common.git
RUN /bin/bash -i -c 'cd /project/base/; colcon build --symlink-install --packages-select camera_calibration_parsers image_transport image_view --cmake-args -DCMAKE_BUILD_TYPE=Release'
# Fix module not found error when displaying images
RUN apt-get update && apt-get install -y --no-install-recommends libcanberra-gtk-module libcanberra-gtk3-module
# Create ROS2 workspace for project packages
RUN mkdir -p /project/dev_ws/src/
RUN cd /project/dev_ws/; colcon build
RUN echo "source /project/dev_ws/install/setup.bash" >> ~/.bashrc
# Fix ros package building error
RUN pip3 install --no-cache-dir "setuptools<=58.2.0"
# Copy modules
COPY ./extras/include/ /RapidPoseTriangulation/extras/include/
COPY ./scripts/ /RapidPoseTriangulation/scripts/
COPY ./extras/ros/pose2D_visualizer/ /RapidPoseTriangulation/extras/ros/pose2D_visualizer/
COPY ./extras/ros/rpt2D_wrapper_py/ /RapidPoseTriangulation/extras/ros/rpt2D_wrapper_py/
COPY ./extras/ros/rpt2D_wrapper_cpp/ /RapidPoseTriangulation/extras/ros/rpt2D_wrapper_cpp/
# Link and build as ros package
RUN ln -s /RapidPoseTriangulation/extras/ros/pose2D_visualizer/ /project/dev_ws/src/pose2D_visualizer
RUN ln -s /RapidPoseTriangulation/extras/ros/rpt2D_wrapper_py/ /project/dev_ws/src/rpt2D_wrapper_py
RUN ln -s /RapidPoseTriangulation/extras/ros/rpt2D_wrapper_cpp/ /project/dev_ws/src/rpt2D_wrapper_cpp
RUN /bin/bash -i -c 'cd /project/dev_ws/; colcon build --symlink-install --cmake-args -DCMAKE_BUILD_TYPE=Release'
# Update ros packages -> autocompletion and check
RUN /bin/bash -i -c 'ros2 pkg list'
# Clear cache to save space, only has an effect if image is squashed
RUN apt-get autoremove -y \
&& apt-get clean \
&& rm -rf /var/lib/apt/lists/*
WORKDIR /RapidPoseTriangulation/
CMD ["/bin/bash"]

View File

@ -0,0 +1,18 @@
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>pose2D_visualizer</name>
<version>0.0.0</version>
<description>TODO: Package description</description>
<maintainer email="root@todo.todo">root</maintainer>
<license>TODO: License declaration</license>
<test_depend>ament_copyright</test_depend>
<test_depend>ament_flake8</test_depend>
<test_depend>ament_pep257</test_depend>
<test_depend>python3-pytest</test_depend>
<export>
<build_type>ament_python</build_type>
</export>
</package>

View File

@ -0,0 +1,152 @@
import json
import os
import sys
import threading
import time
import cv2
from matplotlib import pyplot as plt
import numpy as np
import rclpy
from cv_bridge import CvBridge
from rclpy.qos import QoSHistoryPolicy, QoSProfile, QoSReliabilityPolicy
from sensor_msgs.msg import Image
from std_msgs.msg import String
filepath = os.path.dirname(os.path.realpath(__file__)) + "/"
sys.path.append(filepath + "../../../../scripts/")
import utils_pipeline
from skelda import utils_view
# ==================================================================================================
bridge = CvBridge()
node = None
publisher_img = None
cam_id = "camera01"
img_input_topic = "/" + cam_id + "/pylon_ros2_camera_node/image_raw"
pose_input_topic = "/poses/" + cam_id
img_output_topic = "/" + cam_id + "/img_with_pose"
last_input_image = None
lock = threading.Lock()
# ==================================================================================================
def callback_images(image_data):
global last_input_image, lock
# Convert into cv images from image string
if image_data.encoding == "bayer_rggb8":
bayer_image = bridge.imgmsg_to_cv2(image_data, "bayer_rggb8")
color_image = utils_pipeline.bayer2rgb(bayer_image)
elif image_data.encoding == "mono8":
gray_image = bridge.imgmsg_to_cv2(image_data, "mono8")
color_image = cv2.cvtColor(gray_image, cv2.COLOR_GRAY2RGB)
elif image_data.encoding == "rgb8":
color_image = bridge.imgmsg_to_cv2(image_data, "rgb8")
else:
raise ValueError("Unknown image encoding:", image_data.encoding)
time_stamp = image_data.header.stamp.sec + image_data.header.stamp.nanosec / 1.0e9
with lock:
last_input_image = (color_image, time_stamp)
# ==================================================================================================
def callback_poses(pose_data):
global last_input_image, lock
ptime = time.time()
if last_input_image is None:
return
# Convert pose data from json string
poses = json.loads(pose_data.data)
# Collect inputs
images_2d = []
timestamps = []
with lock:
img = np.copy(last_input_image[0])
ts = last_input_image[1]
images_2d.append(img)
timestamps.append(ts)
# Visualize 2D poses
bodies2D = poses["bodies2D"]
colors = plt.cm.hsv(np.linspace(0, 1, len(bodies2D), endpoint=False)).tolist()
colors = [[int(c[0] * 255), int(c[1] * 255), int(c[2] * 255)] for c in colors]
for i, body in enumerate(bodies2D):
color = list(reversed(colors[i]))
img = utils_view.draw_body_in_image(img, body, poses["joints"], color)
# Publish image with poses
publish(img)
msg = "Visualization time: {:.3f}s"
print(msg.format(time.time() - ptime))
# ==================================================================================================
def publish(img):
# Publish image data
msg = bridge.cv2_to_imgmsg(img, "rgb8")
publisher_img.publish(msg)
# ==================================================================================================
def main():
global node, publisher_img
# Start node
rclpy.init(args=sys.argv)
node = rclpy.create_node("pose2D_visualizer")
# Quality of service settings
qos_profile = QoSProfile(
reliability=QoSReliabilityPolicy.RELIABLE,
history=QoSHistoryPolicy.KEEP_LAST,
depth=1,
)
# Create subscribers
_ = node.create_subscription(
Image,
img_input_topic,
callback_images,
qos_profile,
)
_ = node.create_subscription(
String,
pose_input_topic,
callback_poses,
qos_profile,
)
# Create publishers
publisher_img = node.create_publisher(Image, img_output_topic, qos_profile)
node.get_logger().info("Finished initialization of pose visualizer")
# Run ros update thread
rclpy.spin(node)
node.destroy_node()
rclpy.shutdown()
# ==================================================================================================
if __name__ == "__main__":
main()

View File

@ -0,0 +1,4 @@
[develop]
script_dir=$base/lib/pose2D_visualizer
[install]
install_scripts=$base/lib/pose2D_visualizer

View File

@ -0,0 +1,23 @@
from setuptools import setup
package_name = "pose2D_visualizer"
setup(
name=package_name,
version="0.0.0",
packages=[package_name],
data_files=[
("share/ament_index/resource_index/packages", ["resource/" + package_name]),
("share/" + package_name, ["package.xml"]),
],
install_requires=["setuptools"],
zip_safe=True,
maintainer="root",
maintainer_email="root@todo.todo",
description="TODO: Package description",
license="TODO: License declaration",
tests_require=["pytest"],
entry_points={
"console_scripts": ["pose2D_visualizer = pose2D_visualizer.pose2D_visualizer:main"],
},
)

View File

@ -0,0 +1,27 @@
# Copyright 2015 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import pytest
from ament_copyright.main import main
# Remove the `skip` decorator once the source file(s) have a copyright header
@pytest.mark.skip(
reason="No copyright header has been placed in the generated source file."
)
@pytest.mark.copyright
@pytest.mark.linter
def test_copyright():
rc = main(argv=[".", "test"])
assert rc == 0, "Found errors"

View File

@ -0,0 +1,25 @@
# Copyright 2017 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import pytest
from ament_flake8.main import main_with_errors
@pytest.mark.flake8
@pytest.mark.linter
def test_flake8():
rc, errors = main_with_errors(argv=[])
assert rc == 0, "Found %d code style errors / warnings:\n" % len(
errors
) + "\n".join(errors)

View File

@ -0,0 +1,23 @@
# Copyright 2015 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import pytest
from ament_pep257.main import main
@pytest.mark.linter
@pytest.mark.pep257
def test_pep257():
rc = main(argv=[".", "test"])
assert rc == 0, "Found code style errors / warnings"

View File

@ -0,0 +1,65 @@
cmake_minimum_required(VERSION 3.5)
project(rpt2D_wrapper_cpp)
# Default to C99
if(NOT CMAKE_C_STANDARD)
set(CMAKE_C_STANDARD 99)
endif()
# Default to C++17
if(NOT CMAKE_CXX_STANDARD)
set(CMAKE_CXX_STANDARD 17)
endif()
if(CMAKE_COMPILER_IS_GNUCXX OR CMAKE_CXX_COMPILER_ID MATCHES "Clang")
add_compile_options(-Wall -Wextra -Wpedantic)
endif()
# find dependencies
find_package(ament_cmake REQUIRED)
find_package(rclcpp REQUIRED)
find_package(std_msgs REQUIRED)
find_package(sensor_msgs REQUIRED)
find_package(cv_bridge REQUIRED)
find_package(OpenCV REQUIRED)
### 3) ONNX Runtime
include_directories(/onnxruntime/include/
/onnxruntime/include/onnxruntime/core/session/
/onnxruntime/include/onnxruntime/core/providers/tensorrt/)
link_directories(/onnxruntime/build/Linux/Release/)
add_executable(rpt2D_wrapper src/rpt2D_wrapper.cpp)
ament_target_dependencies(rpt2D_wrapper rclcpp std_msgs sensor_msgs cv_bridge)
target_include_directories(rpt2D_wrapper PUBLIC
$<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/include>
$<INSTALL_INTERFACE:include>)
target_link_libraries(rpt2D_wrapper
${OpenCV_LIBS}
onnxruntime_providers_tensorrt
onnxruntime_providers_shared
onnxruntime_providers_cuda
onnxruntime
)
set_target_properties(rpt2D_wrapper PROPERTIES
BUILD_WITH_INSTALL_RPATH TRUE
INSTALL_RPATH "/onnxruntime/build/Linux/Release"
)
install(TARGETS rpt2D_wrapper
DESTINATION lib/${PROJECT_NAME})
if(BUILD_TESTING)
find_package(ament_lint_auto REQUIRED)
# the following line skips the linter which checks for copyrights
# uncomment the line when a copyright and license is not present in all source files
#set(ament_cmake_copyright_FOUND TRUE)
# the following line skips cpplint (only works in a git repo)
# uncomment the line when this package is not in a git repo
#set(ament_cmake_cpplint_FOUND TRUE)
ament_lint_auto_find_test_dependencies()
endif()
ament_package()

View File

@ -0,0 +1,25 @@
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>rpt2D_wrapper_cpp</name>
<version>0.0.0</version>
<description>TODO: Package description</description>
<maintainer email="root@todo.todo">root</maintainer>
<license>TODO: License declaration</license>
<buildtool_depend>ament_cmake</buildtool_depend>
<depend>rclcpp</depend>
<depend>std_msgs</depend>
<depend>sensor_msgs</depend>
<depend>cv_bridge</depend>
<depend>OpenCV</depend>
<test_depend>ament_lint_auto</test_depend>
<test_depend>ament_lint_common</test_depend>
<export>
<build_type>ament_cmake</build_type>
</export>
</package>

View File

@ -0,0 +1,268 @@
#include <chrono>
#include <memory>
#include <string>
#include <thread>
#include <vector>
#include <mutex>
#include <atomic>
#include <cmath>
#include <iostream>
// ROS2
#include <rclcpp/rclcpp.hpp>
#include <sensor_msgs/msg/image.hpp>
#include <std_msgs/msg/string.hpp>
// OpenCV / cv_bridge
#include <opencv2/opencv.hpp>
#include <cv_bridge/cv_bridge.h>
// JSON library
#include "/RapidPoseTriangulation/extras/include/nlohmann/json.hpp"
using json = nlohmann::json;
#include "/RapidPoseTriangulation/scripts/utils_pipeline.hpp"
#include "/RapidPoseTriangulation/scripts/utils_2d_pose.hpp"
// =================================================================================================
static const std::string cam_id = "camera01";
static const std::string img_input_topic = "/" + cam_id + "/pylon_ros2_camera_node/image_raw";
static const std::string pose_out_topic = "/poses/" + cam_id;
static const float min_bbox_score = 0.4;
static const float min_bbox_area = 0.1 * 0.1;
static const bool batch_poses = true;
static const std::map<std::string, bool> whole_body = {
{"foots", true},
{"face", true},
{"hands", true},
};
// =================================================================================================
// =================================================================================================
class Rpt2DWrapperNode : public rclcpp::Node
{
public:
Rpt2DWrapperNode(const std::string &node_name)
: Node(node_name)
{
this->stop_flag = false;
this->last_input_image = cv::Mat();
this->last_input_time = 0.0;
// QoS
rclcpp::QoS qos_profile(1);
qos_profile.reliable();
qos_profile.keep_last(1);
// Setup subscriber
image_sub_ = this->create_subscription<sensor_msgs::msg::Image>(
img_input_topic, qos_profile,
std::bind(&Rpt2DWrapperNode::callbackImages, this, std::placeholders::_1));
// Setup publisher
pose_pub_ = this->create_publisher<std_msgs::msg::String>(pose_out_topic, qos_profile);
// Load model
bool use_wb = utils_pipeline::use_whole_body(whole_body);
this->kpt_model = std::make_unique<utils_2d_pose::PosePredictor>(
use_wb, min_bbox_score, min_bbox_area, batch_poses);
RCLCPP_INFO(this->get_logger(), "Finished initialization of pose estimator.");
// Start background prediction thread
model_thread = std::thread(&Rpt2DWrapperNode::callbackWrapper, this);
}
~Rpt2DWrapperNode()
{
stop_flag = true;
if (model_thread.joinable())
{
model_thread.join();
}
}
private:
rclcpp::Subscription<sensor_msgs::msg::Image>::SharedPtr image_sub_;
rclcpp::Publisher<std_msgs::msg::String>::SharedPtr pose_pub_;
// Pose model pointer
std::unique_ptr<utils_2d_pose::PosePredictor> kpt_model;
const std::vector<std::string> joint_names_2d = utils_pipeline::get_joint_names(whole_body);
// Threading
std::thread model_thread;
std::mutex mutex;
std::atomic<bool> stop_flag;
cv::Mat last_input_image;
double last_input_time;
void callbackImages(const sensor_msgs::msg::Image::SharedPtr msg);
void callbackModel();
void callbackWrapper()
{
using namespace std::chrono_literals;
while (!stop_flag)
{
callbackModel();
std::this_thread::sleep_for(std::chrono::microseconds(100));
}
}
void publish(const json &data)
{
std_msgs::msg::String msg;
msg.data = data.dump();
pose_pub_->publish(msg);
}
};
// =================================================================================================
void Rpt2DWrapperNode::callbackImages(const sensor_msgs::msg::Image::SharedPtr msg)
{
// Load or convert image to Bayer format
cv::Mat bayer_image;
try
{
if (msg->encoding == "mono8")
{
cv_bridge::CvImageConstPtr cv_ptr = cv_bridge::toCvShare(msg, msg->encoding);
bayer_image = cv_ptr->image;
}
else if (msg->encoding == "bayer_rggb8")
{
cv_bridge::CvImageConstPtr cv_ptr = cv_bridge::toCvShare(msg, msg->encoding);
bayer_image = cv_ptr->image;
}
else if (msg->encoding == "rgb8")
{
cv_bridge::CvImageConstPtr cv_ptr = cv_bridge::toCvShare(msg, "rgb8");
cv::Mat color_image = cv_ptr->image;
bayer_image = utils_pipeline::rgb2bayer(color_image);
}
else
{
throw std::runtime_error("Unknown image encoding: " + msg->encoding);
}
}
catch (const std::exception &e)
{
RCLCPP_ERROR(this->get_logger(), "cv_bridge exception: %s", e.what());
return;
}
// Get timestamp
double time_stamp = msg->header.stamp.sec + msg->header.stamp.nanosec / 1.0e9;
// Store in member variables with lock
{
std::lock_guard<std::mutex> lk(mutex);
this->last_input_image = std::move(bayer_image);
this->last_input_time = time_stamp;
}
}
// =================================================================================================
void Rpt2DWrapperNode::callbackModel()
{
auto ptime = std::chrono::high_resolution_clock::now();
// Check if we have an image
cv::Mat local_image;
double local_timestamp = 0.0;
{
std::lock_guard<std::mutex> lk(mutex);
if (last_input_time == 0.0)
{
return;
}
local_image = std::move(last_input_image);
local_timestamp = last_input_time;
last_input_image = cv::Mat();
last_input_time = 0.0;
}
// Create image vector
cv::Mat rgb_image = utils_pipeline::bayer2rgb(local_image);
std::vector<cv::Mat> images_2d;
images_2d.push_back(rgb_image);
// Predict 2D poses
auto poses_2d_all = kpt_model->predict(images_2d);
auto poses_2d_upd = utils_pipeline::update_keypoints(
poses_2d_all, joint_names_2d, whole_body);
auto &poses_2d = poses_2d_upd[0];
// Drop persons with no joints
std::vector<std::vector<std::array<float, 3>>> valid_poses;
for (auto &person : poses_2d)
{
float sum_conf = 0.0;
for (auto &kp : person)
{
sum_conf += kp[2];
}
if (sum_conf > 0.0)
{
valid_poses.push_back(person);
}
}
// Round poses to 3 decimal places
for (auto &person : valid_poses)
{
for (auto &kp : person)
{
kp[0] = std::round(kp[0] * 1000.0) / 1000.0;
kp[1] = std::round(kp[1] * 1000.0) / 1000.0;
kp[2] = std::round(kp[2] * 1000.0) / 1000.0;
}
}
// Build JSON
auto ts_pose = std::chrono::high_resolution_clock::now();
double ts_pose_sec = std::chrono::duration<double>(ts_pose.time_since_epoch()).count();
double z_images_pose = ts_pose_sec - local_timestamp;
json poses_msg;
poses_msg["bodies2D"] = valid_poses; // shape: persons x keypoints x 3
poses_msg["joints"] = joint_names_2d;
poses_msg["num_persons"] = valid_poses.size();
poses_msg["timestamps"] = {
{"image", local_timestamp},
{"pose", ts_pose_sec},
{"z-images-pose", z_images_pose}};
// Publish
publish(poses_msg);
// Print info
double elapsed_time = std::chrono::duration<double>(
std::chrono::high_resolution_clock::now() - ptime)
.count();
std::cout << "Detected persons: " << valid_poses.size()
<< " - Prediction time: " << elapsed_time << "s" << std::endl;
}
// =================================================================================================
// =================================================================================================
int main(int argc, char **argv)
{
rclcpp::init(argc, argv);
auto node = std::make_shared<Rpt2DWrapperNode>("rpt2D_wrapper");
rclcpp::spin(node);
rclcpp::shutdown();
return 0;
}

View File

@ -0,0 +1,18 @@
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>rpt2D_wrapper_py</name>
<version>0.0.0</version>
<description>TODO: Package description</description>
<maintainer email="root@todo.todo">root</maintainer>
<license>TODO: License declaration</license>
<test_depend>ament_copyright</test_depend>
<test_depend>ament_flake8</test_depend>
<test_depend>ament_pep257</test_depend>
<test_depend>python3-pytest</test_depend>
<export>
<build_type>ament_python</build_type>
</export>
</package>

View File

@ -0,0 +1,196 @@
import json
import os
import sys
import threading
import time
import numpy as np
import rclpy
from cv_bridge import CvBridge
from rclpy.qos import QoSHistoryPolicy, QoSProfile, QoSReliabilityPolicy
from sensor_msgs.msg import Image
from std_msgs.msg import String
filepath = os.path.dirname(os.path.realpath(__file__)) + "/"
sys.path.append(filepath + "../../../scripts/")
import test_triangulate
import utils_2d_pose
# ==================================================================================================
bridge = CvBridge()
node = None
publisher_pose = None
cam_id = "camera01"
img_input_topic = "/" + cam_id + "/pylon_ros2_camera_node/image_raw"
pose_out_topic = "/poses/" + cam_id
last_input_image = None
last_input_time = 0.0
kpt_model = None
joint_names_2d = test_triangulate.joint_names_2d
lock = threading.Lock()
stop_flag = False
# Model config
min_bbox_score = 0.4
min_bbox_area = 0.1 * 0.1
batch_poses = True
# ==================================================================================================
def callback_images(image_data):
global last_input_image, last_input_time, lock
# Convert into cv images from image string
if image_data.encoding == "bayer_rggb8":
bayer_image = bridge.imgmsg_to_cv2(image_data, "bayer_rggb8")
elif image_data.encoding == "mono8":
bayer_image = bridge.imgmsg_to_cv2(image_data, "mono8")
elif image_data.encoding == "rgb8":
color_image = bridge.imgmsg_to_cv2(image_data, "rgb8")
bayer_image = test_triangulate.rgb2bayer(color_image)
else:
raise ValueError("Unknown image encoding:", image_data.encoding)
time_stamp = image_data.header.stamp.sec + image_data.header.stamp.nanosec / 1.0e9
with lock:
last_input_image = bayer_image
last_input_time = time_stamp
# ==================================================================================================
def callback_model():
global last_input_image, last_input_time, kpt_model, lock
ptime = time.time()
if last_input_time == 0.0:
time.sleep(0.0001)
return
# Collect inputs
images_2d = []
timestamps = []
with lock:
img = last_input_image
ts = last_input_time
images_2d.append(img)
timestamps.append(ts)
last_input_image = None
last_input_time = 0.0
# Predict 2D poses
images_2d = [test_triangulate.bayer2rgb(img) for img in images_2d]
poses_2d = utils_2d_pose.get_2d_pose(kpt_model, images_2d)
poses_2d = test_triangulate.update_keypoints(poses_2d, joint_names_2d)
poses_2d = poses_2d[0]
# Drop persons with no joints
poses_2d = np.asarray(poses_2d)
mask = np.sum(poses_2d[..., 2], axis=1) > 0
poses_2d = poses_2d[mask]
# Round poses
poses2D = [np.array(p).round(3).tolist() for p in poses_2d]
# Publish poses
ts_pose = time.time()
poses = {
"bodies2D": poses2D,
"joints": joint_names_2d,
"num_persons": len(poses2D),
"timestamps": {
"image": timestamps[0],
"pose": ts_pose,
"z-images-pose": ts_pose - timestamps[0],
},
}
publish(poses)
msg = "Detected persons: {} - Prediction time: {:.4f}s"
print(msg.format(poses["num_persons"], time.time() - ptime))
# ==================================================================================================
def callback_wrapper():
global stop_flag
while not stop_flag:
callback_model()
time.sleep(0.001)
# ==================================================================================================
def publish(data):
# Publish json data
msg = String()
msg.data = json.dumps(data)
publisher_pose.publish(msg)
# ==================================================================================================
def main():
global node, publisher_pose, kpt_model, stop_flag
# Start node
rclpy.init(args=sys.argv)
node = rclpy.create_node("rpt2D_wrapper")
# Quality of service settings
qos_profile = QoSProfile(
reliability=QoSReliabilityPolicy.RELIABLE,
history=QoSHistoryPolicy.KEEP_LAST,
depth=1,
)
# Create subscribers
_ = node.create_subscription(
Image,
img_input_topic,
callback_images,
qos_profile,
)
# Create publishers
publisher_pose = node.create_publisher(String, pose_out_topic, qos_profile)
# Load 2D pose model
whole_body = test_triangulate.whole_body
if any((whole_body[k] for k in whole_body)):
kpt_model = utils_2d_pose.load_wb_model(
min_bbox_score, min_bbox_area, batch_poses
)
else:
kpt_model = utils_2d_pose.load_model(min_bbox_score, min_bbox_area, batch_poses)
node.get_logger().info("Finished initialization of pose estimator")
# Start prediction thread
p1 = threading.Thread(target=callback_wrapper)
p1.start()
# Run ros update thread
rclpy.spin(node)
stop_flag = True
p1.join()
node.destroy_node()
rclpy.shutdown()
# ==================================================================================================
if __name__ == "__main__":
main()

View File

@ -0,0 +1,4 @@
[develop]
script_dir=$base/lib/rpt2D_wrapper_py
[install]
install_scripts=$base/lib/rpt2D_wrapper_py

View File

@ -0,0 +1,23 @@
from setuptools import setup
package_name = "rpt2D_wrapper_py"
setup(
name=package_name,
version="0.0.0",
packages=[package_name],
data_files=[
("share/ament_index/resource_index/packages", ["resource/" + package_name]),
("share/" + package_name, ["package.xml"]),
],
install_requires=["setuptools"],
zip_safe=True,
maintainer="root",
maintainer_email="root@todo.todo",
description="TODO: Package description",
license="TODO: License declaration",
tests_require=["pytest"],
entry_points={
"console_scripts": ["rpt2D_wrapper = rpt2D_wrapper_py.rpt2D_wrapper:main"],
},
)

View File

@ -0,0 +1,27 @@
# Copyright 2015 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import pytest
from ament_copyright.main import main
# Remove the `skip` decorator once the source file(s) have a copyright header
@pytest.mark.skip(
reason="No copyright header has been placed in the generated source file."
)
@pytest.mark.copyright
@pytest.mark.linter
def test_copyright():
rc = main(argv=[".", "test"])
assert rc == 0, "Found errors"

View File

@ -0,0 +1,25 @@
# Copyright 2017 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import pytest
from ament_flake8.main import main_with_errors
@pytest.mark.flake8
@pytest.mark.linter
def test_flake8():
rc, errors = main_with_errors(argv=[])
assert rc == 0, "Found %d code style errors / warnings:\n" % len(
errors
) + "\n".join(errors)

View File

@ -0,0 +1,23 @@
# Copyright 2015 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import pytest
from ament_pep257.main import main
@pytest.mark.linter
@pytest.mark.pep257
def test_pep257():
rc = main(argv=[".", "test"])
assert rc == 0, "Found code style errors / warnings"