2025-05-29 21:40:02 +08:00

896 lines
36 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import sys
current_file_path = os.path.abspath(__file__)
dobot_nova5_path = os.path.dirname(os.path.dirname(os.path.dirname(current_file_path)))
print(dobot_nova5_path)
# 添加目标文件夹到系统路径
sys.path.append(dobot_nova5_path)
try:
from dobot_api import DobotApiFeedBack,DobotApiDashboard
except:
from hardware.dobot_api import DobotApiFeedBack,DobotApiDashboard
import threading
from time import sleep,time
import re
from tools.log import CustomLogger
# from tools.log
import copy
import numpy as np
from scipy.spatial.transform import Rotation as R
import math
'''
MODE DESCRIPTION CN
1 INIT 初始化
2 BREAK_OPEN 抱闸松开
3 POWER_OFF 本体下电
4 DISABLED 未使能(抱闸未松)
5 ENABLE 使能(空闲)
6 BACKDRIVE 拖拽
7 RUNNING 运行状态(脚本/TCP)
8 SINGLE_MOVE 单次运动(点动)
9 ERROR 错误状态
10 PAUSE 暂停状态
11 COLLISION 碰撞状态
MODE RANK
报错 I
下电 II
碰撞 III
开抱闸 IV
'''
class dobot_nova5:
def __init__(self,arm_ip = '192.168.5.1'):
self.ip = arm_ip ## 机械臂IP地址
self.dashboardPort = 29999 ## 一发一收-设置-控制及运动端口号
self.feedFourPort = 30004 ## 第四实时反馈(8ms)反馈数据端口号
self.dashboard = None ## 初始化控制及运动端口实例
self.feedFour = None ## 初始化数据反馈端口实例
self.feedInfo = []
self.__globalLockValue = threading.Lock() # 多线程安全
class feedbackItem:
def __init__(self):
self.robotMode = -1
self.robotCurrentCommandID = 0
self.MessageSize = -1
self.DigitalInputs = -1
self.DigitalOutputs = -1
self.QActual = -1
self.ActualQuaternion = -1
self.ToolVectorActual = -1
# ... 自定义反馈数据
'''
self.UserValue = -1
self.ToolValue = -1
'''
self.feedbackData = feedbackItem() # 自定义反馈数据结构体
self.logger = CustomLogger('Dobot_nova5')
def parseResultId(self,valueRecv):
# 解析返回值
if "Not Tcp" in valueRecv:
print("Control Mode is not TCP")
print("机械臂不在二次开发模式下")
return [1]
return [int(num) for num in re.findall(r'-?\d+',valueRecv)] or [2]
def GetFeedback(self):
while True:
feedInfo = self.feedFour.feedBackData()
with self.__globalLockValue:
if feedInfo is not None:
if hex((feedInfo['TestValue'][0]))=='0x123456789abcdef': ## DOBOT官方的数据魔数验证
# 基础字段
self.feedbackData.MessageSize = feedInfo['len'][0]
self.feedbackData.robotMode = feedInfo['RobotMode'][0]
self.feedbackData.DigitalInputs = feedInfo['DigitalInputs'][0]
self.feedbackData.DigitalOutputs = feedInfo['DigitalOutputs'][0]
self.feedbackData.robotCurrentCommandID = feedInfo['CurrentCommandId'][0]
self.feedbackData.QActual = feedInfo['QActual'][0]
self.feedbackData.ActualQuaternion = feedInfo['ActualQuaternion'][0]
self.feedbackData.ToolVectorActual = feedInfo['ToolVectorActual'][0]
# 自定义添加所需反馈数据
'''
self.feedData.DigitalOutputs = int(feedInfo['DigitalOutputs'][0])
self.feedData.RobotMode = int(feedInfo['RobotMode'][0])
self.feedData.TimeStamp = int(feedInfo['TimeStamp'][0])
self.feedbackData.UserValue = feedInfo['UserValue[6]'][0]
self.feedbackData.ToolValue = feedInfo['ToolValue[6]'][0]
'''
def start(self):
self.dashboard = DobotApiDashboard(self.ip,self.dashboardPort)
self.feedFour = DobotApiFeedBack(self.ip,self.feedFourPort)
# 状态反馈线程
feedback_thread = threading.Thread(
target=self.GetFeedback # 机器状态反馈线程
)
feedback_thread.daemon = True
feedback_thread.start()
self.dashboard.RequestControl()
self.dashboard.EmergencyStop(mode=0) # 松开急停
self.clearError() # 清除报警
# 关闭碰撞检测
self.dashboard.SetCollisionLevel(level=0)
if self.parseResultId(self.dashboard.EnableRobot())[0] != 0:
print("使能失败:检查29999端口是否被占用")
return
print("机械臂使能成功")
def RunPoint_P_inPose(self,poses_list):
'''
RUNPOINT_P 以关节运动方式运动到目标点
MovJ flag = 0 通过位姿走点,输入为[X Y Z RX RY RZ] unit: mm/deg
'''
# 走点
recvmovemess = self.dashboard.MovJ(*poses_list,0)
print("MovJ:",recvmovemess)
print(self.parseResultId(recvmovemess))
currentCommandID = self.parseResultId(recvmovemess)[1]
print("指令 ID:", currentCommandID)
sleep(0.02)
while True: # 完成判断循环
########## 调试注释 #############
# print("robot mode",self.feedbackData.robotMode)
# print("feedback current ID", self.feedbackData.robotCurrentCommandID)
# print("指令 ID:", currentCommandID)
########## 调试注释 #############
currentCommandID = self.parseResultId(self.dashboard.GetCurrentCommandID())[1]
if self.feedbackData.robotMode == 9 or self.feedbackData.robotMode == 11:
self.dashboard.Stop()
self.dashboard.EmergencyStop(mode=1)
print("当前工作停止,机械臂处于急停状态")
if self.feedbackData.robotMode == 10: # 10 当前工程暂停
print("机械臂为暂停状态")
res = self.dashboard.Continue()
code = self.parseResultId(res)[0]
if code == 0:
print("机械臂继续已暂停的运动指令")
if self.feedbackData.robotMode == 5 and self.feedbackData.robotCurrentCommandID == currentCommandID:
print("该次运动完成")
break
sleep(0.1)
return 0
def RunPoint_P_inPose_M_RAD(self,poses_list):
'''
RUNPOINT_P 以关节运动方式运动到目标点
MovJ flag = 0 通过位姿走点,输入为[X Y Z RX RY RZ] unit: M/RAD
'''
# 单位 m/rad -> mm/deg
poses_list_mm_deg = self.__transform_m_rad_2_mm_deg(poses_list)
# 走点
recvmovemess = self.dashboard.MovJ(*poses_list_mm_deg,0)
print("MovJ:",recvmovemess)
print(self.parseResultId(recvmovemess))
currentCommandID = self.parseResultId(recvmovemess)[1]
print("指令 ID:", currentCommandID)
sleep(0.02)
while True: # 完成判断循环
########## 调试注释 #############
# print("robot mode",self.feedbackData.robotMode)
# print("feedback current ID", self.feedbackData.robotCurrentCommandID)
# print("指令 ID:", currentCommandID)
########## 调试注释 #############
currentCommandID = self.parseResultId(self.dashboard.GetCurrentCommandID())[1]
if self.feedbackData.robotMode == 9 or self.feedbackData.robotMode == 11:
self.dashboard.Stop()
self.dashboard.EmergencyStop(mode=1)
print("当前工作停止,机械臂处于急停状态")
return -1
if self.feedbackData.robotMode == 10: # 10 当前工程暂停
print("机械臂为暂停状态")
res = self.dashboard.Continue()
code = self.parseResultId(res)[0]
if code == 0:
print("机械臂继续已暂停的运动指令")
return -1
if self.feedbackData.robotMode == 5 and self.feedbackData.robotCurrentCommandID == currentCommandID:
print("该次运动完成")
break
sleep(0.1)
return 0
def RunPoint_P_inJoint(self,joints_list):
'''
RUNPOINT_P 以关节运动方式运动到目标点
MovJ flag = 1 通过关节角度走点,输入为[J1 J2 J3 J4 J5 J6] unit: deg
'''
# 走点
recvmovemess = self.dashboard.MovJ(*joints_list,1)
print("MovJ:",recvmovemess)
print(self.parseResultId(recvmovemess))
currentCommandID = self.parseResultId(recvmovemess)[1]
print("指令 ID:", currentCommandID)
sleep(0.02)
while True: # 完成判断循环
########## 调试注释 #############
# print("robot mode",self.feedbackData.robotMode)
# print("feedback current ID", self.feedbackData.robotCurrentCommandID)
# print("指令 ID:", currentCommandID)
# print("30004",self.dashboard.GetCurrentCommandID())
########## 调试注释 #############
currentCommandID = self.parseResultId(self.dashboard.GetCurrentCommandID())[1]
if self.feedbackData.robotMode == 9 or self.feedbackData.robotMode == 11:
self.dashboard.Stop()
self.dashboard.EmergencyStop(mode=1)
print("当前工作停止,机械臂处于急停状态")
if self.feedbackData.robotMode == 10: # 10 当前工程暂停
print("机械臂为暂停状态")
res = self.dashboard.Continue()
code = self.parseResultId(res)[0]
if code == 0:
print("机械臂继续已暂停的运动指令")
if self.feedbackData.robotMode == 5 and self.feedbackData.robotCurrentCommandID == currentCommandID:
print("该次运动完成")
break
sleep(0.1) # 避免 CPU 占用过高
return 0
def RunPoint_P_inJoint_RAD(self,joints_list):
'''
RUNPOINT_P 以关节运动方式运动到目标点
MovJ flag = 1 通过关节角度走点,输入为[J1 J2 J3 J4 J5 J6] unit: rad
'''
# 单位 rad -> deg
joints_list_deg = self.__transform_rad_2_deg(joints_list)
# 走点
recvmovemess = self.dashboard.MovJ(*joints_list_deg,1)
print("MovJ:",recvmovemess)
print(self.parseResultId(recvmovemess))
currentCommandID = self.parseResultId(recvmovemess)[1]
print("指令 ID:", currentCommandID)
sleep(0.02)
while True: # 完成判断循环
########## 调试注释 #############
# print("robot mode",self.feedbackData.robotMode)
# print("feedback current ID", self.feedbackData.robotCurrentCommandID)
# print("指令 ID:", currentCommandID)
# print("30004",self.dashboard.GetCurrentCommandID())
########## 调试注释 #############
currentCommandID = self.parseResultId(self.dashboard.GetCurrentCommandID())[1]
if self.feedbackData.robotMode == 9 or self.feedbackData.robotMode == 11:
self.dashboard.Stop()
self.dashboard.EmergencyStop(mode=1)
print("当前工作停止,机械臂处于急停状态")
if self.feedbackData.robotMode == 10: # 10 当前工程暂停
print("机械臂为暂停状态")
res = self.dashboard.Continue()
code = self.parseResultId(res)[0]
if code == 0:
print("机械臂继续已暂停的运动指令")
if self.feedbackData.robotMode == 5 and self.feedbackData.robotCurrentCommandID == currentCommandID:
print("该次运动完成")
break
sleep(0.1) # 避免 CPU 占用过高
return 0
def RunPoint_L_inPose(self,poses_list):
'''
RUNPOINT_L 以直线运动方式运动到目标点
MovJ flag = 0 通过位姿走点,输入为[X Y Z RX RY RZ] unit: mm/deg
'''
# 走点
recvmovemess = self.dashboard.MovL(*poses_list,0)
print("MovL:",recvmovemess)
print(self.parseResultId(recvmovemess))
currentCommandID = self.parseResultId(recvmovemess)[1]
print("指令 ID:", currentCommandID)
while True:
currentCommandID = self.parseResultId(self.dashboard.GetCurrentCommandID())[1]
if self.feedbackData.robotMode == 9 or self.feedbackData.robotMode == 11:
self.dashboard.Stop()
self.dashboard.EmergencyStop(mode=1)
print("当前工作停止,机械臂处于急停状态")
if self.feedbackData.robotMode == 10: # 10 当前工程暂停
print("机械臂为暂停状态")
res = self.dashboard.Continue()
code = self.parseResultId(res)[0]
if code == 0:
print("机械臂继续已暂停的运动指令")
if self.feedbackData.robotMode == 5 and self.feedbackData.robotCurrentCommandID == currentCommandID:
print("该次运动完成")
break
sleep(0.1)
return 0
def RunPoint_L_inPose_M_RAD(self,poses_list):
'''
RUNPOINT_L 以直线运动方式运动到目标点
MovJ flag = 0 通过位姿走点,输入为[X Y Z RX RY RZ] unit: m/rad
'''
# 单位 m/rad -> mm/deg
poses_list_mm_deg = self.__transform_m_rad_2_mm_deg(poses_list)
# 走点
recvmovemess = self.dashboard.MovL(*poses_list_mm_deg,0)
print("MovL:",recvmovemess)
print(self.parseResultId(recvmovemess))
currentCommandID = self.parseResultId(recvmovemess)[1]
print("指令 ID:", currentCommandID)
while True:
currentCommandID = self.parseResultId(self.dashboard.GetCurrentCommandID())[1]
if self.feedbackData.robotMode == 9 or self.feedbackData.robotMode == 11:
self.dashboard.Stop()
self.dashboard.EmergencyStop(mode=1)
print("当前工作停止,机械臂处于急停状态")
if self.feedbackData.robotMode == 10: # 10 当前工程暂停
print("机械臂为暂停状态")
res = self.dashboard.Continue()
code = self.parseResultId(res)[0]
if code == 0:
print("机械臂继续已暂停的运动指令")
if self.feedbackData.robotMode == 5 and self.feedbackData.robotCurrentCommandID == currentCommandID:
print("该次运动完成")
break
sleep(0.1)
return 0
def RunPoint_L_inJoint(self,joints_list):
'''
RUNPOINT_L 以直线运动方式运动到目标点
MovJ flag = 0 通过位姿走点,输入为[X Y Z RX RY RZ] unit: mm/deg
'''
# 走点
recvmovemess = self.dashboard.MovL(*joints_list,0)
print("MovL:",recvmovemess)
print(self.parseResultId(recvmovemess))
currentCommandID = self.parseResultId(recvmovemess)[1]
print("指令 ID:", currentCommandID)
while True:
currentCommandID = self.parseResultId(self.dashboard.GetCurrentCommandID())[1]
if self.feedbackData.robotMode == 9 or self.feedbackData.robotMode == 11:
self.dashboard.Stop()
self.dashboard.EmergencyStop(mode=1)
print("当前工作停止,机械臂处于急停状态")
if self.feedbackData.robotMode == 10: # 10 当前工程暂停
print("机械臂为暂停状态")
res = self.dashboard.Continue()
code = self.parseResultId(res)[0]
if code == 0:
print("机械臂继续已暂停的运动指令")
if self.feedbackData.robotMode == 5 and self.feedbackData.robotCurrentCommandID == currentCommandID:
print("该次运动完成")
break
sleep(0.1)
return 0
def RunPoint_L_inJoint_RAD(self,joints_list):
'''
RUNPOINT_L 以直线运动方式运动到目标点
MovJ flag = 0 通过位姿走点,输入为[X Y Z RX RY RZ] unit: rad
'''
# 单位 rad -> deg
joints_list_deg = self.__transform_rad_2_deg(joints_list)
# 走点
recvmovemess = self.dashboard.MovL(*joints_list_deg,0)
print("MovL:",recvmovemess)
print(self.parseResultId(recvmovemess))
currentCommandID = self.parseResultId(recvmovemess)[1]
print("指令 ID:", currentCommandID)
while True:
currentCommandID = self.parseResultId(self.dashboard.GetCurrentCommandID())[1]
if self.feedbackData.robotMode == 9 or self.feedbackData.robotMode == 11:
self.dashboard.Stop()
self.dashboard.EmergencyStop(mode=1)
print("当前工作停止,机械臂处于急停状态")
if self.feedbackData.robotMode == 10: # 10 当前工程暂停
print("机械臂为暂停状态")
res = self.dashboard.Continue()
code = self.parseResultId(res)[0]
if code == 0:
print("机械臂继续已暂停的运动指令")
if self.feedbackData.robotMode == 5 and self.feedbackData.robotCurrentCommandID == currentCommandID:
print("该次运动完成")
break
sleep(0.1)
return 0
def ServoJoint(self,joints_list):
'''
关节角度伺服
输入为[J1 J2 J3 J4 J5 J6] unit: deg
'''
tcp_command = f"servoj({joints_list[0]},{joints_list[1]},{joints_list[2]},{joints_list[3]},{joints_list[4]},{joints_list[5]}, t=0.01, aheadtime=20, gain=200)"
self.dashboard.send_data(tcp_command) # 通过反馈状态来进行异常处理,故这里只需要发,不需要收
print("ServoJ:",tcp_command)
if self.feedbackData.robotMode == 9 or self.feedbackData.robotMode == 11:
self.dashboard.Stop()
self.dashboard.EmergencyStop(mode=1)
print("当前工作停止,机械臂处于急停状态")
return -1
if self.feedbackData.robotMode == 10: # 10 当前工程暂停
print("机械臂为暂停状态")
res = self.dashboard.Continue()
code = self.parseResultId(res)[0]
if code == 0:
print("机械臂继续已暂停的运动指令")
return 0
def ServoJoint_RAD(self,joints_list):
'''
关节角度伺服
输入为[J1 J2 J3 J4 J5 J6] unit: rad
'''
# 单位转换 rad -> deg
joints_list_deg = self.__transform_rad_2_deg(joints_list)
# 构造字符串
tcp_command = f"servoj({joints_list_deg[0]},{joints_list_deg[1]},{joints_list_deg[2]},{joints_list_deg[3]},{joints_list_deg[4]},{joints_list_deg[5]}, t=0.01, aheadtime=20, gain=200)"
self.dashboard.send_data(tcp_command) # 通过反馈状态来进行异常处理,故这里只需要发,不需要收
print("ServoJ:",tcp_command)
if self.feedbackData.robotMode == 9 or self.feedbackData.robotMode == 11:
self.dashboard.Stop()
self.dashboard.EmergencyStop(mode=1)
print("当前工作停止,机械臂处于急停状态")
return -1
if self.feedbackData.robotMode == 10: # 10 当前工程暂停
print("机械臂为暂停状态")
res = self.dashboard.Continue()
code = self.parseResultId(res)[0]
if code == 0:
print("机械臂继续已暂停的运动指令")
return 0
def ServoPose(self,poses_list):
'''
位姿伺服
输入为[X Y Z RX RY RZ] unit: mm/deg
'''
poses_list = list(map(float, poses_list))
tcp_command = f"ServoP({poses_list[0]},{poses_list[1]},{poses_list[2]},{poses_list[3]},{poses_list[4]},{poses_list[5]}, t=0.01, aheadtime=20, gain=300)"
# res = self.dashboard.sendRecvMsg(tcp_command) # 通过反馈状态来进行异常处理,故这里只需要发,不需要收
self.dashboard.send_data(tcp_command)
# print("ServoP msg:",res)
if self.feedbackData.robotMode == 9 or self.feedbackData.robotMode == 11:
self.dashboard.Stop()
self.dashboard.EmergencyStop(mode=1)
print("当前工作停止,机械臂处于急停状态")
if self.feedbackData.robotMode == 10: # 10 当前工程暂停
print("机械臂为暂停状态")
res = self.dashboard.Continue()
code = self.parseResultId(res)[0]
if code == 0:
print("机械臂继续已暂停的运动指令")
def ServoPose_M_RAD(self,poses_list):
'''
位姿伺服
输入为[X Y Z RX RY RZ] unit: M/RAD
'''
# 单位 m/rad -> mm/deg
poses_list_mm_deg = self.__transform_m_rad_2_mm_deg(poses_list)
# 确保数据类型正确
poses_list_mm_deg = list(map(float, poses_list_mm_deg))
# 构造字符串
tcp_command = f"ServoP({poses_list[0]},{poses_list[1]},{poses_list[2]},{poses_list[3]},{poses_list[4]},{poses_list[5]}, t=0.01, aheadtime=20, gain=200)"
res = self.dashboard.sendRecvMsg(tcp_command) # 通过反馈状态来进行异常处理,故这里只需要发,不需要收
print("ServoP msg:",res)
if self.feedbackData.robotMode == 9 or self.feedbackData.robotMode == 11:
self.dashboard.Stop()
self.dashboard.EmergencyStop(mode=1)
print("当前工作停止,机械臂处于急停状态")
if self.feedbackData.robotMode == 10: # 10 当前工程暂停
print("机械臂为暂停状态")
res = self.dashboard.Continue()
code = self.parseResultId(res)[0]
if code == 0:
print("机械臂继续已暂停的运动指令")
def setEndEffector(self,i,tool_i):
'''
设置工具坐标系在索引i. i!=0
tool_i 需要以字符串形式输入 "{x,y,z,rx,ry,rz}"
'''
if i == 0:
print("不可修改工具0(初始法兰坐标系)")
return
if i not in {0, 1, 2, 3, 4, 5, 6, 7, 8, 9}:
print("工具坐标系序号超出范围")
return
res = self.dashboard.SetTool(i,tool_i)
print(res)
code = self.parseResultId(res)[0]
if code == 0:
print(f"设置工具坐标系{i}的偏移量成功")
else:
print(f"设置工具坐标系{i}失败")
return
def chooseEndEffector(self,i):
'''
切换工具坐标系, 0~9
'''
if i not in {0, 1, 2, 3, 4, 5, 6, 7, 8, 9}:
print("工具坐标系序号超出范围")
return
res = self.dashboard.Tool(i)
print(res)
code = self.parseResultId(res)[0]
if code == 0:
print(f"切换为工具坐标系{i}成功")
else:
print("切换工具坐标系失败")
return
def setBaseFrame(self,i,base_i):
'''
设置工具坐标系在索引i. i!=0
base_i 修改后的用户坐标系(非偏移量!) 需要以字符串形式输入 "{x,y,z,rx,ry,rz}"
'''
if i == 0:
print("不可修改用户坐标系0(初始基坐标系)")
return
if i not in {0, 1, 2, 3, 4, 5, 6, 7, 8, 9}:
print("用户坐标系序号超出范围")
return
res = self.dashboard.SetUser(i,base_i)
code = self.parseResultId(res)[0]
if code == 0:
print(f"设置用户坐标系{i}成功")
else:
print(f"设置用户坐标系{i}失败")
return
def chooseBaseFrame(self,i):
'''
切换用户基坐标系, 0~9
'''
if i not in {0, 1, 2, 3, 4, 5, 6, 7, 8, 9}:
print("工具坐标系序号超出范围")
return
res = self.dashboard.User(i)
code = self.parseResultId(res)[0]
if code == 0:
print(f"切换为用户基坐标系{i}成功")
else:
print("切换用户基坐标系失败")
return
def getPose(self,user=-1,tool=-1):
if (user == -1) != (tool == -1): ## XOR 必须同时设置或者同时不设置
print("必须同时设置或同时不设置坐标系参数")
res = self.dashboard.GetPose(user,tool) # 返回字符串
print("get pose",res)
code = self.parseResultId(res)[0]
if code == 0:
pass
# print("success")
else:
print("获取位姿失败")
return
# 处理字符串
start = res.find("{")+1
end = res.find("}")
pose_str = res[start:end]
pose = [float(x.strip()) for x in pose_str.split(",")]
return pose
def getAngel(self):
res = self.dashboard.GetAngle()
code = self.parseResultId(res)[0]
if code == 0:
pass
# print("success")
else:
print("获取关节角度失败")
return
# 处理字符串
start = res.find("{")+1
end = res.find("}")
angle_str = res[start:end]
print("res:",res)
angle = [float(x.strip()) for x in angle_str.split(",")]
return angle
def getForwardKin(self,joints_list,user=-1,tool=-1):
res = self.dashboard.PositiveKin(*joints_list,user,tool)
code = self.parseResultId(res)[0]
if code == 0:
pass
# print("success")
else:
print("获取正向运动学解失败")
return
# 处理字符串
start = res.find("{")+1
end = res.find("}")
pose_str = res[start:end]
pose = [float(x.strip()) for x in pose_str.split(",")]
return pose
def getInverseKin(self,poses_list,user=-1,tool=-1,useJointNear=-1,JointNear=''):
'''
poses_list X Y Z RX RY RZ
useJointNear 0 或不携带 表示 JointNear无效
1 表示根据JointNear就近选解
jointNear string "jointNear={j1,j2,j3,j4,j5,j6}"
'''
res = self.dashboard.InverseKin(*poses_list,user,tool,useJointNear,JointNear)
print(res)
code = self.parseResultId(res)[0]
if code == 0:
pass
# print("success")
else:
print("获取逆向运动学解失败")
return
# 处理字符串
start = res.find("{")+1
end = res.find("}")
joint_str = res[start:end]
joint = [float(x.strip()) for x in joint_str.split(",")]
return joint
def disableRobot(self):
res = self.dashboard.DisableRobot()
code = self.parseResultId(res)[0]
if code == 0:
print("下使能机械臂成功")
else:
print("下使能机械臂失败")
return
def clearError(self):
res = self.dashboard.ClearError()
code = self.parseResultId(res)[0]
if code == 0:
print("清楚报警成功")
else:
print("清楚报警失败")
return
def start_drag(self):
res = self.dashboard.StartDrag()
code = self.parseResultId(res)[0]
if code == 0:
print("拖拽模式开启成功")
else:
print("拖拽模式开启失败")
return
def stop_drag(self):
res = self.dashboard.StopDrag()
code = self.parseResultId(res)[0]
if code == 0:
print("拖拽模式关闭成功")
else:
print("拖拽模式关闭失败")
return
def stop_motion(self):
res = self.dashboard.Stop()
code = self.parseResultId(res)[0]
if code == 0:
print("机械臂停止下发运动指令队列")
else:
print("停止下发运动失败,需立即拍下急停")
return
def __del__(self):
del self.dashboard
del self.feedFour
## 内部工具函数
def __transform_m_rad_2_mm_deg(self,input_list):
'''
接受列表 [X Y Z RX RY RZ]
将单位 m/rad 转为 mm/deg
'''
if len(input_list) != 6:
raise ValueError("Input list must contain exactly 6 elements [X, Y, Z, RX, RY, RZ]")
# 转换位置单位m -> mm (乘以 1000)
x_mm = input_list[0] * 1000.0
y_mm = input_list[1] * 1000.0
z_mm = input_list[2] * 1000.0
# 转换角度单位rad -> deg (乘以 180/pi)
rx_deg = math.degrees(input_list[3])
ry_deg = math.degrees(input_list[4])
rz_deg = math.degrees(input_list[5])
return [x_mm, y_mm, z_mm, rx_deg, ry_deg, rz_deg]
def __transform_rad_2_deg(self,input_list):
'''
接受列表 [J1 J2 J3 J4 J5 J6]
将单位rad转为deg
'''
if len(input_list) != 6:
raise ValueError("Input list must contain exactly 6 elements [J1, J2, J3, J4, J5, J6]")
# 转换角度单位: rad -> deg
J_deg = []
for i in range(6):
J_deg.append(math.degrees(input_list[i]))
return J_deg
## 为反馈线程封装
def Get_feedInfo_angle(self):
# print("反馈当前角度",self.feedbackData.QActual)
return self.feedbackData.QActual
def Get_feedInfo_pose(self):
# print("反馈当前TCP位置",self.feedbackData.ToolVectorActual)
return self.feedbackData.ToolVectorActual
## 适配中间层的再封装接口
# 为状态管理而封装的初始化函数
def init(self):
self.is_exit = False
self.traj_list = None
self.last_pose_command = np.zeros(6)
self.last_input_command = None
self.tcp_offset = None
self.init_J = [-45.0009079,55.5785141,-120.68821716,5.11103201,90.00195312,-90.00085449]
self.init_P = [654.3467,-134.9880,305.3604,-180.0000,-30.0000,0.0042]
self.filter_matirx = np.zeros((1,7)) # 位姿伺服用
self.filter_matrix = np.zeros((1,6)) # 角度伺服用
sleep(1)
self.is_standby = False
# code = self.RunPoint_P_inJoint(self.init_J)
code = self.RunPoint_P_inPose(self.init_P)
if code == 0:
print("机械臂初始化成功且到达初始位置")
else:
print("机械臂初始化失败")
def get_pose_at_flange(self):
'''
输出pos,quat
pos: [x,y,z] in m
quat: [qx,qy,qz,qw] 四元数
'''
pose = self.getPose(user=0, tool=0) # 获取初始法兰坐标系下的位姿
x,y,z,rx,ry,rz = pose # x y z in mm; rx ry rz in degree
x /= 1000
y /= 1000
z /= 1000
pos = np.array([x,y,z])
quat_rot = R.from_euler('xyz', [rx, ry, rz], degrees=True).as_quat()
return pos,quat_rot
def get_pose_at_tool(self,tool=1):
'''
工具默认为1
输出pos,quat
pos: [x,y,z] in m
quat: [qx,qy,qz,qw] 四元数
'''
pose = self.getPose(user=0, tool=tool)
x,y,z,rx,ry,rz = pose # x y z in mm; rx ry rz in degree
x /= 1000
y /= 1000
z /= 1000
pos = np.array([x,y,z])
quat_rot = R.from_euler('xyz', [rx, ry, rz], degrees=True).as_quat()
return pos,quat_rot
def get_arm_position(self):
'''
直接获得当前user/tool坐标系下的机械臂位置
'''
'''
输出pos,quat
pos: [x,y,z] in m
quat: [qx,qy,qz,qw] 四元数
'''
pose = self.Get_feedInfo_pose()
# print("pose",pose)
x,y,z,rx,ry,rz = pose # x y z in mm; rx ry rz in degree
x /= 1000
y /= 1000
z /= 1000
pos = np.array([x,y,z])
quat_rot = R.from_euler('xyz', [rx, ry, rz], degrees=True).as_quat()
return pos,quat_rot
def __circular_mean(self,angles):
# 将角度转换为复数单位圆上的点,再计算均值
# 为处理周期性问题
angles_rad = np.deg2rad(angles)
x = np.mean(np.cos(angles_rad))
y = np.mean(np.sin(angles_rad))
return np.rad2deg(np.arctan2(y, x))
def process_command_angle(self,arm_joint_command):
if (not self.is_exit) and (not self.is_standby):
current_joint = np.asarray(arm_joint_command) # degree
# FIFO
self.filter_matrix = np.append(self.filter_matrix,[current_joint],axis=0)
if len(self.filter_matrix)<7:
return 0,current_joint
self.filter_matrix = np.delete(self.filter_matrix,0,axis = 0)
avg_joint = np.array([self.__circular_mean(self.filter_matrix[:, i]) for i in range(len(current_joint))])
return 0,avg_joint
return -1,None
def process_command(self, arm_position_command, arm_orientation_command):
if (not self.is_exit) and (not self.is_standby):
# 构造当前位姿(位置 + 四元数)
current_pose = np.concatenate([
arm_position_command, # [x, y, z]
arm_orientation_command # [qx, qy, qz, qw]
])
# 更新滑动窗口FIFO
self.filter_matirx = np.append(self.filter_matirx, [current_pose], axis=0)
# 如果窗口未填满,直接返回当前值(不滤波)
if len(self.filter_matirx) < 7:
rot_euler = R.from_quat(arm_orientation_command).as_euler('xyz', degrees=True)
result = np.concatenate([arm_position_command, rot_euler])
return 0, result
# 窗口已填满,删除最旧数据
self.filter_matirx = np.delete(self.filter_matirx, 0, axis=0)
# --- 位置分量:滑动平均(仅对有效数据)---
avg_position = np.mean(self.filter_matirx[:, :3], axis=0)
# --- 四元数分量:加权平均(处理符号歧义)---
quats = self.filter_matirx[:, 3:7]
for i in range(1, len(quats)):
if np.dot(quats[0], quats[i]) < 0:
quats[i] = -quats[i] # 确保所有四元数在同一半球
avg_quat = np.mean(quats, axis=0)
avg_quat = avg_quat / np.linalg.norm(avg_quat) # 归一化
# 转换回欧拉角
rot_euler = R.from_quat(avg_quat).as_euler('xyz', degrees=True)
# 合并结果 [x, y, z, roll, pitch, yaw]
result = np.concatenate([avg_position, rot_euler])
return 0, result
return -1, None
def chooseRightFrame(self):
self.chooseBaseFrame(i=1)
# self.chooseEndEffector(i=0)
return
if __name__ == "__main__":
# sleep(5)
dobot = dobot_nova5("192.168.5.1")
dobot.start()
# posJ_home = [-45,87,-147,0,90,-90]
posJ_ready = [-45,55.5785,-120.6882,5.111,90,-90]
dobot.RunPoint_P_inJoint(posJ_ready)
# # sleep(1)
dobot.chooseRightFrame()
# print("Arm pose1:",dobot.getPose())
# # print("Arm pose:",dobot.getPose(user=1,tool=0))
dobot.setEndEffector(i=8,tool_i="{0.0,0.0,154.071,0.0,0.0,0.0}")
dobot.chooseEndEffector(i=8)
print("Arm pose2:",dobot.getPose(user=1,tool=8))
# print(dobot.get_arm_position())
dobot.RunPoint_L_inPose([179.087702,-136.993776,425.209977,179.418397,-28.9718402,0.06])
# cur_pose = dobot.getPose(user=1,tool=8)
# cur_pose[0] += 30
# target_pose = cur_pose
# print("target",target_pose)
# dobot.RunPoint_P_inPose(target_pose)
# dobot.ServoPose(dobot.getPose(user=1,tool=8))
# print("new ready joint",dobot.feedbackData.QActual)
# # dobot.start_drag()
# dobot.stop_drag()
dobot.disableRobot()