from functools import wraps import sys import numpy as np import threading import time from typing import List, Literal, Union from scipy.interpolate import interp1d import requests import os from scipy.spatial.transform import Rotation as R import ctypes libc = ctypes.CDLL("libc.so.6") import atexit import copy from scipy.spatial.transform import Slerp try: # 导入算法 from .algorithms.arm_state import ArmState from .algorithms.controller_manager import ControllerManager from .algorithms.admittance_controller import AdmittanceController from .algorithms.hybrid_controller import HybridController from .algorithms.admithybrid_controller import AdmitHybridController from .algorithms.position_controller import PositionController from .algorithms.hybridPid_controller import HybridPidController from .algorithms.hybridAdmit_controller import HybridAdmitController from .algorithms.positionerSensor_controller import PositionerSensorController from .algorithms.admittanceZ_controller import AdmittanceZController from .algorithms.interpolation import linear_interpolate,spline_interpolate,circle_trajectory,cloud_point_interpolate,oscillation_wrench_interpolate,linear_wrench_interpolate from .algorithms.force_planner import ForcePlanner # 导入硬件 from .hardware.dobot_nova5 import dobot_nova5 from .hardware.force_sensor import XjcSensor # 导入工具 from .tools.log import CustomLogger from .tools.Rate import Rate from .tools.yaml_operator import read_yaml from .tools.decorator_tools import custom_decorator,apply_decorators from .tools.serial_tools import find_serial_by_location from .tools.Trajectory import TrajectoryInterpolator except: #导入算法 from algorithms.arm_state import ArmState from algorithms.controller_manager import ControllerManager from algorithms.admittance_controller import AdmittanceController from algorithms.admittanceZ_controller import AdmittanceZController from algorithms.hybrid_controller import HybridController from algorithms.admithybrid_controller import AdmitHybridController from algorithms.position_controller import PositionController from algorithms.hybridPid_controller import HybridPidController from algorithms.hybridAdmit_controller import HybridAdmitController from algorithms.positionerSensor_controller import PositionerSensorController from algorithms.interpolation import linear_interpolate,spline_interpolate,circle_trajectory,cloud_point_interpolate,oscillation_wrench_interpolate,linear_wrench_interpolate from algorithms.force_planner import ForcePlanner # 导入硬件 from hardware.dobot_nova5 import dobot_nova5 from hardware.force_sensor import XjcSensor # 导入工具 from tools.log import CustomLogger from tools.Rate import Rate from tools.yaml_operator import read_yaml from tools.decorator_tools import custom_decorator,apply_decorators from tools.serial_tools import find_serial_by_location from tools.Trajectory import TrajectoryInterpolator import matplotlib.pyplot as plt from matplotlib.animation import FuncAnimation current_file_path = os.path.abspath(__file__) MassageRobot_Dobot_Path = os.path.dirname(os.path.dirname(os.path.dirname(current_file_path))) print(MassageRobot_Dobot_Path) # 添加目标文件夹到系统路径 sys.path.append(MassageRobot_Dobot_Path) from VortXDB.client import VTXClient """ 在 Python 中,atexit.register 用于注册程序退出时要执行的函数。atexit 模块提供了一个简单的接口,用于在程序正常终止时执行清理操作。 当你注册多个函数时,它们的执行顺序遵循先进后出的顺序(LIFO:Last In, First Out)。也就是说,最后注册的函数会最先执行。 """ def track_function(function_name,log = False): def before(self, *args, **kwargs): self.current_function = function_name if log: self.logger.log_info(f"Entering function: {function_name}") def after(self, result, *args, **kwargs): self.current_function = None if log: self.logger.log_info(f"Exiting function: {function_name},code: {result}") return custom_decorator(before=before, after=after) @apply_decorators class MassageRobot: def __init__(self,arm_config_path,is_log=False): self.logger = CustomLogger() if is_log: self.logger_data = CustomLogger() # 日志线程 threading.Thread(target=self.log_thread,daemon=True).start() self.vtxdb = VTXClient() # 当前执行的函数 self.current_function = None self.arm_state = ArmState() self.arm_config = read_yaml(arm_config_path) # arm 实例化时机械臂类内部进行通讯连接 # 初始化坐标系 TOOL=0 BASE=1 self.arm = dobot_nova5(arm_ip = self.arm_config['arm_ip']) self.arm.start() self.arm.chooseRightFrame() self.arm.init() self.thermotherapy = None self.shockwave = None self.stone = None self.heat = None self.ion = None self.force_plan = ForcePlanner() # 传感器 # self.force_sensor = None self.force_sensor = XjcSensor() # self.force_sensor.connect() # 控制器,初始为导纳控制 self.controller_manager = ControllerManager(self.arm_state) self.controller_manager.add_controller(AdmittanceController,'admittance',self.arm_config['controller'][0]) # self.controller_manager.add_controller(HybridController,'hybrid',self.arm_config['controller'][1]) self.controller_manager.add_controller(PositionController,'position',self.arm_config['controller'][1]) # self.controller_manager.add_controller(AdmitHybridController,'admithybrid',self.arm_config['controller'][3]) # self.controller_manager.add_controller(HybridPidController,'hybridPid',self.arm_config['controller'][4]) # self.controller_manager.add_controller(HybridAdmitController,'hybridAdmit',self.arm_config['controller'][5]) # self.controller_manager.add_controller(PositionerSensorController,'positionerSensor',self.arm_config['controller'][6]) # self.controller_manager.add_controller(AdmittanceZController,'admittanceZ',self.arm_config['controller'][7]) self.controller_manager.switch_controller('admittance') # 读取数据 self.gravity_base = None self.force_zero = None self.torque_zero = None self.tool_position = None self.mass_center_position = None self.s_tf_matrix_t = None arm_orientation_set0 = np.array([-180,-30,0]) self.b_rotation_s_set0 = R.from_euler('xyz',arm_orientation_set0,degrees=True).as_matrix() # 频率数据赋值 self.control_rate = Rate(self.arm_config['control_rate']) self.sensor_rate = Rate(self.arm_config['sensor_rate']) self.command_rate = Rate(self.arm_config['command_rate']) # 低通滤波 self.cutoff_freq = 80.0 # 停止标志位线程 self.exit_event = threading.Event() self.exit_event.set() # 运行初始化为True self.interrupt_event = threading.Event() self.interrupt_event.clear() # 中断初始化为False self.pause_envent = threading.Event() self.pause_envent.clear() # 暂停初始化为False self.skip_envent = threading.Event() self.skip_envent.clear() # 跳过初始化为False # 按摩调整 self.adjust_wrench_envent = threading.Event() self.adjust_wrench_envent.clear() # 调整初始化为False self.is_execute = False self.pos_increment = np.zeros(3,dtype=np.float64) self.adjust_wrench = np.zeros(6,dtype=np.float64) self.skip_pos = np.zeros(6,dtype=np.float64) # 按摩头参数暂时使用本地数据 massage_head_dir = self.arm_config['massage_head_dir'] all_items = os.listdir(massage_head_dir) head_config_files = [f for f in all_items if os.path.isfile(os.path.join(massage_head_dir, f))] self.playload_dict = {} for file in head_config_files: file_address = massage_head_dir + '/' + file play_load = read_yaml(file_address) self.playload_dict[play_load['name']] = play_load # self.playload_dict = self.vtxdb.get("robot_config", "massage_head") # print(self.playload_dict) self.current_head = 'none' self.is_waitting = False self.last_print_time = 0 self.last_record_time = 0 self.last_command_time = 0 self.move_to_point_count = 0 self.width_default = 5 # 卡尔曼滤波相关初始化 self.x_base = np.zeros(6) self.P_base = np.eye(6) # 过程噪声协方差矩阵 self.Q_base = np.eye(6) * 0.01 # 测量噪声协方差矩阵 self.R_base = np.eye(6) * 0.1 self.x_tcp = np.zeros(6) self.P_tcp = np.eye(6) # 过程噪声协方差矩阵 self.Q_tcp = np.eye(6) * 0.01 # 测量噪声协方差矩阵 self.R_tcp = np.eye(6) * 0.1 # 传感器故障计数器 self.sensor_fail_count = 0 # 机械臂初始化,适配中间层 # 读取数据 self.gravity_base = None self.force_zero = None self.torque_zero = None self.tool_position = None self.mass_center_position = None self.s_tf_matrix_t = None arm_orientation_set0 = np.array([-180,0,-90]) self.b_rotation_s_set0 = R.from_euler('xyz',arm_orientation_set0,degrees=True).as_matrix() # REF TRAJ self.xr = [] self.vr = [] self.ar = [] self.ts = [] # time sequence self.dt = [] # derivate of time # --- self.last_time = -1 self.cur_time = -1 # 预测步骤 def kalman_predict(self,x, P, Q): # 预测状态(这里假设状态不变) x_predict = x # 预测误差协方差 P_predict = P + Q return x_predict, P_predict # 更新步骤 def kalman_update(self,x_predict, P_predict, z, R): # 卡尔曼增益 # K = P_predict @ np.linalg.inv(P_predict + R) S = P_predict + R s = np.diag(S) # shape (6,) p_diag = np.diag(P_predict) K_diag = np.zeros_like(s) nonzero_mask = s != 0 K_diag[nonzero_mask] = p_diag[nonzero_mask] / s[nonzero_mask] K = np.diag(K_diag) # 更新状态 x_update = x_predict + K @ (z - x_predict) # 更新误差协方差 P_update = (np.eye(len(K)) - K) @ P_predict return x_update, P_update def init_hardwares(self,ready_pose): self.ready_pose = np.array(ready_pose) self.switch_payload(self.current_head) print(self.arm.get_arm_position()) time.sleep(0.5) def sensor_set_zero(self): self.arm_position_set0,self.arm_orientation_set0 = self.arm.get_arm_position() arm_orientation_set0 = self.arm_orientation_set0 print("arm_orientation_set0",R.from_quat(arm_orientation_set0).as_euler('xyz',degrees=True)) self.b_rotation_s_set0 = R.from_quat(arm_orientation_set0).as_matrix() ######### max_retries = 5 #no_head_wrong=np.array([-80.01506042, 116.34187317, -87.65788269, -0.32910481, -0.65792173, -0.61110526])#旧版传感器) head_set0=np.array([0, 0, 0, 0, 0, 0])#新版传感器) min_tolerance=0.001 retry_count = 0 self.force_sensor.disable_active_transmission() time.sleep(0.5) while retry_count < max_retries: # 调用读取数据的函数,假设返回值是一个 numpy 数组 data = self.force_sensor.read_data_f32() data = self.force_sensor.read_data_f32() data = self.force_sensor.read_data_f32() # 检查返回值是否为 np.array 类型 if isinstance(data, np.ndarray): # 判断数据是否接近目标值 if np.allclose(data, head_set0, atol=min_tolerance): print(f"检测到力传感器已置零:{data}") break else: print(f"检测到力传感器未置零:{data}") # 禁用传感器传输 self.force_sensor.disable_active_transmission() self.force_sensor.disable_active_transmission() time.sleep(0.5) # 尝试设置传感器清零 code = self.force_sensor.set_zero() code = self.force_sensor.set_zero() max_try = 3 while code != 0 and max_try > 0: self.logger.log_error("Force sensor set zero failed, retrying...") self.force_sensor.disable_active_transmission() self.force_sensor.disable_active_transmission() time.sleep(0.5) code = self.force_sensor.set_zero() code = self.force_sensor.set_zero() max_try -= 1 time.sleep(0.5) # 如果多次尝试后失败 if max_try == 0: self.logger.log_error("Force sensor set zero failed, exiting...") requests.post( "http://127.0.0.1:5000/on_message", data={"message": "传感器初始化失败"} ) requests.post( "http://127.0.0.1:5000/update_massage_status", data={"massage_service_started": False} ) # sys.exit(0) return -1 # 置零失败 # 成功后跳出主循环 break else: self.logger.log_error("读取数据失败或格式错误,尝试重试...") # 增加重试次数 retry_count += 1 self.force_sensor.disable_active_transmission() self.force_sensor.disconnect() time.sleep(1) self.force_sensor = XjcSensor() time.sleep(1) # 每次重试之间添加一个短暂的延迟 if retry_count == max_retries: self.logger.log_error(f"超过最大重试次数 ({max_retries}),操作失败,退出程序...") requests.post( "http://127.0.0.1:5000/on_message", data={"message": "传感器初始化失败"} ) requests.post( "http://127.0.0.1:5000/update_massage_status", data={"massage_service_started": False} ) return -2 #读取失败 return 0 #读取成功并置零成功 def sensor_enable(self): code = self.force_sensor.enable_active_transmission() max_try = 3 while code!= 0 and max_try > 0: self.logger.log_error("Force sensor enable_active_transmission failed,retrying...") code = self.force_sensor.enable_active_transmission() max_try -= 1 time.sleep(0.1) if max_try == 0: self.logger.log_error("Force sensor enable_active_transmission failed,exiting...") requests.post( "http://127.0.0.1:5000/on_message", data={"message": "传感器初始化失败"} ) requests.post( "http://127.0.0.1:5000/update_massage_status", data={"massage_service_started": False} ) # sys.exit(0) return -1 return 0 def update_wrench(self): # 当前的机械臂到末端转换 (实时) b_rotation_s = R.from_quat(self.arm_state.arm_orientation).as_matrix() # 读取数据 sensor_data = self.force_sensor.read() # self.logger.log_info(f"传感器数据{sensor_data}") if sensor_data is None: self.force_sensor.stop_background_reading() self.logger.log_error("传感器数据读取失败") return -1 # 反重力补偿 sensor_data[:3] = sensor_data[:3] + self.force_zero + self.b_rotation_s_set0.T @ self.gravity_base sensor_data[3:] = sensor_data[3:] + self.torque_zero + np.cross(self.mass_center_position, self.b_rotation_s_set0.T @ self.gravity_base) # 传感器数据通过矫正计算得到外来施加力 传感器坐标系下 # 重力 gravity_in_sensor = b_rotation_s.T @ self.gravity_base s_force = sensor_data[:3] - self.force_zero - gravity_in_sensor # 力矩 s_torque = sensor_data[3:] - self.torque_zero - np.cross(self.mass_center_position,gravity_in_sensor) wrench = np.concatenate([s_force,s_torque]) # 传感器到TCP wrench = self.wrench_coordinate_conversion(self.s_tf_matrix_t,wrench) # 交给ARM STATE集中管理 self.arm_state.external_wrench_tcp = wrench # 对tcp坐标系下的外力外矩进行平滑 x_tcp_predict, P_tcp_predict = self.kalman_predict(x = self.x_tcp, P = self.P_tcp, Q = self.Q_tcp) self.x_tcp, self.P_tcp = self.kalman_update(x_predict = x_tcp_predict, P_predict = P_tcp_predict, z = self.arm_state.external_wrench_tcp, R = self.R_tcp) self.arm_state.external_wrench_tcp = self.x_tcp self.arm_state.last_external_wrench_tcp = self.arm_state.external_wrench_tcp return 0 def switch_payload(self,name): if name in self.playload_dict: self.stop() self.current_head = name compensation_config = self.playload_dict[self.current_head] # 读取数据 self.gravity_base = np.array(compensation_config['gravity_base']) self.force_zero = np.array(compensation_config['force_zero']) self.torque_zero = np.array(compensation_config['torque_zero']) self.tool_position = np.array(compensation_config['tcp_offset']) self.mass_center_position = np.array(compensation_config['mass_center_position']) self.s_tf_matrix_t = self.get_tf_matrix(self.tool_position[:3], R.from_euler('xyz',self.tool_position[3:],degrees=False).as_quat()) tcp_offset = self.playload_dict[name]["tcp_offset"] tcp_offset_str = "{" + ",".join(map(str, tcp_offset)) + "}" print("tcp_offset_str",tcp_offset_str) self.arm.setEndEffector(i=9,tool_i=tcp_offset_str) self.arm.chooseEndEffector(i=9) print(self.arm.get_arm_position()) self.logger.log_info(f"切换到{name}按摩头") # ####################### 位姿伺服 ########################## def arm_measure_loop(self): self.logger.log_info("机械臂测量线程启动") while (not self.arm.is_exit) and (not self.exit_event.is_set()): try: if not self.is_waitting: self.arm_state.arm_position,self.arm_state.arm_orientation = self.arm.get_arm_position() code = self.update_wrench() if code == -1: self.sensor_fail_count += 1 self.logger.log_error(f"传感器线程数据读取失败-{self.sensor_fail_count}") if self.sensor_fail_count > 10: self.logger.log_error("传感器线程数据读取失败超过10次,程序终止") self.stop() break else: self.sensor_fail_count = 0 except Exception as e: self.logger.log_error(f"机械臂测量线程报错:{e}") self.exit_event.set() # 控制退出while self.sensor_rate.sleep() # 控制频率 def arm_command_loop_traj(self): self.logger.log_info("机械臂控制线程启动") # 离线轨迹启动时间记录 self.traj_start_time = time.time() while (not self.arm.is_exit) and (not self.exit_event.is_set()): try: if not self.is_waitting: process_start_time = time.time() control_cycle = self.control_rate.to_sec() t_now = time.time() - self.traj_start_time i = min(int(t_now / self.dt), len(self.ts) - 2) alpha = max(0.0, min(1.0, (t_now - self.ts[i]) / self.dt)) pos = (1 - alpha) * self.xr[i][:3] + alpha * self.xr[i+1][:3] slerp = Slerp([0, 1], R.concatenate([R.from_quat(self.xr[i][3:]), R.from_quat(self.xr[i+1][3:])])) alpha = 0.5 rot_interp = slerp(alpha).as_quat() xr = np.concatenate([pos, rot_interp]) # xr = (1-alpha) * self.xr[i] + alpha * self.xr[i+1] vr = (1 - alpha) * self.vr[i] + alpha * self.vr[i + 1] ar = (1 - alpha) * self.ar[i] + alpha * self.ar[i + 1] self.arm_state.desired_position = xr[:3] # self.arm_state.desired_orientation = xr[3:] # print(111) self.arm_state.desired_orientation = rot_interp # print(222) self.arm_state.desired_twist = vr self.arm_state.desired_acc = ar print("self.arm_state.desired_position",self.arm_state.desired_position) print("self.arm_state.desired_orientation",self.arm_state.desired_orientation) print("self.arm_state.desired_twist",self.arm_state.desired_twist) print("self.arm_state.desired_acc",self.arm_state.desired_acc) self.controller_manager.step_traj(control_cycle) # self.controller_manager.step(control_cycle) command_pose = np.concatenate([ self.arm_state.arm_position_command * 1000, # 转毫米 self.arm_state.arm_orientation_command ]) ## 输入滤波 status, pose_processed = self.arm.process_command( command_pose[:3], command_pose[3:] ) # print(f"pose_processed:{pose_processed}") tcp_command = ( f"ServoP({pose_processed[0]:.3f},{pose_processed[1]:.3f}," f"{pose_processed[2]:.3f},{pose_processed[3]:.3f}," f"{pose_processed[4]:.3f},{pose_processed[5]:.3f}," "t=0.02, aheadtime=20, gain=200)" ).encode() run_time = time.time() - process_start_time sleep_duration = self.control_rate.to_sec() - run_time b2 =time.time() if sleep_duration > 0: time.sleep(sleep_duration) print(f"real sleep:{time.time()-b2}") # self.arm.dashboard.socket_dobot.sendall(tcp_command) if self.arm.feedbackData.robotMode == 10: # 10 当前工程暂停 print("机械臂为暂停状态") res = self.arm.dashboard.Continue() code = self.arm.parseResultId(res)[0] if code == 0: print("机械臂继续已暂停的运动指令") except Exception as e: self.logger.log_error(f"机械臂控制失败:{e}") self.exit_event.set() # print(f"循环周期时间: {time.time()-process_start_time:.4f} 秒",run_time,sleep_duration) def arm_command_loop(self): self.logger.log_info("机械臂控制线程启动") while (not self.arm.is_exit) and (not self.exit_event.is_set()): try: if not self.is_waitting: process_start_time = time.time() control_cycle = self.control_rate.to_sec() self.controller_manager.step(control_cycle) command_pose = np.concatenate([ self.arm_state.arm_position_command * 1000, # 转毫米 self.arm_state.arm_orientation_command ]) ## 输入滤波 status, pose_processed = self.arm.process_command( command_pose[:3], command_pose[3:] ) # pose_processed = command_pose print(f"pose_processed:{pose_processed}") print(self.arm.feedbackData.robotMode) tcp_command = ( f"ServoP({pose_processed[0]:.3f},{pose_processed[1]:.3f}," f"{pose_processed[2]:.3f},{pose_processed[3]:.3f}," f"{pose_processed[4]:.3f},{pose_processed[5]:.3f}," "t=0.02, aheadtime=20, gain=200)" ).encode() run_time = time.time() - process_start_time sleep_duration = self.control_rate.to_sec() - run_time b2 =time.time() if sleep_duration > 0: time.sleep(sleep_duration) print(f"real sleep:{time.time()-b2}") self.arm.dashboard.socket_dobot.sendall(tcp_command) print("currentCommandID",self.arm.feedbackData.robotCurrentCommandID) if self.arm.feedbackData.robotMode == 10: # 10 当前工程暂停 print("机械臂为暂停状态") res = self.arm.dashboard.Continue() code = self.arm.parseResultId(res)[0] if code == 0: print("机械臂继续已暂停的运动指令") except Exception as e: self.logger.log_error(f"机械臂控制失败:{e}") self.exit_event.set() # print(f"循环周期时间: {time.time()-process_start_time:.4f} 秒",run_time,sleep_duration) ####################### 位姿伺服 ########################## def start(self): if self.exit_event.is_set(): # 线程 self.exit_event.clear() self.arm_measure_thread = threading.Thread(target=self.arm_measure_loop) self.arm_control_thread = threading.Thread(target=self.arm_command_loop) # 线程启动 self.arm_measure_thread.start() ## 测量线程 position,quat_rot = self.arm.get_arm_position() # 初始值 self.arm_state.desired_position = position self.arm_state.arm_position_command = position self.arm_state.desired_orientation = quat_rot self.arm_state.arm_orientation_command = quat_rot self.arm_state.arm_position = position self.arm_state.arm_orientation = quat_rot for i in range(20): self.controller_manager.step(self.control_rate.to_sec()) self.logger.log_blue(f"position command: {self.arm_state.arm_position_command}") self.logger.log_blue(f"orientation command: {R.from_quat(self.arm_state.arm_orientation_command).as_euler('xyz',degrees=True)}") position, quat_rot = self.arm.get_arm_position() self.logger.log_blue(f"position current:{position}") self.logger.log_blue(f"orientation current: {R.from_quat(quat_rot).as_euler('xyz',degrees=False)}") self.command_rate.precise_sleep() position ,quat_rot = self.arm.get_arm_position() # self.arm_state.desired_position = poistion self.arm_state.arm_position_command = position # self.arm_state.desired_orientation = quat_rot self.arm_state.arm_orientation_command = quat_rot ## POSITION CONTROLLER TEST ## # delta_pos = np.array([0.1,0.1,-0.1],dtype=np.float64) # # delta_quat = R.from_euler('xyz', [0, 0, 0], degrees=True).as_quat() # self.arm_state.desired_position = position + delta_pos # self.arm_state.desired_orientation = quat_rot # 线程启动 self.arm_control_thread.start() ## 赋初始值后控制线程 self.logger.log_info("MassageRobot启动") time.sleep(0.5) def stop(self): if not self.exit_event.is_set(): self.exit_event.set() self.interrupt_event.clear() self.arm_control_thread.join() self.arm_measure_thread.join() for i in range(3): self.force_sensor.disable_active_transmission() self.force_sensor.start_background_reading() self.arm.stop_motion() self.logger.log_info("MassageRobot停止") self.controller_manager.switch_controller('position') def log_thread(self): while True: self.logger_data.log_info(f"机械臂位置:{self.arm_state.arm_position},机械臂姿态:{self.arm_state.arm_orientation}",is_print=False) self.logger_data.log_info(f"机械臂期望位置:{self.arm_state.desired_position},机械臂真实位置:{self.arm_state.arm_position}",is_print=False) self.logger_data.log_info(f"机械臂期望姿态:{self.arm_state.desired_orientation},机械臂真实姿态:{self.arm_state.arm_orientation}",is_print=False) self.logger_data.log_info(f"机械臂期望力矩:{self.arm_state.desired_wrench},机械臂真实力矩:{self.arm_state.external_wrench_base}",is_print=False) self.logger_data.log_info(f"当前按摩头:{self.current_head}",is_print=False) time.sleep(1) # 工具函数 def get_tf_matrix(self,position,orientation): tf_matrix = np.eye(4) rotation_matrix = R.from_quat(orientation).as_matrix() tf_matrix[:3,3] = position/1000 tf_matrix[:3,:3] = rotation_matrix return tf_matrix def wrench_coordinate_conversion(self,tf_matrix, wrench): rot_matrix = tf_matrix[:3, :3] vector_p = tf_matrix[:3, 3] temp_force = wrench[:3] torque = wrench[3:] force = rot_matrix.T @ temp_force torque = rot_matrix.T @ np.cross(temp_force,vector_p) + rot_matrix.T @ torque return np.concatenate([force, torque]) @track_function("set_position",True) def set_position(self,pose,is_wait = False): self.logger.log_info(f"set postion:{pose}") x,y,z = pose[0],pose[1],pose[2] roll,pitch,yaw = pose[3],pose[4],pose[5] pose_command = np.concatenate([[x,y,z], [roll,pitch,yaw]]) time.sleep(0.1) code = self.arm.RunPoint_P_inPose_M_RAD(poses_list = pose_command) time.sleep(0.1) return code @track_function("move_to_point",True) def move_to_point(self, t, pose, is_interrupt=True, wait=True, timeout=0.5, interpolation:Literal["linear","circle","cloud_point"] ='linear', algorithm:Literal["admittance","position","hybrid","admithybrid","hybridPid","hybridAdmit","admittanceZ"] = "admittance",is_switch_controller = False): """ 移动到指定的点 param: t: 时间或时间序列 pose(nx6): 世界坐标系下的位姿或位姿序列,当输入为None时,表示保持当前位姿 is_interrupt: 是否允许中断 wait: 是否等待运动完成 timeout: 超时时间 interpolation: 插值方式 algorithm: 控制算法 return: code: 0表示成功 1表示超时 2表示用户打断 3表示硬件错误 4表示参数错误 """ self.move_to_point_count += 1 self.logger.log_blue(f"move_to_point_count: {self.move_to_point_count}") self.logger.log_yellow(f"move_to_point: {pose[-1]}") # 在函数内部直接规划轨迹 traj = self.traj_generate(t, pose, None, interpolation=interpolation) if traj is None: self.logger.log_error("轨迹未生成或生成错误") return 4 self.logger.log_yellow(f"traj_quat: {R.from_quat(traj['quat'][-1]).as_euler('xyz',degrees=False)}") self.logger.log_yellow(f"traj_pos: {traj['pos'][-1]}") time_seq = traj['t'] pos_traj = traj['pos'] quat_traj = traj['quat'] if is_switch_controller == True: try: self.controller_manager.switch_controller(algorithm) except ValueError as error: self.logger.log_error(error) self.logger.log_error("切换到%s控制器失败" % algorithm) return 4 self.logger.log_info("切换到%s控制器" % algorithm) # self.logger.log_info("当前为%s控制器" % algorithm) for i in range(len(time_seq)): if self.interrupt_event.is_set(): self.interrupt_event.clear() self.logger.log_error("self.interrupt_event.clear()") self.arm_state.desired_position = copy.deepcopy(self.arm_state.arm_position) self.arm_state.desired_orientation = copy.deepcopy(self.arm_state.arm_orientation) massage_wrench = np.array([0, 0, 0, 0, 0, 0]) self.arm_state.desired_wrench = massage_wrench return 2 if self.pause_envent.is_set(): self.pause_envent.clear() self.logger.log_error("self.pause_envent.clear()") self.arm_state.desired_position = copy.deepcopy(self.arm_state.arm_position) self.arm_state.desired_orientation = copy.deepcopy(self.arm_state.arm_orientation) massage_wrench = np.array([0, 0, 0, 0, 0, 0]) self.arm_state.desired_wrench = massage_wrench return 5 if self.skip_envent.is_set(): self.logger.log_error("self.skip_envent.clear()") self.arm_state.desired_position = copy.deepcopy(self.arm_state.arm_position) self.arm_state.desired_orientation = copy.deepcopy(self.arm_state.arm_orientation) massage_wrench = np.array([0, 0, 0, 0, 0, 0]) self.arm_state.desired_wrench = massage_wrench return 6 if self.arm.is_exit or self.exit_event.is_set(): print("robot_hardware_error:",self.arm.is_exit,self.exit_event.is_set()) return 3 if self.adjust_wrench_envent.is_set() and self.is_execute == True: if self.adjust_wrench is not None: self.arm_state.desired_wrench = self.adjust_wrench self.adjust_wrench_envent.clear() self.arm_state.desired_position = pos_traj[i] self.arm_state.desired_orientation = quat_traj[i] if self.ion is not None: act_position = R.from_quat(self.arm_state.arm_orientation).as_matrix().T @ self.arm_state.arm_position desired_position = R.from_quat(self.arm_state.arm_orientation).as_matrix().T @ self.arm_state.desired_position if self.arm_state.positionerSensorData < 0.001: desired_position[2] = desired_position[2] - self.arm_state.desired_high else: desired_position[2] = min(act_position[2] + self.arm_state.positionerSensorData - self.arm_state.desired_high , desired_position[2] - self.arm_state.desired_high) self.arm_state.desired_position = R.from_quat(self.arm_state.arm_orientation).as_matrix() @ desired_position self.command_rate.precise_sleep() start = time.time() while wait: if self.interrupt_event.is_set(): self.interrupt_event.clear() return 2 if self.pause_envent.is_set(): self.pause_envent.clear() return 5 if self.skip_envent.is_set(): # self.skip_envent.clear() return 6 if self.arm.is_exit or self.exit_event.is_set(): print("robot_hardware_error:",self.arm.is_exit,self.exit_event.is_set()) return 3 if time.time() - start > timeout: return 1 self.command_rate.precise_sleep() return 0 @track_function("apply_force",True) def apply_force(self, t, wrench, interpolation:Literal["linear","oscillation"] = "linear", algorithm:Literal["admittance","position","hybrid","admithybrid","hybridPid","hybridAdmit","admittanceZ"] = "admittance"): """ 施加力 param: t: 时间或时间序列 wrench(nx6): 力矩或力矩序列,当输入为None时,表示力为[0,0,0,0,0,0] is_interrupt: 是否允许中断 wait: 是否等待运动完成 timeout: 超时时间 interpolation: 插值方式 algorithm: 控制算法 return: code: 0表示成功 1表示超时 2表示用户打断 3表示硬件错误 4表示参数错误 """ if t==0: self.arm_state.desired_wrench = wrench return 0 # 在函数内部直接规划力矩轨迹 traj = self.wrench_traj_generate(t, wrench, interpolation=interpolation) # # 在函数内部直接规划力矩轨迹 # traj = self.traj_generate(t, None, wrench, interpolation=interpolation) if traj is None: return 4 time_seq = traj['t'] wrench_traj = traj['wrench'] # try: # self.controller_manager.switch_controller(algorithm) # except ValueError as error: # self.logger.log_error(error) # return 4 self.logger.log_info("切换到%s控制器" % algorithm) for i in range(len(time_seq)): if self.interrupt_event.is_set(): self.interrupt_event.clear() return 2 if self.pause_envent.is_set(): self.pause_envent.clear() return 5 if self.skip_envent.is_set(): self.skip_envent.clear() return 6 if self.arm.is_exit or self.exit_event.is_set(): return 3 if self.adjust_wrench_envent.is_set(): # if wrench_traj is not None: # wrench_traj[i:] = self.adjust_wrench self.adjust_wrench_envent.clear() # print(wrench_traj[i]) self.arm_state.desired_wrench = wrench_traj[i] if wrench_traj is not None else np.zeros(6, dtype=np.float64) self.command_rate.precise_sleep() return 0 @track_function("resample_curve_strict",True) def resample_curve_strict(self, points, num_resampled_points): """ 修正的曲线重新采样函数,移除重复点并确保累积弧长严格递增。 """ # 计算累积弧长 distances = np.sqrt(np.sum(np.diff(points, axis=0)**2, axis=1)) # 计算相邻点之间的距离 cumulative_length = np.insert(np.cumsum(distances), 0, 0) # 计算累积弧长 # 确保累积弧长严格递增(去除重复值) unique_indices = np.where(np.diff(cumulative_length, prepend=-np.inf) > 0)[0] cumulative_length = cumulative_length[unique_indices] points = points[unique_indices] # 生成均匀间隔的新弧长 target_lengths = np.linspace(0, cumulative_length[-1], num_resampled_points) # 在累积弧长上进行插值计算新点位置 x_interp = interp1d(cumulative_length, points[:, 0], kind='linear', fill_value="extrapolate") y_interp = interp1d(cumulative_length, points[:, 1], kind='linear', fill_value="extrapolate") new_x = x_interp(target_lengths) new_y = y_interp(target_lengths) return np.column_stack((np.round(new_x).astype(int), np.round(new_y).astype(int))) @track_function("generate_in_spiral_cloudPoint",True) def generate_in_spiral_cloudPoint(self, start, end, num_cycle_factor=None, width=None, direction:Literal["CW","CCW"] ='CW'): """ 绘制由内向外的螺旋线。 参数: - start: 起点坐标 (x, y) - end: 终点坐标 (x, y) - num_cycle_factor: 螺旋的圈数 - points: 螺旋线上的点的数量 """ # 计算起点和终点之间的距离 dx = end[0] - start[0] dy = end[1] - start[1] distance = np.sqrt(dx**2 + dy**2) if width is None: width = self.width_default if num_cycle_factor is None: num_cycle_factor = 5 num_cycle_factor = np.round(num_cycle_factor).astype(int) if distance < 2: print("距离过短,无法绘制螺旋线") return 0 if width <= 2: width = 2 if num_cycle_factor < 1: print("圈数必须为大于1的整数") num_cycle_factor = 1 angle = np.arctan2(dy, dx) # 计算从起点到终点的角度 points = np.round(num_cycle_factor *2*np.pi/(np.pi/9)).astype(int) # 螺旋的角度范围 theta = np.linspace(0, 2 * np.pi * num_cycle_factor, points) # 螺旋的半径随着角度增大而线性增长 r = np.linspace(0, distance, points) tempK = np.abs(width/(distance*((2*num_cycle_factor-0.75)/num_cycle_factor))) # 计算螺旋的x, y坐标 if direction == "CCW": # 反向旋转时,使用负的角度来反向螺旋 x = r * np.cos(-theta) # 逆时针旋转 y = r * np.sin(-theta)*tempK else: # 正常旋转 x = r * np.cos(theta) y = r * np.sin(theta)*tempK # 旋转螺旋线以对齐起点和终点的角度 x_rotated = x * np.cos(angle) - y * np.sin(angle) y_rotated = x * np.sin(angle) + y * np.cos(angle) # 平移螺旋线,使其从起点开始 x_final = x_rotated + start[0] - x_rotated[0] y_final = y_rotated + start[1] - y_rotated[0] # return np.hstack((np.round(x_final).astype(int).reshape(-1, 1), np.round(y_final).astype(int).reshape(-1, 1))) result_points = np.hstack((np.round(x_final).astype(int).reshape(-1, 1), np.round(y_final).astype(int).reshape(-1, 1))) return self.resample_curve_strict(points=result_points, num_resampled_points=len(result_points)) @track_function("generate_out_spiral_cloudPoint",True) def generate_out_spiral_cloudPoint(self,start, end, num_cycle_factor=None, width=None, direction:Literal["CW","CCW"] ='CW'): """ 绘制由外向内的螺旋线。 参数: - start: 起点坐标 (x, y) - end: 终点坐标 (x, y) - num_cycle_factor: 螺旋的圈数 - points: 螺旋线上的点的数量 """ # 计算起点和终点之间的距离 dx = end[0] - start[0] dy = end[1] - start[1] angle = np.arctan2(dy, dx) # 计算从起点到终点的角度 distance = np.sqrt(dx**2 + dy**2) if width is None: width = self.width_default if num_cycle_factor is None: num_cycle_factor = 5 num_cycle_factor = np.round(num_cycle_factor).astype(int) if distance < 2: print("距离过短,无法绘制螺旋线") return 0 if width <= 2: width = 2 if num_cycle_factor < 1: print("圈数必须为大于1的整数") num_cycle_factor = 1 points = np.round(num_cycle_factor *2*np.pi/(np.pi/9)).astype(int) # 螺旋的角度范围 theta = np.linspace(0, 2 * np.pi * num_cycle_factor, points) # 螺旋的半径随着角度增大而线性增长 r = np.linspace(distance, 0, points) tempK = np.abs(width/(distance*((2*num_cycle_factor-0.75)/num_cycle_factor))) # 计算螺旋的x, y坐标 if direction == "CCW": # 反向旋转时,使用负的角度来反向螺旋 x = r * np.cos(-theta) # 逆时针旋转 y = r * np.sin(-theta)*tempK else: # 正常旋转 x = r * np.cos(theta) y = r * np.sin(theta)*tempK # 旋转螺旋线以对齐起点和终点的角度 x = x * np.cos(np.pi) - y * np.sin(np.pi) y = x * np.sin(np.pi) + y * np.cos(np.pi) # 旋转螺旋线以对齐起点和终点的角度 x_rotated = x * np.cos(angle) - y * np.sin(angle) y_rotated = x * np.sin(angle) + y * np.cos(angle) # 平移螺旋线,使其从起点开始 x_final = x_rotated + start[0] - x_rotated[0] y_final = y_rotated + start[1] - y_rotated[0] # return np.hstack((np.round(x_final).astype(int).reshape(-1, 1), np.round(y_final).astype(int).reshape(-1, 1))) result_points = np.hstack((np.round(x_final).astype(int).reshape(-1, 1), np.round(y_final).astype(int).reshape(-1, 1))) return self.resample_curve_strict(points=result_points, num_resampled_points=len(result_points)) @track_function("generate_prolateCycloid_cloudPoint",True) def generate_prolateCycloid_cloudPoint(self, start, end, width=None, num_cycle_factor=None, direction: Literal["CW", "CCW"] = 'CW'): # Parameters step_degree = np.pi / 12 a_prolate = 4.5 radius = 30.0 distance = np.linalg.norm(np.array(end) - np.array(start)) angle = np.arctan2(end[1] - start[1], end[0] - start[0]) if width is None: width = self.width_default if num_cycle_factor is None: num_cycle_factor = 5 num_cycle_factor = np.round(num_cycle_factor).astype(int) if distance < 2: print("距离过短,无法绘制Cycloid线") return 0 if width <= 2: width = 2 if num_cycle_factor < 1: print("圈数必须为大于1的整数") num_cycle_factor = 1 r_cycloid = radius / (2 * a_prolate) num_cycles = int(num_cycle_factor) temp_k = distance / (r_cycloid * (np.pi + 2 * a_prolate + 2 * np.pi * num_cycles)) # Generate the prolate cycloid theta_prolate = np.linspace(0.5 * np.pi, num_cycles * 2 * np.pi + 1.5 * np.pi, round((num_cycles * 2 * np.pi + 1.5 * np.pi) / step_degree)) if direction == 'CW': theta_prolate = np.flip(theta_prolate) x_prolate = r_cycloid * (theta_prolate - a_prolate * np.sin(theta_prolate)) * temp_k y_prolate = r_cycloid * (1 - a_prolate * np.cos(theta_prolate)) # Scale y-prolate based on width y_prolate = (y_prolate / 30) * width # Rotation of cycloid if direction == 'CW': x_prolate, y_prolate = -x_prolate, -y_prolate x_rotated = x_prolate * np.cos(angle) - y_prolate * np.sin(angle) y_rotated = x_prolate * np.sin(angle) + y_prolate * np.cos(angle) # Translate to starting point x_final = x_rotated + start[0] - x_rotated[0] y_final = y_rotated + start[1] - y_rotated[0] return np.column_stack((np.round(x_final).astype(int), np.round(y_final).astype(int))) @track_function("generate_ellipse_cloudPoint",True) def generate_ellipse_cloudPoint(self,start, end, num_cycle_factor=None, width=None, direction:Literal["CW","CCW"] ='CW'): # 计算椭圆的中心(两个端点的中点) center = (start + end) / 2 if num_cycle_factor is None: num_cycle_factor = 1 num_cycle_factor = np.round(num_cycle_factor).astype(int) distance = np.sqrt((end[0] - start[0])**2 + (end[1] - start[1])**2) if width is None: width = distance/2 if distance < 2: print("距离过短,无法绘制椭圆") return 0 if width <= 2: print("椭圆半径过小,无法绘制椭圆") width = self.width_default if num_cycle_factor < 1: print("圈数必须为大于1的整数") num_cycle_factor = 1 # 计算两个端点的距离,即长半轴长度 a = np.sqrt((end[0] - start[0])**2 + (end[1] - start[1])**2) / 2 # 假设短半轴长度为常量值radius b = width / 2 # 计算旋转角度(由端点的方向确定) angle = np.arctan2(end[1] - start[1], end[0] - start[0]) points = np.round(((num_cycle_factor + 1) * np.pi * 2)/(np.pi/9)).astype(int) # 绘制椭圆的参数化公式 theta = np.linspace(np.pi, (2+num_cycle_factor*2)* np.pi, points) # 计算螺旋的x, y坐标 if direction == "CCW": # 反向旋转时,使用负的角度来反向螺旋 x_ellipse = a * np.cos(-theta) #顺时针 y_ellipse = b * np.sin(-theta) else: # 正常旋转 x_ellipse = a * np.cos(theta) y_ellipse = b * np.sin(theta) # 绕中心旋转椭圆 x_rot = center[0] + x_ellipse * np.cos(angle) - y_ellipse * np.sin(angle) y_rot = center[1] + x_ellipse * np.sin(angle) + y_ellipse * np.cos(angle) # 返回椭圆点云,保留整数值 return np.hstack((np.round(x_rot).astype(int).reshape(-1, 1), np.round(y_rot).astype(int).reshape(-1, 1))) @track_function("generate_Lemniscate_cloudPoint",True) def generate_Lemniscate_cloudPoint(self, start, end, width = None,num_cycle_factor = None, direction:Literal["CW","CCW"] ='CW'): """ 生成从start到end的8字形插补路径 :param start: 起点坐标 (x0, y0) :param end: 终点坐标 (x1, y1) :param num_points: 插补点的数量 :return: 插补路径的坐标列表 """ # 起点和终点的坐标 x1, y1 = start x0, y0 = end step_degree=np.pi/12 # 计算轨迹的缩放因子和旋转角度 distance = np.linalg.norm([x1 - x0, y1 - y0]) if width is None: width = self.width_default if num_cycle_factor is None: num_cycle_factor = 1 if distance < 2: print("距离过短,无法绘制螺旋线") return 0 if width <= 2: width = 2 if num_cycle_factor < 1: print("圈数必须为大于1的整数") num_cycle_factor = 1 a = distance / 2 # 轨迹的宽度一半 b = width/2 angle = np.arctan2(y1 - y0, x1 - x0) num_points = np.abs((2*num_cycle_factor+1)*np.pi/step_degree).astype(int) # 生成标准的8字形轨迹 (Gerono卵形线) t_values = np.linspace(0.5*np.pi, (2*num_cycle_factor + 1.5)* np.pi, num_points) x_curve = a * np.sin(t_values) if direction == 'CCW': y_curve = -b * np.sin(2 * t_values) else: y_curve = b * np.sin(2 * t_values) # 构建旋转矩阵 rotation_matrix = np.array([[np.cos(angle), -np.sin(angle)], [np.sin(angle), np.cos(angle)]]) # 旋转并平移轨迹 curve_points = np.vstack((x_curve, y_curve)).T rotated_points = curve_points @ rotation_matrix.T shifted_points = rotated_points + [(x0 + x1) / 2, (y0 + y1) / 2] # 将插值结果四舍五入为整数 return np.round(shifted_points).astype(int) @track_function("generate_circle_cloudPoint",True) def generate_circle_cloudPoint(self, start_point, center, step_degree=np.pi/12, num_cycle_factor = 3): """ center: 圆心坐标,形如 (x_c, y_c) start_point: 起始点坐标,形如 (x_0, y_0) radius: 圆的半径 delta_theta: 每次插补的角度增量 num_turns: 绕圈的次数 """ # 确定总共需要生成的插补点数 num_points = int((2 * np.pi * num_cycle_factor) / step_degree) # 圆心 x_c, y_c = center radius = np.linalg.norm(np.array(start_point) - np.array(center)) # 半径 # 计算起始点的初始角度 x_0, y_0 = start_point theta_0 = np.arctan2(y_0 - y_c, x_0 - x_c) # 初始化存储插补点的列表 circle_points = [] # 生成插补点 for i in range(num_points): # 当前角度 theta_i = theta_0 + i * step_degree # 计算插补点的坐标 x_i = x_c + radius * np.cos(theta_i) y_i = y_c + radius * np.sin(theta_i) # 将点添加到列表中 circle_points.append((np.round(x_i).astype(int), np.round(y_i).astype(int))) circle_points.append((np.round(x_0).astype(int), np.round(y_0).astype(int))) return circle_points @track_function("generate_point_cloudPoint",True) def generate_point_cloudPoint(self, point, num_cycle_factor=None, width=None, direction: Literal["CW", "CCW"] = 'CW'): if num_cycle_factor is None: num_cycle_factor = 12 if num_cycle_factor < 1: print("圈数必须为大于1的整数") num_cycle_factor = 1 num_cycle_factor = np.abs(np.round(num_cycle_factor).astype(int)) if width is None: width = 20 if width < 2: print("宽度必须为大于2的整数") width = 2 width = np.abs(np.round(width).astype(int)) radius = width/2 points = np.round((num_cycle_factor * np.pi * 2)/(np.pi/9)).astype(int) thetas = np.linspace(0, num_cycle_factor * 2 * np.pi, points) radiuss = np.ones_like(thetas) * radius for i in range(len(thetas)): if thetas[i] <= np.pi/2: radiuss[i] = np.sin(thetas[i]) * radius if thetas[i] >= num_cycle_factor * np.pi * 2 - np.pi / 2: radiuss[i] = np.sin(num_cycle_factor * np.pi * 2 - thetas[i]) * radius if direction == "CCW": x_ellipse = radiuss * np.cos(-thetas) y_ellipse = radiuss * np.sin(-thetas) else: x_ellipse = radiuss * np.cos(thetas) y_ellipse = radiuss * np.sin(thetas) x_rot = point[0] + x_ellipse y_rot = point[1] + y_ellipse result_points = np.hstack((np.round(x_rot).astype(int).reshape(-1, 1), np.round(y_rot).astype(int).reshape(-1, 1))) return self.resample_curve_strict(points=result_points, num_resampled_points=len(result_points)) @track_function("generate_line_cloudPoint",True) def generate_line_cloudPoint(self,start, end, step_distance = 5): """ 生成起点和终点之间的线性插值点 参数: start: 起点 (标量或数组,例如 [x1, y1, z1]) end: 终点 (标量或数组,例如 [x2, y2, z2]) num_points: 插值点的数量(包括起点和终点) 返回: 包含线性插值点的数组 """ start = np.array(start) end = np.array(end) distance = np.linalg.norm(end - start) num_points = int(distance / step_distance) if num_points <=2: num_points = 2 # 使用 numpy 的 linspace 在起点和终点之间生成线性插值点 interpolated_points = np.linspace(start, end, num_points) # 将插值结果四舍五入为整数 return np.round(interpolated_points).astype(int) @track_function("generate_repeat_line_cloudPoint",True) def generate_repeat_line_cloudPoint(self, start_point, end_point, num_cycle_factor = 5): """ start_point: 起始点坐标,形如 (x_0, y_0) end_point: 终点坐标,形如 (x_0, y_0) num_cycle_factor: 重复的次数 """ # 初始化存储插补点的列表 result_points = [] # 生成插补点 for i in range(num_cycle_factor): # 将点添加到列表中 result_points.append(start_point) result_points.append(end_point) return np.round(np.array(result_points)).astype(int) @track_function("generate_finger_circle_cloudPoint",True) def generate_finger_circle_cloudPoint(self, center_point, num_cycle_factor=5, radius=30,deviation=15): """ 生成一指禅绕球心的圆弧轨迹 center_point: 圆心坐标 [x, y, z, roll, pitch, yaw] num_cycle_factor: 轨迹的重复次数 radius: 圆的半径 """ result_points = [] result_points.append(copy.deepcopy(center_point)) deviation = deviation * np.pi/180 totalCounts = int(360 / radius) shift = 1 for i in range(num_cycle_factor): for countR in range(0, totalCounts): # 当前旋转矩阵 temp_pose = copy.deepcopy(center_point) rotation = R.from_euler('xyz', temp_pose[3:], degrees=False) current_rotation_matrix = rotation.as_matrix() if i == 0: shift = countR/(90/radius) if countR/(90/radius) < 1 else 1 if i == num_cycle_factor-1: shift = (totalCounts-countR)/(90/radius) if (totalCounts-countR)/(90/radius) < 1 else 1 # 生成增量旋转矩阵 increment_rotation= R.from_euler('xyz', [deviation*shift,0,countR * np.pi*2 /totalCounts], degrees=False).as_matrix() # 更新旋转矩阵 updated_rotation_matrix = current_rotation_matrix @ increment_rotation # 将旋转矩阵转换回欧拉角 temp_pose[3:] = R.from_matrix(updated_rotation_matrix).as_euler('xyz', degrees=False) # 添加当前点到结果列表 result_points.append(copy.deepcopy(temp_pose)) result_points.append(copy.deepcopy(center_point)) return result_points def traj_generate(self, t: Union[int, float, List[float]], pose = None, wrench = None, interpolation: Literal["linear", "cubic","circle","cloud_point"] = 'linear',**kwargs): """ 轨迹生成 param: t: 时间或时间序列 pose(nx6): 世界坐标系下的位姿或位姿序列,当输入为None时,表示保持当前位姿 wrench(nx6): 力矩或力矩序列,当输入为None时,表示保持当前的力 interpolation: 插值方式 return: traj: 生成的轨迹 -> dict """ # 确保时间输入是列表 if isinstance(t, (int, float)): t = [t] elif not isinstance(t, list) or any(not isinstance(i, (int, float)) for i in t): self.logger.log_error("not isinstance(t, list) or any(not isinstance(i, (int, float)) for i in t)") return None # 检查时间点数量 if len(t) < 1: self.logger.log_error("len(t) < 1") return None # 时间序列 time_points = [0.0] + t time_points = np.array(time_points) dt = self.command_rate.to_sec() for i in range(len(time_points)): time_points[i] = np.round(time_points[i] / dt) * dt if np.abs(time_points[-1]) < dt: time_points[-1] = dt # 由于浮点数精度问题,time_step要乘0.9 time_seq = np.arange(time_points[0], time_points[-1] + dt * 0.9, dt) pos_traj = None quat_traj = None wrench_traj = None pos_vel = None angular_vel = None # 位置轨迹与速度轨迹生成 if pose is not None: if interpolation != 'cloud_point': pose = np.array(pose).reshape(-1, 6) if pose.shape[0] != len(t): self.logger.log_error("pose.shape[0] != len(t)") print(pose.shape[0],len(t)) return None pose_matrix = self.convert_to_7d(pose) else: pose = np.array(pose) try: # 插值 if interpolation == 'linear': pos_traj, quat_traj = linear_interpolate(positions=pose_matrix[:, :3], quaternions=pose_matrix[:, 3:], time_points=time_points, time_step=dt) elif interpolation == 'cubic': pos_traj, quat_traj = spline_interpolate(positions=pose_matrix[:, :3], quaternions=pose_matrix[:, 3:], time_points=time_points, time_step=dt) elif interpolation == 'circle': if self.ion is not None: pos_traj = circle_trajectory(center=pose, radius=0.07,time_points=time_points,time_step=dt,**kwargs) else: pos_traj = circle_trajectory(center=pose,time_points=time_points,time_step=dt,**kwargs) quat_traj = np.tile(R.from_euler('xyz', np.array(pose[0][3:])).as_quat(), (pos_traj.shape[0], 1)) elif interpolation == 'cloud_point': pos_traj, quat_traj = cloud_point_interpolate(positions=pose[:, :3], quaternions=pose[:, 3:], time_points=time_points, time_step=dt) # self.logger.log_yellow(f'{pos_traj[0]} {pos_traj[-1]}') except ValueError as error: self.logger.log_error(error) return None else: self.logger.log_error("未输入位置进行规划") # TODO 力轨迹生成 改为线性增加 if wrench is not None: wrench = np.array(wrench).reshape(-1, 6) if wrench.shape[0] != len(t): return None indices = np.searchsorted(time_points, time_seq, side='left') - 1 indices = np.clip(indices, 0, len(wrench) - 1) wrench_traj = wrench[indices] # 构建返回数据字典 traj = { 't': time_seq, 'pos': pos_traj, 'quat': quat_traj, 'pos_vel': pos_vel, 'angular_vel': angular_vel, 'wrench': wrench_traj } return traj def wrench_traj_generate(self, t: Union[int, float, List[float]], wrench = None, interpolation: Literal["linear", "oscillation"] = 'linear',**kwargs): """ 力/力矩轨迹生成 param: t: 时间或时间序列 wrench(nx6): 力矩或力矩序列 interpolation: 插值方式 return: traj: 生成的轨迹 -> dict """ # 确保时间输入是列表 if isinstance(t, (int, float)): t = [t] elif not isinstance(t, list) or any(not isinstance(i, (int, float)) for i in t): self.logger.log_error("not isinstance(t, list) or any(not isinstance(i, (int, float)) for i in t)") return None # 检查时间点数量 if len(t) < 1: self.logger.log_error("len(t) < 1") return None if wrench is None: self.logger.log_error("未输入力矩进行规划") return None # 时间序列 time_points = [0.0] + t time_points = np.array(time_points) dt = self.command_rate.to_sec() for i in range(len(time_points)): time_points[i] = np.round(time_points[i] / dt) * dt if np.abs(time_points[-1]) < dt: time_points[-1] = dt # 由于浮点数精度问题,time_step要乘0.9 time_seq = np.arange(time_points[0], time_points[-1] + dt * 0.9, dt) wrench_traj = None # TODO 力轨迹生成 改为线性增加 if wrench is not None: wrench = np.array(wrench).reshape(-1, 6) if wrench.shape[0] != len(t): return None if interpolation == 'oscillation': wrench_traj = oscillation_wrench_interpolate(wrench=wrench, time_points=time_points, time_step=dt) else: wrench_traj = linear_wrench_interpolate(wrench=wrench, time_points=time_points, time_step=dt) # indices = np.searchsorted(time_points, time_seq, side='left') - 1 # indices = np.clip(indices, 0, len(wrench) - 1) # wrench_traj = wrench[indices] # 构建返回数据字典 traj = { 't': time_seq, 'wrench': wrench_traj } return traj def force_traj_generate(self, t: Union[int, float, List[float]], wrench = None, interpolation: Literal["linear", "oscillation", "auto"] = "auto",**kwargs): """ 力/力矩轨迹生成 param: t: 时间或时间序列 wrench(nx6): 力矩或力矩序列 interpolation: 插值方式 return: traj: 生成的轨迹 -> dict """ # 确保时间输入是列表 if isinstance(t, (int, float)): t = [t] elif not isinstance(t, list) or any(not isinstance(i, (int, float)) for i in t): self.logger.log_error("not isinstance(t, list) or any(not isinstance(i, (int, float)) for i in t)") return None # 检查时间点数量 if len(t) < 1: self.logger.log_error("len(t) < 1") return None if wrench is None: self.logger.log_error("未输入力矩进行规划") return None # 时间序列 time_points = [0.0] + t time_points = np.array(time_points) dt = self.command_rate.to_sec() for i in range(len(time_points)): time_points[i] = np.round(time_points[i] / dt) * dt if np.abs(time_points[-1]) < dt: time_points[-1] = dt # 由于浮点数精度问题,time_step要乘0.9 time_seq = np.arange(time_points[0], time_points[-1] + dt * 0.9, dt) wrench_traj = None # TODO 力轨迹生成 改为线性增加 if wrench is not None: wrench = np.array(wrench).reshape(-1, 6) if wrench.shape[0] != len(t): return None if interpolation == 'oscillation': wrench_traj = self.force_plan.oscillation_wrench_interpolate(wrench=wrench, time_points=time_points, time_step=dt) elif interpolation == "auto": # target_wrench = [] # target_wrench.append(wrench) wrench_traj = self.force_plan.S_shaped_wrench_interpolate(start=self.arm_state.desired_wrench, wrench=wrench, time_points=time_points, time_step=dt) else: wrench_traj = self.force_plan.linear_wrench_interpolate(wrench=wrench, time_points=time_points, time_step=dt) # indices = np.searchsorted(time_points, time_seq, side='left') - 1 # indices = np.clip(indices, 0, len(wrench) - 1) # wrench_traj = wrench[indices] # 构建返回数据字典 traj = { 't': time_seq, 'wrench': wrench_traj } return traj if __name__ == "__main__": # waypoints = [ # {"time": 0.0, "position": [0.25, -0.135, 0.3443], "velocity": [0, 0, 0], # "acceleration": [0, 0, 0], # "orientation": R.from_euler("xyz", [0, 0, 0], degrees=True).as_quat()}, # {"time": 5.0, "position": [0.30, -0.135, 0.3043], "velocity": [0, 0, 0], # "acceleration": [0, 0, 0], # "orientation": R.from_euler("xyz", [0, 30, 30], degrees=True).as_quat()}, # ] ## 单位 m deg # myInterpolate = TrajectoryInterpolator(waypoints) # ts = np.linspace(0,5,100) robot = MassageRobot(arm_config_path="/home/jsfb/jsfb_ws/MassageRobot_Dobot/Massage/MassageControl/config/robot_config.yaml") # xr_list, vr_list, ar_list = [], [], [] # for t in ts: # xr, vr, ar = myInterpolate.interpolate(t) # xr_list.append(xr) # vr_list.append(vr) # ar_list.append(ar) # xr_array = np.array(xr_list) # vr_array = np.array(vr_list) # ar_array = np.array(ar_list) # robot.xr = xr_array # robot.vr = vr_array # robot.ar = ar_array # robot.ts = ts # robot.dt = ts[1] - ts[0] ready_pose = [204.3467,-134.9880,455.3604,-180.0000,-30.0000,0.0042] robot.current_head = 'finger_head' robot.force_sensor.disable_active_transmission() time.sleep(0.5) robot.force_sensor.disable_active_transmission() time.sleep(0.5) robot.force_sensor.disable_active_transmission() time.sleep(0.5) robot.force_sensor.set_zero() time.sleep(0.5) robot.force_sensor.enable_active_transmission() robot.init_hardwares(ready_pose) time.sleep(3) robot.controller_manager.switch_controller('admittance') # robot.controller_manager.switch_controller('position') atexit.register(robot.force_sensor.disconnect) robot.arm_state.desired_wrench = np.array([0,0,-1,0,0,0]) try: robot_thread = threading.Thread(target=robot.start) robot_thread.start() except KeyboardInterrupt: print("用户中断操作。") except Exception as e: print("Exception occurred at line:", e.__traceback__.tb_lineno) print("发生异常:", e) # robot.arm.disableRobot()