提供一個範例程式。
前往套件的 src
目錄裡建立偵測 AprilTag 的程式,我們可以根據之前的 get_video.py
來修改。
#! /usr/bin/env python3
# -*- coding: utf-8 -*-
import rospy
from cv_bridge import CvBridge, CvBridgeError
import cv2
import os
import time
import numpy as np
from pupil_apriltags import Detector
from std_msgs.msg import Float32
from sensor_msgs.msg import Image
from kslab_swarmsim.msg import ApriltagDetection # 自訂訊息
'''
ApriltagDetection.msg 內容:
Header header # 標準的 ROS 訊息標頭
int32 tag_id
float32 center_x
float32 center_y
float32[] corners # 依次為 x0, y0, x1, y1, x2, y2, x3, y3
float32 area
'''
class ApriltagDetector:
def __init__(self):
# 初始化橋接工具和偵測器
self.bridge = CvBridge()
self.at_detector = Detector(families='tag36h11')
# OpenCV 視訊編碼器與設定
self.video_writer = None
self.fourcc = cv2.VideoWriter_fourcc('X', 'V', 'I', 'D')
timestamp = time.strftime("%y%m%d-%H%M%S")
self.dir_path = '~/catkin_ws/src/my_simulation/videos/' # 要改成套件的完整路徑
self.file_name = os.path.join(self.dir_path, f'output_video_{timestamp}.avi')
self.frame_size = None # 將在接收到第一幀時初始化
self.frame_area = None
# 第一次錄影時建立 videos 目錄
if not os.path.exists(self.dir_path):
os.makedirs(self.dir_path)
rospy.sleep(1)
rospy.loginfo("Apriltag detector initialized.")
rospy.loginfo("Video writer initialized.")
rospy.on_shutdown(self.clean_up)
self.image_sub = rospy.Subscriber('/ardrone/down_camera/image_raw', Image, self.image_callback)
self.tag_pub = rospy.Publisher('/apriltag/detection', ApriltagDetection, queue_size=10)
# 等待訂閱者和發布者完全連線
rospy.sleep(1)
rospy.loginfo("Pub, sub node initialized.")
# 處理影像的回呼函數
def image_callback(self, msg):
try:
# 取得 ROS 訊息並轉換成 OpenCV 影像
cv_image = self.bridge.imgmsg_to_cv2(msg, desired_encoding='bgr8')
print("Accepted camera image sucessfully.")
# 初始化 video_writer
if self.video_writer is None:
height, width, channel = cv_image.shape
self.frame_size = (width, height)
self.frame_area = width * height
self.video_writer = cv2.VideoWriter(self.file_name, self.fourcc, 30.0, self.frame_size)
print("Start recording.")
if not self.video_writer.isOpened():
rospy.logerr("Fail to open video writer")
rospy.signal_shutdown("Fail to open video writer")
# 轉換影像為灰階格式才能讓 Apriltag 偵測
gray_image = cv2.cvtColor(cv_image, cv2.COLOR_BGR2GRAY)
tags = self.at_detector.detect(gray_image)
# 在影像上偵測第一個出現的 Apriltag
if tags:
tag = tags[0]
# 儲存自訂訊息
detection_msg = ApriltagDetection()
detection_msg.header.stamp = rospy.Time.now()
detection_msg.tag_id = tag.tag_id
detection_msg.center_x = float(tag.center[0])
detection_msg.center_y = float(tag.center[1])
detection_msg.corners = [float(corner) for point in tag.corners for corner in point]
detection_msg.area = cv2.contourArea(np.array(tag.corners, dtype=np.float32))
# 發布偵測結果
self.tag_pub.publish(detection_msg)
# 在鏡頭畫面上標記的中心座標和角落座標繪製四個邊線
cv2.circle(cv_image, (int(tag.center[0]), int(tag.center[1])), 10, (0, 255, 0), -1)
for idx in range(4):
cv2.line(cv_image,
(int(tag.corners[idx][0]), int(tag.corners[idx][1])),
(int(tag.corners[(idx + 1) % 4][0]), int(tag.corners[(idx + 1) % 4][1])),
(255, 0, 0), 2)
self.video_writer.write(cv_image)
cv2.imshow("ARDrone Down Camera with Apriltag Detection", cv_image)
key = cv2.waitKey(1)
# 在影像視窗按下 q 鍵時關閉 ROS 節點
if key == ord('q'):
rospy.signal_shutdown("User requested shutdown")
except Exception as e:
rospy.logerr("Exception: {0}".format(e))
def clean_up(self):
# 釋放影片資源
if self.video_writer is not None:
self.video_writer.release()
rospy.loginfo(f"Video saved as {self.file_name}")
# 釋放視窗資源
cv2.destroyAllWindows()
rospy.loginfo("Shutting down video capture node")
def run(self):
rospy.spin()
if __name__ == "__main__":
try:
rospy.init_node('apriltag_detector', anonymous=True)
apriltag_detector = ApriltagDetector()
apriltag_detector.run()
except rospy.ROSInterruptException:
pass