Files
RapidPoseTriangulation/ros/pose2D_visualizer/pose2D_visualizer/pose2D_visualizer.py
2025-01-15 16:08:24 +01:00

153 lines
4.1 KiB
Python

import json
import os
import sys
import threading
import time
import cv2
from matplotlib import pyplot as plt
import numpy as np
import rclpy
from cv_bridge import CvBridge
from rclpy.qos import QoSHistoryPolicy, QoSProfile, QoSReliabilityPolicy
from sensor_msgs.msg import Image
from std_msgs.msg import String
filepath = os.path.dirname(os.path.realpath(__file__)) + "/"
sys.path.append(filepath + "../../../scripts/")
import test_triangulate
from skelda import utils_view
# ==================================================================================================
bridge = CvBridge()
node = None
publisher_img = None
cam_id = "camera01"
img_input_topic = "/" + cam_id + "/pylon_ros2_camera_node/image_raw"
pose_input_topic = "/poses/" + cam_id
img_output_topic = "/" + cam_id + "/img_with_pose"
last_input_image = None
lock = threading.Lock()
# ==================================================================================================
def callback_images(image_data):
global last_input_image, lock
# Convert into cv images from image string
if image_data.encoding == "bayer_rggb8":
bayer_image = bridge.imgmsg_to_cv2(image_data, "bayer_rggb8")
color_image = test_triangulate.bayer2rgb(bayer_image)
elif image_data.encoding == "mono8":
gray_image = bridge.imgmsg_to_cv2(image_data, "mono8")
color_image = cv2.cvtColor(gray_image, cv2.COLOR_GRAY2RGB)
elif image_data.encoding == "rgb8":
color_image = bridge.imgmsg_to_cv2(image_data, "rgb8")
else:
raise ValueError("Unknown image encoding:", image_data.encoding)
time_stamp = image_data.header.stamp.sec + image_data.header.stamp.nanosec / 1.0e9
with lock:
last_input_image = (color_image, time_stamp)
# ==================================================================================================
def callback_poses(pose_data):
global last_input_image, lock
ptime = time.time()
if last_input_image is None:
return
# Convert pose data from json string
poses = json.loads(pose_data.data)
# Collect inputs
images_2d = []
timestamps = []
with lock:
img = np.copy(last_input_image[0])
ts = last_input_image[1]
images_2d.append(img)
timestamps.append(ts)
# Visualize 2D poses
bodies2D = poses["bodies2D"]
colors = plt.cm.hsv(np.linspace(0, 1, len(bodies2D), endpoint=False)).tolist()
colors = [[int(c[0] * 255), int(c[1] * 255), int(c[2] * 255)] for c in colors]
for i, body in enumerate(bodies2D):
color = list(reversed(colors[i]))
img = utils_view.draw_body_in_image(img, body, poses["joints"], color)
# Publish image with poses
publish(img)
msg = "Visualization time: {:.3f}s"
print(msg.format(time.time() - ptime))
# ==================================================================================================
def publish(img):
# Publish image data
msg = bridge.cv2_to_imgmsg(img, "rgb8")
publisher_img.publish(msg)
# ==================================================================================================
def main():
global node, publisher_img
# Start node
rclpy.init(args=sys.argv)
node = rclpy.create_node("pose2D_visualizer")
# Quality of service settings
qos_profile = QoSProfile(
reliability=QoSReliabilityPolicy.RELIABLE,
history=QoSHistoryPolicy.KEEP_LAST,
depth=1,
)
# Create subscribers
_ = node.create_subscription(
Image,
img_input_topic,
callback_images,
qos_profile,
)
_ = node.create_subscription(
String,
pose_input_topic,
callback_poses,
qos_profile,
)
# Create publishers
publisher_img = node.create_publisher(Image, img_output_topic, qos_profile)
node.get_logger().info("Finished initialization of pose visualizer")
# Run ros update thread
rclpy.spin(node)
node.destroy_node()
rclpy.shutdown()
# ==================================================================================================
if __name__ == "__main__":
main()