Created ros wrapper for 2D pose estimation.

This commit is contained in:
Daniel
2025-01-15 16:08:24 +01:00
parent 6b3fe5761b
commit 1bd58deede
21 changed files with 734 additions and 0 deletions

View File

@ -0,0 +1,18 @@
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>pose2D_visualizer</name>
<version>0.0.0</version>
<description>TODO: Package description</description>
<maintainer email="root@todo.todo">root</maintainer>
<license>TODO: License declaration</license>
<test_depend>ament_copyright</test_depend>
<test_depend>ament_flake8</test_depend>
<test_depend>ament_pep257</test_depend>
<test_depend>python3-pytest</test_depend>
<export>
<build_type>ament_python</build_type>
</export>
</package>

View File

@ -0,0 +1,152 @@
import json
import os
import sys
import threading
import time
import cv2
from matplotlib import pyplot as plt
import numpy as np
import rclpy
from cv_bridge import CvBridge
from rclpy.qos import QoSHistoryPolicy, QoSProfile, QoSReliabilityPolicy
from sensor_msgs.msg import Image
from std_msgs.msg import String
filepath = os.path.dirname(os.path.realpath(__file__)) + "/"
sys.path.append(filepath + "../../../scripts/")
import test_triangulate
from skelda import utils_view
# ==================================================================================================
bridge = CvBridge()
node = None
publisher_img = None
cam_id = "camera01"
img_input_topic = "/" + cam_id + "/pylon_ros2_camera_node/image_raw"
pose_input_topic = "/poses/" + cam_id
img_output_topic = "/" + cam_id + "/img_with_pose"
last_input_image = None
lock = threading.Lock()
# ==================================================================================================
def callback_images(image_data):
global last_input_image, lock
# Convert into cv images from image string
if image_data.encoding == "bayer_rggb8":
bayer_image = bridge.imgmsg_to_cv2(image_data, "bayer_rggb8")
color_image = test_triangulate.bayer2rgb(bayer_image)
elif image_data.encoding == "mono8":
gray_image = bridge.imgmsg_to_cv2(image_data, "mono8")
color_image = cv2.cvtColor(gray_image, cv2.COLOR_GRAY2RGB)
elif image_data.encoding == "rgb8":
color_image = bridge.imgmsg_to_cv2(image_data, "rgb8")
else:
raise ValueError("Unknown image encoding:", image_data.encoding)
time_stamp = image_data.header.stamp.sec + image_data.header.stamp.nanosec / 1.0e9
with lock:
last_input_image = (color_image, time_stamp)
# ==================================================================================================
def callback_poses(pose_data):
global last_input_image, lock
ptime = time.time()
if last_input_image is None:
return
# Convert pose data from json string
poses = json.loads(pose_data.data)
# Collect inputs
images_2d = []
timestamps = []
with lock:
img = np.copy(last_input_image[0])
ts = last_input_image[1]
images_2d.append(img)
timestamps.append(ts)
# Visualize 2D poses
bodies2D = poses["bodies2D"]
colors = plt.cm.hsv(np.linspace(0, 1, len(bodies2D), endpoint=False)).tolist()
colors = [[int(c[0] * 255), int(c[1] * 255), int(c[2] * 255)] for c in colors]
for i, body in enumerate(bodies2D):
color = list(reversed(colors[i]))
img = utils_view.draw_body_in_image(img, body, poses["joints"], color)
# Publish image with poses
publish(img)
msg = "Visualization time: {:.3f}s"
print(msg.format(time.time() - ptime))
# ==================================================================================================
def publish(img):
# Publish image data
msg = bridge.cv2_to_imgmsg(img, "rgb8")
publisher_img.publish(msg)
# ==================================================================================================
def main():
global node, publisher_img
# Start node
rclpy.init(args=sys.argv)
node = rclpy.create_node("pose2D_visualizer")
# Quality of service settings
qos_profile = QoSProfile(
reliability=QoSReliabilityPolicy.RELIABLE,
history=QoSHistoryPolicy.KEEP_LAST,
depth=1,
)
# Create subscribers
_ = node.create_subscription(
Image,
img_input_topic,
callback_images,
qos_profile,
)
_ = node.create_subscription(
String,
pose_input_topic,
callback_poses,
qos_profile,
)
# Create publishers
publisher_img = node.create_publisher(Image, img_output_topic, qos_profile)
node.get_logger().info("Finished initialization of pose visualizer")
# Run ros update thread
rclpy.spin(node)
node.destroy_node()
rclpy.shutdown()
# ==================================================================================================
if __name__ == "__main__":
main()

View File

@ -0,0 +1,4 @@
[develop]
script_dir=$base/lib/pose2D_visualizer
[install]
install_scripts=$base/lib/pose2D_visualizer

View File

@ -0,0 +1,23 @@
from setuptools import setup
package_name = "pose2D_visualizer"
setup(
name=package_name,
version="0.0.0",
packages=[package_name],
data_files=[
("share/ament_index/resource_index/packages", ["resource/" + package_name]),
("share/" + package_name, ["package.xml"]),
],
install_requires=["setuptools"],
zip_safe=True,
maintainer="root",
maintainer_email="root@todo.todo",
description="TODO: Package description",
license="TODO: License declaration",
tests_require=["pytest"],
entry_points={
"console_scripts": ["pose2D_visualizer = pose2D_visualizer.pose2D_visualizer:main"],
},
)

View File

@ -0,0 +1,27 @@
# Copyright 2015 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import pytest
from ament_copyright.main import main
# Remove the `skip` decorator once the source file(s) have a copyright header
@pytest.mark.skip(
reason="No copyright header has been placed in the generated source file."
)
@pytest.mark.copyright
@pytest.mark.linter
def test_copyright():
rc = main(argv=[".", "test"])
assert rc == 0, "Found errors"

View File

@ -0,0 +1,25 @@
# Copyright 2017 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import pytest
from ament_flake8.main import main_with_errors
@pytest.mark.flake8
@pytest.mark.linter
def test_flake8():
rc, errors = main_with_errors(argv=[])
assert rc == 0, "Found %d code style errors / warnings:\n" % len(
errors
) + "\n".join(errors)

View File

@ -0,0 +1,23 @@
# Copyright 2015 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import pytest
from ament_pep257.main import main
@pytest.mark.linter
@pytest.mark.pep257
def test_pep257():
rc = main(argv=[".", "test"])
assert rc == 0, "Found code style errors / warnings"