from dobot_api import DobotApiFeedBack,DobotApiDashboard import threading from time import sleep,time import re import numpy as np from scipy.spatial.transform import Rotation as R ''' MODE DESCRIPTION CN 1 INIT 初始化 2 BREAK_OPEN 抱闸松开 3 POWER_OFF 本体下电 4 DISABLED 未使能(抱闸未松) 5 ENABLE 使能(空闲) 6 BACKDRIVE 拖拽 7 RUNNING 运行状态(脚本/TCP) 8 SINGLE_MOVE 单次运动(点动) 9 ERROR 错误状态 10 PAUSE 暂停状态 11 COLLISION 碰撞状态 MODE RANK 报错 I 下电 II 碰撞 III 开抱闸 IV ''' class dobot_nova5: def __init__(self,ip): self.ip = ip self.dashboardPort = 29999 ## 一发一收-设置-控制及运动端口号 self.feedFourPort = 30004 ## 第四实时反馈(8ms)反馈数据端口号 self.dashboard = None ## 初始化控制及运动端口实例 self.feedFour = None ## 初始化数据反馈端口实例 self.feedInfo = [] self.__globalLockValue = threading.Lock() # 多线程安全 class feedbackItem: def __init__(self): self.robotMode = -1 self.robotCurrentCommandID = 0 self.MessageSize = -1 self.DigitalInputs = -1 self.DigitalOutputs = -1 # ... 自定义反馈数据 self.feedbackData = feedbackItem() # 自定义反馈数据结构体 def parseResultId(self,valueRecv): # 解析返回值 if "Not Tcp" in valueRecv: print("Control Mode is not TCP") print("机械臂不在二次开发模式下") return [1] return [int(num) for num in re.findall(r'-?\d+',valueRecv)] or [2] def GetFeedback(self): while True: feedInfo = self.feedFour.feedBackData() with self.__globalLockValue: if feedInfo is not None: if hex((feedInfo['TestValue'][0]))=='0x123456789abcdef': ## DOBOT官方的数据魔数验证 # 基础字段 self.feedbackData.MessageSize = feedInfo['len'][0] self.feedbackData.robotMode = feedInfo['RobotMode'][0] self.feedbackData.DigitalInputs = feedInfo['DigitalInputs'][0] self.feedbackData.DigitalOutputs = feedInfo['DigitalOutputs'][0] self.feedbackData.robotCurrentCommandID = feedInfo['CurrentCommandId'][0] # 自定义添加所需反馈数据 ''' self.feedData.DigitalOutputs = int(feedInfo['DigitalOutputs'][0]) self.feedData.RobotMode = int(feedInfo['RobotMode'][0]) self.feedData.TimeStamp = int(feedInfo['TimeStamp'][0]) ''' def start(self): self.dashboard = DobotApiDashboard(self.ip,self.dashboardPort) self.feedFour = DobotApiFeedBack(self.ip,self.feedFourPort) self.clearError() # 清除报警 if self.parseResultId(self.dashboard.EnableRobot())[0] != 0: print("使能失败:检查29999端口是否被占用") return print("机械臂使能成功") # 状态反馈线程 feedback_thread = threading.Thread( target=self.GetFeedback # 机器状态反馈线程 ) feedback_thread.daemon = True feedback_thread.start() def RunPoint_P_inPose(self,poses_list): ''' RUNPOINT_P 以关节运动方式运动到目标点 MovJ flag = 0 通过位姿走点,输入为[X Y Z RX RY RZ] unit: mm/deg ''' # 走点 recvmovemess = self.dashboard.MovJ(*poses_list,0) print("MovJ:",recvmovemess) print(self.parseResultId(recvmovemess)) currentCommandID = self.parseResultId(recvmovemess)[1] print("指令 ID:", currentCommandID) sleep(0.02) while True: # 完成判断循环 ########## 调试注释 ############# # print("robot mode",self.feedbackData.robotMode) # print("feedback current ID", self.feedbackData.robotCurrentCommandID) # print("指令 ID:", currentCommandID) ########## 调试注释 ############# currentCommandID = self.parseResultId(self.dashboard.GetCurrentCommandID())[1] if self.feedbackData.robotMode == 9 or self.feedbackData.robotMode == 11: self.dashboard.Stop() self.dashboard.EmergencyStop() print("当前工作停止,机械臂处于急停状态") if self.feedbackData.robotMode == 10: # 10 当前工程暂停 print("机械臂为暂停状态") res = self.dashboard.Continue() code = int(res[0]) if code == 0: print("机械臂继续已暂停的运动指令") if self.feedbackData.robotMode == 5 and self.feedbackData.robotCurrentCommandID == currentCommandID: print("该次运动完成") break sleep(0.1) return 0 def RunPoint_P_inJoint(self,joints_list): ''' RUNPOINT_P 以关节运动方式运动到目标点 MovJ flag = 1 通过关节角度走点,输入为[J1 J2 J3 J4 J5 J6] unit: deg ''' # 走点 recvmovemess = self.dashboard.MovJ(*joints_list,1) print("MovJ:",recvmovemess) print(self.parseResultId(recvmovemess)) currentCommandID = self.parseResultId(recvmovemess)[1] print("指令 ID:", currentCommandID) sleep(0.02) while True: # 完成判断循环 ########## 调试注释 ############# # print("robot mode",self.feedbackData.robotMode) # print("feedback current ID", self.feedbackData.robotCurrentCommandID) # print("指令 ID:", currentCommandID) # print("30004",self.dashboard.GetCurrentCommandID()) ########## 调试注释 ############# currentCommandID = self.parseResultId(self.dashboard.GetCurrentCommandID())[1] if self.feedbackData.robotMode == 9 or self.feedbackData.robotMode == 11: self.dashboard.Stop() self.dashboard.EmergencyStop() print("当前工作停止,机械臂处于急停状态") if self.feedbackData.robotMode == 10: # 10 当前工程暂停 print("机械臂为暂停状态") res = self.dashboard.Continue() code = int(res[0]) if code == 0: print("机械臂继续已暂停的运动指令") if self.feedbackData.robotMode == 5 and self.feedbackData.robotCurrentCommandID == currentCommandID: print("该次运动完成") break sleep(0.1) # 避免 CPU 占用过高 return 0 def RunPoint_L_inPose(self,poses_list): ''' RUNPOINT_L 以直线运动方式运动到目标点 MovJ flag = 0 通过位姿走点,输入为[X Y Z RX RY RZ] unit: mm/deg ''' # 走点 recvmovemess = self.dashboard.MovL(*poses_list,0) print("MovL:",recvmovemess) print(self.parseResultId(recvmovemess)) currentCommandID = self.parseResultId(recvmovemess)[1] print("指令 ID:", currentCommandID) while True: currentCommandID = self.parseResultId(self.dashboard.GetCurrentCommandID())[1] if self.feedbackData.robotMode == 9 or self.feedbackData.robotMode == 11: self.dashboard.Stop() self.dashboard.EmergencyStop() print("当前工作停止,机械臂处于急停状态") if self.feedbackData.robotMode == 10: # 10 当前工程暂停 print("机械臂为暂停状态") res = self.dashboard.Continue() code = int(res[0]) if code == 0: print("机械臂继续已暂停的运动指令") if self.feedbackData.robotMode == 5 and self.feedbackData.robotCurrentCommandID == currentCommandID: print("该次运动完成") break sleep(0.1) return 0 def RunPoint_L_inJoint(self,joints_list): ''' RUNPOINT_L 以直线运动方式运动到目标点 MovJ flag = 0 通过位姿走点,输入为[X Y Z RX RY RZ] unit: mm/deg ''' # 走点 recvmovemess = self.dashboard.MovL(*joints_list,0) print("MovL:",recvmovemess) print(self.parseResultId(recvmovemess)) currentCommandID = self.parseResultId(recvmovemess)[1] print("指令 ID:", currentCommandID) while True: currentCommandID = self.parseResultId(self.dashboard.GetCurrentCommandID())[1] if self.feedbackData.robotMode == 9 or self.feedbackData.robotMode == 11: self.dashboard.Stop() self.dashboard.EmergencyStop() print("当前工作停止,机械臂处于急停状态") if self.feedbackData.robotMode == 10: # 10 当前工程暂停 print("机械臂为暂停状态") res = self.dashboard.Continue() code = int(res[0]) if code == 0: print("机械臂继续已暂停的运动指令") if self.feedbackData.robotMode == 5 and self.feedbackData.robotCurrentCommandID == currentCommandID: print("该次运动完成") break sleep(0.1) return 0 def ServoJoint(self,joints_list,t=-1,aheadtime=-1,gain=-1): tcp_command = f"servoj({joints_list[0]},{joints_list[1]},{joints_list[2]},{joints_list[3]},{joints_list[4]},{joints_list[5]},t={t},aheadtime={aheadtime},gain={gain})" self.dashboard.send_data(tcp_command) # 通过反馈状态来进行异常处理,故这里只需要发,不需要收 print("ServoJ:",tcp_command) if self.feedbackData.robotMode == 9 or self.feedbackData.robotMode == 11: self.dashboard.Stop() self.dashboard.EmergencyStop() print("当前工作停止,机械臂处于急停状态") return -1 if self.feedbackData.robotMode == 10: # 10 当前工程暂停 print("机械臂为暂停状态") res = self.dashboard.Continue() code = int(res[0]) if code == 0: print("机械臂继续已暂停的运动指令") return 0 def ServoPose(self,poses_list,t=-1,aheadtime=-1,gain=-1): poses_list = list(map(float, poses_list)) tcp_command = f"ServoP({poses_list[0]},{poses_list[1]},{poses_list[2]},{poses_list[3]},{poses_list[4]},{poses_list[5]}, t={t}, aheadtime={aheadtime}, gain={gain})" self.dashboard.send_data(tcp_command) # 通过反馈状态来进行异常处理,故这里只需要发,不需要收 print("ServoP:",tcp_command) if self.feedbackData.robotMode == 9 or self.feedbackData.robotMode == 11: self.dashboard.Stop() self.dashboard.EmergencyStop() print("当前工作停止,机械臂处于急停状态") if self.feedbackData.robotMode == 10: # 10 当前工程暂停 print("机械臂为暂停状态") res = self.dashboard.Continue() code = int(res[0]) if code == 0: print("机械臂继续已暂停的运动指令") def setEndEffector(self,i,tool_i): ''' 设置工具坐标系在索引i. i!=0 tool_i 需要以字符串形式输入 "{x,y,z,rx,ry,rz}" ''' if i == 0: print("不可修改工具0(初始法兰坐标系)") return if i not in {0, 1, 2, 3, 4, 5, 6, 7, 8, 9}: print("工具坐标系序号超出范围") return res = self.dashboard.SetTool(i,tool_i) code = int(res[0]) if code == 0: print(f"设置工具坐标系{i}的偏移量成功") else: print(f"设置工具坐标系{i}失败") return def chooseEndEffector(self,i): ''' 切换工具坐标系, 0~9 ''' if i not in {0, 1, 2, 3, 4, 5, 6, 7, 8, 9}: print("工具坐标系序号超出范围") return res = self.dashboard.Tool(i) code = int(res[0]) if code == 0: print(f"切换为工具坐标系{i}成功") else: print("切换工具坐标系失败") return def setBaseFrame(self,i,base_i): ''' 设置工具坐标系在索引i. i!=0 base_i 修改后的用户坐标系(非偏移量!) 需要以字符串形式输入 "{x,y,z,rx,ry,rz}" ''' if i == 0: print("不可修改用户坐标系0(初始基坐标系)") return if i not in {0, 1, 2, 3, 4, 5, 6, 7, 8, 9}: print("用户坐标系序号超出范围") return res = self.dashboard.SetUser(i,base_i) code = int(res[0]) if code == 0: print(f"设置用户坐标系{i}成功") else: print(f"设置用户坐标系{i}失败") return def chooseBaseFrame(self,i): ''' 切换用户基坐标系, 0~9 ''' if i not in {0, 1, 2, 3, 4, 5, 6, 7, 8, 9}: print("工具坐标系序号超出范围") return res = self.dashboard.User(i) code = int(res[0]) if code == 0: print(f"切换为用户基坐标系{i}成功") else: print("切换用户基坐标系失败") return def getPose(self,user=-1,tool=-1): if (user == -1) != (tool == -1): ## XOR 必须同时设置或者同时不设置 print("必须同时设置或同时不设置坐标系参数") res = self.dashboard.GetPose(user,tool) # 返回字符串 code = int(res[0]) # 拿到的是字符串 if code == 0: pass # print("success") else: print("获取位姿失败") return # 处理字符串 start = res.find("{")+1 end = res.find("}") pose_str = res[start:end] pose = [float(x.strip()) for x in pose_str.split(",")] return pose def getAngel(self): res = self.dashboard.GetAngle() code = int(res[0]) if code == 0: pass # print("success") else: print("获取关节角度失败") return # 处理字符串 start = res.find("{")+1 end = res.find("}") angle_str = res[start:end] angle = [float(x.strip()) for x in angle_str.split(",")] return angle def getForwardKin(self,joints_list,user=-1,tool=-1): res = self.dashboard.PositiveKin(*joints_list,user,tool) code = int(res[0]) if code == 0: pass # print("success") else: print("获取正向运动学解失败") return # 处理字符串 start = res.find("{")+1 end = res.find("}") pose_str = res[start:end] pose = [float(x.strip()) for x in pose_str.split(",")] return pose def getInverseKin(self,poses_list,user=-1,tool=-1,useJointNear=-1,JointNear=''): ''' poses_list X Y Z RX RY RZ useJointNear 0 或不携带 表示 JointNear无效 1 表示根据JointNear就近选解 jointNear string "{j1,j2,j3,j4,j5,j6}" ''' res = self.dashboard.InverseKin(*poses_list,user,tool,useJointNear,JointNear) code = int(res[0]) if code == 0: pass # print("success") else: print("获取逆向运动学解失败") return # 处理字符串 start = res.find("{")+1 end = res.find("}") joint_str = res[start:end] joint = [float(x.strip()) for x in joint_str.split(",")] return joint def disableRobot(self): res = self.dashboard.DisableRobot() code = int(res[0]) if code == 0: print("下使能机械臂成功") else: print("下使能机械臂失败") return def clearError(self): res = self.dashboard.ClearError() code = int(res[0]) if code == 0: print("清楚报警成功") else: print("清楚报警失败") return def start_drag(self): res = self.dashboard.StartDrag() code = int(res[0]) if code == 0: print("拖拽模式开启成功") else: print("拖拽模式开启失败") return def stop_drag(self): res = self.dashboard.StopDrag() code = int(res[0]) if code == 0: print("拖拽模式关闭成功") else: print("拖拽模式关闭失败") return def stop_motion(self): res = self.dashboard.Stop() code = int(res[0]) if code == 0: print("机械臂停止下发运动指令队列") else: print("停止下发运动失败,需立即拍下急停") return def __del__(self): del self.dashboard del self.feedFour ## 适配中间层的再封装接口 # 为状态管理而封装的初始化函数 def init(self): self.is_exit = False self.traj_list = None self.last_pose_command = np.zeros(6) self.last_input_command = None self.tcp_offset = None self.init_J = [0,30,-120,0,90,0] sleep(1) self.is_standby = False code = self.RunPoint_P_inJoint(self.init_J) if code == 0: print("机械臂初始化成功且到达初始位置") else: print("机械臂初始化失败") def get_arm_position(self): pose = self.getPose() x,y,z,rx,ry,rz = pose # x y z in mm; rx ry rz in degree pos = np.array([x,y,z]) quat_rot = R.from_euler('xyz', [rx, ry, rz], degrees=True).as_quat() return pos,quat_rot if __name__ == "__main__": dobot = dobot_nova5("192.168.5.1") dobot.start() posJ = [0,30,-120,0,90,0] dobot.RunPoint_P_inJoint(posJ) sleep(1) for i in range(400): posJ[2] += 0.04 dobot.ServoJoint(posJ,t=0.008,aheadtime=20,gain=200) sleep(0.007) dobot.disableRobot()