863 lines
35 KiB
Python
863 lines
35 KiB
Python
try:
|
||
from dobot_api import DobotApiFeedBack,DobotApiDashboard
|
||
except:
|
||
from hardware.dobot_api import DobotApiFeedBack,DobotApiDashboard
|
||
import threading
|
||
from time import sleep,time
|
||
import re
|
||
from tools.log import CustomLogger
|
||
import copy
|
||
import numpy as np
|
||
from scipy.spatial.transform import Rotation as R
|
||
import math
|
||
'''
|
||
|
||
MODE DESCRIPTION CN
|
||
|
||
1 INIT 初始化
|
||
2 BREAK_OPEN 抱闸松开
|
||
3 POWER_OFF 本体下电
|
||
4 DISABLED 未使能(抱闸未松)
|
||
5 ENABLE 使能(空闲)
|
||
6 BACKDRIVE 拖拽
|
||
7 RUNNING 运行状态(脚本/TCP)
|
||
8 SINGLE_MOVE 单次运动(点动)
|
||
9 ERROR 错误状态
|
||
10 PAUSE 暂停状态
|
||
11 COLLISION 碰撞状态
|
||
|
||
MODE RANK
|
||
|
||
报错 I
|
||
下电 II
|
||
碰撞 III
|
||
开抱闸 IV
|
||
|
||
'''
|
||
|
||
class dobot_nova5:
|
||
def __init__(self,arm_ip = '192.168.5.1'):
|
||
self.ip = arm_ip ## 机械臂IP地址
|
||
self.dashboardPort = 29999 ## 一发一收-设置-控制及运动端口号
|
||
self.feedFourPort = 30004 ## 第四实时反馈(8ms)反馈数据端口号
|
||
self.dashboard = None ## 初始化控制及运动端口实例
|
||
self.feedFour = None ## 初始化数据反馈端口实例
|
||
self.feedInfo = []
|
||
self.__globalLockValue = threading.Lock() # 多线程安全
|
||
|
||
class feedbackItem:
|
||
def __init__(self):
|
||
self.robotMode = -1
|
||
self.robotCurrentCommandID = 0
|
||
self.MessageSize = -1
|
||
self.DigitalInputs = -1
|
||
self.DigitalOutputs = -1
|
||
self.QActual = -1
|
||
self.ActualQuaternion = -1
|
||
self.ToolVectorActual = -1
|
||
# ... 自定义反馈数据
|
||
'''
|
||
self.UserValue = -1
|
||
self.ToolValue = -1
|
||
'''
|
||
self.feedbackData = feedbackItem() # 自定义反馈数据结构体
|
||
self.logger = CustomLogger('Dobot_nova5',True)
|
||
|
||
|
||
def parseResultId(self,valueRecv):
|
||
# 解析返回值
|
||
if "Not Tcp" in valueRecv:
|
||
print("Control Mode is not TCP")
|
||
print("机械臂不在二次开发模式下")
|
||
return [1]
|
||
return [int(num) for num in re.findall(r'-?\d+',valueRecv)] or [2]
|
||
|
||
def GetFeedback(self):
|
||
while True:
|
||
feedInfo = self.feedFour.feedBackData()
|
||
with self.__globalLockValue:
|
||
if feedInfo is not None:
|
||
if hex((feedInfo['TestValue'][0]))=='0x123456789abcdef': ## DOBOT官方的数据魔数验证
|
||
# 基础字段
|
||
self.feedbackData.MessageSize = feedInfo['len'][0]
|
||
self.feedbackData.robotMode = feedInfo['RobotMode'][0]
|
||
self.feedbackData.DigitalInputs = feedInfo['DigitalInputs'][0]
|
||
self.feedbackData.DigitalOutputs = feedInfo['DigitalOutputs'][0]
|
||
self.feedbackData.robotCurrentCommandID = feedInfo['CurrentCommandId'][0]
|
||
self.feedbackData.QActual = feedInfo['QActual'][0]
|
||
self.feedbackData.ActualQuaternion = feedInfo['ActualQuaternion'][0]
|
||
self.feedbackData.ToolVectorActual = feedInfo['ToolVectorActual'][0]
|
||
# 自定义添加所需反馈数据
|
||
'''
|
||
self.feedData.DigitalOutputs = int(feedInfo['DigitalOutputs'][0])
|
||
self.feedData.RobotMode = int(feedInfo['RobotMode'][0])
|
||
self.feedData.TimeStamp = int(feedInfo['TimeStamp'][0])
|
||
self.feedbackData.UserValue = feedInfo['UserValue[6]'][0]
|
||
self.feedbackData.ToolValue = feedInfo['ToolValue[6]'][0]
|
||
'''
|
||
|
||
def start(self):
|
||
self.dashboard = DobotApiDashboard(self.ip,self.dashboardPort)
|
||
self.feedFour = DobotApiFeedBack(self.ip,self.feedFourPort)
|
||
self.dashboard.EmergencyStop(mode=0) # 松开急停
|
||
self.clearError() # 清除报警
|
||
if self.parseResultId(self.dashboard.EnableRobot())[0] != 0:
|
||
print("使能失败:检查29999端口是否被占用")
|
||
return
|
||
print("机械臂使能成功")
|
||
|
||
# 状态反馈线程
|
||
feedback_thread = threading.Thread(
|
||
target=self.GetFeedback # 机器状态反馈线程
|
||
)
|
||
feedback_thread.daemon = True
|
||
feedback_thread.start()
|
||
|
||
def RunPoint_P_inPose(self,poses_list):
|
||
'''
|
||
RUNPOINT_P 以关节运动方式运动到目标点
|
||
MovJ flag = 0 通过位姿走点,输入为[X Y Z RX RY RZ] unit: mm/deg
|
||
'''
|
||
# 走点
|
||
recvmovemess = self.dashboard.MovJ(*poses_list,0)
|
||
print("MovJ:",recvmovemess)
|
||
print(self.parseResultId(recvmovemess))
|
||
currentCommandID = self.parseResultId(recvmovemess)[1]
|
||
print("指令 ID:", currentCommandID)
|
||
sleep(0.02)
|
||
while True: # 完成判断循环
|
||
########## 调试注释 #############
|
||
# print("robot mode",self.feedbackData.robotMode)
|
||
# print("feedback current ID", self.feedbackData.robotCurrentCommandID)
|
||
# print("指令 ID:", currentCommandID)
|
||
########## 调试注释 #############
|
||
currentCommandID = self.parseResultId(self.dashboard.GetCurrentCommandID())[1]
|
||
if self.feedbackData.robotMode == 9 or self.feedbackData.robotMode == 11:
|
||
self.dashboard.Stop()
|
||
self.dashboard.EmergencyStop(mode=1)
|
||
print("当前工作停止,机械臂处于急停状态")
|
||
if self.feedbackData.robotMode == 10: # 10 当前工程暂停
|
||
print("机械臂为暂停状态")
|
||
res = self.dashboard.Continue()
|
||
code = self.parseResultId(res)[0]
|
||
if code == 0:
|
||
print("机械臂继续已暂停的运动指令")
|
||
if self.feedbackData.robotMode == 5 and self.feedbackData.robotCurrentCommandID == currentCommandID:
|
||
print("该次运动完成")
|
||
break
|
||
sleep(0.1)
|
||
return 0
|
||
|
||
def RunPoint_P_inPose_M_RAD(self,poses_list):
|
||
'''
|
||
RUNPOINT_P 以关节运动方式运动到目标点
|
||
MovJ flag = 0 通过位姿走点,输入为[X Y Z RX RY RZ] unit: M/RAD
|
||
'''
|
||
# 单位 m/rad -> mm/deg
|
||
poses_list_mm_deg = self.__transform_m_rad_2_mm_deg(poses_list)
|
||
# 走点
|
||
recvmovemess = self.dashboard.MovJ(*poses_list_mm_deg,0)
|
||
print("MovJ:",recvmovemess)
|
||
print(self.parseResultId(recvmovemess))
|
||
currentCommandID = self.parseResultId(recvmovemess)[1]
|
||
print("指令 ID:", currentCommandID)
|
||
sleep(0.02)
|
||
while True: # 完成判断循环
|
||
########## 调试注释 #############
|
||
# print("robot mode",self.feedbackData.robotMode)
|
||
# print("feedback current ID", self.feedbackData.robotCurrentCommandID)
|
||
# print("指令 ID:", currentCommandID)
|
||
########## 调试注释 #############
|
||
currentCommandID = self.parseResultId(self.dashboard.GetCurrentCommandID())[1]
|
||
if self.feedbackData.robotMode == 9 or self.feedbackData.robotMode == 11:
|
||
self.dashboard.Stop()
|
||
self.dashboard.EmergencyStop(mode=1)
|
||
print("当前工作停止,机械臂处于急停状态")
|
||
return -1
|
||
if self.feedbackData.robotMode == 10: # 10 当前工程暂停
|
||
print("机械臂为暂停状态")
|
||
res = self.dashboard.Continue()
|
||
code = self.parseResultId(res)[0]
|
||
if code == 0:
|
||
print("机械臂继续已暂停的运动指令")
|
||
return -1
|
||
if self.feedbackData.robotMode == 5 and self.feedbackData.robotCurrentCommandID == currentCommandID:
|
||
print("该次运动完成")
|
||
break
|
||
sleep(0.1)
|
||
return 0
|
||
|
||
def RunPoint_P_inJoint(self,joints_list):
|
||
'''
|
||
RUNPOINT_P 以关节运动方式运动到目标点
|
||
MovJ flag = 1 通过关节角度走点,输入为[J1 J2 J3 J4 J5 J6] unit: deg
|
||
'''
|
||
# 走点
|
||
recvmovemess = self.dashboard.MovJ(*joints_list,1)
|
||
print("MovJ:",recvmovemess)
|
||
print(self.parseResultId(recvmovemess))
|
||
currentCommandID = self.parseResultId(recvmovemess)[1]
|
||
print("指令 ID:", currentCommandID)
|
||
sleep(0.02)
|
||
while True: # 完成判断循环
|
||
########## 调试注释 #############
|
||
# print("robot mode",self.feedbackData.robotMode)
|
||
# print("feedback current ID", self.feedbackData.robotCurrentCommandID)
|
||
# print("指令 ID:", currentCommandID)
|
||
# print("30004",self.dashboard.GetCurrentCommandID())
|
||
########## 调试注释 #############
|
||
currentCommandID = self.parseResultId(self.dashboard.GetCurrentCommandID())[1]
|
||
if self.feedbackData.robotMode == 9 or self.feedbackData.robotMode == 11:
|
||
self.dashboard.Stop()
|
||
self.dashboard.EmergencyStop(mode=1)
|
||
print("当前工作停止,机械臂处于急停状态")
|
||
if self.feedbackData.robotMode == 10: # 10 当前工程暂停
|
||
print("机械臂为暂停状态")
|
||
res = self.dashboard.Continue()
|
||
code = self.parseResultId(res)[0]
|
||
if code == 0:
|
||
print("机械臂继续已暂停的运动指令")
|
||
if self.feedbackData.robotMode == 5 and self.feedbackData.robotCurrentCommandID == currentCommandID:
|
||
print("该次运动完成")
|
||
break
|
||
sleep(0.1) # 避免 CPU 占用过高
|
||
return 0
|
||
|
||
def RunPoint_P_inJoint_RAD(self,joints_list):
|
||
'''
|
||
RUNPOINT_P 以关节运动方式运动到目标点
|
||
MovJ flag = 1 通过关节角度走点,输入为[J1 J2 J3 J4 J5 J6] unit: rad
|
||
'''
|
||
# 单位 rad -> deg
|
||
joints_list_deg = self.__transform_rad_2_deg(joints_list)
|
||
# 走点
|
||
recvmovemess = self.dashboard.MovJ(*joints_list_deg,1)
|
||
print("MovJ:",recvmovemess)
|
||
print(self.parseResultId(recvmovemess))
|
||
currentCommandID = self.parseResultId(recvmovemess)[1]
|
||
print("指令 ID:", currentCommandID)
|
||
sleep(0.02)
|
||
while True: # 完成判断循环
|
||
########## 调试注释 #############
|
||
# print("robot mode",self.feedbackData.robotMode)
|
||
# print("feedback current ID", self.feedbackData.robotCurrentCommandID)
|
||
# print("指令 ID:", currentCommandID)
|
||
# print("30004",self.dashboard.GetCurrentCommandID())
|
||
########## 调试注释 #############
|
||
currentCommandID = self.parseResultId(self.dashboard.GetCurrentCommandID())[1]
|
||
if self.feedbackData.robotMode == 9 or self.feedbackData.robotMode == 11:
|
||
self.dashboard.Stop()
|
||
self.dashboard.EmergencyStop(mode=1)
|
||
print("当前工作停止,机械臂处于急停状态")
|
||
if self.feedbackData.robotMode == 10: # 10 当前工程暂停
|
||
print("机械臂为暂停状态")
|
||
res = self.dashboard.Continue()
|
||
code = self.parseResultId(res)[0]
|
||
if code == 0:
|
||
print("机械臂继续已暂停的运动指令")
|
||
if self.feedbackData.robotMode == 5 and self.feedbackData.robotCurrentCommandID == currentCommandID:
|
||
print("该次运动完成")
|
||
break
|
||
sleep(0.1) # 避免 CPU 占用过高
|
||
return 0
|
||
|
||
def RunPoint_L_inPose(self,poses_list):
|
||
'''
|
||
RUNPOINT_L 以直线运动方式运动到目标点
|
||
MovJ flag = 0 通过位姿走点,输入为[X Y Z RX RY RZ] unit: mm/deg
|
||
'''
|
||
# 走点
|
||
recvmovemess = self.dashboard.MovL(*poses_list,0)
|
||
print("MovL:",recvmovemess)
|
||
print(self.parseResultId(recvmovemess))
|
||
currentCommandID = self.parseResultId(recvmovemess)[1]
|
||
print("指令 ID:", currentCommandID)
|
||
while True:
|
||
currentCommandID = self.parseResultId(self.dashboard.GetCurrentCommandID())[1]
|
||
if self.feedbackData.robotMode == 9 or self.feedbackData.robotMode == 11:
|
||
self.dashboard.Stop()
|
||
self.dashboard.EmergencyStop(mode=1)
|
||
print("当前工作停止,机械臂处于急停状态")
|
||
if self.feedbackData.robotMode == 10: # 10 当前工程暂停
|
||
print("机械臂为暂停状态")
|
||
res = self.dashboard.Continue()
|
||
code = self.parseResultId(res)[0]
|
||
if code == 0:
|
||
print("机械臂继续已暂停的运动指令")
|
||
if self.feedbackData.robotMode == 5 and self.feedbackData.robotCurrentCommandID == currentCommandID:
|
||
print("该次运动完成")
|
||
break
|
||
sleep(0.1)
|
||
return 0
|
||
|
||
def RunPoint_L_inPose_M_RAD(self,poses_list):
|
||
'''
|
||
RUNPOINT_L 以直线运动方式运动到目标点
|
||
MovJ flag = 0 通过位姿走点,输入为[X Y Z RX RY RZ] unit: m/rad
|
||
'''
|
||
# 单位 m/rad -> mm/deg
|
||
poses_list_mm_deg = self.__transform_m_rad_2_mm_deg(poses_list)
|
||
# 走点
|
||
recvmovemess = self.dashboard.MovL(*poses_list_mm_deg,0)
|
||
print("MovL:",recvmovemess)
|
||
print(self.parseResultId(recvmovemess))
|
||
currentCommandID = self.parseResultId(recvmovemess)[1]
|
||
print("指令 ID:", currentCommandID)
|
||
while True:
|
||
currentCommandID = self.parseResultId(self.dashboard.GetCurrentCommandID())[1]
|
||
if self.feedbackData.robotMode == 9 or self.feedbackData.robotMode == 11:
|
||
self.dashboard.Stop()
|
||
self.dashboard.EmergencyStop(mode=1)
|
||
print("当前工作停止,机械臂处于急停状态")
|
||
if self.feedbackData.robotMode == 10: # 10 当前工程暂停
|
||
print("机械臂为暂停状态")
|
||
res = self.dashboard.Continue()
|
||
code = self.parseResultId(res)[0]
|
||
if code == 0:
|
||
print("机械臂继续已暂停的运动指令")
|
||
if self.feedbackData.robotMode == 5 and self.feedbackData.robotCurrentCommandID == currentCommandID:
|
||
print("该次运动完成")
|
||
break
|
||
sleep(0.1)
|
||
return 0
|
||
|
||
def RunPoint_L_inJoint(self,joints_list):
|
||
'''
|
||
RUNPOINT_L 以直线运动方式运动到目标点
|
||
MovJ flag = 0 通过位姿走点,输入为[X Y Z RX RY RZ] unit: mm/deg
|
||
'''
|
||
# 走点
|
||
recvmovemess = self.dashboard.MovL(*joints_list,0)
|
||
print("MovL:",recvmovemess)
|
||
print(self.parseResultId(recvmovemess))
|
||
currentCommandID = self.parseResultId(recvmovemess)[1]
|
||
print("指令 ID:", currentCommandID)
|
||
while True:
|
||
currentCommandID = self.parseResultId(self.dashboard.GetCurrentCommandID())[1]
|
||
if self.feedbackData.robotMode == 9 or self.feedbackData.robotMode == 11:
|
||
self.dashboard.Stop()
|
||
self.dashboard.EmergencyStop(mode=1)
|
||
print("当前工作停止,机械臂处于急停状态")
|
||
if self.feedbackData.robotMode == 10: # 10 当前工程暂停
|
||
print("机械臂为暂停状态")
|
||
res = self.dashboard.Continue()
|
||
code = self.parseResultId(res)[0]
|
||
if code == 0:
|
||
print("机械臂继续已暂停的运动指令")
|
||
if self.feedbackData.robotMode == 5 and self.feedbackData.robotCurrentCommandID == currentCommandID:
|
||
print("该次运动完成")
|
||
break
|
||
sleep(0.1)
|
||
return 0
|
||
|
||
def RunPoint_L_inJoint_RAD(self,joints_list):
|
||
'''
|
||
RUNPOINT_L 以直线运动方式运动到目标点
|
||
MovJ flag = 0 通过位姿走点,输入为[X Y Z RX RY RZ] unit: rad
|
||
'''
|
||
# 单位 rad -> deg
|
||
joints_list_deg = self.__transform_rad_2_deg(joints_list)
|
||
# 走点
|
||
recvmovemess = self.dashboard.MovL(*joints_list_deg,0)
|
||
print("MovL:",recvmovemess)
|
||
print(self.parseResultId(recvmovemess))
|
||
currentCommandID = self.parseResultId(recvmovemess)[1]
|
||
print("指令 ID:", currentCommandID)
|
||
while True:
|
||
currentCommandID = self.parseResultId(self.dashboard.GetCurrentCommandID())[1]
|
||
if self.feedbackData.robotMode == 9 or self.feedbackData.robotMode == 11:
|
||
self.dashboard.Stop()
|
||
self.dashboard.EmergencyStop(mode=1)
|
||
print("当前工作停止,机械臂处于急停状态")
|
||
if self.feedbackData.robotMode == 10: # 10 当前工程暂停
|
||
print("机械臂为暂停状态")
|
||
res = self.dashboard.Continue()
|
||
code = self.parseResultId(res)[0]
|
||
if code == 0:
|
||
print("机械臂继续已暂停的运动指令")
|
||
if self.feedbackData.robotMode == 5 and self.feedbackData.robotCurrentCommandID == currentCommandID:
|
||
print("该次运动完成")
|
||
break
|
||
sleep(0.1)
|
||
return 0
|
||
|
||
def ServoJoint(self,joints_list):
|
||
'''
|
||
关节角度伺服
|
||
输入为[J1 J2 J3 J4 J5 J6] unit: deg
|
||
'''
|
||
tcp_command = f"servoj({joints_list[0]},{joints_list[1]},{joints_list[2]},{joints_list[3]},{joints_list[4]},{joints_list[5]}, t=0.01, aheadtime=20, gain=200)"
|
||
self.dashboard.send_data(tcp_command) # 通过反馈状态来进行异常处理,故这里只需要发,不需要收
|
||
print("ServoJ:",tcp_command)
|
||
if self.feedbackData.robotMode == 9 or self.feedbackData.robotMode == 11:
|
||
self.dashboard.Stop()
|
||
self.dashboard.EmergencyStop(mode=1)
|
||
print("当前工作停止,机械臂处于急停状态")
|
||
return -1
|
||
if self.feedbackData.robotMode == 10: # 10 当前工程暂停
|
||
print("机械臂为暂停状态")
|
||
res = self.dashboard.Continue()
|
||
code = self.parseResultId(res)[0]
|
||
if code == 0:
|
||
print("机械臂继续已暂停的运动指令")
|
||
return 0
|
||
|
||
def ServoJoint_RAD(self,joints_list):
|
||
'''
|
||
关节角度伺服
|
||
输入为[J1 J2 J3 J4 J5 J6] unit: rad
|
||
'''
|
||
# 单位转换 rad -> deg
|
||
joints_list_deg = self.__transform_rad_2_deg(joints_list)
|
||
# 构造字符串
|
||
tcp_command = f"servoj({joints_list_deg[0]},{joints_list_deg[1]},{joints_list_deg[2]},{joints_list_deg[3]},{joints_list_deg[4]},{joints_list_deg[5]}, t=0.01, aheadtime=20, gain=200)"
|
||
self.dashboard.send_data(tcp_command) # 通过反馈状态来进行异常处理,故这里只需要发,不需要收
|
||
print("ServoJ:",tcp_command)
|
||
if self.feedbackData.robotMode == 9 or self.feedbackData.robotMode == 11:
|
||
self.dashboard.Stop()
|
||
self.dashboard.EmergencyStop(mode=1)
|
||
print("当前工作停止,机械臂处于急停状态")
|
||
return -1
|
||
if self.feedbackData.robotMode == 10: # 10 当前工程暂停
|
||
print("机械臂为暂停状态")
|
||
res = self.dashboard.Continue()
|
||
code = self.parseResultId(res)[0]
|
||
if code == 0:
|
||
print("机械臂继续已暂停的运动指令")
|
||
return 0
|
||
|
||
def ServoPose(self,poses_list):
|
||
'''
|
||
位姿伺服
|
||
输入为[X Y Z RX RY RZ] unit: mm/deg
|
||
'''
|
||
poses_list = list(map(float, poses_list))
|
||
tcp_command = f"ServoP({poses_list[0]},{poses_list[1]},{poses_list[2]},{poses_list[3]},{poses_list[4]},{poses_list[5]}, t=0.01, aheadtime=20, gain=300)"
|
||
# res = self.dashboard.sendRecvMsg(tcp_command) # 通过反馈状态来进行异常处理,故这里只需要发,不需要收
|
||
self.dashboard.send_data(tcp_command)
|
||
# print("ServoP msg:",res)
|
||
if self.feedbackData.robotMode == 9 or self.feedbackData.robotMode == 11:
|
||
self.dashboard.Stop()
|
||
self.dashboard.EmergencyStop(mode=1)
|
||
print("当前工作停止,机械臂处于急停状态")
|
||
if self.feedbackData.robotMode == 10: # 10 当前工程暂停
|
||
print("机械臂为暂停状态")
|
||
res = self.dashboard.Continue()
|
||
code = self.parseResultId(res)[0]
|
||
if code == 0:
|
||
print("机械臂继续已暂停的运动指令")
|
||
|
||
def ServoPose_M_RAD(self,poses_list):
|
||
'''
|
||
位姿伺服
|
||
输入为[X Y Z RX RY RZ] unit: M/RAD
|
||
'''
|
||
# 单位 m/rad -> mm/deg
|
||
poses_list_mm_deg = self.__transform_m_rad_2_mm_deg(poses_list)
|
||
# 确保数据类型正确
|
||
poses_list_mm_deg = list(map(float, poses_list_mm_deg))
|
||
# 构造字符串
|
||
tcp_command = f"ServoP({poses_list[0]},{poses_list[1]},{poses_list[2]},{poses_list[3]},{poses_list[4]},{poses_list[5]}, t=0.01, aheadtime=20, gain=200)"
|
||
res = self.dashboard.sendRecvMsg(tcp_command) # 通过反馈状态来进行异常处理,故这里只需要发,不需要收
|
||
print("ServoP msg:",res)
|
||
if self.feedbackData.robotMode == 9 or self.feedbackData.robotMode == 11:
|
||
self.dashboard.Stop()
|
||
self.dashboard.EmergencyStop(mode=1)
|
||
print("当前工作停止,机械臂处于急停状态")
|
||
if self.feedbackData.robotMode == 10: # 10 当前工程暂停
|
||
print("机械臂为暂停状态")
|
||
res = self.dashboard.Continue()
|
||
code = self.parseResultId(res)[0]
|
||
if code == 0:
|
||
print("机械臂继续已暂停的运动指令")
|
||
|
||
def setEndEffector(self,i,tool_i):
|
||
'''
|
||
设置工具坐标系在索引i. i!=0
|
||
tool_i 需要以字符串形式输入 "{x,y,z,rx,ry,rz}"
|
||
'''
|
||
if i == 0:
|
||
print("不可修改工具0(初始法兰坐标系)")
|
||
return
|
||
|
||
if i not in {0, 1, 2, 3, 4, 5, 6, 7, 8, 9}:
|
||
print("工具坐标系序号超出范围")
|
||
return
|
||
|
||
res = self.dashboard.SetTool(i,tool_i)
|
||
print(res)
|
||
code = self.parseResultId(res)[0]
|
||
if code == 0:
|
||
print(f"设置工具坐标系{i}的偏移量成功")
|
||
else:
|
||
print(f"设置工具坐标系{i}失败")
|
||
return
|
||
|
||
def chooseEndEffector(self,i):
|
||
'''
|
||
切换工具坐标系, 0~9
|
||
'''
|
||
if i not in {0, 1, 2, 3, 4, 5, 6, 7, 8, 9}:
|
||
print("工具坐标系序号超出范围")
|
||
return
|
||
res = self.dashboard.Tool(i)
|
||
print(res)
|
||
code = self.parseResultId(res)[0]
|
||
if code == 0:
|
||
print(f"切换为工具坐标系{i}成功")
|
||
else:
|
||
print("切换工具坐标系失败")
|
||
return
|
||
|
||
def setBaseFrame(self,i,base_i):
|
||
'''
|
||
设置工具坐标系在索引i. i!=0
|
||
base_i 修改后的用户坐标系(非偏移量!) 需要以字符串形式输入 "{x,y,z,rx,ry,rz}"
|
||
'''
|
||
if i == 0:
|
||
print("不可修改用户坐标系0(初始基坐标系)")
|
||
return
|
||
|
||
if i not in {0, 1, 2, 3, 4, 5, 6, 7, 8, 9}:
|
||
print("用户坐标系序号超出范围")
|
||
return
|
||
|
||
res = self.dashboard.SetUser(i,base_i)
|
||
code = self.parseResultId(res)[0]
|
||
if code == 0:
|
||
print(f"设置用户坐标系{i}成功")
|
||
else:
|
||
print(f"设置用户坐标系{i}失败")
|
||
return
|
||
|
||
def chooseBaseFrame(self,i):
|
||
'''
|
||
切换用户基坐标系, 0~9
|
||
'''
|
||
if i not in {0, 1, 2, 3, 4, 5, 6, 7, 8, 9}:
|
||
print("工具坐标系序号超出范围")
|
||
return
|
||
res = self.dashboard.User(i)
|
||
code = self.parseResultId(res)[0]
|
||
if code == 0:
|
||
print(f"切换为用户基坐标系{i}成功")
|
||
else:
|
||
print("切换用户基坐标系失败")
|
||
return
|
||
|
||
def getPose(self,user=-1,tool=-1):
|
||
if (user == -1) != (tool == -1): ## XOR 必须同时设置或者同时不设置
|
||
print("必须同时设置或同时不设置坐标系参数")
|
||
res = self.dashboard.GetPose(user,tool) # 返回字符串
|
||
print("get pose",res)
|
||
code = self.parseResultId(res)[0]
|
||
if code == 0:
|
||
pass
|
||
# print("success")
|
||
else:
|
||
print("获取位姿失败")
|
||
return
|
||
# 处理字符串
|
||
start = res.find("{")+1
|
||
end = res.find("}")
|
||
pose_str = res[start:end]
|
||
pose = [float(x.strip()) for x in pose_str.split(",")]
|
||
return pose
|
||
|
||
def getAngel(self):
|
||
res = self.dashboard.GetAngle()
|
||
code = self.parseResultId(res)[0]
|
||
if code == 0:
|
||
pass
|
||
# print("success")
|
||
else:
|
||
print("获取关节角度失败")
|
||
return
|
||
# 处理字符串
|
||
start = res.find("{")+1
|
||
end = res.find("}")
|
||
angle_str = res[start:end]
|
||
print("res:",res)
|
||
angle = [float(x.strip()) for x in angle_str.split(",")]
|
||
return angle
|
||
|
||
def getForwardKin(self,joints_list,user=-1,tool=-1):
|
||
res = self.dashboard.PositiveKin(*joints_list,user,tool)
|
||
code = self.parseResultId(res)[0]
|
||
if code == 0:
|
||
pass
|
||
# print("success")
|
||
else:
|
||
print("获取正向运动学解失败")
|
||
return
|
||
# 处理字符串
|
||
start = res.find("{")+1
|
||
end = res.find("}")
|
||
pose_str = res[start:end]
|
||
pose = [float(x.strip()) for x in pose_str.split(",")]
|
||
return pose
|
||
|
||
def getInverseKin(self,poses_list,user=-1,tool=-1,useJointNear=-1,JointNear=''):
|
||
'''
|
||
poses_list X Y Z RX RY RZ
|
||
useJointNear 0 或不携带 表示 JointNear无效
|
||
1 表示根据JointNear就近选解
|
||
jointNear string "jointNear={j1,j2,j3,j4,j5,j6}"
|
||
'''
|
||
res = self.dashboard.InverseKin(*poses_list,user,tool,useJointNear,JointNear)
|
||
print(res)
|
||
code = self.parseResultId(res)[0]
|
||
if code == 0:
|
||
pass
|
||
# print("success")
|
||
else:
|
||
print("获取逆向运动学解失败")
|
||
return
|
||
# 处理字符串
|
||
start = res.find("{")+1
|
||
end = res.find("}")
|
||
joint_str = res[start:end]
|
||
joint = [float(x.strip()) for x in joint_str.split(",")]
|
||
return joint
|
||
|
||
def disableRobot(self):
|
||
res = self.dashboard.DisableRobot()
|
||
code = self.parseResultId(res)[0]
|
||
if code == 0:
|
||
print("下使能机械臂成功")
|
||
else:
|
||
print("下使能机械臂失败")
|
||
return
|
||
|
||
def clearError(self):
|
||
res = self.dashboard.ClearError()
|
||
code = self.parseResultId(res)[0]
|
||
if code == 0:
|
||
print("清楚报警成功")
|
||
else:
|
||
print("清楚报警失败")
|
||
return
|
||
|
||
def start_drag(self):
|
||
res = self.dashboard.StartDrag()
|
||
code = self.parseResultId(res)[0]
|
||
if code == 0:
|
||
print("拖拽模式开启成功")
|
||
else:
|
||
print("拖拽模式开启失败")
|
||
return
|
||
|
||
def stop_drag(self):
|
||
res = self.dashboard.StopDrag()
|
||
code = self.parseResultId(res)[0]
|
||
if code == 0:
|
||
print("拖拽模式关闭成功")
|
||
else:
|
||
print("拖拽模式关闭失败")
|
||
return
|
||
|
||
def stop_motion(self):
|
||
res = self.dashboard.Stop()
|
||
code = self.parseResultId(res)[0]
|
||
if code == 0:
|
||
print("机械臂停止下发运动指令队列")
|
||
else:
|
||
print("停止下发运动失败,需立即拍下急停")
|
||
return
|
||
|
||
def __del__(self):
|
||
del self.dashboard
|
||
del self.feedFour
|
||
## 内部工具函数
|
||
|
||
def __transform_m_rad_2_mm_deg(self,input_list):
|
||
'''
|
||
接受列表 [X Y Z RX RY RZ]
|
||
将单位 m/rad 转为 mm/deg
|
||
'''
|
||
if len(input_list) != 6:
|
||
raise ValueError("Input list must contain exactly 6 elements [X, Y, Z, RX, RY, RZ]")
|
||
|
||
# 转换位置单位:m -> mm (乘以 1000)
|
||
x_mm = input_list[0] * 1000.0
|
||
y_mm = input_list[1] * 1000.0
|
||
z_mm = input_list[2] * 1000.0
|
||
|
||
# 转换角度单位:rad -> deg (乘以 180/pi)
|
||
rx_deg = math.degrees(input_list[3])
|
||
ry_deg = math.degrees(input_list[4])
|
||
rz_deg = math.degrees(input_list[5])
|
||
|
||
return [x_mm, y_mm, z_mm, rx_deg, ry_deg, rz_deg]
|
||
|
||
def __transform_rad_2_deg(self,input_list):
|
||
'''
|
||
接受列表 [J1 J2 J3 J4 J5 J6]
|
||
将单位rad转为deg
|
||
'''
|
||
if len(input_list) != 6:
|
||
raise ValueError("Input list must contain exactly 6 elements [J1, J2, J3, J4, J5, J6]")
|
||
# 转换角度单位: rad -> deg
|
||
J_deg = []
|
||
for i in range(6):
|
||
J_deg.append(math.degrees(input_list[i]))
|
||
return J_deg
|
||
|
||
|
||
## 为反馈线程封装
|
||
|
||
def Get_feedInfo_angle(self):
|
||
# print("反馈当前角度",self.feedbackData.QActual)
|
||
return self.feedbackData.QActual
|
||
|
||
def Get_feedInfo_pose(self):
|
||
# print("反馈当前TCP位置",self.feedbackData.ToolVectorActual)
|
||
return self.feedbackData.ToolVectorActual
|
||
|
||
## 适配中间层的再封装接口
|
||
|
||
# 为状态管理而封装的初始化函数
|
||
def init(self):
|
||
self.is_exit = False
|
||
self.traj_list = None
|
||
self.last_pose_command = np.zeros(6)
|
||
self.last_input_command = None
|
||
self.tcp_offset = None
|
||
self.init_J = [-45,60,-120,0,-90,0]
|
||
self.filter_matirx = np.zeros((1,7)) # 位姿伺服用
|
||
self.filter_matrix = np.zeros((1,6)) # 角度伺服用
|
||
sleep(1)
|
||
self.is_standby = False
|
||
code = self.RunPoint_P_inJoint(self.init_J)
|
||
if code == 0:
|
||
print("机械臂初始化成功且到达初始位置")
|
||
else:
|
||
print("机械臂初始化失败")
|
||
|
||
def get_pose_at_flange(self):
|
||
'''
|
||
输出pos,quat
|
||
pos: [x,y,z] in m
|
||
quat: [qx,qy,qz,qw] 四元数
|
||
'''
|
||
pose = self.getPose(user=0, tool=0) # 获取初始法兰坐标系下的位姿
|
||
x,y,z,rx,ry,rz = pose # x y z in mm; rx ry rz in degree
|
||
x /= 1000
|
||
y /= 1000
|
||
z /= 1000
|
||
pos = np.array([x,y,z])
|
||
quat_rot = R.from_euler('xyz', [rx, ry, rz], degrees=True).as_quat()
|
||
return pos,quat_rot
|
||
|
||
def get_pose_at_tool(self,tool=1):
|
||
'''
|
||
工具默认为1
|
||
输出pos,quat
|
||
pos: [x,y,z] in m
|
||
quat: [qx,qy,qz,qw] 四元数
|
||
'''
|
||
pose = self.getPose(user=0, tool=tool)
|
||
x,y,z,rx,ry,rz = pose # x y z in mm; rx ry rz in degree
|
||
x /= 1000
|
||
y /= 1000
|
||
z /= 1000
|
||
pos = np.array([x,y,z])
|
||
quat_rot = R.from_euler('xyz', [rx, ry, rz], degrees=True).as_quat()
|
||
return pos,quat_rot
|
||
|
||
def get_arm_position(self):
|
||
'''
|
||
直接获得当前user/tool坐标系下的机械臂位置
|
||
'''
|
||
'''
|
||
输出pos,quat
|
||
pos: [x,y,z] in m
|
||
quat: [qx,qy,qz,qw] 四元数
|
||
'''
|
||
pose = self.Get_feedInfo_pose()
|
||
# print("pose",pose)
|
||
x,y,z,rx,ry,rz = pose # x y z in mm; rx ry rz in degree
|
||
x /= 1000
|
||
y /= 1000
|
||
z /= 1000
|
||
pos = np.array([x,y,z])
|
||
quat_rot = R.from_euler('xyz', [rx, ry, rz], degrees=True).as_quat()
|
||
return pos,quat_rot
|
||
|
||
def __circular_mean(self,angles):
|
||
# 将角度转换为复数单位圆上的点,再计算均值
|
||
# 为处理周期性问题
|
||
angles_rad = np.deg2rad(angles)
|
||
x = np.mean(np.cos(angles_rad))
|
||
y = np.mean(np.sin(angles_rad))
|
||
return np.rad2deg(np.arctan2(y, x))
|
||
|
||
def process_command_angle(self,arm_joint_command):
|
||
if (not self.is_exit) and (not self.is_standby):
|
||
current_joint = np.asarray(arm_joint_command) # degree
|
||
# FIFO
|
||
self.filter_matrix = np.append(self.filter_matrix,[current_joint],axis=0)
|
||
if len(self.filter_matrix)<7:
|
||
return 0,current_joint
|
||
self.filter_matrix = np.delete(self.filter_matrix,0,axis = 0)
|
||
avg_joint = np.array([self.__circular_mean(self.filter_matrix[:, i]) for i in range(len(current_joint))])
|
||
return 0,avg_joint
|
||
return -1,None
|
||
|
||
def process_command(self, arm_position_command, arm_orientation_command):
|
||
if (not self.is_exit) and (not self.is_standby):
|
||
# 构造当前位姿(位置 + 四元数)
|
||
current_pose = np.concatenate([
|
||
arm_position_command, # [x, y, z]
|
||
arm_orientation_command # [qx, qy, qz, qw]
|
||
])
|
||
|
||
# 更新滑动窗口(FIFO)
|
||
self.filter_matirx = np.append(self.filter_matirx, [current_pose], axis=0)
|
||
|
||
# 如果窗口未填满,直接返回当前值(不滤波)
|
||
if len(self.filter_matirx) < 7:
|
||
rot_euler = R.from_quat(arm_orientation_command).as_euler('xyz', degrees=True)
|
||
result = np.concatenate([arm_position_command, rot_euler])
|
||
return 0, result
|
||
|
||
# 窗口已填满,删除最旧数据
|
||
self.filter_matirx = np.delete(self.filter_matirx, 0, axis=0)
|
||
|
||
# --- 位置分量:滑动平均(仅对有效数据)---
|
||
avg_position = np.mean(self.filter_matirx[:, :3], axis=0)
|
||
|
||
# --- 四元数分量:加权平均(处理符号歧义)---
|
||
quats = self.filter_matirx[:, 3:7]
|
||
for i in range(1, len(quats)):
|
||
if np.dot(quats[0], quats[i]) < 0:
|
||
quats[i] = -quats[i] # 确保所有四元数在同一半球
|
||
avg_quat = np.mean(quats, axis=0)
|
||
avg_quat = avg_quat / np.linalg.norm(avg_quat) # 归一化
|
||
|
||
# 转换回欧拉角
|
||
rot_euler = R.from_quat(avg_quat).as_euler('xyz', degrees=True)
|
||
|
||
# 合并结果 [x, y, z, roll, pitch, yaw]
|
||
result = np.concatenate([avg_position, rot_euler])
|
||
return 0, result
|
||
|
||
return -1, None
|
||
|
||
|
||
if __name__ == "__main__":
|
||
# sleep(5)
|
||
dobot = dobot_nova5("192.168.5.1")
|
||
dobot.start()
|
||
posJ = [0,30,-120,0,90,0]
|
||
pos_end=[]
|
||
dobot.RunPoint_P_inJoint(posJ)
|
||
sleep(1)
|
||
dobot.setEndEffector(i=9,tool_i="{0.0,0.0,154.071,0.0,0.0,0.0}")
|
||
dobot.chooseEndEffector(i=9)
|
||
print("Arm pose:",dobot.getPose())
|
||
|
||
# dobot.RunPoint_P_inPose([300, -135, 224.34, -180, 0, -90])
|
||
|
||
# dobot.start_drag()
|
||
dobot.disableRobot() |