413 lines
16 KiB
Python

from dobot_api import DobotApiFeedBack,DobotApiDashboard
import threading
from time import sleep,time
import re
import numpy as np
from scipy.spatial.transform import Rotation as R
class dobot_nova5:
def __init__(self,ip):
self.ip = ip
self.dashboardPort = 29999 ## 一发一收-设置-控制及运动端口号
self.feedFourPort = 30004 ## 第四实时反馈(8ms)反馈数据端口号
self.dashboard = None ## 初始化控制及运动端口实例
self.feedFour = None ## 初始化数据反馈端口实例
self.feedInfo = []
self.__globalLockValue = threading.Lock() # 多线程安全
class feedbackItem:
def __init__(self):
self.robotMode = -1
self.robotCurrentCommandID = 0
self.MessageSize = -1
self.DigitalInputs = -1
self.DigitalOutputs = -1
# ... 自定义反馈数据
self.feedbackData = feedbackItem() # 自定义反馈数据结构体
def parseResultId(self,valueRecv):
# 解析返回值
if "Not Tcp" in valueRecv:
print("Control Mode is not TCP")
print("机械臂不在二次开发模式下")
return [1]
return [int(num) for num in re.findall(r'-?\d+',valueRecv)] or [2]
def GetFeedback(self):
while True:
feedInfo = self.feedFour.feedBackData()
with self.__globalLockValue:
if feedInfo is not None:
if hex((feedInfo['TestValue'][0]))=='0x123456789abcdef': ## DOBOT官方的数据魔数验证
# 基础字段
self.feedbackData.MessageSize = feedInfo['len'][0]
self.feedbackData.robotMode = feedInfo['RobotMode'][0]
self.feedbackData.DigitalInputs = feedInfo['DigitalInputs'][0]
self.feedbackData.DigitalOutputs = feedInfo['DigitalOutputs'][0]
self.feedbackData.robotCurrentCommandID = feedInfo['CurrentCommandId'][0]
# 自定义添加所需反馈数据
'''
self.feedData.DigitalOutputs = int(feedInfo['DigitalOutputs'][0])
self.feedData.RobotMode = int(feedInfo['RobotMode'][0])
self.feedData.TimeStamp = int(feedInfo['TimeStamp'][0])
'''
def start(self):
self.dashboard = DobotApiDashboard(self.ip,self.dashboardPort)
self.feedFour = DobotApiFeedBack(self.ip,self.feedFourPort)
self.clearError() # 清除报警
if self.parseResultId(self.dashboard.EnableRobot())[0] != 0:
print("使能失败:检查29999端口是否被占用")
return
print("机械臂使能成功")
# 状态反馈线程
feedback_thread = threading.Thread(
target=self.GetFeedback # 机器状态反馈线程
)
feedback_thread.daemon = True
feedback_thread.start()
def RunPoint_P_inPose(self,poses_list):
'''
RUNPOINT_P 以关节运动方式运动到目标点
MovJ flag = 0 通过位姿走点,输入为[X Y Z RX RY RZ] unit: mm/deg
'''
# 走点
recvmovemess = self.dashboard.MovJ(*poses_list,0)
print("MovJ:",recvmovemess)
print(self.parseResultId(recvmovemess))
currentCommandID = self.parseResultId(recvmovemess)[1]
print("指令 ID:", currentCommandID)
# sleep(0.02)
while True: # 完成判断循环
########## 调试注释 #############
# print("robot mode",self.feedbackData.robotMode)
# print("feedback current ID", self.feedbackData.robotCurrentCommandID)
# print("指令 ID:", currentCommandID)
########## 调试注释 #############
currentCommandID = self.parseResultId(self.dashboard.GetCurrentCommandID())[1]
if self.feedbackData.robotMode == 5 and self.feedbackData.robotCurrentCommandID == currentCommandID:
print("该次运动完成")
break
sleep(0.1)
def RunPoint_P_inJoint(self,joints_list):
'''
RUNPOINT_P 以关节运动方式运动到目标点
MovJ flag = 1 通过关节角度走点,输入为[J1 J2 J3 J4 J5 J6] unit: deg
'''
# 走点
recvmovemess = self.dashboard.MovJ(*joints_list,1)
print("MovJ:",recvmovemess)
print(self.parseResultId(recvmovemess))
currentCommandID = self.parseResultId(recvmovemess)[1]
print("指令 ID:", currentCommandID)
sleep(0.02)
start_time = time() # 记录开始时间
while True: # 完成判断循环
########## 调试注释 #############
# print("robot mode",self.feedbackData.robotMode)
# print("feedback current ID", self.feedbackData.robotCurrentCommandID)
# print("指令 ID:", currentCommandID)
# print("30004",self.dashboard.GetCurrentCommandID())
########## 调试注释 #############
currentCommandID = self.parseResultId(self.dashboard.GetCurrentCommandID())[1]
if self.feedbackData.robotMode == 9 or self.feedbackData.robotMode == 11:
self.dashboard.Stop()
self.dashboard.EmergencyStop()
print("当前工作停止,机械臂处于急停状态")
if self.feedbackData.robotMode == 10: # 10 当前工程暂停
print("机械臂为暂停状态")
res = self.dashboard.Continue()
code = int(res[0])
if code == 0:
print("机械臂继续已暂停的运动指令")
if self.feedbackData.robotMode == 5 and self.feedbackData.robotCurrentCommandID == currentCommandID:
print("该次运动完成")
break
sleep(0.1) # 避免 CPU 占用过高
def RunPoint_L_inPose(self,poses_list):
'''
RUNPOINT_L 以直线运动方式运动到目标点
MovJ flag = 0 通过位姿走点,输入为[X Y Z RX RY RZ] unit: mm/deg
'''
# 走点
recvmovemess = self.dashboard.MovL(*poses_list,0)
print("MovL:",recvmovemess)
print(self.parseResultId(recvmovemess))
currentCommandID = self.parseResultId(recvmovemess)[1]
print("指令 ID:", currentCommandID)
while True:
currentCommandID = self.parseResultId(self.dashboard.GetCurrentCommandID())[1]
if self.feedbackData.robotMode == 5 and self.feedbackData.robotCurrentCommandID == currentCommandID:
print("该次运动完成")
break
sleep(0.1)
def RunPoint_L_inJoint(self,joints_list):
'''
RUNPOINT_L 以直线运动方式运动到目标点
MovJ flag = 0 通过位姿走点,输入为[X Y Z RX RY RZ] unit: mm/deg
'''
# 走点
recvmovemess = self.dashboard.MovL(*joints_list,0)
print("MovL:",recvmovemess)
print(self.parseResultId(recvmovemess))
currentCommandID = self.parseResultId(recvmovemess)[1]
print("指令 ID:", currentCommandID)
while True:
currentCommandID = self.parseResultId(self.dashboard.GetCurrentCommandID())[1]
if self.feedbackData.robotMode == 5 and self.feedbackData.robotCurrentCommandID == currentCommandID:
print("该次运动完成")
break
sleep(0.1)
def ServoJoint(self,joints_list,t=-1,aheadtime=-1,gain=-1):
# 关节空间伺服
recvmovemess = self.dashboard.ServoJ(*joints_list,t,aheadtime,gain)
print("ServoJ:",recvmovemess)
print(self.parseResultId(recvmovemess))
currentCommandID = self.parseResultId(recvmovemess)[1]
print("指令 ID", currentCommandID)
while True:
currentCommandID = self.parseResultId(self.dashboard.GetCurrentCommandID())[1]
if self.feedbackData.robotMode == 9 or self.feedbackData.robotMode == 11:
self.dashboard.Stop()
self.dashboard.EmergencyStop()
print("当前工作停止,机械臂处于急停状态")
if self.feedbackData.robotMode == 10: # 10 当前工程暂停
print("机械臂为暂停状态")
res = self.dashboard.Continue()
code = int(res[0])
if code == 0:
print("机械臂继续已暂停的运动指令")
if self.feedbackData.robotMode == 5 and self.feedbackData.robotCurrentCommandID == currentCommandID:
print("完成一次伺服动作")
break
def ServoPose(self,poses_list,t=-1,aheadtime=-1,gain=-1):
# 位姿伺服
recvmovemess = self.dashboard.ServoP(*poses_list,t,aheadtime,gain)
print("ServoP:",recvmovemess)
print(self.parseResultId(recvmovemess))
currentCommandID = self.parseResultId(recvmovemess)[1]
print("指令 ID", currentCommandID)
while True:
currentCommandID = self.parseResultId(self.dashboard.GetCurrentCommandID())[1]
if self.feedbackData.robotMode == 5 and self.feedbackData.robotCurrentCommandID == currentCommandID:
print("完成一次伺服动作")
break
def setEndEffector(self,i,tool_i):
'''
设置工具坐标系在索引i. i!=0
tool_i 需要以字符串形式输入 "{x,y,z,rx,ry,rz}"
'''
if i == 0:
print("不可修改工具0(初始法兰坐标系)")
return
if i not in {0, 1, 2, 3, 4, 5, 6, 7, 8, 9}:
print("工具坐标系序号超出范围")
return
res = self.dashboard.SetTool(i,tool_i)
code = int(res[0])
if code == 0:
print(f"设置工具坐标系{i}的偏移量成功")
else:
print(f"设置工具坐标系{i}失败")
return
def chooseEndEffector(self,i):
'''
切换工具坐标系, 0~9
'''
if i not in {0, 1, 2, 3, 4, 5, 6, 7, 8, 9}:
print("工具坐标系序号超出范围")
return
res = self.dashboard.Tool(i)
code = int(res[0])
if code == 0:
print(f"切换为工具坐标系{i}成功")
else:
print("切换工具坐标系失败")
return
def setBaseFrame(self,i,base_i):
'''
设置工具坐标系在索引i. i!=0
base_i 修改后的用户坐标系(非偏移量!) 需要以字符串形式输入 "{x,y,z,rx,ry,rz}"
'''
if i == 0:
print("不可修改用户坐标系0(初始基坐标系)")
return
if i not in {0, 1, 2, 3, 4, 5, 6, 7, 8, 9}:
print("用户坐标系序号超出范围")
return
res = self.dashboard.SetUser(i,base_i)
code = int(res[0])
if code == 0:
print(f"设置用户坐标系{i}成功")
else:
print(f"设置用户坐标系{i}失败")
return
def chooseBaseFrame(self,i):
'''
切换用户基坐标系, 0~9
'''
if i not in {0, 1, 2, 3, 4, 5, 6, 7, 8, 9}:
print("工具坐标系序号超出范围")
return
res = self.dashboard.User(i)
code = int(res[0])
if code == 0:
print(f"切换为用户基坐标系{i}成功")
else:
print("切换用户基坐标系失败")
return
def getPose(self,user=-1,tool=-1):
if (user == -1) != (tool == -1): ## XOR 必须同时设置或者同时不设置
print("必须同时设置或同时不设置坐标系参数")
res = self.dashboard.GetPose(user,tool) # 返回字符串
code = int(res[0]) # 拿到的是字符串
if code == 0:
pass
# print("success")
else:
print("获取位姿失败")
return
# 处理字符串
start = res.find("{")+1
end = res.find("}")
pose_str = res[start:end]
pose = [float(x.strip()) for x in pose_str.split(",")]
return pose
def getAngel(self):
res = self.dashboard.GetAngle()
code = int(res[0])
if code == 0:
pass
# print("success")
else:
print("获取关节角度失败")
return
# 处理字符串
start = res.find("{")+1
end = res.find("}")
angle_str = res[start:end]
angle = [float(x.strip()) for x in angle_str.split(",")]
return angle
def getForwardKin(self,joints_list,user=-1,tool=-1):
res = self.dashboard.PositiveKin(*joints_list,user,tool)
code = int(res[0])
if code == 0:
pass
# print("success")
else:
print("获取正向运动学解失败")
return
# 处理字符串
start = res.find("{")+1
end = res.find("}")
pose_str = res[start:end]
pose = [float(x.strip()) for x in pose_str.split(",")]
return pose
def getInverseKin(self,poses_list,user=-1,tool=-1,useJointNear=-1,JointNear=''):
'''
poses_list X Y Z RX RY RZ
useJointNear 0 或不携带 表示 JointNear无效
1 表示根据JointNear就近选解
jointNear string "{j1,j2,j3,j4,j5,j6}"
'''
res = self.dashboard.InverseKin(*poses_list,user,tool,useJointNear,JointNear)
code = int(res[0])
if code == 0:
pass
# print("success")
else:
print("获取逆向运动学解失败")
return
# 处理字符串
start = res.find("{")+1
end = res.find("}")
joint_str = res[start:end]
joint = [float(x.strip()) for x in joint_str.split(",")]
return joint
def disableRobot(self):
res = self.dashboard.DisableRobot()
code = int(res[0])
if code == 0:
print("下使能机械臂成功")
else:
print("下使能机械臂失败")
return
def clearError(self):
res = self.dashboard.ClearError()
code = int(res[0])
if code == 0:
print("清楚报警成功")
else:
print("清楚报警失败")
return
def start_drag(self):
res = self.dashboard.StartDrag()
code = int(res[0])
if code == 0:
print("拖拽模式开启成功")
else:
print("拖拽模式开启失败")
return
def stop_drag(self):
res = self.dashboard.StopDrag()
code = int(res[0])
if code == 0:
print("拖拽模式关闭成功")
else:
print("拖拽模式关闭失败")
return
def __del__(self):
del self.dashboard
del self.feedFour
#########
def get_arm_position(self):
pose = self.getPose()
x,y,z,rx,ry,rz = pose # x y z in mm; rx ry rz in degree
pos = np.array([x,y,z])
quat_rot = R.from_euler('xyz', [rx, ry, rz], degrees=True).as_quat()
return pos,quat_rot
#########
if __name__ == "__main__":
dobot = dobot_nova5("192.168.5.1")
dobot.start()
posJ = [0,30,-120,0,90,0]
dobot.RunPoint_P_inJoint(posJ)
sleep(1)
# for i in range(200):
# posJ[2] += 0.04
# dobot.ServoJoint(joints_list=posJ,t=0.008,aheadtime=20,gain=200)
# sleep(0.008)
dobot.disableRobot()