import os import sys current_file_path = os.path.abspath(__file__) dobot_nova5_path = os.path.dirname(os.path.dirname(os.path.dirname(current_file_path))) print(dobot_nova5_path) # 添加目标文件夹到系统路径 sys.path.append(dobot_nova5_path) try: from dobot_api import DobotApiFeedBack,DobotApiDashboard except: from hardware.dobot_api import DobotApiFeedBack,DobotApiDashboard try: from .dobot_message import * except: from dobot_message import * import threading from time import sleep,time import re from tools.log import CustomLogger import copy import numpy as np from scipy.spatial.transform import Rotation as R import math import time import requests import atexit ''' MODE DESCRIPTION CN 1 INIT 初始化 2 BREAK_OPEN 抱闸松开 3 POWER_OFF 本体下电 4 DISABLED 未使能(抱闸未松) 5 ENABLE 使能(空闲) 6 BACKDRIVE 拖拽 7 RUNNING 运行状态(脚本/TCP) 8 SINGLE_MOVE 单次运动(点动) 9 ERROR 错误状态 10 PAUSE 暂停状态 11 COLLISION 碰撞状态 MODE RANK 报错 I 下电 II 碰撞 III 开抱闸 IV ''' class dobot_nova5: def __init__(self,arm_ip = '192.168.5.1',dashboardPort = 29999,feedFourPort = 30004): self.ip = arm_ip ## 机械臂IP地址 self.tcp_offset = [0, 0, 0, 0, 0, 0] # 工具坐标系偏移 self.tcp_euler_inv = np.linalg.inv(R.from_euler('xyz', self.tcp_offset[3:], degrees=False).as_matrix()) self.dashboardPort = dashboardPort ## 一发一收-设置-控制及运动端口号 self.feedFourPort = feedFourPort ## 第四实时反馈(8ms)反馈数据端口号 self.dashboard = None ## 初始化控制及运动端口实例 self.feedFour = None ## 初始化数据反馈端口实例 self.feedInfo = [] self.__globalLockValue = threading.Lock() # 多线程安全 class feedbackItem: def __init__(self): self.robotMode = -1 self.robotCurrentCommandID = 0 self.MessageSize = -1 # self.DigitalInputs = -1 # self.DigitalOutputs = -1 self.EnableState = False self.QActual = -1 self.ActualQuaternion = -1 self.ToolVectorActual = -1 # ... 自定义反馈数据 ''' self.UserValue = -1 self.ToolValue = -1 ''' self.feedbackData = feedbackItem() # 自定义反馈数据结构体 self.logger = CustomLogger('Dobot_nova5') self.stop_feedback = threading.Event() self.stop_feedback.clear() # 状态反馈线程 self.feedback_thread = threading.Thread( target=self.GetFeedback # 机器状态反馈线程 ) try: self.dashboard = DobotApiDashboard(self.ip, self.dashboardPort) self.feedFour = DobotApiFeedBack(self.ip, self.feedFourPort) if self.dashboard.is_connected() and self.feedFour.is_connected(): print("连接成功") self.feedback_thread.daemon = True self.feedback_thread.start() time.sleep(1) self.dashboard.RequestControl() max_retry_num = 5 #尝试连接次数 cnt_num = 0 while cnt_num < max_retry_num: if self.feedbackData.robotMode != -1: self.dashboard.EmergencyStop(mode=0) # 松开急停 self.clearError() # 清除报警 # stopId = self.dashboard.Stop() time.sleep(0.5) print("self.self.feedbackData.robotMode:",self.feedbackData.robotMode) if (self.feedbackData.robotMode == 3 or self.feedbackData.robotMode == 4): #and self.parseResultprocess(stopId)[0] == 0: # self.dashboard.RequestControl() if self.feedbackData.robotMode == 3: self.dashboard.PowerOn() # 复位 # self.dashboard.Stop() # 停止 if self.start_up(): # 启动 self.logger.log_info(f"连接机械臂成功") break else: self.logger.log_error(f"启动失败,尝试第{cnt_num+1}次连接") cnt_num += 1 time.sleep(1) else: if self.feedbackData.robotMode == 9:# and self.parseResultprocess(stopId)[0] == -3: print(robot_mode_types.get(self.feedbackData.robotMode, "未知模式")) requests.post( "http://127.0.0.1:5000/on_message", data={"message": "机械臂处于锁定状态
请解除锁定后再使用"} ) instruction = { "is_massaging": False, "massage_service_started": False } requests.post( "http://127.0.0.1:5000/update_massage_status", data=instruction ) sys.exit(0) self.disableRobot() self.clearError() # 清除报警 cnt_num += 1 time.sleep(1) else: self.logger.log_error(f"连接机械臂失败,尝试第{cnt_num+1}次连接") cnt_num += 1 time.sleep(1) else: print("[ERROR]未连接成功") self.dashboard = None self.feedFour = None except Exception as e: print(f"[ERROR] Dashboard 连接失败: {e}") self.dashboard = None self.feedFour = None atexit.register(self.exit_task) self.chooseRightFrame() self.init_pos = [-45.0009079*np.pi/180,55.5785141*np.pi/180,-120.68821716*np.pi/180,5.11103201*np.pi/180,90.00195312*np.pi/180,-90.00085449*np.pi/180] self.standby_pos = [-35.0009079*np.pi/180,45.5785141*np.pi/180,-110.68821716*np.pi/180,2.11103201*np.pi/180,80.00195312*np.pi/180,-80.00085449*np.pi/180] self.off_pos = [-44.996*np.pi/180,87.0092*np.pi/180,-147.0087*np.pi/180,-0.0048*np.pi/180,90.0010*np.pi/180,-90.0021*np.pi/180] self.cam_pose = [-45.0050*np.pi/180,30.0682*np.pi/180,-66.1887*np.pi/180,-21.8836*np.pi/180,-90.3064*np.pi/180,-89.9977*np.pi/180] self.cam_length = 1.75 self.level_base = [-180.0000*np.pi/180,-30.0000*np.pi/180,0.0042*np.pi/180] self.is_exit = False self.is_standby = False self.filter_matirx = np.zeros((1,7)) # 位姿伺服用 self.filter_matrix = np.zeros((1,6)) # 角度伺服用 # 为状态管理而封装的初始化函数 def init(self): self.is_exit = False self.traj_list = None self.last_pose_command = np.zeros(6) self.last_input_command = None self.tcp_offset = None self.filter_matirx = np.zeros((1,7)) # 位姿伺服用 self.filter_matrix = np.zeros((1,6)) # 角度伺服用 sleep(1) self.is_standby = False self.move_joint_jog_range(q=np.zeros(6)) time.sleep(0.5) code = self.move_joint(self.init_pos) if code == 0: if self.parseResultprocess(self.dashboard.SetCollisionLevel(level=1))[0] == 0: self.logger.log_info("机械臂初始化") else: time.sleep(0.2) max_retries = 5 retry_count = 0 while retry_count < max_retries: if self.parseResultprocess(self.dashboard.SetCollisionLevel(level=1))[0] == 0: print("retry_count",retry_count) break else: time.sleep(0.1) if retry_count == max_retries: self.logger.log_error("机械臂初始化失败") requests.post( "http://127.0.0.1:5000/on_message", data={"message": "机械臂初始化失败"} ) self.is_exit = True sys.exit(0) else: self.logger.log_error("机械臂初始化失败") requests.post( "http://127.0.0.1:5000/on_message", data={"message": "机械臂初始化失败"} ) self.is_exit = True sys.exit(0) self.last_print_time=0 self.last_reocrd_time =0 def change_end_effector(self, end_effector_config): # 设置tcp偏移 self.tcp_offset = end_effector_config["tcp_offset"] self.tcp_euler_inv = np.linalg.inv(R.from_euler('xyz', self.tcp_offset[3:], degrees=False).as_matrix()) def exit_task(self): self.logger.log_yellow("退出任务") # self.disable() instruction = { "is_massaging": False, "massage_service_started": False } requests.post( "http://127.0.0.1:5000/update_massage_status", data=instruction ) time.sleep(1) self.move_joint(self.off_pos) self.stop_motion() self.disableRobot() self.stop_feedback.set() self.feedback_thread.join() self.dashboard.Disconnect() self.feedFour.Disconnect() def parseResultprocess(self,valueRecv): if valueRecv is None: print("Empty response or None received") return [1] # 解析返回值 if "Not Tcp" in valueRecv: print("Control Mode is not TCP") print("机械臂不在二次开发模式下") return [1] return [int(num) for num in re.findall(r'-?\d+',valueRecv)] or [2] def GetFeedback(self): while self.stop_feedback.is_set() == False: feedInfo = self.feedFour.feedBackData() with self.__globalLockValue: if feedInfo is not None: if hex((feedInfo['TestValue'][0]))=='0x123456789abcdef': ## DOBOT官方的数据魔数验证 # 基础字段 self.feedbackData.MessageSize = feedInfo['len'][0] self.feedbackData.robotMode = feedInfo['RobotMode'][0] # self.feedbackData.DigitalInputs = feedInfo['DigitalInputs'][0] # self.feedbackData.DigitalOutputs = feedInfo['DigitalOutputs'][0] self.feedbackData.EnableState = feedInfo['EnableStatus'][0] self.feedbackData.robotCurrentCommandID = feedInfo['CurrentCommandId'][0] self.feedbackData.QActual = feedInfo['QActual'][0] self.feedbackData.ActualQuaternion = feedInfo['ActualQuaternion'][0] self.feedbackData.ToolVectorActual = feedInfo['ToolVectorActual'][0] # 自定义添加所需反馈数据 ''' self.feedData.DigitalOutputs = int(feedInfo['DigitalOutputs'][0]) self.feedData.RobotMode = int(feedInfo['RobotMode'][0]) self.feedData.TimeStamp = int(feedInfo['TimeStamp'][0]) self.feedbackData.UserValue = feedInfo['UserValue[6]'][0] self.feedbackData.ToolValue = feedInfo['ToolValue[6]'][0] ''' def start_up(self): max_try = 3 # 尝试次数 cnt = 0 # 尝试次数计数 while cnt < max_try: cnt += 1 # 关闭碰撞检测 # if self.parseResultprocess(self.dashboard.SetCollisionLevel(level=0))[0] == 0: time.sleep(0.1) self.logger.log_info("关闭碰撞检测成功") code = self.parseResultprocess(self.dashboard.EnableRobot())[0] if code == 0: while 1: if self.feedbackData.robotMode in [3, 6, 7, 8, 9, 10, 11]: self.logger.log_error(f"使能失败,机械臂异常:{self.feedbackData.robotMode}") return False elif self.feedbackData.robotMode == 5: self.logger.log_info("机械臂使能成功") return True time.sleep(0.1) def disable_servo(self): # time.sleep(10) pass def enable_servo(self): pass def waitArrival(self, command): sendCommandID = self.parseResultprocess(command)[1] # print("self.feedbackData.robotMode6:",self.feedbackData.robotMode) # print("self.parseResultprocess(command)",self.parseResultprocess(command)) while True: if self.feedbackData.robotMode in [3, 4, 6, 9, 10, 11]: self.logger.log_error(f"机械臂异常:{self.feedbackData.robotMode}") print(robot_mode_types.get(self.feedbackData.robotMode, "未知模式")) self.dashboard.Stop() self.dashboard.EmergencyStop(mode=1) return -1 if self.feedbackData.EnableState: # print("self.feedbackData.robotCurrentCommandID",self.feedbackData.robotCurrentCommandID) if self.feedbackData.robotCurrentCommandID > sendCommandID: break else: isFinsh = (self.feedbackData.robotMode == 5) if self.feedbackData.robotCurrentCommandID == sendCommandID and isFinsh: break sleep(0.1) return 0 def move_joint(self,q,**params): ''' RUNPOINT_P 以关节运动方式运动到目标点 MovJ flag = 1 通过关节角度走点,输入为[J1 J2 J3 J4 J5 J6] unit: rad ''' # 单位 rad -> deg max_retry_count = params.get('max_retry_count', 3) cnt = 0 joints_list_deg = self.__transform_rad_2_deg(q) while cnt < max_retry_count: cnt += 1 command = self.dashboard.MovJ(*joints_list_deg,1) if command == None: return -1 is_arrived = self.waitArrival(command) if is_arrived == 0: # self.dashboard.MovJ() print(f'Arrived at the joint position: {q}') return 0 else: print(f'Failed to arrive at the joint position: {q}') return -1 print(f'After {cnt} retries, still failed to arrive at the joint position: {q}') return -1 # def move_joint_jog(self,q,**params): # ''' # move_joint_jog 以JOINT jOG运动方式运动到目标点 # MovJ flag = 1 通过关节角度走点,输入为[J1 J2 J3 J4 J5 J6] unit: rad # ''' # threshold = params.get('threshold', 10) # currentAngle = self.getAngel() # targetAngle = self.__transform_rad_2_deg(q) # currentAngle = np.array(currentAngle) # targetAngle = np.array(targetAngle) # self.dashboard.SpeedFactor(50) # for i in range(len(targetAngle)): # while np.abs(currentAngle[i] - targetAngle[i]) >threshold: # if currentAngle[i] < targetAngle[i]: # self.dashboard.MoveJog(f"J{i+1}+") # else: # self.dashboard.MoveJog(f"J{i+1}-") # time.sleep(0.5) # currentAngle = self.getAngel() # currentAngle = np.array(currentAngle) # self.dashboard.MoveJog("") # # self.dashboard.SpeedFactor(95) def move_joint_jog_range(self,q,**params): ''' move_joint_jog 以JOINT jOG运动方式运动到目标点 MovJ flag = 1 通过关节角度走点,输入为[J1 J2 J3 J4 J5 J6] unit: rad ''' threshold = params.get('threshold', np.array([350,170,155,350,350,350])) currentAngle = self.getAngel() targetAngle = self.__transform_rad_2_deg(q) currentAngle = np.array(currentAngle) targetAngle = np.array(targetAngle) self.dashboard.SpeedFactor(50) for i in range(len(targetAngle)): while np.abs(np.abs(currentAngle[i]) - targetAngle[i]) >threshold[i]: if currentAngle[i] < targetAngle[i]: self.dashboard.MoveJog(f"J{i+1}+") else: self.dashboard.MoveJog(f"J{i+1}-") time.sleep(0.5) currentAngle = self.getAngel() currentAngle = np.array(currentAngle) self.dashboard.MoveJog("") def RunPoint_P_inPose_M_RAD(self,pose,**params): ''' RUNPOINT_P 以关节运动方式运动到目标点 MovJ flag = 0 通过位姿走点,输入为[X Y Z RX RY RZ] unit: M/RAD ''' # 单位 m/rad -> mm/deg max_retry_count = params.get('max_retry_count', 3) cnt = 0 print("self.robotmode3:",self.feedbackData.robotMode) poses_list_mm_deg = self.__transform_m_rad_2_mm_deg(pose) while cnt < max_retry_count: cnt += 1 print("self.robotmode4:",self.feedbackData.robotMode) command = self.dashboard.MovJ(*poses_list_mm_deg,0) if command == None: return -1 print("self.robotmode5:",self.feedbackData.robotMode) is_arrived = self.waitArrival(command = command) if is_arrived == 0: print(f'Arrived at the joint position: {pose}') return 0 else: print(f'Failed to arrive at the joint position: {pose}') return -1 print(f'After {cnt} retries, still failed to arrive at the joint position: {pose}') return -1 def ServoJoint(self,joints_list): ''' 关节角度伺服 输入为[J1 J2 J3 J4 J5 J6] unit: deg ''' tcp_command = f"servoj({joints_list[0]},{joints_list[1]},{joints_list[2]},{joints_list[3]},{joints_list[4]},{joints_list[5]}, t=0.01, aheadtime=20, gain=200)" self.dashboard.send_data(tcp_command) # 通过反馈状态来进行异常处理,故这里只需要发,不需要收 print("ServoJ:",tcp_command) if self.feedbackData.robotMode == 9 or self.feedbackData.robotMode == 11: self.dashboard.Stop() self.dashboard.EmergencyStop(mode=1) print("当前工作停止,机械臂处于急停状态") return -1 if self.feedbackData.robotMode == 10: # 10 当前工程暂停 print("机械臂为暂停状态") res = self.dashboard.Continue() code = self.parseResultprocess(res)[0] if code == 0: print("机械臂继续已暂停的运动指令") return 0 def ServoJoint_RAD(self,joints_list): ''' 关节角度伺服 输入为[J1 J2 J3 J4 J5 J6] unit: rad ''' # 单位转换 rad -> deg joints_list_deg = self.__transform_rad_2_deg(joints_list) # 构造字符串 tcp_command = f"servoj({joints_list_deg[0]},{joints_list_deg[1]},{joints_list_deg[2]},{joints_list_deg[3]},{joints_list_deg[4]},{joints_list_deg[5]}, t=0.01, aheadtime=20, gain=200)" self.dashboard.send_data(tcp_command) # 通过反馈状态来进行异常处理,故这里只需要发,不需要收 print("ServoJ:",tcp_command) if self.feedbackData.robotMode == 9 or self.feedbackData.robotMode == 11: self.dashboard.Stop() self.dashboard.EmergencyStop(mode=1) print("当前工作停止,机械臂处于急停状态") return -1 if self.feedbackData.robotMode == 10: # 10 当前工程暂停 print("机械臂为暂停状态") res = self.dashboard.Continue() code = self.parseResultprocess(res)[0] if code == 0: print("机械臂继续已暂停的运动指令") return 0 def ServoPose(self,poses_list): ''' 位姿伺服 输入为[X Y Z RX RY RZ] unit: mm/deg ''' poses_list = list(map(float, poses_list)) tcp_command = f"ServoP({poses_list[0]},{poses_list[1]},{poses_list[2]},{poses_list[3]},{poses_list[4]},{poses_list[5]}, t=0.01, aheadtime=20, gain=300)" # res = self.dashboard.sendRecvMsg(tcp_command) # 通过反馈状态来进行异常处理,故这里只需要发,不需要收 self.dashboard.send_data(tcp_command) # print("ServoP msg:",res) if self.feedbackData.robotMode == 9 or self.feedbackData.robotMode == 11: self.dashboard.Stop() self.dashboard.EmergencyStop(mode=1) print("当前工作停止,机械臂处于急停状态") if self.feedbackData.robotMode == 10: # 10 当前工程暂停 print("机械臂为暂停状态") res = self.dashboard.Continue() code = self.parseResultprocess(res)[0] if code == 0: print("机械臂继续已暂停的运动指令") def ServoPose_M_RAD(self,poses_list): ''' 位姿伺服 输入为[X Y Z RX RY RZ] unit: M/RAD ''' # 单位 m/rad -> mm/deg poses_list_mm_deg = self.__transform_m_rad_2_mm_deg(poses_list) # 确保数据类型正确 poses_list_mm_deg = list(map(float, poses_list_mm_deg)) # 构造字符串 tcp_command = f"ServoP({poses_list[0]},{poses_list[1]},{poses_list[2]},{poses_list[3]},{poses_list[4]},{poses_list[5]}, t=0.01, aheadtime=20, gain=200)" res = self.dashboard.sendRecvMsg(tcp_command) # 通过反馈状态来进行异常处理,故这里只需要发,不需要收 print("ServoP msg:",res) if self.feedbackData.robotMode == 9 or self.feedbackData.robotMode == 11: self.dashboard.Stop() self.dashboard.EmergencyStop(mode=1) print("当前工作停止,机械臂处于急停状态") if self.feedbackData.robotMode == 10: # 10 当前工程暂停 print("机械臂为暂停状态") res = self.dashboard.Continue() code = self.parseResultprocess(res)[0] if code == 0: print("机械臂继续已暂停的运动指令") def setEndEffector(self,i,tool_i): ''' 设置工具坐标系在索引i. i!=0 tool_i 需要以字符串形式输入 "{x,y,z,rx,ry,rz}" ''' if i == 0: print("不可修改工具0(初始法兰坐标系)") return if i not in {0, 1, 2, 3, 4, 5, 6, 7, 8, 9}: print("工具坐标系序号超出范围") return res = self.dashboard.SetTool(i,tool_i) print(res) code = self.parseResultprocess(res)[0] if code == 0: print(f"设置工具坐标系{i}的偏移量成功") else: print(f"设置工具坐标系{i}失败") return def chooseEndEffector(self,i): ''' 切换工具坐标系, 0~9 ''' if i not in {0, 1, 2, 3, 4, 5, 6, 7, 8, 9}: print("工具坐标系序号超出范围") return res = self.dashboard.Tool(i) print(res) code = self.parseResultprocess(res)[0] if code == 0: print(f"切换为工具坐标系{i}成功") else: print("切换工具坐标系失败") return def setBaseFrame(self,i,base_i): ''' 设置工具坐标系在索引i. i!=0 base_i 修改后的用户坐标系(非偏移量!) 需要以字符串形式输入 "{x,y,z,rx,ry,rz}" ''' if i == 0: print("不可修改用户坐标系0(初始基坐标系)") return if i not in {0, 1, 2, 3, 4, 5, 6, 7, 8, 9}: print("用户坐标系序号超出范围") return res = self.dashboard.SetUser(i,base_i) code = self.parseResultprocess(res)[0] if code == 0: print(f"设置用户坐标系{i}成功") else: print(f"设置用户坐标系{i}失败") return def chooseBaseFrame(self,i): ''' 切换用户基坐标系, 0~9 ''' if i not in {0, 1, 2, 3, 4, 5, 6, 7, 8, 9}: print("工具坐标系序号超出范围") return res = self.dashboard.User(i) code = self.parseResultprocess(res)[0] if code == 0: print(f"切换为用户基坐标系{i}成功") else: print("切换用户基坐标系失败") return def getPose(self,user=-1,tool=-1): if (user == -1) != (tool == -1): ## XOR 必须同时设置或者同时不设置 print("必须同时设置或同时不设置坐标系参数") res = self.dashboard.GetPose(user,tool) # 返回字符串 print("get pose",res) code = self.parseResultprocess(res)[0] if code == 0: pass # print("success") else: print("获取位姿失败") return # 处理字符串 start = res.find("{")+1 end = res.find("}") pose_str = res[start:end] pose = [float(x.strip()) for x in pose_str.split(",")] return pose def getAngel(self): res = self.dashboard.GetAngle() code = self.parseResultprocess(res)[0] if code == 0: pass # print("success") else: print("获取关节角度失败") return # 处理字符串 start = res.find("{")+1 end = res.find("}") angle_str = res[start:end] # print("res:",res) angle = [float(x.strip()) for x in angle_str.split(",")] return angle def getForwardKin(self,joints_list,user=-1,tool=-1): res = self.dashboard.PositiveKin(*joints_list,user,tool) code = self.parseResultprocess(res)[0] if code == 0: pass # print("success") else: print("获取正向运动学解失败") return # 处理字符串 start = res.find("{")+1 end = res.find("}") pose_str = res[start:end] pose = [float(x.strip()) for x in pose_str.split(",")] return pose def getInverseKin(self,poses_list,user=1,tool=-1,useJointNear=1,JointNear=''): ''' poses_list X Y Z RX RY RZ useJointNear 0 或不携带 表示 JointNear无效 1 表示根据JointNear就近选解 jointNear string "jointNear={j1,j2,j3,j4,j5,j6}" ''' res = self.dashboard.InverseKin(*poses_list,user,tool,useJointNear,JointNear) print(res) code = self.parseResultprocess(res)[0] if code == 0: pass # print("success") else: print("获取逆向运动学解失败") return # 处理字符串 start = res.find("{")+1 end = res.find("}") joint_str = res[start:end] joint = [float(x.strip()) for x in joint_str.split(",")] return joint def disableRobot(self): res = self.dashboard.DisableRobot() code = self.parseResultprocess(res)[0] if code == 0: print("下使能机械臂成功") else: print("下使能机械臂失败") return def clearError(self): res = self.dashboard.ClearError() code = self.parseResultprocess(res)[0] if code == 0: print("清楚报警成功") else: print("清楚报警失败") return def start_drag(self): res = self.dashboard.StartDrag() code = self.parseResultprocess(res)[0] if code == 0: print("拖拽模式开启成功") else: print("拖拽模式开启失败") return def stop_drag(self): res = self.dashboard.StopDrag() code = self.parseResultprocess(res)[0] if code == 0: print("拖拽模式关闭成功") else: print("拖拽模式关闭失败") return def stop_motion(self): res = self.dashboard.Stop() code = self.parseResultprocess(res)[0] if code == 0: print("机械臂停止下发运动指令队列") else: print("停止下发运动失败,需立即拍下急停") return def __del__(self): del self.dashboard del self.feedFour ## 内部工具函数 def __transform_m_rad_2_mm_deg(self,input_list): ''' 接受列表 [X Y Z RX RY RZ] 将单位 m/rad 转为 mm/deg ''' if len(input_list) != 6: raise ValueError("Input list must contain exactly 6 elements [X, Y, Z, RX, RY, RZ]") # 转换位置单位:m -> mm (乘以 1000) x_mm = input_list[0] * 1000.0 y_mm = input_list[1] * 1000.0 z_mm = input_list[2] * 1000.0 # 转换角度单位:rad -> deg (乘以 180/pi) rx_deg = math.degrees(input_list[3]) ry_deg = math.degrees(input_list[4]) rz_deg = math.degrees(input_list[5]) return [x_mm, y_mm, z_mm, rx_deg, ry_deg, rz_deg] def __transform_rad_2_deg(self,input_list): ''' 接受列表 [J1 J2 J3 J4 J5 J6] 将单位rad转为deg ''' if len(input_list) != 6: raise ValueError("Input list must contain exactly 6 elements [J1, J2, J3, J4, J5, J6]") # 转换角度单位: rad -> deg J_deg = [] for i in range(6): J_deg.append(math.degrees(input_list[i])) return J_deg ## 为反馈线程封装 def Get_feedInfo_angle(self): # print("反馈当前角度",self.feedbackData.QActual) return self.feedbackData.QActual def Get_feedInfo_pose(self): # print("反馈当前TCP位置",self.feedbackData.ToolVectorActual) return self.feedbackData.ToolVectorActual ## 适配中间层的再封装接口 # # 为状态管理而封装的初始化函数 # def init(self): # self.is_exit = False # self.traj_list = None # self.last_pose_command = np.zeros(6) # self.last_input_command = None # self.tcp_offset = None # self.init_J = [-45.0009079,55.5785141,-120.68821716,5.11103201,90.00195312,-90.00085449] # self.filter_matirx = np.zeros((1,7)) # 位姿伺服用 # self.filter_matrix = np.zeros((1,6)) # 角度伺服用 # sleep(1) # self.is_standby = False # code = self.RunPoint_P_inJoint(self.init_J) # if code == 0: # print("机械臂初始化成功且到达初始位置") # else: # print("机械臂初始化失败") def get_pose_at_flange(self): ''' 输出pos,quat pos: [x,y,z] in m quat: [qx,qy,qz,qw] 四元数 ''' pose = self.getPose(user=0, tool=0) # 获取初始法兰坐标系下的位姿 x,y,z,rx,ry,rz = pose # x y z in mm; rx ry rz in degree x /= 1000 y /= 1000 z /= 1000 pos = np.array([x,y,z]) quat_rot = R.from_euler('xyz', [rx, ry, rz], degrees=True).as_quat() return pos,quat_rot def get_end_position(self,tool=0): ''' 工具默认为1 输出pos,quat pos: [x,y,z] in m quat: [qx,qy,qz,qw] 四元数 ''' # pose = self.getPose(user=1, tool=tool) pose = self.Get_feedInfo_pose() x,y,z,rx,ry,rz = pose # x y z in mm; rx ry rz in degree x /= 1000 y /= 1000 z /= 1000 pos = np.array([x,y,z]) quat_rot = R.from_euler('xyz', [rx, ry, rz], degrees=True).as_quat() return pos,quat_rot def get_arm_position(self): ''' 直接获得当前user/tool坐标系下的机械臂位置 ''' ''' 输出pos,quat pos: [x,y,z] in m quat: [qx,qy,qz,qw] 四元数 ''' tool = 0 pose = self.Get_feedInfo_pose() # pose = self.getPose(user=1, tool=tool) # print("pose",pose) x,y,z,rx,ry,rz = pose # x y z in mm; rx ry rz in degree x /= 1000 y /= 1000 z /= 1000 pos = np.array([x,y,z]) ori = np.array([rx, ry, rz])*np.pi/180 try: if self.tcp_offset == None: self.tcp_offset = [0, 0, 0, 0, 0, 0] # 工具坐标系偏移 # 添加tcp偏移 target_pos = pos + R.from_euler("xyz",ori,degrees=False).as_matrix() @ np.array(self.tcp_offset[:3]) target_matrix = R.from_euler('xyz', ori, degrees=False).as_matrix() @ R.from_euler('xyz', self.tcp_offset[3:], degrees=False).as_matrix() target_euler = R.from_matrix(target_matrix).as_euler('xyz', degrees=False) tool_position = target_pos tool_quat_rot = R.from_euler('xyz', target_euler, degrees=False).as_quat() return tool_position, tool_quat_rot except Exception as e: self.logger.log_error(f"计算工具位置时发生错误:{str(e)}") return None, None def __circular_mean(self,angles): # 将角度转换为复数单位圆上的点,再计算均值 # 为处理周期性问题 angles_rad = np.deg2rad(angles) x = np.mean(np.cos(angles_rad)) y = np.mean(np.sin(angles_rad)) return np.rad2deg(np.arctan2(y, x)) def process_command_angle(self,arm_joint_command): if (not self.is_exit) and (not self.is_standby): current_joint = np.asarray(arm_joint_command) # degree # FIFO self.filter_matrix = np.append(self.filter_matrix,[current_joint],axis=0) if len(self.filter_matrix)<7: return 0,current_joint self.filter_matrix = np.delete(self.filter_matrix,0,axis = 0) avg_joint = np.array([self.__circular_mean(self.filter_matrix[:, i]) for i in range(len(current_joint))]) return 0,avg_joint return -1,None def process_command(self, arm_position_command, arm_orientation_command): if (not self.is_exit) and (not self.is_standby): # 构造当前位姿(位置 + 四元数) current_pose = np.concatenate([ arm_position_command, # [x, y, z] arm_orientation_command # [qx, qy, qz, qw] ]) # 更新滑动窗口(FIFO) self.filter_matirx = np.append(self.filter_matirx, [current_pose], axis=0) # 如果窗口未填满,直接返回当前值(不滤波) if len(self.filter_matirx) < 7: rot_euler = R.from_quat(arm_orientation_command).as_euler('xyz', degrees=True) result = np.concatenate([arm_position_command, rot_euler]) return 0, result # 窗口已填满,删除最旧数据 self.filter_matirx = np.delete(self.filter_matirx, 0, axis=0) # --- 位置分量:滑动平均(仅对有效数据)--- avg_position = np.mean(self.filter_matirx[:, :3], axis=0) # --- 四元数分量:加权平均(处理符号歧义)--- quats = self.filter_matirx[:, 3:7] for i in range(1, len(quats)): if np.dot(quats[0], quats[i]) < 0: quats[i] = -quats[i] # 确保所有四元数在同一半球 avg_quat = np.mean(quats, axis=0) avg_quat = avg_quat / np.linalg.norm(avg_quat) # 归一化 # 转换回欧拉角 rot_euler = R.from_quat(avg_quat).as_euler('xyz', degrees=True) # 合并结果 [x, y, z, roll, pitch, yaw] result = np.concatenate([avg_position, rot_euler]) return 0, result return -1, None def chooseRightFrame(self): self.chooseBaseFrame(i=1) # self.chooseEndEffector(i=0) return if __name__ == "__main__": # print(R.from_euler("xyz", np.array([180, 0, 0]), degrees=True).as_matrix()) # sleep(5) dobot = dobot_nova5("192.168.5.1") dobot.init() dobot.dashboard.LogExportUSB(0) # # dobot.move_joint_jog(q=dobot.init_pos) # # dobot.dashboard.MoveJog("J1+") # # time.sleep(1) # # dobot.dashboard.MoveJog("") # print("dobot.getangle::",dobot.getAngel()) # # # while True: # # # time.sleep(1) # # # dobot.start() # # posJ = [0,30,-120,0,90,0] time.sleep(1) print('successs!') # joint = [-45.0050*np.pi/180,30.0682*np.pi/180,-66.1887*np.pi/180,-21.8836*np.pi/180,-90.3064*np.pi/180,-89.9977*np.pi/180] # # dobot.move_joint(q = joint) # time.sleep(1) # joint1 = [-44.8416*np.pi/180, 39.5626*np.pi/180, -80.5683*np.pi/180, -18.0396*np.pi/180, -89.4168*np.pi/180, -90.0026*np.pi/180] # try: dobot.start_drag() # # dobot.move_joint(q = joint) # # dobot.move_joint(q = joint1) # # pose1 = [0.353467,-0.1349880,0.403604,-180.0000*np.pi/180,-30.0000*np.pi/180,0.0042*np.pi/180] # # dobot.RunPoint_P_inPose_M_RAD(pose=pose1) try: while True: time.sleep(1) except KeyboardInterrupt: # robot_thread.join() dobot.stop_drag() print("用户中断操作。") except Exception as e: # robot_thread.join() dobot.stop_drag() print("Exception occurred at line:", e.__traceback__.tb_lineno) print("发生异常:", e) # # dobot.RunPoint_P_inJoint(posJ) # # sleep(1) # # dobot.setEndEffector(i=9,tool_i="{0.0,0.0,154.071,0.0,0.0,0.0}") # # dobot.chooseEndEffector(i=9) # # print("Arm pose:",dobot.getPose()) # # dobot.RunPoint_P_inPose([300, -135, 224.34, -180, 0, -90]) # # dobot.start_drag() # dobot.disableRobot()