796 lines
32 KiB
Python
796 lines
32 KiB
Python
try:
|
||
from dobot_api import DobotApiFeedBack,DobotApiDashboard
|
||
except:
|
||
from hardware.dobot_api import DobotApiFeedBack,DobotApiDashboard
|
||
import threading
|
||
from time import sleep,time
|
||
import re
|
||
import copy
|
||
import numpy as np
|
||
from scipy.spatial.transform import Rotation as R
|
||
import math
|
||
'''
|
||
|
||
MODE DESCRIPTION CN
|
||
|
||
1 INIT 初始化
|
||
2 BREAK_OPEN 抱闸松开
|
||
3 POWER_OFF 本体下电
|
||
4 DISABLED 未使能(抱闸未松)
|
||
5 ENABLE 使能(空闲)
|
||
6 BACKDRIVE 拖拽
|
||
7 RUNNING 运行状态(脚本/TCP)
|
||
8 SINGLE_MOVE 单次运动(点动)
|
||
9 ERROR 错误状态
|
||
10 PAUSE 暂停状态
|
||
11 COLLISION 碰撞状态
|
||
|
||
MODE RANK
|
||
|
||
报错 I
|
||
下电 II
|
||
碰撞 III
|
||
开抱闸 IV
|
||
|
||
'''
|
||
|
||
class dobot_nova5:
|
||
def __init__(self,ip):
|
||
self.ip = ip
|
||
self.dashboardPort = 29999 ## 一发一收-设置-控制及运动端口号
|
||
self.feedFourPort = 30004 ## 第四实时反馈(8ms)反馈数据端口号
|
||
self.dashboard = None ## 初始化控制及运动端口实例
|
||
self.feedFour = None ## 初始化数据反馈端口实例
|
||
self.feedInfo = []
|
||
self.__globalLockValue = threading.Lock() # 多线程安全
|
||
|
||
class feedbackItem:
|
||
def __init__(self):
|
||
self.robotMode = -1
|
||
self.robotCurrentCommandID = 0
|
||
self.MessageSize = -1
|
||
self.DigitalInputs = -1
|
||
self.DigitalOutputs = -1
|
||
self.QActual = -1
|
||
self.ActualQuaternion = -1
|
||
self.ToolVectorActual = -1
|
||
# ... 自定义反馈数据
|
||
'''
|
||
self.UserValue = -1
|
||
self.ToolValue = -1
|
||
'''
|
||
self.feedbackData = feedbackItem() # 自定义反馈数据结构体
|
||
|
||
def parseResultId(self,valueRecv):
|
||
# 解析返回值
|
||
if "Not Tcp" in valueRecv:
|
||
print("Control Mode is not TCP")
|
||
print("机械臂不在二次开发模式下")
|
||
return [1]
|
||
return [int(num) for num in re.findall(r'-?\d+',valueRecv)] or [2]
|
||
|
||
def GetFeedback(self):
|
||
while True:
|
||
feedInfo = self.feedFour.feedBackData()
|
||
with self.__globalLockValue:
|
||
if feedInfo is not None:
|
||
if hex((feedInfo['TestValue'][0]))=='0x123456789abcdef': ## DOBOT官方的数据魔数验证
|
||
# 基础字段
|
||
self.feedbackData.MessageSize = feedInfo['len'][0]
|
||
self.feedbackData.robotMode = feedInfo['RobotMode'][0]
|
||
self.feedbackData.DigitalInputs = feedInfo['DigitalInputs'][0]
|
||
self.feedbackData.DigitalOutputs = feedInfo['DigitalOutputs'][0]
|
||
self.feedbackData.robotCurrentCommandID = feedInfo['CurrentCommandId'][0]
|
||
self.feedbackData.QActual = feedInfo['QActual'][0]
|
||
self.feedbackData.ActualQuaternion = feedInfo['ActualQuaternion'][0]
|
||
self.feedbackData.ToolVectorActual = feedInfo['ToolVectorActual'][0]
|
||
# 自定义添加所需反馈数据
|
||
'''
|
||
self.feedData.DigitalOutputs = int(feedInfo['DigitalOutputs'][0])
|
||
self.feedData.RobotMode = int(feedInfo['RobotMode'][0])
|
||
self.feedData.TimeStamp = int(feedInfo['TimeStamp'][0])
|
||
self.feedbackData.UserValue = feedInfo['UserValue[6]'][0]
|
||
self.feedbackData.ToolValue = feedInfo['ToolValue[6]'][0]
|
||
'''
|
||
|
||
def start(self):
|
||
self.dashboard = DobotApiDashboard(self.ip,self.dashboardPort)
|
||
self.feedFour = DobotApiFeedBack(self.ip,self.feedFourPort)
|
||
self.dashboard.EmergencyStop(mode=0) # 松开急停
|
||
self.clearError() # 清除报警
|
||
if self.parseResultId(self.dashboard.EnableRobot())[0] != 0:
|
||
print("使能失败:检查29999端口是否被占用")
|
||
return
|
||
print("机械臂使能成功")
|
||
|
||
# 状态反馈线程
|
||
feedback_thread = threading.Thread(
|
||
target=self.GetFeedback # 机器状态反馈线程
|
||
)
|
||
feedback_thread.daemon = True
|
||
feedback_thread.start()
|
||
|
||
def RunPoint_P_inPose(self,poses_list):
|
||
'''
|
||
RUNPOINT_P 以关节运动方式运动到目标点
|
||
MovJ flag = 0 通过位姿走点,输入为[X Y Z RX RY RZ] unit: mm/deg
|
||
'''
|
||
# 走点
|
||
recvmovemess = self.dashboard.MovJ(*poses_list,0)
|
||
print("MovJ:",recvmovemess)
|
||
print(self.parseResultId(recvmovemess))
|
||
currentCommandID = self.parseResultId(recvmovemess)[1]
|
||
print("指令 ID:", currentCommandID)
|
||
sleep(0.02)
|
||
while True: # 完成判断循环
|
||
########## 调试注释 #############
|
||
# print("robot mode",self.feedbackData.robotMode)
|
||
# print("feedback current ID", self.feedbackData.robotCurrentCommandID)
|
||
# print("指令 ID:", currentCommandID)
|
||
########## 调试注释 #############
|
||
currentCommandID = self.parseResultId(self.dashboard.GetCurrentCommandID())[1]
|
||
if self.feedbackData.robotMode == 9 or self.feedbackData.robotMode == 11:
|
||
self.dashboard.Stop()
|
||
self.dashboard.EmergencyStop(mode=1)
|
||
print("当前工作停止,机械臂处于急停状态")
|
||
if self.feedbackData.robotMode == 10: # 10 当前工程暂停
|
||
print("机械臂为暂停状态")
|
||
res = self.dashboard.Continue()
|
||
code = self.parseResultId(res)[0]
|
||
if code == 0:
|
||
print("机械臂继续已暂停的运动指令")
|
||
if self.feedbackData.robotMode == 5 and self.feedbackData.robotCurrentCommandID == currentCommandID:
|
||
print("该次运动完成")
|
||
break
|
||
sleep(0.1)
|
||
return 0
|
||
|
||
def RunPoint_P_inPose_M_RAD(self,poses_list):
|
||
'''
|
||
RUNPOINT_P 以关节运动方式运动到目标点
|
||
MovJ flag = 0 通过位姿走点,输入为[X Y Z RX RY RZ] unit: M/RAD
|
||
'''
|
||
# 单位 m/rad -> mm/deg
|
||
poses_list_mm_deg = self.__transform_m_rad_2_mm_deg(poses_list)
|
||
# 走点
|
||
recvmovemess = self.dashboard.MovJ(*poses_list_mm_deg,0)
|
||
print("MovJ:",recvmovemess)
|
||
print(self.parseResultId(recvmovemess))
|
||
currentCommandID = self.parseResultId(recvmovemess)[1]
|
||
print("指令 ID:", currentCommandID)
|
||
sleep(0.02)
|
||
while True: # 完成判断循环
|
||
########## 调试注释 #############
|
||
# print("robot mode",self.feedbackData.robotMode)
|
||
# print("feedback current ID", self.feedbackData.robotCurrentCommandID)
|
||
# print("指令 ID:", currentCommandID)
|
||
########## 调试注释 #############
|
||
currentCommandID = self.parseResultId(self.dashboard.GetCurrentCommandID())[1]
|
||
if self.feedbackData.robotMode == 9 or self.feedbackData.robotMode == 11:
|
||
self.dashboard.Stop()
|
||
self.dashboard.EmergencyStop(mode=1)
|
||
print("当前工作停止,机械臂处于急停状态")
|
||
if self.feedbackData.robotMode == 10: # 10 当前工程暂停
|
||
print("机械臂为暂停状态")
|
||
res = self.dashboard.Continue()
|
||
code = self.parseResultId(res)[0]
|
||
if code == 0:
|
||
print("机械臂继续已暂停的运动指令")
|
||
if self.feedbackData.robotMode == 5 and self.feedbackData.robotCurrentCommandID == currentCommandID:
|
||
print("该次运动完成")
|
||
break
|
||
sleep(0.1)
|
||
return 0
|
||
|
||
def RunPoint_P_inJoint(self,joints_list):
|
||
'''
|
||
RUNPOINT_P 以关节运动方式运动到目标点
|
||
MovJ flag = 1 通过关节角度走点,输入为[J1 J2 J3 J4 J5 J6] unit: deg
|
||
'''
|
||
# 走点
|
||
recvmovemess = self.dashboard.MovJ(*joints_list,1)
|
||
print("MovJ:",recvmovemess)
|
||
print(self.parseResultId(recvmovemess))
|
||
currentCommandID = self.parseResultId(recvmovemess)[1]
|
||
print("指令 ID:", currentCommandID)
|
||
sleep(0.02)
|
||
while True: # 完成判断循环
|
||
########## 调试注释 #############
|
||
# print("robot mode",self.feedbackData.robotMode)
|
||
# print("feedback current ID", self.feedbackData.robotCurrentCommandID)
|
||
# print("指令 ID:", currentCommandID)
|
||
# print("30004",self.dashboard.GetCurrentCommandID())
|
||
########## 调试注释 #############
|
||
currentCommandID = self.parseResultId(self.dashboard.GetCurrentCommandID())[1]
|
||
if self.feedbackData.robotMode == 9 or self.feedbackData.robotMode == 11:
|
||
self.dashboard.Stop()
|
||
self.dashboard.EmergencyStop(mode=1)
|
||
print("当前工作停止,机械臂处于急停状态")
|
||
if self.feedbackData.robotMode == 10: # 10 当前工程暂停
|
||
print("机械臂为暂停状态")
|
||
res = self.dashboard.Continue()
|
||
code = self.parseResultId(res)[0]
|
||
if code == 0:
|
||
print("机械臂继续已暂停的运动指令")
|
||
if self.feedbackData.robotMode == 5 and self.feedbackData.robotCurrentCommandID == currentCommandID:
|
||
print("该次运动完成")
|
||
break
|
||
sleep(0.1) # 避免 CPU 占用过高
|
||
return 0
|
||
|
||
def RunPoint_P_inJoint_RAD(self,joints_list):
|
||
'''
|
||
RUNPOINT_P 以关节运动方式运动到目标点
|
||
MovJ flag = 1 通过关节角度走点,输入为[J1 J2 J3 J4 J5 J6] unit: rad
|
||
'''
|
||
# 单位 rad -> deg
|
||
joints_list_deg = self.__transform_rad_2_deg(joints_list)
|
||
# 走点
|
||
recvmovemess = self.dashboard.MovJ(*joints_list_deg,1)
|
||
print("MovJ:",recvmovemess)
|
||
print(self.parseResultId(recvmovemess))
|
||
currentCommandID = self.parseResultId(recvmovemess)[1]
|
||
print("指令 ID:", currentCommandID)
|
||
sleep(0.02)
|
||
while True: # 完成判断循环
|
||
########## 调试注释 #############
|
||
# print("robot mode",self.feedbackData.robotMode)
|
||
# print("feedback current ID", self.feedbackData.robotCurrentCommandID)
|
||
# print("指令 ID:", currentCommandID)
|
||
# print("30004",self.dashboard.GetCurrentCommandID())
|
||
########## 调试注释 #############
|
||
currentCommandID = self.parseResultId(self.dashboard.GetCurrentCommandID())[1]
|
||
if self.feedbackData.robotMode == 9 or self.feedbackData.robotMode == 11:
|
||
self.dashboard.Stop()
|
||
self.dashboard.EmergencyStop(mode=1)
|
||
print("当前工作停止,机械臂处于急停状态")
|
||
if self.feedbackData.robotMode == 10: # 10 当前工程暂停
|
||
print("机械臂为暂停状态")
|
||
res = self.dashboard.Continue()
|
||
code = self.parseResultId(res)[0]
|
||
if code == 0:
|
||
print("机械臂继续已暂停的运动指令")
|
||
if self.feedbackData.robotMode == 5 and self.feedbackData.robotCurrentCommandID == currentCommandID:
|
||
print("该次运动完成")
|
||
break
|
||
sleep(0.1) # 避免 CPU 占用过高
|
||
return 0
|
||
|
||
def RunPoint_L_inPose(self,poses_list):
|
||
'''
|
||
RUNPOINT_L 以直线运动方式运动到目标点
|
||
MovJ flag = 0 通过位姿走点,输入为[X Y Z RX RY RZ] unit: mm/deg
|
||
'''
|
||
# 走点
|
||
recvmovemess = self.dashboard.MovL(*poses_list,0)
|
||
print("MovL:",recvmovemess)
|
||
print(self.parseResultId(recvmovemess))
|
||
currentCommandID = self.parseResultId(recvmovemess)[1]
|
||
print("指令 ID:", currentCommandID)
|
||
while True:
|
||
currentCommandID = self.parseResultId(self.dashboard.GetCurrentCommandID())[1]
|
||
if self.feedbackData.robotMode == 9 or self.feedbackData.robotMode == 11:
|
||
self.dashboard.Stop()
|
||
self.dashboard.EmergencyStop(mode=1)
|
||
print("当前工作停止,机械臂处于急停状态")
|
||
if self.feedbackData.robotMode == 10: # 10 当前工程暂停
|
||
print("机械臂为暂停状态")
|
||
res = self.dashboard.Continue()
|
||
code = self.parseResultId(res)[0]
|
||
if code == 0:
|
||
print("机械臂继续已暂停的运动指令")
|
||
if self.feedbackData.robotMode == 5 and self.feedbackData.robotCurrentCommandID == currentCommandID:
|
||
print("该次运动完成")
|
||
break
|
||
sleep(0.1)
|
||
return 0
|
||
|
||
def RunPoint_L_inPose_M_RAD(self,poses_list):
|
||
'''
|
||
RUNPOINT_L 以直线运动方式运动到目标点
|
||
MovJ flag = 0 通过位姿走点,输入为[X Y Z RX RY RZ] unit: m/rad
|
||
'''
|
||
# 单位 m/rad -> mm/deg
|
||
poses_list_mm_deg = self.__transform_m_rad_2_mm_deg(poses_list)
|
||
# 走点
|
||
recvmovemess = self.dashboard.MovL(*poses_list_mm_deg,0)
|
||
print("MovL:",recvmovemess)
|
||
print(self.parseResultId(recvmovemess))
|
||
currentCommandID = self.parseResultId(recvmovemess)[1]
|
||
print("指令 ID:", currentCommandID)
|
||
while True:
|
||
currentCommandID = self.parseResultId(self.dashboard.GetCurrentCommandID())[1]
|
||
if self.feedbackData.robotMode == 9 or self.feedbackData.robotMode == 11:
|
||
self.dashboard.Stop()
|
||
self.dashboard.EmergencyStop(mode=1)
|
||
print("当前工作停止,机械臂处于急停状态")
|
||
if self.feedbackData.robotMode == 10: # 10 当前工程暂停
|
||
print("机械臂为暂停状态")
|
||
res = self.dashboard.Continue()
|
||
code = self.parseResultId(res)[0]
|
||
if code == 0:
|
||
print("机械臂继续已暂停的运动指令")
|
||
if self.feedbackData.robotMode == 5 and self.feedbackData.robotCurrentCommandID == currentCommandID:
|
||
print("该次运动完成")
|
||
break
|
||
sleep(0.1)
|
||
return 0
|
||
|
||
def RunPoint_L_inJoint(self,joints_list):
|
||
'''
|
||
RUNPOINT_L 以直线运动方式运动到目标点
|
||
MovJ flag = 0 通过位姿走点,输入为[X Y Z RX RY RZ] unit: mm/deg
|
||
'''
|
||
# 走点
|
||
recvmovemess = self.dashboard.MovL(*joints_list,0)
|
||
print("MovL:",recvmovemess)
|
||
print(self.parseResultId(recvmovemess))
|
||
currentCommandID = self.parseResultId(recvmovemess)[1]
|
||
print("指令 ID:", currentCommandID)
|
||
while True:
|
||
currentCommandID = self.parseResultId(self.dashboard.GetCurrentCommandID())[1]
|
||
if self.feedbackData.robotMode == 9 or self.feedbackData.robotMode == 11:
|
||
self.dashboard.Stop()
|
||
self.dashboard.EmergencyStop(mode=1)
|
||
print("当前工作停止,机械臂处于急停状态")
|
||
if self.feedbackData.robotMode == 10: # 10 当前工程暂停
|
||
print("机械臂为暂停状态")
|
||
res = self.dashboard.Continue()
|
||
code = self.parseResultId(res)[0]
|
||
if code == 0:
|
||
print("机械臂继续已暂停的运动指令")
|
||
if self.feedbackData.robotMode == 5 and self.feedbackData.robotCurrentCommandID == currentCommandID:
|
||
print("该次运动完成")
|
||
break
|
||
sleep(0.1)
|
||
return 0
|
||
|
||
def RunPoint_L_inJoint_RAD(self,joints_list):
|
||
'''
|
||
RUNPOINT_L 以直线运动方式运动到目标点
|
||
MovJ flag = 0 通过位姿走点,输入为[X Y Z RX RY RZ] unit: rad
|
||
'''
|
||
# 单位 rad -> deg
|
||
joints_list_deg = self.__transform_rad_2_deg(joints_list)
|
||
# 走点
|
||
recvmovemess = self.dashboard.MovL(*joints_list_deg,0)
|
||
print("MovL:",recvmovemess)
|
||
print(self.parseResultId(recvmovemess))
|
||
currentCommandID = self.parseResultId(recvmovemess)[1]
|
||
print("指令 ID:", currentCommandID)
|
||
while True:
|
||
currentCommandID = self.parseResultId(self.dashboard.GetCurrentCommandID())[1]
|
||
if self.feedbackData.robotMode == 9 or self.feedbackData.robotMode == 11:
|
||
self.dashboard.Stop()
|
||
self.dashboard.EmergencyStop(mode=1)
|
||
print("当前工作停止,机械臂处于急停状态")
|
||
if self.feedbackData.robotMode == 10: # 10 当前工程暂停
|
||
print("机械臂为暂停状态")
|
||
res = self.dashboard.Continue()
|
||
code = self.parseResultId(res)[0]
|
||
if code == 0:
|
||
print("机械臂继续已暂停的运动指令")
|
||
if self.feedbackData.robotMode == 5 and self.feedbackData.robotCurrentCommandID == currentCommandID:
|
||
print("该次运动完成")
|
||
break
|
||
sleep(0.1)
|
||
return 0
|
||
|
||
def ServoJoint(self,joints_list):
|
||
'''
|
||
关节角度伺服
|
||
输入为[J1 J2 J3 J4 J5 J6] unit: deg
|
||
'''
|
||
tcp_command = f"servoj({joints_list[0]},{joints_list[1]},{joints_list[2]},{joints_list[3]},{joints_list[4]},{joints_list[5]}, t=0.01, aheadtime=20, gain=200)"
|
||
self.dashboard.send_data(tcp_command) # 通过反馈状态来进行异常处理,故这里只需要发,不需要收
|
||
print("ServoJ:",tcp_command)
|
||
if self.feedbackData.robotMode == 9 or self.feedbackData.robotMode == 11:
|
||
self.dashboard.Stop()
|
||
self.dashboard.EmergencyStop(mode=1)
|
||
print("当前工作停止,机械臂处于急停状态")
|
||
return -1
|
||
if self.feedbackData.robotMode == 10: # 10 当前工程暂停
|
||
print("机械臂为暂停状态")
|
||
res = self.dashboard.Continue()
|
||
code = self.parseResultId(res)[0]
|
||
if code == 0:
|
||
print("机械臂继续已暂停的运动指令")
|
||
return 0
|
||
|
||
def ServoJoint_RAD(self,joints_list):
|
||
'''
|
||
关节角度伺服
|
||
输入为[J1 J2 J3 J4 J5 J6] unit: rad
|
||
'''
|
||
# 单位转换 rad -> deg
|
||
joints_list_deg = self.__transform_rad_2_deg(joints_list)
|
||
# 构造字符串
|
||
tcp_command = f"servoj({joints_list_deg[0]},{joints_list_deg[1]},{joints_list_deg[2]},{joints_list_deg[3]},{joints_list_deg[4]},{joints_list_deg[5]}, t=0.01, aheadtime=20, gain=200)"
|
||
self.dashboard.send_data(tcp_command) # 通过反馈状态来进行异常处理,故这里只需要发,不需要收
|
||
print("ServoJ:",tcp_command)
|
||
if self.feedbackData.robotMode == 9 or self.feedbackData.robotMode == 11:
|
||
self.dashboard.Stop()
|
||
self.dashboard.EmergencyStop(mode=1)
|
||
print("当前工作停止,机械臂处于急停状态")
|
||
return -1
|
||
if self.feedbackData.robotMode == 10: # 10 当前工程暂停
|
||
print("机械臂为暂停状态")
|
||
res = self.dashboard.Continue()
|
||
code = self.parseResultId(res)[0]
|
||
if code == 0:
|
||
print("机械臂继续已暂停的运动指令")
|
||
return 0
|
||
|
||
def ServoPose(self,poses_list):
|
||
'''
|
||
位姿伺服
|
||
输入为[X Y Z RX RY RZ] unit: mm/deg
|
||
'''
|
||
poses_list = list(map(float, poses_list))
|
||
tcp_command = f"ServoP({poses_list[0]},{poses_list[1]},{poses_list[2]},{poses_list[3]},{poses_list[4]},{poses_list[5]}, t=0.01, aheadtime=20, gain=200)"
|
||
res = self.dashboard.sendRecvMsg(tcp_command) # 通过反馈状态来进行异常处理,故这里只需要发,不需要收
|
||
print("ServoP msg:",res)
|
||
if self.feedbackData.robotMode == 9 or self.feedbackData.robotMode == 11:
|
||
self.dashboard.Stop()
|
||
self.dashboard.EmergencyStop(mode=1)
|
||
print("当前工作停止,机械臂处于急停状态")
|
||
if self.feedbackData.robotMode == 10: # 10 当前工程暂停
|
||
print("机械臂为暂停状态")
|
||
res = self.dashboard.Continue()
|
||
code = self.parseResultId(res)[0]
|
||
if code == 0:
|
||
print("机械臂继续已暂停的运动指令")
|
||
|
||
def ServoPose_M_RAD(self,poses_list):
|
||
'''
|
||
位姿伺服
|
||
输入为[X Y Z RX RY RZ] unit: M/RAD
|
||
'''
|
||
# 单位 m/rad -> mm/deg
|
||
poses_list_mm_deg = self.__transform_m_rad_2_mm_deg(poses_list)
|
||
# 确保数据类型正确
|
||
poses_list_mm_deg = list(map(float, poses_list_mm_deg))
|
||
# 构造字符串
|
||
tcp_command = f"ServoP({poses_list[0]},{poses_list[1]},{poses_list[2]},{poses_list[3]},{poses_list[4]},{poses_list[5]}, t=0.01, aheadtime=20, gain=200)"
|
||
res = self.dashboard.sendRecvMsg(tcp_command) # 通过反馈状态来进行异常处理,故这里只需要发,不需要收
|
||
print("ServoP msg:",res)
|
||
if self.feedbackData.robotMode == 9 or self.feedbackData.robotMode == 11:
|
||
self.dashboard.Stop()
|
||
self.dashboard.EmergencyStop(mode=1)
|
||
print("当前工作停止,机械臂处于急停状态")
|
||
if self.feedbackData.robotMode == 10: # 10 当前工程暂停
|
||
print("机械臂为暂停状态")
|
||
res = self.dashboard.Continue()
|
||
code = self.parseResultId(res)[0]
|
||
if code == 0:
|
||
print("机械臂继续已暂停的运动指令")
|
||
|
||
def setEndEffector(self,i,tool_i):
|
||
'''
|
||
设置工具坐标系在索引i. i!=0
|
||
tool_i 需要以字符串形式输入 "{x,y,z,rx,ry,rz}"
|
||
'''
|
||
if i == 0:
|
||
print("不可修改工具0(初始法兰坐标系)")
|
||
return
|
||
|
||
if i not in {0, 1, 2, 3, 4, 5, 6, 7, 8, 9}:
|
||
print("工具坐标系序号超出范围")
|
||
return
|
||
|
||
res = self.dashboard.SetTool(i,tool_i)
|
||
print(res)
|
||
code = self.parseResultId(res)[0]
|
||
if code == 0:
|
||
print(f"设置工具坐标系{i}的偏移量成功")
|
||
else:
|
||
print(f"设置工具坐标系{i}失败")
|
||
return
|
||
|
||
def chooseEndEffector(self,i):
|
||
'''
|
||
切换工具坐标系, 0~9
|
||
'''
|
||
if i not in {0, 1, 2, 3, 4, 5, 6, 7, 8, 9}:
|
||
print("工具坐标系序号超出范围")
|
||
return
|
||
res = self.dashboard.Tool(i)
|
||
print(res)
|
||
code = self.parseResultId(res)[0]
|
||
if code == 0:
|
||
print(f"切换为工具坐标系{i}成功")
|
||
else:
|
||
print("切换工具坐标系失败")
|
||
return
|
||
|
||
def setBaseFrame(self,i,base_i):
|
||
'''
|
||
设置工具坐标系在索引i. i!=0
|
||
base_i 修改后的用户坐标系(非偏移量!) 需要以字符串形式输入 "{x,y,z,rx,ry,rz}"
|
||
'''
|
||
if i == 0:
|
||
print("不可修改用户坐标系0(初始基坐标系)")
|
||
return
|
||
|
||
if i not in {0, 1, 2, 3, 4, 5, 6, 7, 8, 9}:
|
||
print("用户坐标系序号超出范围")
|
||
return
|
||
|
||
res = self.dashboard.SetUser(i,base_i)
|
||
code = self.parseResultId(res)[0]
|
||
if code == 0:
|
||
print(f"设置用户坐标系{i}成功")
|
||
else:
|
||
print(f"设置用户坐标系{i}失败")
|
||
return
|
||
|
||
def chooseBaseFrame(self,i):
|
||
'''
|
||
切换用户基坐标系, 0~9
|
||
'''
|
||
if i not in {0, 1, 2, 3, 4, 5, 6, 7, 8, 9}:
|
||
print("工具坐标系序号超出范围")
|
||
return
|
||
res = self.dashboard.User(i)
|
||
code = self.parseResultId(res)[0]
|
||
if code == 0:
|
||
print(f"切换为用户基坐标系{i}成功")
|
||
else:
|
||
print("切换用户基坐标系失败")
|
||
return
|
||
|
||
def getPose(self,user=-1,tool=-1):
|
||
if (user == -1) != (tool == -1): ## XOR 必须同时设置或者同时不设置
|
||
print("必须同时设置或同时不设置坐标系参数")
|
||
res = self.dashboard.GetPose(user,tool) # 返回字符串
|
||
print("get pose",res)
|
||
code = self.parseResultId(res)[0]
|
||
if code == 0:
|
||
pass
|
||
# print("success")
|
||
else:
|
||
print("获取位姿失败")
|
||
return
|
||
# 处理字符串
|
||
start = res.find("{")+1
|
||
end = res.find("}")
|
||
pose_str = res[start:end]
|
||
pose = [float(x.strip()) for x in pose_str.split(",")]
|
||
return pose
|
||
|
||
def getAngel(self):
|
||
res = self.dashboard.GetAngle()
|
||
code = self.parseResultId(res)[0]
|
||
if code == 0:
|
||
pass
|
||
# print("success")
|
||
else:
|
||
print("获取关节角度失败")
|
||
return
|
||
# 处理字符串
|
||
start = res.find("{")+1
|
||
end = res.find("}")
|
||
angle_str = res[start:end]
|
||
print("res:",res)
|
||
angle = [float(x.strip()) for x in angle_str.split(",")]
|
||
return angle
|
||
|
||
def getForwardKin(self,joints_list,user=-1,tool=-1):
|
||
res = self.dashboard.PositiveKin(*joints_list,user,tool)
|
||
code = self.parseResultId(res)[0]
|
||
if code == 0:
|
||
pass
|
||
# print("success")
|
||
else:
|
||
print("获取正向运动学解失败")
|
||
return
|
||
# 处理字符串
|
||
start = res.find("{")+1
|
||
end = res.find("}")
|
||
pose_str = res[start:end]
|
||
pose = [float(x.strip()) for x in pose_str.split(",")]
|
||
return pose
|
||
|
||
def getInverseKin(self,poses_list,user=-1,tool=-1,useJointNear=-1,JointNear=''):
|
||
'''
|
||
poses_list X Y Z RX RY RZ
|
||
useJointNear 0 或不携带 表示 JointNear无效
|
||
1 表示根据JointNear就近选解
|
||
jointNear string "jointNear={j1,j2,j3,j4,j5,j6}"
|
||
'''
|
||
res = self.dashboard.InverseKin(*poses_list,user,tool,useJointNear,JointNear)
|
||
print(res)
|
||
code = self.parseResultId(res)[0]
|
||
if code == 0:
|
||
pass
|
||
# print("success")
|
||
else:
|
||
print("获取逆向运动学解失败")
|
||
return
|
||
# 处理字符串
|
||
start = res.find("{")+1
|
||
end = res.find("}")
|
||
joint_str = res[start:end]
|
||
joint = [float(x.strip()) for x in joint_str.split(",")]
|
||
return joint
|
||
|
||
def disableRobot(self):
|
||
res = self.dashboard.DisableRobot()
|
||
code = self.parseResultId(res)[0]
|
||
if code == 0:
|
||
print("下使能机械臂成功")
|
||
else:
|
||
print("下使能机械臂失败")
|
||
return
|
||
|
||
def clearError(self):
|
||
res = self.dashboard.ClearError()
|
||
code = self.parseResultId(res)[0]
|
||
if code == 0:
|
||
print("清楚报警成功")
|
||
else:
|
||
print("清楚报警失败")
|
||
return
|
||
|
||
def start_drag(self):
|
||
res = self.dashboard.StartDrag()
|
||
code = self.parseResultId(res)[0]
|
||
if code == 0:
|
||
print("拖拽模式开启成功")
|
||
else:
|
||
print("拖拽模式开启失败")
|
||
return
|
||
|
||
def stop_drag(self):
|
||
res = self.dashboard.StopDrag()
|
||
code = self.parseResultId(res)[0]
|
||
if code == 0:
|
||
print("拖拽模式关闭成功")
|
||
else:
|
||
print("拖拽模式关闭失败")
|
||
return
|
||
|
||
def stop_motion(self):
|
||
res = self.dashboard.Stop()
|
||
code = self.parseResultId(res)[0]
|
||
if code == 0:
|
||
print("机械臂停止下发运动指令队列")
|
||
else:
|
||
print("停止下发运动失败,需立即拍下急停")
|
||
return
|
||
|
||
def __del__(self):
|
||
del self.dashboard
|
||
del self.feedFour
|
||
## 内部工具函数
|
||
|
||
def __transform_m_rad_2_mm_deg(self,input_list):
|
||
'''
|
||
接受列表 [X Y Z RX RY RZ]
|
||
将单位 m/rad 转为 mm/deg
|
||
'''
|
||
if len(input_list) != 6:
|
||
raise ValueError("Input list must contain exactly 6 elements [X, Y, Z, RX, RY, RZ]")
|
||
|
||
# 转换位置单位:m -> mm (乘以 1000)
|
||
x_mm = input_list[0] * 1000.0
|
||
y_mm = input_list[1] * 1000.0
|
||
z_mm = input_list[2] * 1000.0
|
||
|
||
# 转换角度单位:rad -> deg (乘以 180/pi)
|
||
rx_deg = math.degrees(input_list[3])
|
||
ry_deg = math.degrees(input_list[4])
|
||
rz_deg = math.degrees(input_list[5])
|
||
|
||
return [x_mm, y_mm, z_mm, rx_deg, ry_deg, rz_deg]
|
||
|
||
def __transform_rad_2_deg(self,input_list):
|
||
'''
|
||
接受列表 [J1 J2 J3 J4 J5 J6]
|
||
将单位rad转为deg
|
||
'''
|
||
if len(input_list) != 6:
|
||
raise ValueError("Input list must contain exactly 6 elements [J1, J2, J3, J4, J5, J6]")
|
||
# 转换角度单位: rad -> deg
|
||
J_deg = []
|
||
for i in range(6):
|
||
J_deg.append(math.degrees(input_list[i]))
|
||
return J_deg
|
||
|
||
|
||
## 为反馈线程封装
|
||
|
||
def Get_feedInfo_angle(self):
|
||
# print("反馈当前角度",self.feedbackData.QActual)
|
||
return self.feedbackData.QActual
|
||
|
||
def Get_feedInfo_pose(self):
|
||
# print("反馈当前TCP位置",self.feedbackData.ToolVectorActual)
|
||
return self.feedbackData.ToolVectorActual
|
||
|
||
## 适配中间层的再封装接口
|
||
|
||
# 为状态管理而封装的初始化函数
|
||
def init(self):
|
||
self.is_exit = False
|
||
self.traj_list = None
|
||
self.last_pose_command = np.zeros(6)
|
||
self.last_input_command = None
|
||
self.tcp_offset = None
|
||
self.init_J = [0,30,-120,0,90,0]
|
||
self.filter_matirx = np.zeros((8,7))
|
||
sleep(1)
|
||
self.is_standby = False
|
||
code = self.RunPoint_P_inJoint(self.init_J)
|
||
if code == 0:
|
||
print("机械臂初始化成功且到达初始位置")
|
||
else:
|
||
print("机械臂初始化失败")
|
||
|
||
|
||
|
||
def get_arm_position(self):
|
||
pose = self.Get_feedInfo_pose()
|
||
# print("pose",pose)
|
||
x,y,z,rx,ry,rz = pose # x y z in mm; rx ry rz in degree
|
||
x /= 1000
|
||
y /= 1000
|
||
z /= 1000
|
||
pos = np.array([x,y,z])
|
||
quat_rot = R.from_euler('xyz', [rx, ry, rz], degrees=True).as_quat()
|
||
return pos,quat_rot
|
||
|
||
def process_command(self,arm_position_command, arm_orientation_command):
|
||
rot_euler = R.from_quat(arm_orientation_command).as_euler('xyz',degrees=True)
|
||
rot_euler = np.clip(rot_euler,-180,180)
|
||
arm_position_command *= 1000
|
||
pose_command = np.concatenate([arm_position_command, rot_euler])
|
||
print("pose_command",pose_command)
|
||
if (not self.is_exit) and (not self.is_standby):
|
||
cur = self.getAngel() # 参考关节角度
|
||
target = self.getInverseKin(pose_command,-1,-1,1,"{" + ",".join(map(str, cur)) + "}")
|
||
# print("target",target)
|
||
self.joint_current = cur
|
||
self.joint_target = target
|
||
ress = np.array(target)
|
||
ress = np.append(ress,[1])
|
||
self.filter_matirx = np.append(self.filter_matirx,[ress],axis=0)
|
||
self.filter_matirx = np.delete(self.filter_matirx,0,axis=0)
|
||
sum_joint = np.sum(self.filter_matirx,axis=0)
|
||
sum_joint[:6] = sum_joint[:6]/sum_joint[6]
|
||
result = sum_joint[:6]
|
||
return 0, result
|
||
return -1, None
|
||
|
||
|
||
if __name__ == "__main__":
|
||
# sleep(5)
|
||
dobot = dobot_nova5("192.168.5.1")
|
||
dobot.start()
|
||
posJ = [0,30,-120,0,90,0]
|
||
dobot.RunPoint_P_inJoint(posJ)
|
||
sleep(1)
|
||
# dobot.setEndEffector(i=9,tool_i="{0.0,0.0,154.071,0.0,0.0,0.0}")
|
||
# dobot.chooseEndEffector(i=9)
|
||
# print(dobot.getPose())
|
||
# print(dobot.get_arm_position())
|
||
# pos,rot_quat = dobot.get_arm_position()
|
||
# print("pos",pos)
|
||
# print("rot_quat",rot_quat)
|
||
|
||
# while(True):
|
||
# print("反馈角度",dobot.feedbackData.QActual)
|
||
# print("反馈四元数",dobot.feedbackData.ActualQuaternion)
|
||
# print("反馈TCP笛卡尔实际坐标值",dobot.feedbackData.ToolVectorActual)
|
||
# sleep(0.008)
|
||
|
||
dobot.Get_feedInfo_angle()
|
||
sleep(0.008)
|
||
dobot.Get_feedInfo_pose()
|
||
sleep(0.5)
|
||
# for i in range(400):
|
||
# posJ[2] += 0.04
|
||
# dobot.ServoJoint(posJ,t=0.008,aheadtime=20,gain=200)
|
||
# sleep(0.007)
|
||
|
||
dobot.disableRobot() |