延續前篇的偵測節點,根據 simple_move.py
修改控制節點,加入對齊的部份。
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import rospy
from cv_bridge import CvBridge, CvBridgeError
import cv2
import numpy as np
import math
import time
from geometry_msgs.msg import Twist
from std_msgs.msg import Float64, Bool, Empty
from sensor_msgs.msg import Image
from kslab_swarmsim.msg import ApriltagDetection # 自訂訊息
'''
ApriltagDetection.msg 內容:
Header header # 標準的 ROS 訊息標頭
int32 tag_id
float32 center_x
float32 center_y
float32[] corners # 依次為 x0, y0, x1, y1, x2, y2, x3, y3
float32 area
'''
class ApriltagMove:
def __init__(self):
self.image_width = 640
self.image_height = 360
self.target_area = 6000 # 目標標記面積,用於估計距離
self.position_threshold = 20 # 位置誤差的閾值
self.area_threshold = 1000 # 面積的容許誤差
# 控制增益參數
self.kp_x = 0.005
self.kp_y = 0.005
self.kp_z = 0.0001
self.kp_yaw = 0.5
self.rate = rospy.Rate(10)
self.state = "TAKEOFF"
'''
定義幾個狀態:
"TAKEOFF",
"SEARCHING_TAG",
"ALIGNING",
"LANDING"
'''
self.search_timeout = 20 # 設定搜索超時時間(秒)
# 訂閱 Apriltag 訊息, 發布速度訊息
self.takeoff_pub = rospy.Publisher('/ardrone/takeoff', Empty, queue_size=10)
self.land_pub = rospy.Publisher('/ardrone/land', Empty, queue_size=10)
self.tag_sub = rospy.Subscriber('/apriltag/detection', ApriltagDetection, self.tag_callback)
self.cmd_pub = rospy.Publisher('/ardrone/cmd_vel', Twist, queue_size=10)
self.takeoff_msg = Empty()
self.move_cmd_msg = Twist()
self.land_msg = Empty()
self.align_cmd_msg = Twist()
# 等待訂閱者和發布者完全連線
rospy.sleep(1)
self.last_detection_time = rospy.Time.now()
rospy.loginfo("Apriltag follower initialized.")
def control_loop(self):
while not rospy.is_shutdown():
if self.state == "TAKEOFF":
print(self.state)
self.takeoff()
self.height_adjust(5, 1)
self.state = "SEARCHING_TAG"
elif self.state == "SEARCHING_TAG":
print(self.state)
# 檢查是否超時
current_time = rospy.Time.now()
time_since_last_detection = (current_time - self.last_detection_time).to_sec()
if time_since_last_detection > self.search_timeout:
rospy.logwarn("No Apriltag detected for {:.1f} seconds. Stopping...".format(self.search_timeout))
self.cmd_pub.publish(Twist())
rospy.sleep(1)
rospy.signal_shutdown("Apriltag not found within timeout.")
else:
self.move_cmd_msg.linear.x = 1 # 前進速度
connections = self.cmd_pub.get_num_connections()
if connections > 0:
rospy.loginfo("Moving forward and searching for Apriltag...")
self.cmd_pub.publish(self.move_cmd_msg) # 持續前進
elif self.state == "ALIGNING":
print(self.state)
# 對齊的控制在 tag_callback 中處理
pass
elif self.state == "LANDING":
print(self.state)
self.height_adjust(5, -1)
self.land()
rospy.signal_shutdown("Landing completed.")
self.rate.sleep()
# 偵測到 Apriltag,停止前進,進入對齊狀態
def tag_callback(self, msg):
self.last_detection_time = rospy.Time.now()
if self.state == "SEARCHING_TAG":
rospy.loginfo("Apriltag detected, stopping forward motion and aligning...")
self.cmd_pub.publish(Twist())
self.state = "ALIGNING"
if self.state == "ALIGNING":
try:
# 計算位置誤差(影像中心與標記中心的偏差)
error_x = (msg.center_x - self.image_width / 2)
error_y = (msg.center_y - self.image_height / 2)
error_area = self.target_area - msg.area
yaw_angle = self.calculate_tag_yaw(msg.corners)
if (abs(error_x) < self.position_threshold and
abs(error_y) < self.position_threshold):
# 停止對齊xy, 開始對齊z
self.align_cmd_msg = Twist()
self.align_cmd_msg.linear.z = -self.kp_z * error_area
self.cmd_pub.publish(self.align_cmd_msg)
rospy.sleep(1)
rospy.loginfo("Aligning z axis.")
print(f"error_area: {error_area}, self.area_threshold:{self.area_threshold}, msg.area: {msg.area}")
if abs(error_area) < self.area_threshold and self.state != "LANDING":
# 停止對齊z
self.align_cmd_msg = Twist()
self.cmd_pub.publish(self.align_cmd_msg)
rospy.sleep(1)
rospy.loginfo("Alignment achieved.")
self.state = "LANDING"
else:
# 計算控制指令
self.align_cmd_msg.linear.x = -self.kp_x * error_y # 無人機的 x 軸正向是鏡頭畫面 y 軸負向,因此用 error_y
self.align_cmd_msg.linear.y = -self.kp_y * error_x # 無人機的 y 軸正向是鏡頭畫面 x 軸負向,因此用 error_x
self.align_cmd_msg.angular.z = -self.kp_yaw * yaw_angle # 不考慮 roll 和 pitch,只調整 yaw
# 發布控制指令
self.cmd_pub.publish(self.align_cmd_msg)
rospy.loginfo("Aligning x, y plane.")
print(f"error_x: {error_x}, error_y:{error_y}, yaw_angle: {yaw_angle}")
except Exception as e:
rospy.logerr("Error in detection_callback: {}".format(e))
def calculate_tag_yaw(self, corners):
# 根據標記的角落座標計算在影像中的旋轉角度, 取標記上邊緣來計算
# Apriltag 的角落順序為:[x0, y0, x1, y1, x2, y2, x3, y3] 分別對應於左上、右上、右下、左下
dx = corners[2] - corners[0] # x1 - x0
dy = corners[3] - corners[1] # y1 - y0
angle = math.atan2(dy, dx) # 弧度
return angle # 返回弧度值
def takeoff(self):
connections = self.takeoff_pub.get_num_connections()
if connections > 0:
rospy.loginfo("Taking off...")
self.takeoff_pub.publish(self.takeoff_msg)
rospy.sleep(2)
def land(self):
connections = self.land_pub.get_num_connections()
if connections > 0:
rospy.loginfo("Landing...")
self.land_pub.publish(self.land_msg)
rospy.sleep(2)
def height_adjust(self, duration, speed):
self.move_cmd_msg = Twist()
self.move_cmd_msg.linear.z = speed
connections = self.cmd_pub.get_num_connections()
if connections > 0:
rospy.loginfo("Height adjusting...")
self.cmd_pub.publish(self.move_cmd_msg)
rospy.sleep(duration)
# 停止無人機運動
self.move_cmd_msg.linear.z = 0.0
self.cmd_pub.publish(self.move_cmd_msg)
# 等待執行完成
rospy.sleep(2)
def run(self):
# 開始自動控制流程
self.control_loop()
rospy.spin()
if __name__ == "__main__":
rospy.init_node('apriltag_move', anonymous=True)
try:
ap_move = ApriltagMove()
ap_move.run()
except rospy.ROSInterruptException:
pass