2025-05-29 23:06:51 +08:00

825 lines
31 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import sys
current_file_path = os.path.abspath(__file__)
dobot_nova5_path = os.path.dirname(os.path.dirname(os.path.dirname(current_file_path)))
print(dobot_nova5_path)
# 添加目标文件夹到系统路径
sys.path.append(dobot_nova5_path)
try:
from dobot_api import DobotApiFeedBack,DobotApiDashboard
except:
from hardware.dobot_api import DobotApiFeedBack,DobotApiDashboard
try:
from .dobot_message import *
except:
from dobot_message import *
import threading
from time import sleep,time
import re
from tools.log import CustomLogger
import copy
import numpy as np
from scipy.spatial.transform import Rotation as R
import math
import time
import requests
import atexit
'''
MODE DESCRIPTION CN
1 INIT 初始化
2 BREAK_OPEN 抱闸松开
3 POWER_OFF 本体下电
4 DISABLED 未使能(抱闸未松)
5 ENABLE 使能(空闲)
6 BACKDRIVE 拖拽
7 RUNNING 运行状态(脚本/TCP)
8 SINGLE_MOVE 单次运动(点动)
9 ERROR 错误状态
10 PAUSE 暂停状态
11 COLLISION 碰撞状态
MODE RANK
报错 I
下电 II
碰撞 III
开抱闸 IV
'''
class dobot_nova5:
def __init__(self,arm_ip = '192.168.5.1',dashboardPort = 29999,feedFourPort = 30004):
self.ip = arm_ip ## 机械臂IP地址
self.dashboardPort = dashboardPort ## 一发一收-设置-控制及运动端口号
self.feedFourPort = feedFourPort ## 第四实时反馈(8ms)反馈数据端口号
self.dashboard = None ## 初始化控制及运动端口实例
self.feedFour = None ## 初始化数据反馈端口实例
self.feedInfo = []
self.__globalLockValue = threading.Lock() # 多线程安全
class feedbackItem:
def __init__(self):
self.robotMode = -1
self.robotCurrentCommandID = 0
self.MessageSize = -1
# self.DigitalInputs = -1
# self.DigitalOutputs = -1
self.EnableState = False
self.QActual = -1
self.ActualQuaternion = -1
self.ToolVectorActual = -1
# ... 自定义反馈数据
'''
self.UserValue = -1
self.ToolValue = -1
'''
self.feedbackData = feedbackItem() # 自定义反馈数据结构体
self.logger = CustomLogger('Dobot_nova5')
# try:
# self.dashboard = DobotApiDashboard(self.ip, self.dashboardPort)
# if not self.dashboard.connect():
# print(f"[ERROR] 无法连接 Dashboard {self.ip}:{self.dashboardPort}")
# self.dashboard = None
# else:
# print("[INFO] Dashboard 连接成功")
# except Exception as e:
# print(f"[ERROR] Dashboard 初始化失败: {e}")
# self.dashboard = None
# 尝试连接 Dashboard 控制接口(默认 29999 端口)
try:
self.dashboard = DobotApiDashboard(self.ip, self.dashboardPort)
if self.dashboard.is_connected():
print("[INFO] Dashboard 连接成功")
else:
print("[ERROR] Dashboard 未连接成功")
self.dashboard = None
except Exception as e:
print(f"[ERROR] Dashboard 连接失败: {e}")
self.dashboard = None
# 尝试连接 Feedback 接口(默认 30003 端口)
try:
self.feedFour = DobotApiFeedBack(self.ip, self.feedFourPort)
if self.feedFour.is_connected():
print("[INFO] Feedback 连接成功")
else:
print("[ERROR] Feedback 未连接成功")
self.feedFour = None
except Exception as e:
print(f"[ERROR] Feedback 连接失败: {e}")
self.feedFour = None
# self.dashboard = DobotApiDashboard(self.ip,self.dashboardPort)
# self.feedFour = DobotApiFeedBack(self.ip,self.feedFourPort)
self.stop_feedback = threading.Event()
self.stop_feedback.clear()
# 状态反馈线程
self.feedback_thread = threading.Thread(
target=self.GetFeedback # 机器状态反馈线程
)
self.feedback_thread.daemon = True
self.feedback_thread.start()
time.sleep(1)
self.dashboard.RequestControl()
max_retry_num = 5 #尝试连接次数
cnt_num = 0
while cnt_num < max_retry_num:
if self.feedbackData.robotMode != -1:
self.dashboard.EmergencyStop(mode=0) # 松开急停
self.clearError() # 清除报警
# stopId = self.dashboard.Stop()
time.sleep(0.5)
print("self.self.feedbackData.robotMode:",self.feedbackData.robotMode)
if (self.feedbackData.robotMode == 3 or self.feedbackData.robotMode == 4): #and self.parseResultId(stopId)[0] == 0:
# self.dashboard.RequestControl()
if self.feedbackData.robotMode == 3:
self.dashboard.PowerOn() # 复位
# self.dashboard.Stop() # 停止
if self.start_up(): # 启动
self.logger.log_info(f"连接机械臂成功")
break
else:
self.logger.log_error(f"启动失败,尝试第{cnt_num+1}次连接")
cnt_num += 1
time.sleep(1)
else:
if self.feedbackData.robotMode == 9:# and self.parseResultId(stopId)[0] == -3:
print(robot_mode_types.get(self.feedbackData.robotMode, "未知模式"))
requests.post(
"http://127.0.0.1:5000/on_message", data={"message": "机械臂处于锁定状态<br>请解除锁定后再使用"}
)
instruction = {
"is_massaging": False,
"massage_service_started": False
}
requests.post(
"http://127.0.0.1:5000/update_massage_status", data=instruction
)
sys.exit(0)
self.disableRobot()
self.clearError() # 清除报警
cnt_num += 1
time.sleep(1)
else:
self.logger.log_error(f"连接机械臂失败,尝试第{cnt_num+1}次连接")
cnt_num += 1
time.sleep(1)
atexit.register(self.exit_task)
self.init_pos = [-45.0009079*np.pi/180,55.5785141*np.pi/180,-120.68821716*np.pi/180,5.11103201*np.pi/180,90.00195312*np.pi/180,-90.00085449*np.pi/180]
# 为状态管理而封装的初始化函数
def init(self):
self.is_exit = False
self.traj_list = None
self.last_pose_command = np.zeros(6)
self.last_input_command = None
self.tcp_offset = None
self.filter_matirx = np.zeros((1,7)) # 位姿伺服用
self.filter_matrix = np.zeros((1,6)) # 角度伺服用
sleep(1)
self.is_standby = False
code = self.move_joint(self.init_pos)
if code == 0:
if self.parseResultId(self.dashboard.SetCollisionLevel(level=1))[0] == 0:
self.logger.log_info("机械臂初始化")
else:
time.sleep(0.2)
max_retries = 5
retry_count = 0
while retry_count < max_retries:
if self.parseResultId(self.dashboard.SetCollisionLevel(level=1))[0] == 0:
print("retry_count",retry_count)
break
else:
time.sleep(0.1)
if retry_count == max_retries:
self.logger.log_error("机械臂初始化失败")
requests.post(
"http://127.0.0.1:5000/on_message", data={"message": "机械臂初始化失败"}
)
self.is_exit = True
sys.exit(0)
else:
self.logger.log_error("机械臂初始化失败")
requests.post(
"http://127.0.0.1:5000/on_message", data={"message": "机械臂初始化失败"}
)
self.is_exit = True
sys.exit(0)
self.last_print_time=0
self.last_reocrd_time =0
def exit_task(self):
self.logger.log_yellow("退出任务")
# self.disable()
instruction = {
"is_massaging": False,
"massage_service_started": False
}
requests.post(
"http://127.0.0.1:5000/update_massage_status", data=instruction
)
time.sleep(1)
# self.move_joint(self.off_pos)
def parseResultId(self,valueRecv):
# 解析返回值
if "Not Tcp" in valueRecv:
print("Control Mode is not TCP")
print("机械臂不在二次开发模式下")
return [1]
return [int(num) for num in re.findall(r'-?\d+',valueRecv)] or [2]
def GetFeedback(self):
while self.stop_feedback.is_set() == False:
feedInfo = self.feedFour.feedBackData()
with self.__globalLockValue:
if feedInfo is not None:
if hex((feedInfo['TestValue'][0]))=='0x123456789abcdef': ## DOBOT官方的数据魔数验证
# 基础字段
self.feedbackData.MessageSize = feedInfo['len'][0]
self.feedbackData.robotMode = feedInfo['RobotMode'][0]
# self.feedbackData.DigitalInputs = feedInfo['DigitalInputs'][0]
# self.feedbackData.DigitalOutputs = feedInfo['DigitalOutputs'][0]
self.feedbackData.EnableState = feedInfo['EnableStatus'][0]
self.feedbackData.robotCurrentCommandID = feedInfo['CurrentCommandId'][0]
self.feedbackData.QActual = feedInfo['QActual'][0]
self.feedbackData.ActualQuaternion = feedInfo['ActualQuaternion'][0]
self.feedbackData.ToolVectorActual = feedInfo['ToolVectorActual'][0]
# 自定义添加所需反馈数据
'''
self.feedData.DigitalOutputs = int(feedInfo['DigitalOutputs'][0])
self.feedData.RobotMode = int(feedInfo['RobotMode'][0])
self.feedData.TimeStamp = int(feedInfo['TimeStamp'][0])
self.feedbackData.UserValue = feedInfo['UserValue[6]'][0]
self.feedbackData.ToolValue = feedInfo['ToolValue[6]'][0]
'''
def start_up(self):
max_try = 3 # 尝试次数
cnt = 0 # 尝试次数计数
while cnt < max_try:
cnt += 1
# 关闭碰撞检测
# if self.parseResultId(self.dashboard.SetCollisionLevel(level=0))[0] == 0:
time.sleep(0.1)
self.logger.log_info("关闭碰撞检测成功")
code = self.parseResultId(self.dashboard.EnableRobot())[0]
if code == 0:
while 1:
if self.feedbackData.robotMode in [3, 6, 7, 8, 9, 10, 11]:
self.logger.log_error(f"使能失败,机械臂异常:{self.feedbackData.robotMode}")
return False
elif self.feedbackData.robotMode == 5:
self.logger.log_info("机械臂使能成功")
return True
time.sleep(0.1)
# def disable_servo(self):
# pass
def enable_servo(self):
pass
def waitArrival(self, command):
sendCommandID = self.parseResultId(command)[1]
while True:
if self.feedbackData.robotMode in [3, 4, 6, 8, 9, 10, 11]:
self.logger.log_error(f"机械臂异常:{self.feedbackData.robotMode}")
print(robot_mode_types.get(self.feedbackData.robotMode, "未知模式"))
self.dashboard.Stop()
self.dashboard.EmergencyStop(mode=1)
return -1
if self.feedbackData.EnableState:
if self.feedbackData.robotCurrentCommandID > sendCommandID:
break
else:
isFinsh = (self.feedbackData.robotMode == 5)
if self.feedbackData.robotCurrentCommandID == sendCommandID and isFinsh:
break
sleep(0.1)
return 0
def move_joint(self,q,**params):
'''
RUNPOINT_P 以关节运动方式运动到目标点
MovJ flag = 1 通过关节角度走点,输入为[J1 J2 J3 J4 J5 J6] unit: rad
'''
# 单位 rad -> deg
max_retry_count = params.get('max_retry_count', 3)
cnt = 0
joints_list_deg = self.__transform_rad_2_deg(q)
while cnt < max_retry_count:
cnt += 1
command = self.dashboard.MovJ(*joints_list_deg,1)
is_arrived = self.waitArrival(command)
if is_arrived == 0:
print(f'Arrived at the joint position: {q}')
return 0
else:
print(f'Failed to arrive at the joint position: {q}')
return -1
print(f'After {cnt} retries, still failed to arrive at the joint position: {q}')
return -1
def ServoJoint(self,joints_list):
'''
关节角度伺服
输入为[J1 J2 J3 J4 J5 J6] unit: deg
'''
tcp_command = f"servoj({joints_list[0]},{joints_list[1]},{joints_list[2]},{joints_list[3]},{joints_list[4]},{joints_list[5]}, t=0.01, aheadtime=20, gain=200)"
self.dashboard.send_data(tcp_command) # 通过反馈状态来进行异常处理,故这里只需要发,不需要收
print("ServoJ:",tcp_command)
if self.feedbackData.robotMode == 9 or self.feedbackData.robotMode == 11:
self.dashboard.Stop()
self.dashboard.EmergencyStop(mode=1)
print("当前工作停止,机械臂处于急停状态")
return -1
if self.feedbackData.robotMode == 10: # 10 当前工程暂停
print("机械臂为暂停状态")
res = self.dashboard.Continue()
code = self.parseResultId(res)[0]
if code == 0:
print("机械臂继续已暂停的运动指令")
return 0
def ServoJoint_RAD(self,joints_list):
'''
关节角度伺服
输入为[J1 J2 J3 J4 J5 J6] unit: rad
'''
# 单位转换 rad -> deg
joints_list_deg = self.__transform_rad_2_deg(joints_list)
# 构造字符串
tcp_command = f"servoj({joints_list_deg[0]},{joints_list_deg[1]},{joints_list_deg[2]},{joints_list_deg[3]},{joints_list_deg[4]},{joints_list_deg[5]}, t=0.01, aheadtime=20, gain=200)"
self.dashboard.send_data(tcp_command) # 通过反馈状态来进行异常处理,故这里只需要发,不需要收
print("ServoJ:",tcp_command)
if self.feedbackData.robotMode == 9 or self.feedbackData.robotMode == 11:
self.dashboard.Stop()
self.dashboard.EmergencyStop(mode=1)
print("当前工作停止,机械臂处于急停状态")
return -1
if self.feedbackData.robotMode == 10: # 10 当前工程暂停
print("机械臂为暂停状态")
res = self.dashboard.Continue()
code = self.parseResultId(res)[0]
if code == 0:
print("机械臂继续已暂停的运动指令")
return 0
def ServoPose(self,poses_list):
'''
位姿伺服
输入为[X Y Z RX RY RZ] unit: mm/deg
'''
poses_list = list(map(float, poses_list))
tcp_command = f"ServoP({poses_list[0]},{poses_list[1]},{poses_list[2]},{poses_list[3]},{poses_list[4]},{poses_list[5]}, t=0.01, aheadtime=20, gain=300)"
# res = self.dashboard.sendRecvMsg(tcp_command) # 通过反馈状态来进行异常处理,故这里只需要发,不需要收
self.dashboard.send_data(tcp_command)
# print("ServoP msg:",res)
if self.feedbackData.robotMode == 9 or self.feedbackData.robotMode == 11:
self.dashboard.Stop()
self.dashboard.EmergencyStop(mode=1)
print("当前工作停止,机械臂处于急停状态")
if self.feedbackData.robotMode == 10: # 10 当前工程暂停
print("机械臂为暂停状态")
res = self.dashboard.Continue()
code = self.parseResultId(res)[0]
if code == 0:
print("机械臂继续已暂停的运动指令")
def ServoPose_M_RAD(self,poses_list):
'''
位姿伺服
输入为[X Y Z RX RY RZ] unit: M/RAD
'''
# 单位 m/rad -> mm/deg
poses_list_mm_deg = self.__transform_m_rad_2_mm_deg(poses_list)
# 确保数据类型正确
poses_list_mm_deg = list(map(float, poses_list_mm_deg))
# 构造字符串
tcp_command = f"ServoP({poses_list[0]},{poses_list[1]},{poses_list[2]},{poses_list[3]},{poses_list[4]},{poses_list[5]}, t=0.01, aheadtime=20, gain=200)"
res = self.dashboard.sendRecvMsg(tcp_command) # 通过反馈状态来进行异常处理,故这里只需要发,不需要收
print("ServoP msg:",res)
if self.feedbackData.robotMode == 9 or self.feedbackData.robotMode == 11:
self.dashboard.Stop()
self.dashboard.EmergencyStop(mode=1)
print("当前工作停止,机械臂处于急停状态")
if self.feedbackData.robotMode == 10: # 10 当前工程暂停
print("机械臂为暂停状态")
res = self.dashboard.Continue()
code = self.parseResultId(res)[0]
if code == 0:
print("机械臂继续已暂停的运动指令")
def setEndEffector(self,i,tool_i):
'''
设置工具坐标系在索引i. i!=0
tool_i 需要以字符串形式输入 "{x,y,z,rx,ry,rz}"
'''
if i == 0:
print("不可修改工具0(初始法兰坐标系)")
return
if i not in {0, 1, 2, 3, 4, 5, 6, 7, 8, 9}:
print("工具坐标系序号超出范围")
return
res = self.dashboard.SetTool(i,tool_i)
print(res)
code = self.parseResultId(res)[0]
if code == 0:
print(f"设置工具坐标系{i}的偏移量成功")
else:
print(f"设置工具坐标系{i}失败")
return
def chooseEndEffector(self,i):
'''
切换工具坐标系, 0~9
'''
if i not in {0, 1, 2, 3, 4, 5, 6, 7, 8, 9}:
print("工具坐标系序号超出范围")
return
res = self.dashboard.Tool(i)
print(res)
code = self.parseResultId(res)[0]
if code == 0:
print(f"切换为工具坐标系{i}成功")
else:
print("切换工具坐标系失败")
return
def setBaseFrame(self,i,base_i):
'''
设置工具坐标系在索引i. i!=0
base_i 修改后的用户坐标系(非偏移量!) 需要以字符串形式输入 "{x,y,z,rx,ry,rz}"
'''
if i == 0:
print("不可修改用户坐标系0(初始基坐标系)")
return
if i not in {0, 1, 2, 3, 4, 5, 6, 7, 8, 9}:
print("用户坐标系序号超出范围")
return
res = self.dashboard.SetUser(i,base_i)
code = self.parseResultId(res)[0]
if code == 0:
print(f"设置用户坐标系{i}成功")
else:
print(f"设置用户坐标系{i}失败")
return
def chooseBaseFrame(self,i):
'''
切换用户基坐标系, 0~9
'''
if i not in {0, 1, 2, 3, 4, 5, 6, 7, 8, 9}:
print("工具坐标系序号超出范围")
return
res = self.dashboard.User(i)
code = self.parseResultId(res)[0]
if code == 0:
print(f"切换为用户基坐标系{i}成功")
else:
print("切换用户基坐标系失败")
return
def getPose(self,user=-1,tool=-1):
if (user == -1) != (tool == -1): ## XOR 必须同时设置或者同时不设置
print("必须同时设置或同时不设置坐标系参数")
res = self.dashboard.GetPose(user,tool) # 返回字符串
print("get pose",res)
code = self.parseResultId(res)[0]
if code == 0:
pass
# print("success")
else:
print("获取位姿失败")
return
# 处理字符串
start = res.find("{")+1
end = res.find("}")
pose_str = res[start:end]
pose = [float(x.strip()) for x in pose_str.split(",")]
return pose
def getAngel(self):
res = self.dashboard.GetAngle()
code = self.parseResultId(res)[0]
if code == 0:
pass
# print("success")
else:
print("获取关节角度失败")
return
# 处理字符串
start = res.find("{")+1
end = res.find("}")
angle_str = res[start:end]
print("res:",res)
angle = [float(x.strip()) for x in angle_str.split(",")]
return angle
def getForwardKin(self,joints_list,user=-1,tool=-1):
res = self.dashboard.PositiveKin(*joints_list,user,tool)
code = self.parseResultId(res)[0]
if code == 0:
pass
# print("success")
else:
print("获取正向运动学解失败")
return
# 处理字符串
start = res.find("{")+1
end = res.find("}")
pose_str = res[start:end]
pose = [float(x.strip()) for x in pose_str.split(",")]
return pose
def getInverseKin(self,poses_list,user=-1,tool=-1,useJointNear=-1,JointNear=''):
'''
poses_list X Y Z RX RY RZ
useJointNear 0 或不携带 表示 JointNear无效
1 表示根据JointNear就近选解
jointNear string "jointNear={j1,j2,j3,j4,j5,j6}"
'''
res = self.dashboard.InverseKin(*poses_list,user,tool,useJointNear,JointNear)
print(res)
code = self.parseResultId(res)[0]
if code == 0:
pass
# print("success")
else:
print("获取逆向运动学解失败")
return
# 处理字符串
start = res.find("{")+1
end = res.find("}")
joint_str = res[start:end]
joint = [float(x.strip()) for x in joint_str.split(",")]
return joint
def disableRobot(self):
res = self.dashboard.DisableRobot()
code = self.parseResultId(res)[0]
if code == 0:
print("下使能机械臂成功")
else:
print("下使能机械臂失败")
return
def clearError(self):
res = self.dashboard.ClearError()
code = self.parseResultId(res)[0]
if code == 0:
print("清楚报警成功")
else:
print("清楚报警失败")
return
def start_drag(self):
res = self.dashboard.StartDrag()
code = self.parseResultId(res)[0]
if code == 0:
print("拖拽模式开启成功")
else:
print("拖拽模式开启失败")
return
def stop_drag(self):
res = self.dashboard.StopDrag()
code = self.parseResultId(res)[0]
if code == 0:
print("拖拽模式关闭成功")
else:
print("拖拽模式关闭失败")
return
def stop_motion(self):
res = self.dashboard.Stop()
code = self.parseResultId(res)[0]
if code == 0:
print("机械臂停止下发运动指令队列")
else:
print("停止下发运动失败,需立即拍下急停")
return
def __del__(self):
del self.dashboard
del self.feedFour
## 内部工具函数
def __transform_m_rad_2_mm_deg(self,input_list):
'''
接受列表 [X Y Z RX RY RZ]
将单位 m/rad 转为 mm/deg
'''
if len(input_list) != 6:
raise ValueError("Input list must contain exactly 6 elements [X, Y, Z, RX, RY, RZ]")
# 转换位置单位m -> mm (乘以 1000)
x_mm = input_list[0] * 1000.0
y_mm = input_list[1] * 1000.0
z_mm = input_list[2] * 1000.0
# 转换角度单位rad -> deg (乘以 180/pi)
rx_deg = math.degrees(input_list[3])
ry_deg = math.degrees(input_list[4])
rz_deg = math.degrees(input_list[5])
return [x_mm, y_mm, z_mm, rx_deg, ry_deg, rz_deg]
def __transform_rad_2_deg(self,input_list):
'''
接受列表 [J1 J2 J3 J4 J5 J6]
将单位rad转为deg
'''
if len(input_list) != 6:
raise ValueError("Input list must contain exactly 6 elements [J1, J2, J3, J4, J5, J6]")
# 转换角度单位: rad -> deg
J_deg = []
for i in range(6):
J_deg.append(math.degrees(input_list[i]))
return J_deg
## 为反馈线程封装
def Get_feedInfo_angle(self):
# print("反馈当前角度",self.feedbackData.QActual)
return self.feedbackData.QActual
def Get_feedInfo_pose(self):
# print("反馈当前TCP位置",self.feedbackData.ToolVectorActual)
return self.feedbackData.ToolVectorActual
## 适配中间层的再封装接口
# # 为状态管理而封装的初始化函数
# def init(self):
# self.is_exit = False
# self.traj_list = None
# self.last_pose_command = np.zeros(6)
# self.last_input_command = None
# self.tcp_offset = None
# self.init_J = [-45.0009079,55.5785141,-120.68821716,5.11103201,90.00195312,-90.00085449]
# self.filter_matirx = np.zeros((1,7)) # 位姿伺服用
# self.filter_matrix = np.zeros((1,6)) # 角度伺服用
# sleep(1)
# self.is_standby = False
# code = self.RunPoint_P_inJoint(self.init_J)
# if code == 0:
# print("机械臂初始化成功且到达初始位置")
# else:
# print("机械臂初始化失败")
def get_pose_at_flange(self):
'''
输出pos,quat
pos: [x,y,z] in m
quat: [qx,qy,qz,qw] 四元数
'''
pose = self.getPose(user=0, tool=0) # 获取初始法兰坐标系下的位姿
x,y,z,rx,ry,rz = pose # x y z in mm; rx ry rz in degree
x /= 1000
y /= 1000
z /= 1000
pos = np.array([x,y,z])
quat_rot = R.from_euler('xyz', [rx, ry, rz], degrees=True).as_quat()
return pos,quat_rot
def get_end_position(self,tool=1):
'''
工具默认为1
输出pos,quat
pos: [x,y,z] in m
quat: [qx,qy,qz,qw] 四元数
'''
pose = self.getPose(user=0, tool=tool)
x,y,z,rx,ry,rz = pose # x y z in mm; rx ry rz in degree
x /= 1000
y /= 1000
z /= 1000
pos = np.array([x,y,z])
quat_rot = R.from_euler('xyz', [rx, ry, rz], degrees=True).as_quat()
return pos,quat_rot
def get_arm_position(self):
'''
直接获得当前user/tool坐标系下的机械臂位置
'''
'''
输出pos,quat
pos: [x,y,z] in m
quat: [qx,qy,qz,qw] 四元数
'''
pose = self.Get_feedInfo_pose()
# print("pose",pose)
x,y,z,rx,ry,rz = pose # x y z in mm; rx ry rz in degree
x /= 1000
y /= 1000
z /= 1000
pos = np.array([x,y,z])
quat_rot = R.from_euler('xyz', [rx, ry, rz], degrees=True).as_quat()
return pos,quat_rot
def __circular_mean(self,angles):
# 将角度转换为复数单位圆上的点,再计算均值
# 为处理周期性问题
angles_rad = np.deg2rad(angles)
x = np.mean(np.cos(angles_rad))
y = np.mean(np.sin(angles_rad))
return np.rad2deg(np.arctan2(y, x))
def process_command_angle(self,arm_joint_command):
if (not self.is_exit) and (not self.is_standby):
current_joint = np.asarray(arm_joint_command) # degree
# FIFO
self.filter_matrix = np.append(self.filter_matrix,[current_joint],axis=0)
if len(self.filter_matrix)<7:
return 0,current_joint
self.filter_matrix = np.delete(self.filter_matrix,0,axis = 0)
avg_joint = np.array([self.__circular_mean(self.filter_matrix[:, i]) for i in range(len(current_joint))])
return 0,avg_joint
return -1,None
def process_command(self, arm_position_command, arm_orientation_command):
if (not self.is_exit) and (not self.is_standby):
# 构造当前位姿(位置 + 四元数)
current_pose = np.concatenate([
arm_position_command, # [x, y, z]
arm_orientation_command # [qx, qy, qz, qw]
])
# 更新滑动窗口FIFO
self.filter_matirx = np.append(self.filter_matirx, [current_pose], axis=0)
# 如果窗口未填满,直接返回当前值(不滤波)
if len(self.filter_matirx) < 7:
rot_euler = R.from_quat(arm_orientation_command).as_euler('xyz', degrees=True)
result = np.concatenate([arm_position_command, rot_euler])
return 0, result
# 窗口已填满,删除最旧数据
self.filter_matirx = np.delete(self.filter_matirx, 0, axis=0)
# --- 位置分量:滑动平均(仅对有效数据)---
avg_position = np.mean(self.filter_matirx[:, :3], axis=0)
# --- 四元数分量:加权平均(处理符号歧义)---
quats = self.filter_matirx[:, 3:7]
for i in range(1, len(quats)):
if np.dot(quats[0], quats[i]) < 0:
quats[i] = -quats[i] # 确保所有四元数在同一半球
avg_quat = np.mean(quats, axis=0)
avg_quat = avg_quat / np.linalg.norm(avg_quat) # 归一化
# 转换回欧拉角
rot_euler = R.from_quat(avg_quat).as_euler('xyz', degrees=True)
# 合并结果 [x, y, z, roll, pitch, yaw]
result = np.concatenate([avg_position, rot_euler])
return 0, result
return -1, None
def chooseRightFrame(self):
self.chooseBaseFrame(i=1)
# self.chooseEndEffector(i=0)
return
if __name__ == "__main__":
# sleep(5)
dobot = dobot_nova5("192.168.5.1")
dobot.start()
posJ = [0,30,-120,0,90,0]
dobot.RunPoint_P_inJoint(posJ)
sleep(1)
dobot.setEndEffector(i=9,tool_i="{0.0,0.0,154.071,0.0,0.0,0.0}")
dobot.chooseEndEffector(i=9)
print("Arm pose:",dobot.getPose())
# dobot.RunPoint_P_inPose([300, -135, 224.34, -180, 0, -90])
# dobot.start_drag()
dobot.disableRobot()