diff --git a/Massage/MassageControl/MassageRobot_nova5.py b/Massage/MassageControl/MassageRobot_nova5.py index 50c77eb..ed45833 100644 --- a/Massage/MassageControl/MassageRobot_nova5.py +++ b/Massage/MassageControl/MassageRobot_nova5.py @@ -1,1630 +1,1649 @@ from functools import wraps - import sys - import numpy as np - import threading - import time - from typing import List, Literal, Union - - from scipy.interpolate import interp1d - - import requests - import os - from scipy.spatial.transform import Rotation as R - import ctypes - libc = ctypes.CDLL("libc.so.6") - import atexit - import copy - - from scipy.spatial.transform import Slerp - - - try: - # 导入算法 - from .algorithms.arm_state import ArmState - from .algorithms.controller_manager import ControllerManager - from .algorithms.admittance_controller import AdmittanceController - from .algorithms.hybrid_controller import HybridController - from .algorithms.admithybrid_controller import AdmitHybridController - from .algorithms.position_controller import PositionController - from .algorithms.hybridPid_controller import HybridPidController - from .algorithms.hybridAdmit_controller import HybridAdmitController - from .algorithms.positionerSensor_controller import PositionerSensorController - from .algorithms.admittanceZ_controller import AdmittanceZController - from .algorithms.interpolation import linear_interpolate,spline_interpolate,circle_trajectory,cloud_point_interpolate,oscillation_wrench_interpolate,linear_wrench_interpolate - from .algorithms.force_planner import ForcePlanner - - # 导入硬件 - from .hardware.dobot_nova5 import dobot_nova5 - from .hardware.force_sensor import XjcSensor - - # 导入工具 - from .tools.log import CustomLogger - from .tools.Rate import Rate - from .tools.yaml_operator import read_yaml - from .tools.decorator_tools import custom_decorator,apply_decorators - from .tools.serial_tools import find_serial_by_location - from .tools.Trajectory import TrajectoryInterpolator - except: - #导入算法 - from algorithms.arm_state import ArmState - from algorithms.controller_manager import ControllerManager - from algorithms.admittance_controller import AdmittanceController - from algorithms.admittanceZ_controller import AdmittanceZController - from algorithms.hybrid_controller import HybridController - from algorithms.admithybrid_controller import AdmitHybridController - from algorithms.position_controller import PositionController - from algorithms.hybridPid_controller import HybridPidController - from algorithms.hybridAdmit_controller import HybridAdmitController - from algorithms.positionerSensor_controller import PositionerSensorController - from algorithms.interpolation import linear_interpolate,spline_interpolate,circle_trajectory,cloud_point_interpolate,oscillation_wrench_interpolate,linear_wrench_interpolate - from algorithms.force_planner import ForcePlanner - - # 导入硬件 - from hardware.dobot_nova5 import dobot_nova5 - from hardware.force_sensor import XjcSensor - - # 导入工具 - from tools.log import CustomLogger - from tools.Rate import Rate - from tools.yaml_operator import read_yaml - from tools.decorator_tools import custom_decorator,apply_decorators - from tools.serial_tools import find_serial_by_location - from tools.Trajectory import TrajectoryInterpolator - - import matplotlib.pyplot as plt - from matplotlib.animation import FuncAnimation - - current_file_path = os.path.abspath(__file__) - MassageRobot_Dobot_Path = os.path.dirname(os.path.dirname(os.path.dirname(current_file_path))) - print(MassageRobot_Dobot_Path) - # 添加目标文件夹到系统路径 - sys.path.append(MassageRobot_Dobot_Path) - from VortXDB.client import VTXClient - - """ - 在 Python 中,atexit.register 用于注册程序退出时要执行的函数。atexit 模块提供了一个简单的接口,用于在程序正常终止时执行清理操作。 +import sys +import numpy as np +import threading +import time +from typing import List, Literal, Union + +from scipy.interpolate import interp1d + +import requests +import os +from scipy.spatial.transform import Rotation as R +import ctypes +libc = ctypes.CDLL("libc.so.6") +import atexit +import copy + +from scipy.spatial.transform import Slerp + + +try: + # 导入算法 + from .algorithms.arm_state import ArmState + from .algorithms.controller_manager import ControllerManager + from .algorithms.admittance_controller import AdmittanceController + from .algorithms.hybrid_controller import HybridController + from .algorithms.admithybrid_controller import AdmitHybridController + from .algorithms.position_controller import PositionController + from .algorithms.hybridPid_controller import HybridPidController + from .algorithms.hybridAdmit_controller import HybridAdmitController + from .algorithms.positionerSensor_controller import PositionerSensorController + from .algorithms.admittanceZ_controller import AdmittanceZController + from .algorithms.interpolation import linear_interpolate,spline_interpolate,circle_trajectory,cloud_point_interpolate,oscillation_wrench_interpolate,linear_wrench_interpolate + from .algorithms.force_planner import ForcePlanner + + # 导入硬件 + from .hardware.dobot_nova5 import dobot_nova5 + from .hardware.force_sensor import XjcSensor + + # 导入工具 + from .tools.log import CustomLogger + from .tools.Rate import Rate + from .tools.yaml_operator import read_yaml + from .tools.decorator_tools import custom_decorator,apply_decorators + from .tools.serial_tools import find_serial_by_location + from .tools.Trajectory import TrajectoryInterpolator +except: + #导入算法 + from algorithms.arm_state import ArmState + from algorithms.controller_manager import ControllerManager + from algorithms.admittance_controller import AdmittanceController + from algorithms.admittanceZ_controller import AdmittanceZController + from algorithms.hybrid_controller import HybridController + from algorithms.admithybrid_controller import AdmitHybridController + from algorithms.position_controller import PositionController + from algorithms.hybridPid_controller import HybridPidController + from algorithms.hybridAdmit_controller import HybridAdmitController + from algorithms.positionerSensor_controller import PositionerSensorController + from algorithms.interpolation import linear_interpolate,spline_interpolate,circle_trajectory,cloud_point_interpolate,oscillation_wrench_interpolate,linear_wrench_interpolate + from algorithms.force_planner import ForcePlanner + + # 导入硬件 + from hardware.dobot_nova5 import dobot_nova5 + from hardware.force_sensor import XjcSensor + + # 导入工具 + from tools.log import CustomLogger + from tools.Rate import Rate + from tools.yaml_operator import read_yaml + from tools.decorator_tools import custom_decorator,apply_decorators + from tools.serial_tools import find_serial_by_location + from tools.Trajectory import TrajectoryInterpolator + +import matplotlib.pyplot as plt +from matplotlib.animation import FuncAnimation + +current_file_path = os.path.abspath(__file__) +MassageRobot_Dobot_Path = os.path.dirname(os.path.dirname(os.path.dirname(current_file_path))) +print(MassageRobot_Dobot_Path) +# 添加目标文件夹到系统路径 +sys.path.append(MassageRobot_Dobot_Path) +from VortXDB.client import VTXClient + +""" +在 Python 中,atexit.register 用于注册程序退出时要执行的函数。atexit 模块提供了一个简单的接口,用于在程序正常终止时执行清理操作。 当你注册多个函数时,它们的执行顺序遵循先进后出的顺序(LIFO:Last In, First Out)。也就是说,最后注册的函数会最先执行。 - """ - - def track_function(function_name,log = False): - def before(self, *args, **kwargs): - self.current_function = function_name - if log: - self.logger.log_info(f"Entering function: {function_name}") - def after(self, result, *args, **kwargs): - self.current_function = None - if log: - self.logger.log_info(f"Exiting function: {function_name},code: {result}") - return custom_decorator(before=before, after=after) - - @apply_decorators - class MassageRobot: - def __init__(self,arm_config_path,is_log=False): - self.logger = CustomLogger() - if is_log: - self.logger_data = CustomLogger() - # 日志线程 - threading.Thread(target=self.log_thread,daemon=True).start() - - self.vtxdb = VTXClient() - - # 当前执行的函数 - self.current_function = None - self.arm_state = ArmState() - self.arm_config = read_yaml(arm_config_path) - # arm 实例化时机械臂类内部进行通讯连接 - self.arm = dobot_nova5(arm_ip = self.arm_config['arm_ip']) - self.arm.start() - self.arm.init() - - self.thermotherapy = None - self.shockwave = None - self.stone = None - self.heat = None - self.ion = None - self.force_plan = ForcePlanner() - # 传感器 - # self.force_sensor = None - self.force_sensor = XjcSensor() - # self.force_sensor.connect() - # 控制器,初始为导纳控制 - self.controller_manager = ControllerManager(self.arm_state) - self.controller_manager.add_controller(AdmittanceController,'admittance',self.arm_config['controller'][0]) - self.controller_manager.add_controller(HybridController,'hybrid',self.arm_config['controller'][1]) - self.controller_manager.add_controller(PositionController,'position',self.arm_config['controller'][2]) - self.controller_manager.add_controller(AdmitHybridController,'admithybrid',self.arm_config['controller'][3]) - self.controller_manager.add_controller(HybridPidController,'hybridPid',self.arm_config['controller'][4]) - self.controller_manager.add_controller(HybridAdmitController,'hybridAdmit',self.arm_config['controller'][5]) - self.controller_manager.add_controller(PositionerSensorController,'positionerSensor',self.arm_config['controller'][6]) - self.controller_manager.add_controller(AdmittanceZController,'admittanceZ',self.arm_config['controller'][7]) - - self.controller_manager.switch_controller('admittance') - - - - - # 频率数据赋值 - self.control_rate = Rate(self.arm_config['control_rate']) - self.sensor_rate = Rate(self.arm_config['sensor_rate']) - self.command_rate = Rate(self.arm_config['command_rate']) - - # 低通滤波 - self.cutoff_freq = 80.0 - - # 停止标志位线程 - self.exit_event = threading.Event() - self.exit_event.set() # 运行初始化为True - self.interrupt_event = threading.Event() - self.interrupt_event.clear() # 中断初始化为False - self.pause_envent = threading.Event() - self.pause_envent.clear() # 暂停初始化为False - self.skip_envent = threading.Event() - self.skip_envent.clear() # 跳过初始化为False - # 按摩调整 - self.adjust_wrench_envent = threading.Event() - self.adjust_wrench_envent.clear() # 调整初始化为False - self.is_execute = False - self.pos_increment = np.zeros(3,dtype=np.float64) - self.adjust_wrench = np.zeros(6,dtype=np.float64) - self.skip_pos = np.zeros(6,dtype=np.float64) - - # 按摩头参数暂时使用本地数据 - massage_head_dir = self.arm_config['massage_head_dir'] - all_items = os.listdir(massage_head_dir) - head_config_files = [f for f in all_items if os.path.isfile(os.path.join(massage_head_dir, f))] - self.playload_dict = {} - for file in head_config_files: - file_address = massage_head_dir + '/' + file - play_load = read_yaml(file_address) - self.playload_dict[play_load['name']] = play_load - # self.playload_dict = self.vtxdb.get("robot_config", "massage_head") - # print(self.playload_dict) - self.current_head = 'none' - self.is_waitting = False - - self.last_print_time = 0 - self.last_record_time = 0 - self.last_command_time = 0 - self.move_to_point_count = 0 - self.width_default = 5 - # 卡尔曼滤波相关初始化 - self.x_base = np.zeros(6) - self.P_base = np.eye(6) - # 过程噪声协方差矩阵 - self.Q_base = np.eye(6) * 0.01 - # 测量噪声协方差矩阵 - self.R_base = np.eye(6) * 0.1 - - self.x_tcp = np.zeros(6) - self.P_tcp = np.eye(6) - # 过程噪声协方差矩阵 - self.Q_tcp = np.eye(6) * 0.01 - # 测量噪声协方差矩阵 - self.R_tcp = np.eye(6) * 0.1 - # 传感器故障计数器 - self.sensor_fail_count = 0 - # 机械臂初始化,适配中间层 - # 读取数据 - self.gravity_base = None - self.force_zero = None - self.torque_zero = None - self.tool_position = None - self.mass_center_position = None - self.s_tf_matrix_t = None - arm_orientation_set0 = np.array([-180,0,-90]) - self.b_rotation_s_set0 = R.from_euler('xyz',arm_orientation_set0,degrees=True).as_matrix() - - - # REF TRAJ - self.xr = [] - self.vr = [] - self.ar = [] - self.ts = [] # time sequence - self.dt = [] # derivate of time - # --- - self.last_time = -1 - self.cur_time = -1 - - # 预测步骤 - def kalman_predict(self,x, P, Q): - # 预测状态(这里假设状态不变) - x_predict = x - # 预测误差协方差 - P_predict = P + Q - return x_predict, P_predict - - # 更新步骤 - def kalman_update(self,x_predict, P_predict, z, R): - # 卡尔曼增益 - # K = P_predict @ np.linalg.inv(P_predict + R) - S = P_predict + R - s = np.diag(S) # shape (6,) - p_diag = np.diag(P_predict) - K_diag = np.zeros_like(s) - nonzero_mask = s != 0 - K_diag[nonzero_mask] = p_diag[nonzero_mask] / s[nonzero_mask] - K = np.diag(K_diag) - # 更新状态 - x_update = x_predict + K @ (z - x_predict) - # 更新误差协方差 - P_update = (np.eye(len(K)) - K) @ P_predict - return x_update, P_update - def init_hardwares(self,ready_pose): - self.ready_pose = np.array(ready_pose) - self.switch_payload(self.current_head) - print(self.arm.get_arm_position()) - time.sleep(0.5) - - - def sensor_set_zero(self): - self.arm_position_set0,self.arm_orientation_set0 = self.arm.get_arm_position() - arm_orientation_set0 = self.arm_orientation_set0 - print("arm_orientation_set0",R.from_quat(arm_orientation_set0).as_euler('xyz',degrees=True)) - self.b_rotation_s_set0 = R.from_quat(arm_orientation_set0).as_matrix() - ######### - max_retries = 5 - #no_head_wrong=np.array([-80.01506042, 116.34187317, -87.65788269, -0.32910481, -0.65792173, -0.61110526])#旧版传感器) - head_set0=np.array([0, 0, 0, 0, 0, 0])#新版传感器) - min_tolerance=0.001 - retry_count = 0 - self.force_sensor.disable_active_transmission() - time.sleep(0.5) - while retry_count < max_retries: - # 调用读取数据的函数,假设返回值是一个 numpy 数组 - data = self.force_sensor.read_data_f32() - data = self.force_sensor.read_data_f32() - data = self.force_sensor.read_data_f32() - - # 检查返回值是否为 np.array 类型 - if isinstance(data, np.ndarray): - # 判断数据是否接近目标值 - - if np.allclose(data, head_set0, atol=min_tolerance): - print(f"检测到力传感器已置零:{data}") - break - else: - print(f"检测到力传感器未置零:{data}") - - # 禁用传感器传输 - self.force_sensor.disable_active_transmission() - self.force_sensor.disable_active_transmission() - time.sleep(0.5) - - # 尝试设置传感器清零 - code = self.force_sensor.set_zero() - code = self.force_sensor.set_zero() - max_try = 3 - - while code != 0 and max_try > 0: - self.logger.log_error("Force sensor set zero failed, retrying...") - self.force_sensor.disable_active_transmission() - self.force_sensor.disable_active_transmission() - time.sleep(0.5) - code = self.force_sensor.set_zero() - code = self.force_sensor.set_zero() - max_try -= 1 - time.sleep(0.5) - - # 如果多次尝试后失败 - if max_try == 0: - self.logger.log_error("Force sensor set zero failed, exiting...") - requests.post( - "http://127.0.0.1:5000/on_message", data={"message": "传感器初始化失败"} - ) - requests.post( - "http://127.0.0.1:5000/update_massage_status", data={"massage_service_started": False} - ) - # sys.exit(0) - return -1 # 置零失败 - - # 成功后跳出主循环 - break - - else: - self.logger.log_error("读取数据失败或格式错误,尝试重试...") - - # 增加重试次数 - retry_count += 1 - self.force_sensor.disable_active_transmission() - self.force_sensor.disconnect() - time.sleep(1) - self.force_sensor = XjcSensor() - time.sleep(1) # 每次重试之间添加一个短暂的延迟 - - if retry_count == max_retries: - self.logger.log_error(f"超过最大重试次数 ({max_retries}),操作失败,退出程序...") - requests.post( - "http://127.0.0.1:5000/on_message", data={"message": "传感器初始化失败"} - ) - requests.post( - "http://127.0.0.1:5000/update_massage_status", data={"massage_service_started": False} - ) - return -2 #读取失败 - - return 0 #读取成功并置零成功 - - def sensor_enable(self): - code = self.force_sensor.enable_active_transmission() - max_try = 3 - while code!= 0 and max_try > 0: - self.logger.log_error("Force sensor enable_active_transmission failed,retrying...") - code = self.force_sensor.enable_active_transmission() - max_try -= 1 - time.sleep(0.1) - if max_try == 0: - self.logger.log_error("Force sensor enable_active_transmission failed,exiting...") - requests.post( - "http://127.0.0.1:5000/on_message", data={"message": "传感器初始化失败"} - ) - requests.post( - "http://127.0.0.1:5000/update_massage_status", data={"massage_service_started": False} - ) - # sys.exit(0) - return -1 - return 0 - - - - def update_wrench(self): - # 当前的机械臂到末端转换 (实时) - b_rotation_s = R.from_quat(self.arm_state.arm_orientation).as_matrix() - # 读取数据 - sensor_data = self.force_sensor.read() - # self.logger.log_info(f"传感器数据{sensor_data}") - if sensor_data is None: - self.force_sensor.stop_background_reading() - self.logger.log_error("传感器数据读取失败") - return -1 - # 反重力补偿 - sensor_data[:3] = sensor_data[:3] + self.force_zero + self.b_rotation_s_set0.T @ self.gravity_base - sensor_data[3:] = sensor_data[3:] + self.torque_zero + np.cross(self.mass_center_position, self.b_rotation_s_set0.T @ self.gravity_base) - # 传感器数据通过矫正计算得到外来施加力 传感器坐标系下 - # 重力 - gravity_in_sensor = b_rotation_s.T @ self.gravity_base - s_force = sensor_data[:3] - self.force_zero - gravity_in_sensor - # 力矩 - s_torque = sensor_data[3:] - self.torque_zero - np.cross(self.mass_center_position,gravity_in_sensor) - wrench = np.concatenate([s_force,s_torque]) - # 传感器到TCP - wrench = self.wrench_coordinate_conversion(self.s_tf_matrix_t,wrench) - # 交给ARM STATE集中管理 - self.arm_state.external_wrench_tcp = wrench - # 对tcp坐标系下的外力外矩进行平滑 - x_tcp_predict, P_tcp_predict = self.kalman_predict(x = self.x_tcp, - P = self.P_tcp, - Q = self.Q_tcp) - self.x_tcp, self.P_tcp = self.kalman_update(x_predict = x_tcp_predict, - P_predict = P_tcp_predict, - z = self.arm_state.external_wrench_tcp, - R = self.R_tcp) - self.arm_state.external_wrench_tcp = self.x_tcp - self.arm_state.last_external_wrench_tcp = self.arm_state.external_wrench_tcp - return 0 - - def switch_payload(self,name): - if name in self.playload_dict: - self.stop() - self.current_head = name - - compensation_config = self.playload_dict[self.current_head] - - # 读取数据 - self.gravity_base = np.array(compensation_config['gravity_base']) - self.force_zero = np.array(compensation_config['force_zero']) - self.torque_zero = np.array(compensation_config['torque_zero']) - self.tool_position = np.array(compensation_config['tcp_offset']) - self.mass_center_position = np.array(compensation_config['mass_center_position']) - self.s_tf_matrix_t = self.get_tf_matrix(self.tool_position[:3], R.from_euler('xyz',self.tool_position[3:],degrees=False).as_quat()) - - tcp_offset = self.playload_dict[name]["tcp_offset"] - tcp_offset_str = "{" + ",".join(map(str, tcp_offset)) + "}" - print("tcp_offset_str",tcp_offset_str) - self.arm.setEndEffector(i=9,tool_i=tcp_offset_str) - self.arm.chooseEndEffector(i=9) - print(self.arm.get_arm_position()) - self.logger.log_info(f"切换到{name}按摩头") - - # ####################### 位姿伺服 ########################## - - def arm_measure_loop(self): - self.logger.log_info("机械臂测量线程启动") - while (not self.arm.is_exit) and (not self.exit_event.is_set()): - try: - if not self.is_waitting: - self.arm_state.arm_position,self.arm_state.arm_orientation = self.arm.get_arm_position() - code = self.update_wrench() - if code == -1: - self.sensor_fail_count += 1 - self.logger.log_error(f"传感器线程数据读取失败-{self.sensor_fail_count}") - if self.sensor_fail_count > 10: - self.logger.log_error("传感器线程数据读取失败超过10次,程序终止") - self.stop() - break - else: - self.sensor_fail_count = 0 - except Exception as e: - self.logger.log_error(f"机械臂测量线程报错:{e}") - self.exit_event.set() # 控制退出while - self.sensor_rate.sleep() # 控制频率 - - def arm_command_loop_traj(self): - self.logger.log_info("机械臂控制线程启动") - # 离线轨迹启动时间记录 - self.traj_start_time = time.time() - while (not self.arm.is_exit) and (not self.exit_event.is_set()): - try: - if not self.is_waitting: - process_start_time = time.time() - control_cycle = self.control_rate.to_sec() - t_now = time.time() - self.traj_start_time - i = min(int(t_now / self.dt), len(self.ts) - 2) - alpha = max(0.0, min(1.0, (t_now - self.ts[i]) / self.dt)) - - pos = (1 - alpha) * self.xr[i][:3] + alpha * self.xr[i+1][:3] - - slerp = Slerp([0, 1], R.concatenate([R.from_quat(self.xr[i][3:]), R.from_quat(self.xr[i+1][3:])])) - alpha = 0.5 - rot_interp = slerp(alpha).as_quat() - - xr = np.concatenate([pos, rot_interp]) - # xr = (1-alpha) * self.xr[i] + alpha * self.xr[i+1] - vr = (1 - alpha) * self.vr[i] + alpha * self.vr[i + 1] - ar = (1 - alpha) * self.ar[i] + alpha * self.ar[i + 1] - - self.arm_state.desired_position = xr[:3] - # self.arm_state.desired_orientation = xr[3:] - # print(111) - self.arm_state.desired_orientation = rot_interp - # print(222) - self.arm_state.desired_twist = vr - self.arm_state.desired_acc = ar - - print("self.arm_state.desired_position",self.arm_state.desired_position) - print("self.arm_state.desired_orientation",self.arm_state.desired_orientation) - print("self.arm_state.desired_twist",self.arm_state.desired_twist) - print("self.arm_state.desired_acc",self.arm_state.desired_acc) - - self.controller_manager.step_traj(control_cycle) - # self.controller_manager.step(control_cycle) - command_pose = np.concatenate([ - self.arm_state.arm_position_command * 1000, # 转毫米 - self.arm_state.arm_orientation_command - ]) - ## 输入滤波 - status, pose_processed = self.arm.process_command( - command_pose[:3], - command_pose[3:] - ) - # print(f"pose_processed:{pose_processed}") - - tcp_command = ( - f"ServoP({pose_processed[0]:.3f},{pose_processed[1]:.3f}," - f"{pose_processed[2]:.3f},{pose_processed[3]:.3f}," - f"{pose_processed[4]:.3f},{pose_processed[5]:.3f}," - "t=0.02, aheadtime=20, gain=200)" - ).encode() - - run_time = time.time() - process_start_time - sleep_duration = self.control_rate.to_sec() - run_time - b2 =time.time() - if sleep_duration > 0: - time.sleep(sleep_duration) - print(f"real sleep:{time.time()-b2}") - # self.arm.dashboard.socket_dobot.sendall(tcp_command) - if self.arm.feedbackData.robotMode == 10: # 10 当前工程暂停 - print("机械臂为暂停状态") - res = self.arm.dashboard.Continue() - code = self.arm.parseResultId(res)[0] - if code == 0: - print("机械臂继续已暂停的运动指令") - - except Exception as e: - self.logger.log_error(f"机械臂控制失败:{e}") - self.exit_event.set() - # print(f"循环周期时间: {time.time()-process_start_time:.4f} 秒",run_time,sleep_duration) - - def arm_command_loop(self): - self.logger.log_info("机械臂控制线程启动") - while (not self.arm.is_exit) and (not self.exit_event.is_set()): - try: - if not self.is_waitting: - process_start_time = time.time() - control_cycle = self.control_rate.to_sec() - self.controller_manager.step(control_cycle) - command_pose = np.concatenate([ - self.arm_state.arm_position_command * 1000, # 转毫米 - self.arm_state.arm_orientation_command - ]) - ## 输入滤波 - status, pose_processed = self.arm.process_command( - command_pose[:3], - command_pose[3:] - ) - # print(f"pose_processed:{pose_processed}") - - tcp_command = ( - f"ServoP({pose_processed[0]:.3f},{pose_processed[1]:.3f}," - f"{pose_processed[2]:.3f},{pose_processed[3]:.3f}," - f"{pose_processed[4]:.3f},{pose_processed[5]:.3f}," - "t=0.02, aheadtime=20, gain=200)" - ).encode() - - run_time = time.time() - process_start_time - sleep_duration = self.control_rate.to_sec() - run_time - b2 =time.time() - if sleep_duration > 0: - time.sleep(sleep_duration) - print(f"real sleep:{time.time()-b2}") - self.arm.dashboard.socket_dobot.sendall(tcp_command) - if self.arm.feedbackData.robotMode == 10: # 10 当前工程暂停 - print("机械臂为暂停状态") - res = self.arm.dashboard.Continue() - code = self.arm.parseResultId(res)[0] - if code == 0: - print("机械臂继续已暂停的运动指令") - - except Exception as e: - self.logger.log_error(f"机械臂控制失败:{e}") - self.exit_event.set() - # print(f"循环周期时间: {time.time()-process_start_time:.4f} 秒",run_time,sleep_duration) - - ####################### 位姿伺服 ########################## - - def start(self): - if self.exit_event.is_set(): - # 线程 - self.exit_event.clear() - self.arm_measure_thread = threading.Thread(target=self.arm_measure_loop) - self.arm_control_thread = threading.Thread(target=self.arm_command_loop_traj) - # 线程启动 - self.arm_measure_thread.start() ## 测量线程 - position,quat_rot = self.arm.get_arm_position() - # 初始值 - self.arm_state.desired_position = position - self.arm_state.arm_position_command = position - self.arm_state.desired_orientation = quat_rot - self.arm_state.arm_orientation_command = quat_rot - self.arm_state.arm_position = position - self.arm_state.arm_orientation = quat_rot - for i in range(20): - self.controller_manager.step(self.control_rate.to_sec()) - self.logger.log_blue(f"position command: {self.arm_state.arm_position_command}") - self.logger.log_blue(f"orientation command: {R.from_quat(self.arm_state.arm_orientation_command).as_euler('xyz',degrees=True)}") - position, quat_rot = self.arm.get_arm_position() - self.logger.log_blue(f"position current:{position}") - self.logger.log_blue(f"orientation current: {R.from_quat(quat_rot).as_euler('xyz',degrees=False)}") - self.command_rate.precise_sleep() - - position ,quat_rot = self.arm.get_arm_position() - # self.arm_state.desired_position = poistion - self.arm_state.arm_position_command = position - # self.arm_state.desired_orientation = quat_rot - self.arm_state.arm_orientation_command = quat_rot - - ## POSITION CONTROLLER TEST ## - # delta_pos = np.array([0.1,0.1,-0.1],dtype=np.float64) - # # delta_quat = R.from_euler('xyz', [0, 0, 0], degrees=True).as_quat() - # self.arm_state.desired_position = position + delta_pos - # self.arm_state.desired_orientation = quat_rot - # 线程启动 - self.arm_control_thread.start() ## 赋初始值后控制线程 - - self.logger.log_info("MassageRobot启动") - time.sleep(0.5) - - def stop(self): - if not self.exit_event.is_set(): - self.exit_event.set() - self.interrupt_event.clear() - self.arm_control_thread.join() - self.arm_measure_thread.join() - for i in range(3): - self.force_sensor.disable_active_transmission() - self.force_sensor.start_background_reading() - self.arm.stop_motion() - self.logger.log_info("MassageRobot停止") - - - self.controller_manager.switch_controller('position') - else: - self.logger.log_error(f"未找到{name}按摩头") - - def log_thread(self): - while True: - self.logger_data.log_info(f"机械臂位置:{self.arm_state.arm_position},机械臂姿态:{self.arm_state.arm_orientation}",is_print=False) - self.logger_data.log_info(f"机械臂期望位置:{self.arm_state.desired_position},机械臂真实位置:{self.arm_state.arm_position}",is_print=False) - self.logger_data.log_info(f"机械臂期望姿态:{self.arm_state.desired_orientation},机械臂真实姿态:{self.arm_state.arm_orientation}",is_print=False) - self.logger_data.log_info(f"机械臂期望力矩:{self.arm_state.desired_wrench},机械臂真实力矩:{self.arm_state.external_wrench_base}",is_print=False) - self.logger_data.log_info(f"当前按摩头:{self.current_head}",is_print=False) - time.sleep(1) - - # 工具函数 - def get_tf_matrix(self,position,orientation): - tf_matrix = np.eye(4) - rotation_matrix = R.from_quat(orientation).as_matrix() - tf_matrix[:3,3] = position/1000 - tf_matrix[:3,:3] = rotation_matrix - return tf_matrix - def wrench_coordinate_conversion(self,tf_matrix, wrench): - rot_matrix = tf_matrix[:3, :3] - vector_p = tf_matrix[:3, 3] - temp_force = wrench[:3] - torque = wrench[3:] - force = rot_matrix.T @ temp_force - torque = rot_matrix.T @ np.cross(temp_force,vector_p) + rot_matrix.T @ torque - return np.concatenate([force, torque]) - - @track_function("set_position",True) - def set_position(self,pose,is_wait = False): - self.logger.log_info(f"set postion:{pose}") - x,y,z = pose[0],pose[1],pose[2] - roll,pitch,yaw = pose[3],pose[4],pose[5] - pose_command = np.concatenate([[x,y,z], [roll,pitch,yaw]]) - time.sleep(0.1) - code = self.arm.RunPoint_P_inPose_M_RAD(poses_list = pose_command) - time.sleep(0.1) - return code - - @track_function("move_to_point",True) - def move_to_point(self, t, pose, is_interrupt=True, wait=True, timeout=0.5, interpolation:Literal["linear","circle","cloud_point"] ='linear', algorithm:Literal["admittance","position","hybrid","admithybrid","hybridPid","hybridAdmit","admittanceZ"] = "admittance",is_switch_controller = False): - """ - 移动到指定的点 - param: - t: 时间或时间序列 - pose(nx6): 世界坐标系下的位姿或位姿序列,当输入为None时,表示保持当前位姿 - is_interrupt: 是否允许中断 - wait: 是否等待运动完成 - timeout: 超时时间 - interpolation: 插值方式 - algorithm: 控制算法 - return: - code: 0表示成功 - 1表示超时 - 2表示用户打断 - 3表示硬件错误 - 4表示参数错误 - """ - self.move_to_point_count += 1 - self.logger.log_blue(f"move_to_point_count: {self.move_to_point_count}") - self.logger.log_yellow(f"move_to_point: {pose[-1]}") - # 在函数内部直接规划轨迹 - traj = self.traj_generate(t, pose, None, interpolation=interpolation) - if traj is None: - self.logger.log_error("轨迹未生成或生成错误") - return 4 - - self.logger.log_yellow(f"traj_quat: {R.from_quat(traj['quat'][-1]).as_euler('xyz',degrees=False)}") - self.logger.log_yellow(f"traj_pos: {traj['pos'][-1]}") - - time_seq = traj['t'] - pos_traj = traj['pos'] - quat_traj = traj['quat'] - - if is_switch_controller == True: - try: - self.controller_manager.switch_controller(algorithm) - except ValueError as error: - self.logger.log_error(error) - self.logger.log_error("切换到%s控制器失败" % algorithm) - return 4 - self.logger.log_info("切换到%s控制器" % algorithm) - - # self.logger.log_info("当前为%s控制器" % algorithm) - - for i in range(len(time_seq)): - if self.interrupt_event.is_set(): - self.interrupt_event.clear() - self.logger.log_error("self.interrupt_event.clear()") - self.arm_state.desired_position = copy.deepcopy(self.arm_state.arm_position) - self.arm_state.desired_orientation = copy.deepcopy(self.arm_state.arm_orientation) - massage_wrench = np.array([0, 0, 0, 0, 0, 0]) - self.arm_state.desired_wrench = massage_wrench - return 2 - if self.pause_envent.is_set(): - self.pause_envent.clear() - self.logger.log_error("self.pause_envent.clear()") - self.arm_state.desired_position = copy.deepcopy(self.arm_state.arm_position) - self.arm_state.desired_orientation = copy.deepcopy(self.arm_state.arm_orientation) - massage_wrench = np.array([0, 0, 0, 0, 0, 0]) - self.arm_state.desired_wrench = massage_wrench - return 5 - if self.skip_envent.is_set(): - self.logger.log_error("self.skip_envent.clear()") - self.arm_state.desired_position = copy.deepcopy(self.arm_state.arm_position) - self.arm_state.desired_orientation = copy.deepcopy(self.arm_state.arm_orientation) - massage_wrench = np.array([0, 0, 0, 0, 0, 0]) - self.arm_state.desired_wrench = massage_wrench - return 6 - if self.arm.is_exit or self.exit_event.is_set(): - print("robot_hardware_error:",self.arm.is_exit,self.exit_event.is_set()) - return 3 - if self.adjust_wrench_envent.is_set() and self.is_execute == True: - if self.adjust_wrench is not None: - self.arm_state.desired_wrench = self.adjust_wrench - self.adjust_wrench_envent.clear() - - self.arm_state.desired_position = pos_traj[i] - self.arm_state.desired_orientation = quat_traj[i] - - if self.ion is not None: - - act_position = R.from_quat(self.arm_state.arm_orientation).as_matrix().T @ self.arm_state.arm_position - desired_position = R.from_quat(self.arm_state.arm_orientation).as_matrix().T @ self.arm_state.desired_position - if self.arm_state.positionerSensorData < 0.001: - desired_position[2] = desired_position[2] - self.arm_state.desired_high - else: - desired_position[2] = min(act_position[2] + self.arm_state.positionerSensorData - self.arm_state.desired_high , desired_position[2] - self.arm_state.desired_high) - self.arm_state.desired_position = R.from_quat(self.arm_state.arm_orientation).as_matrix() @ desired_position - self.command_rate.precise_sleep() - - start = time.time() - - while wait: - if self.interrupt_event.is_set(): - self.interrupt_event.clear() - return 2 - if self.pause_envent.is_set(): - self.pause_envent.clear() - return 5 - if self.skip_envent.is_set(): - # self.skip_envent.clear() - return 6 - if self.arm.is_exit or self.exit_event.is_set(): - print("robot_hardware_error:",self.arm.is_exit,self.exit_event.is_set()) - return 3 - if time.time() - start > timeout: - return 1 - self.command_rate.precise_sleep() - - return 0 - - @track_function("apply_force",True) - def apply_force(self, t, wrench, interpolation:Literal["linear","oscillation"] = "linear", algorithm:Literal["admittance","position","hybrid","admithybrid","hybridPid","hybridAdmit","admittanceZ"] = "admittance"): - """ - 施加力 - param: - t: 时间或时间序列 - wrench(nx6): 力矩或力矩序列,当输入为None时,表示力为[0,0,0,0,0,0] - is_interrupt: 是否允许中断 - wait: 是否等待运动完成 - timeout: 超时时间 - interpolation: 插值方式 - algorithm: 控制算法 - return: - code: 0表示成功 - 1表示超时 - 2表示用户打断 - 3表示硬件错误 - 4表示参数错误 - """ - - if t==0: - self.arm_state.desired_wrench = wrench - return 0 - - # 在函数内部直接规划力矩轨迹 - traj = self.wrench_traj_generate(t, wrench, interpolation=interpolation) - # # 在函数内部直接规划力矩轨迹 - # traj = self.traj_generate(t, None, wrench, interpolation=interpolation) - if traj is None: - return 4 - - time_seq = traj['t'] - wrench_traj = traj['wrench'] - - # try: - # self.controller_manager.switch_controller(algorithm) - # except ValueError as error: - # self.logger.log_error(error) - # return 4 - - self.logger.log_info("切换到%s控制器" % algorithm) - - for i in range(len(time_seq)): - if self.interrupt_event.is_set(): - self.interrupt_event.clear() - return 2 - if self.pause_envent.is_set(): - self.pause_envent.clear() - return 5 - if self.skip_envent.is_set(): - self.skip_envent.clear() - return 6 - if self.arm.is_exit or self.exit_event.is_set(): - return 3 - if self.adjust_wrench_envent.is_set(): - # if wrench_traj is not None: - # wrench_traj[i:] = self.adjust_wrench - self.adjust_wrench_envent.clear() - # print(wrench_traj[i]) - self.arm_state.desired_wrench = wrench_traj[i] if wrench_traj is not None else np.zeros(6, dtype=np.float64) - self.command_rate.precise_sleep() - - return 0 - - @track_function("resample_curve_strict",True) - def resample_curve_strict(self, points, num_resampled_points): - """ - 修正的曲线重新采样函数,移除重复点并确保累积弧长严格递增。 - """ - # 计算累积弧长 - distances = np.sqrt(np.sum(np.diff(points, axis=0)**2, axis=1)) # 计算相邻点之间的距离 - cumulative_length = np.insert(np.cumsum(distances), 0, 0) # 计算累积弧长 - - # 确保累积弧长严格递增(去除重复值) - unique_indices = np.where(np.diff(cumulative_length, prepend=-np.inf) > 0)[0] - cumulative_length = cumulative_length[unique_indices] - points = points[unique_indices] - - # 生成均匀间隔的新弧长 - target_lengths = np.linspace(0, cumulative_length[-1], num_resampled_points) - - # 在累积弧长上进行插值计算新点位置 - x_interp = interp1d(cumulative_length, points[:, 0], kind='linear', fill_value="extrapolate") - y_interp = interp1d(cumulative_length, points[:, 1], kind='linear', fill_value="extrapolate") - - new_x = x_interp(target_lengths) - new_y = y_interp(target_lengths) - - return np.column_stack((np.round(new_x).astype(int), np.round(new_y).astype(int))) - - @track_function("generate_in_spiral_cloudPoint",True) - def generate_in_spiral_cloudPoint(self, start, end, num_cycle_factor=None, width=None, direction:Literal["CW","CCW"] ='CW'): - """ - 绘制由内向外的螺旋线。 - - 参数: - - start: 起点坐标 (x, y) - - end: 终点坐标 (x, y) - - num_cycle_factor: 螺旋的圈数 - - points: 螺旋线上的点的数量 - """ - # 计算起点和终点之间的距离 - dx = end[0] - start[0] - dy = end[1] - start[1] - distance = np.sqrt(dx**2 + dy**2) - if width is None: - width = self.width_default - if num_cycle_factor is None: - num_cycle_factor = 5 - - num_cycle_factor = np.round(num_cycle_factor).astype(int) - if distance < 2: - print("距离过短,无法绘制螺旋线") - return 0 - if width <= 2: - width = 2 - if num_cycle_factor < 1: - print("圈数必须为大于1的整数") - num_cycle_factor = 1 - angle = np.arctan2(dy, dx) # 计算从起点到终点的角度 - - points = np.round(num_cycle_factor *2*np.pi/(np.pi/9)).astype(int) - - # 螺旋的角度范围 - theta = np.linspace(0, 2 * np.pi * num_cycle_factor, points) - - # 螺旋的半径随着角度增大而线性增长 - r = np.linspace(0, distance, points) - - tempK = np.abs(width/(distance*((2*num_cycle_factor-0.75)/num_cycle_factor))) - - # 计算螺旋的x, y坐标 - if direction == "CCW": - # 反向旋转时,使用负的角度来反向螺旋 - x = r * np.cos(-theta) # 逆时针旋转 - y = r * np.sin(-theta)*tempK - else: - # 正常旋转 - x = r * np.cos(theta) - y = r * np.sin(theta)*tempK - - # 旋转螺旋线以对齐起点和终点的角度 - x_rotated = x * np.cos(angle) - y * np.sin(angle) - y_rotated = x * np.sin(angle) + y * np.cos(angle) - - # 平移螺旋线,使其从起点开始 - x_final = x_rotated + start[0] - x_rotated[0] - y_final = y_rotated + start[1] - y_rotated[0] - - # return np.hstack((np.round(x_final).astype(int).reshape(-1, 1), np.round(y_final).astype(int).reshape(-1, 1))) - result_points = np.hstack((np.round(x_final).astype(int).reshape(-1, 1), - np.round(y_final).astype(int).reshape(-1, 1))) - - return self.resample_curve_strict(points=result_points, num_resampled_points=len(result_points)) - - @track_function("generate_out_spiral_cloudPoint",True) - def generate_out_spiral_cloudPoint(self,start, end, num_cycle_factor=None, width=None, direction:Literal["CW","CCW"] ='CW'): - """ - 绘制由外向内的螺旋线。 - - 参数: - - start: 起点坐标 (x, y) - - end: 终点坐标 (x, y) - - num_cycle_factor: 螺旋的圈数 - - points: 螺旋线上的点的数量 - """ - # 计算起点和终点之间的距离 - dx = end[0] - start[0] - dy = end[1] - start[1] - angle = np.arctan2(dy, dx) # 计算从起点到终点的角度 - distance = np.sqrt(dx**2 + dy**2) - if width is None: - width = self.width_default - if num_cycle_factor is None: - num_cycle_factor = 5 - - num_cycle_factor = np.round(num_cycle_factor).astype(int) - if distance < 2: - print("距离过短,无法绘制螺旋线") - return 0 - if width <= 2: - width = 2 - if num_cycle_factor < 1: - print("圈数必须为大于1的整数") - num_cycle_factor = 1 - - points = np.round(num_cycle_factor *2*np.pi/(np.pi/9)).astype(int) - - # 螺旋的角度范围 - theta = np.linspace(0, 2 * np.pi * num_cycle_factor, points) - - # 螺旋的半径随着角度增大而线性增长 - r = np.linspace(distance, 0, points) - - tempK = np.abs(width/(distance*((2*num_cycle_factor-0.75)/num_cycle_factor))) - - # 计算螺旋的x, y坐标 - if direction == "CCW": - # 反向旋转时,使用负的角度来反向螺旋 - x = r * np.cos(-theta) # 逆时针旋转 - y = r * np.sin(-theta)*tempK - else: - # 正常旋转 - x = r * np.cos(theta) - y = r * np.sin(theta)*tempK - - # 旋转螺旋线以对齐起点和终点的角度 - x = x * np.cos(np.pi) - y * np.sin(np.pi) - y = x * np.sin(np.pi) + y * np.cos(np.pi) - - # 旋转螺旋线以对齐起点和终点的角度 - x_rotated = x * np.cos(angle) - y * np.sin(angle) - y_rotated = x * np.sin(angle) + y * np.cos(angle) - - # 平移螺旋线,使其从起点开始 - x_final = x_rotated + start[0] - x_rotated[0] - y_final = y_rotated + start[1] - y_rotated[0] - - # return np.hstack((np.round(x_final).astype(int).reshape(-1, 1), np.round(y_final).astype(int).reshape(-1, 1))) - result_points = np.hstack((np.round(x_final).astype(int).reshape(-1, 1), - np.round(y_final).astype(int).reshape(-1, 1))) - - return self.resample_curve_strict(points=result_points, num_resampled_points=len(result_points)) - - @track_function("generate_prolateCycloid_cloudPoint",True) - def generate_prolateCycloid_cloudPoint(self, start, end, width=None, num_cycle_factor=None, direction: Literal["CW", "CCW"] = 'CW'): - # Parameters - step_degree = np.pi / 12 - a_prolate = 4.5 - radius = 30.0 - distance = np.linalg.norm(np.array(end) - np.array(start)) - angle = np.arctan2(end[1] - start[1], end[0] - start[0]) - - if width is None: - width = self.width_default - if num_cycle_factor is None: - num_cycle_factor = 5 - num_cycle_factor = np.round(num_cycle_factor).astype(int) - if distance < 2: - print("距离过短,无法绘制Cycloid线") - return 0 - if width <= 2: - width = 2 - if num_cycle_factor < 1: - print("圈数必须为大于1的整数") - num_cycle_factor = 1 - - r_cycloid = radius / (2 * a_prolate) - num_cycles = int(num_cycle_factor) - temp_k = distance / (r_cycloid * (np.pi + 2 * a_prolate + 2 * np.pi * num_cycles)) - - # Generate the prolate cycloid - theta_prolate = np.linspace(0.5 * np.pi, num_cycles * 2 * np.pi + 1.5 * np.pi, - round((num_cycles * 2 * np.pi + 1.5 * np.pi) / step_degree)) - if direction == 'CW': - theta_prolate = np.flip(theta_prolate) - - x_prolate = r_cycloid * (theta_prolate - a_prolate * np.sin(theta_prolate)) * temp_k - y_prolate = r_cycloid * (1 - a_prolate * np.cos(theta_prolate)) - - # Scale y-prolate based on width - y_prolate = (y_prolate / 30) * width - - # Rotation of cycloid - if direction == 'CW': - x_prolate, y_prolate = -x_prolate, -y_prolate - - x_rotated = x_prolate * np.cos(angle) - y_prolate * np.sin(angle) - y_rotated = x_prolate * np.sin(angle) + y_prolate * np.cos(angle) - - # Translate to starting point - x_final = x_rotated + start[0] - x_rotated[0] - y_final = y_rotated + start[1] - y_rotated[0] - - return np.column_stack((np.round(x_final).astype(int), np.round(y_final).astype(int))) - - @track_function("generate_ellipse_cloudPoint",True) - def generate_ellipse_cloudPoint(self,start, end, num_cycle_factor=None, width=None, direction:Literal["CW","CCW"] ='CW'): - # 计算椭圆的中心(两个端点的中点) - center = (start + end) / 2 - if num_cycle_factor is None: - num_cycle_factor = 1 - - num_cycle_factor = np.round(num_cycle_factor).astype(int) - - distance = np.sqrt((end[0] - start[0])**2 + (end[1] - start[1])**2) - - if width is None: - width = distance/2 - - - if distance < 2: - print("距离过短,无法绘制椭圆") - return 0 - if width <= 2: - print("椭圆半径过小,无法绘制椭圆") - width = self.width_default - if num_cycle_factor < 1: - print("圈数必须为大于1的整数") - num_cycle_factor = 1 - - # 计算两个端点的距离,即长半轴长度 - a = np.sqrt((end[0] - start[0])**2 + (end[1] - start[1])**2) / 2 - - # 假设短半轴长度为常量值radius - b = width / 2 - - # 计算旋转角度(由端点的方向确定) - angle = np.arctan2(end[1] - start[1], end[0] - start[0]) - points = np.round(((num_cycle_factor + 1) * np.pi * 2)/(np.pi/9)).astype(int) - - # 绘制椭圆的参数化公式 - theta = np.linspace(np.pi, (2+num_cycle_factor*2)* np.pi, points) - - # 计算螺旋的x, y坐标 - if direction == "CCW": - # 反向旋转时,使用负的角度来反向螺旋 - x_ellipse = a * np.cos(-theta) #顺时针 - y_ellipse = b * np.sin(-theta) - else: - # 正常旋转 - x_ellipse = a * np.cos(theta) - y_ellipse = b * np.sin(theta) - - # 绕中心旋转椭圆 - x_rot = center[0] + x_ellipse * np.cos(angle) - y_ellipse * np.sin(angle) - y_rot = center[1] + x_ellipse * np.sin(angle) + y_ellipse * np.cos(angle) - - # 返回椭圆点云,保留整数值 - return np.hstack((np.round(x_rot).astype(int).reshape(-1, 1), np.round(y_rot).astype(int).reshape(-1, 1))) - - @track_function("generate_Lemniscate_cloudPoint",True) - def generate_Lemniscate_cloudPoint(self, start, end, width = None,num_cycle_factor = None, direction:Literal["CW","CCW"] ='CW'): - """ - 生成从start到end的8字形插补路径 - - :param start: 起点坐标 (x0, y0) - :param end: 终点坐标 (x1, y1) - :param num_points: 插补点的数量 - :return: 插补路径的坐标列表 - """ - # 起点和终点的坐标 - x1, y1 = start - x0, y0 = end - step_degree=np.pi/12 - - # 计算轨迹的缩放因子和旋转角度 - distance = np.linalg.norm([x1 - x0, y1 - y0]) - - if width is None: - width = self.width_default - if num_cycle_factor is None: - num_cycle_factor = 1 - if distance < 2: - print("距离过短,无法绘制螺旋线") - return 0 - if width <= 2: - width = 2 - if num_cycle_factor < 1: - print("圈数必须为大于1的整数") - num_cycle_factor = 1 - - a = distance / 2 # 轨迹的宽度一半 - b = width/2 - angle = np.arctan2(y1 - y0, x1 - x0) - - num_points = np.abs((2*num_cycle_factor+1)*np.pi/step_degree).astype(int) - - # 生成标准的8字形轨迹 (Gerono卵形线) - t_values = np.linspace(0.5*np.pi, (2*num_cycle_factor + 1.5)* np.pi, num_points) - - x_curve = a * np.sin(t_values) - if direction == 'CCW': - y_curve = -b * np.sin(2 * t_values) - else: - y_curve = b * np.sin(2 * t_values) - - # 构建旋转矩阵 - rotation_matrix = np.array([[np.cos(angle), -np.sin(angle)], - [np.sin(angle), np.cos(angle)]]) - - # 旋转并平移轨迹 - curve_points = np.vstack((x_curve, y_curve)).T - rotated_points = curve_points @ rotation_matrix.T - shifted_points = rotated_points + [(x0 + x1) / 2, (y0 + y1) / 2] - - # 将插值结果四舍五入为整数 - return np.round(shifted_points).astype(int) - - @track_function("generate_circle_cloudPoint",True) - def generate_circle_cloudPoint(self, start_point, center, step_degree=np.pi/12, num_cycle_factor = 3): - """ - center: 圆心坐标,形如 (x_c, y_c) - start_point: 起始点坐标,形如 (x_0, y_0) - radius: 圆的半径 - delta_theta: 每次插补的角度增量 - num_turns: 绕圈的次数 - """ - # 确定总共需要生成的插补点数 - num_points = int((2 * np.pi * num_cycle_factor) / step_degree) - - # 圆心 - x_c, y_c = center - - radius = np.linalg.norm(np.array(start_point) - np.array(center)) # 半径 - - # 计算起始点的初始角度 - x_0, y_0 = start_point - theta_0 = np.arctan2(y_0 - y_c, x_0 - x_c) - - # 初始化存储插补点的列表 - circle_points = [] - - # 生成插补点 - for i in range(num_points): - # 当前角度 - theta_i = theta_0 + i * step_degree - - # 计算插补点的坐标 - x_i = x_c + radius * np.cos(theta_i) - y_i = y_c + radius * np.sin(theta_i) - - # 将点添加到列表中 - - circle_points.append((np.round(x_i).astype(int), np.round(y_i).astype(int))) - - circle_points.append((np.round(x_0).astype(int), np.round(y_0).astype(int))) - - return circle_points - - @track_function("generate_point_cloudPoint",True) - def generate_point_cloudPoint(self, point, num_cycle_factor=None, width=None, direction: Literal["CW", "CCW"] = 'CW'): - if num_cycle_factor is None: - num_cycle_factor = 12 - if num_cycle_factor < 1: - print("圈数必须为大于1的整数") - num_cycle_factor = 1 - num_cycle_factor = np.abs(np.round(num_cycle_factor).astype(int)) - if width is None: - width = 20 - if width < 2: - print("宽度必须为大于2的整数") - width = 2 - width = np.abs(np.round(width).astype(int)) - - radius = width/2 - - points = np.round((num_cycle_factor * np.pi * 2)/(np.pi/9)).astype(int) - thetas = np.linspace(0, num_cycle_factor * 2 * np.pi, points) - radiuss = np.ones_like(thetas) * radius - - for i in range(len(thetas)): - if thetas[i] <= np.pi/2: - radiuss[i] = np.sin(thetas[i]) * radius - if thetas[i] >= num_cycle_factor * np.pi * 2 - np.pi / 2: - radiuss[i] = np.sin(num_cycle_factor * np.pi * 2 - thetas[i]) * radius - - - - if direction == "CCW": - x_ellipse = radiuss * np.cos(-thetas) - y_ellipse = radiuss * np.sin(-thetas) - else: - x_ellipse = radiuss * np.cos(thetas) - y_ellipse = radiuss * np.sin(thetas) - x_rot = point[0] + x_ellipse - y_rot = point[1] + y_ellipse - result_points = np.hstack((np.round(x_rot).astype(int).reshape(-1, 1), np.round(y_rot).astype(int).reshape(-1, 1))) - return self.resample_curve_strict(points=result_points, num_resampled_points=len(result_points)) - - @track_function("generate_line_cloudPoint",True) - def generate_line_cloudPoint(self,start, end, step_distance = 5): - """ - 生成起点和终点之间的线性插值点 - - 参数: - start: 起点 (标量或数组,例如 [x1, y1, z1]) - end: 终点 (标量或数组,例如 [x2, y2, z2]) - num_points: 插值点的数量(包括起点和终点) - - 返回: - 包含线性插值点的数组 - """ - start = np.array(start) - end = np.array(end) - distance = np.linalg.norm(end - start) - num_points = int(distance / step_distance) - - if num_points <=2: - num_points = 2 - # 使用 numpy 的 linspace 在起点和终点之间生成线性插值点 - interpolated_points = np.linspace(start, end, num_points) - - # 将插值结果四舍五入为整数 - return np.round(interpolated_points).astype(int) - - @track_function("generate_repeat_line_cloudPoint",True) - def generate_repeat_line_cloudPoint(self, start_point, end_point, num_cycle_factor = 5): - """ - start_point: 起始点坐标,形如 (x_0, y_0) - end_point: 终点坐标,形如 (x_0, y_0) - num_cycle_factor: 重复的次数 - """ - - # 初始化存储插补点的列表 - result_points = [] - # 生成插补点 - for i in range(num_cycle_factor): - # 将点添加到列表中 - result_points.append(start_point) - result_points.append(end_point) - return np.round(np.array(result_points)).astype(int) - - @track_function("generate_finger_circle_cloudPoint",True) - def generate_finger_circle_cloudPoint(self, center_point, num_cycle_factor=5, radius=30,deviation=15): - """ - 生成一指禅绕球心的圆弧轨迹 - center_point: 圆心坐标 [x, y, z, roll, pitch, yaw] - num_cycle_factor: 轨迹的重复次数 - radius: 圆的半径 - """ - result_points = [] - result_points.append(copy.deepcopy(center_point)) - deviation = deviation * np.pi/180 - - totalCounts = int(360 / radius) - shift = 1 - - - for i in range(num_cycle_factor): - for countR in range(0, totalCounts): - # 当前旋转矩阵 - temp_pose = copy.deepcopy(center_point) - rotation = R.from_euler('xyz', temp_pose[3:], degrees=False) - current_rotation_matrix = rotation.as_matrix() - - - if i == 0: - shift = countR/(90/radius) if countR/(90/radius) < 1 else 1 - if i == num_cycle_factor-1: - shift = (totalCounts-countR)/(90/radius) if (totalCounts-countR)/(90/radius) < 1 else 1 - - - # 生成增量旋转矩阵 - increment_rotation= R.from_euler('xyz', [deviation*shift,0,countR * np.pi*2 /totalCounts], degrees=False).as_matrix() - - # 更新旋转矩阵 - updated_rotation_matrix = current_rotation_matrix @ increment_rotation - - # 将旋转矩阵转换回欧拉角 - temp_pose[3:] = R.from_matrix(updated_rotation_matrix).as_euler('xyz', degrees=False) - - # 添加当前点到结果列表 - result_points.append(copy.deepcopy(temp_pose)) - - result_points.append(copy.deepcopy(center_point)) - return result_points - - def traj_generate(self, t: Union[int, float, List[float]], pose = None, wrench = None, interpolation: Literal["linear", "cubic","circle","cloud_point"] = 'linear',**kwargs): - """ - 轨迹生成 - param: - t: 时间或时间序列 - pose(nx6): 世界坐标系下的位姿或位姿序列,当输入为None时,表示保持当前位姿 - wrench(nx6): 力矩或力矩序列,当输入为None时,表示保持当前的力 - interpolation: 插值方式 - return: - traj: 生成的轨迹 -> dict - """ - # 确保时间输入是列表 - if isinstance(t, (int, float)): - t = [t] - elif not isinstance(t, list) or any(not isinstance(i, (int, float)) for i in t): - self.logger.log_error("not isinstance(t, list) or any(not isinstance(i, (int, float)) for i in t)") - return None - # 检查时间点数量 - if len(t) < 1: - self.logger.log_error("len(t) < 1") - return None - # 时间序列 - time_points = [0.0] + t - time_points = np.array(time_points) - dt = self.command_rate.to_sec() - - for i in range(len(time_points)): - time_points[i] = np.round(time_points[i] / dt) * dt - if np.abs(time_points[-1]) < dt: - time_points[-1] = dt - # 由于浮点数精度问题,time_step要乘0.9 - time_seq = np.arange(time_points[0], time_points[-1] + dt * 0.9, dt) - pos_traj = None - quat_traj = None - wrench_traj = None - pos_vel = None - angular_vel = None - # 位置轨迹与速度轨迹生成 - if pose is not None: - if interpolation != 'cloud_point': - pose = np.array(pose).reshape(-1, 6) - if pose.shape[0] != len(t): - self.logger.log_error("pose.shape[0] != len(t)") - print(pose.shape[0],len(t)) - return None - pose_matrix = self.convert_to_7d(pose) - else: - pose = np.array(pose) - try: - # 插值 - if interpolation == 'linear': - pos_traj, quat_traj = linear_interpolate(positions=pose_matrix[:, :3], quaternions=pose_matrix[:, 3:], time_points=time_points, time_step=dt) - elif interpolation == 'cubic': - pos_traj, quat_traj = spline_interpolate(positions=pose_matrix[:, :3], quaternions=pose_matrix[:, 3:], time_points=time_points, time_step=dt) - elif interpolation == 'circle': - if self.ion is not None: - pos_traj = circle_trajectory(center=pose, radius=0.07,time_points=time_points,time_step=dt,**kwargs) - else: - pos_traj = circle_trajectory(center=pose,time_points=time_points,time_step=dt,**kwargs) - quat_traj = np.tile(R.from_euler('xyz', np.array(pose[0][3:])).as_quat(), (pos_traj.shape[0], 1)) - elif interpolation == 'cloud_point': - pos_traj, quat_traj = cloud_point_interpolate(positions=pose[:, :3], quaternions=pose[:, 3:], time_points=time_points, time_step=dt) - # self.logger.log_yellow(f'{pos_traj[0]} {pos_traj[-1]}') - except ValueError as error: - self.logger.log_error(error) - return None - else: - self.logger.log_error("未输入位置进行规划") - - # TODO 力轨迹生成 改为线性增加 - if wrench is not None: - wrench = np.array(wrench).reshape(-1, 6) - if wrench.shape[0] != len(t): - return None - indices = np.searchsorted(time_points, time_seq, side='left') - 1 - indices = np.clip(indices, 0, len(wrench) - 1) - wrench_traj = wrench[indices] - # 构建返回数据字典 - traj = { - 't': time_seq, - 'pos': pos_traj, - 'quat': quat_traj, - 'pos_vel': pos_vel, - 'angular_vel': angular_vel, - 'wrench': wrench_traj - } - return traj - - - def wrench_traj_generate(self, t: Union[int, float, List[float]], wrench = None, interpolation: Literal["linear", "oscillation"] = 'linear',**kwargs): - """ - 力/力矩轨迹生成 - param: - t: 时间或时间序列 - wrench(nx6): 力矩或力矩序列 - interpolation: 插值方式 - return: - traj: 生成的轨迹 -> dict - """ - # 确保时间输入是列表 - if isinstance(t, (int, float)): - t = [t] - elif not isinstance(t, list) or any(not isinstance(i, (int, float)) for i in t): - self.logger.log_error("not isinstance(t, list) or any(not isinstance(i, (int, float)) for i in t)") - return None - # 检查时间点数量 - if len(t) < 1: - self.logger.log_error("len(t) < 1") - return None - - if wrench is None: - self.logger.log_error("未输入力矩进行规划") - return None - # 时间序列 - time_points = [0.0] + t - time_points = np.array(time_points) - dt = self.command_rate.to_sec() - - for i in range(len(time_points)): - time_points[i] = np.round(time_points[i] / dt) * dt - if np.abs(time_points[-1]) < dt: - time_points[-1] = dt - # 由于浮点数精度问题,time_step要乘0.9 - time_seq = np.arange(time_points[0], time_points[-1] + dt * 0.9, dt) - - wrench_traj = None - - - - - # TODO 力轨迹生成 改为线性增加 - if wrench is not None: - wrench = np.array(wrench).reshape(-1, 6) - - - if wrench.shape[0] != len(t): - return None - if interpolation == 'oscillation': - wrench_traj = oscillation_wrench_interpolate(wrench=wrench, time_points=time_points, time_step=dt) - else: - wrench_traj = linear_wrench_interpolate(wrench=wrench, time_points=time_points, time_step=dt) - - # indices = np.searchsorted(time_points, time_seq, side='left') - 1 - - - # indices = np.clip(indices, 0, len(wrench) - 1) - - # wrench_traj = wrench[indices] - - - # 构建返回数据字典 - traj = { - 't': time_seq, - 'wrench': wrench_traj - } - return traj - - def force_traj_generate(self, t: Union[int, float, List[float]], wrench = None, interpolation: Literal["linear", "oscillation", "auto"] = "auto",**kwargs): - """ - 力/力矩轨迹生成 - param: - t: 时间或时间序列 - wrench(nx6): 力矩或力矩序列 - interpolation: 插值方式 - return: - traj: 生成的轨迹 -> dict - """ - # 确保时间输入是列表 - if isinstance(t, (int, float)): - t = [t] - elif not isinstance(t, list) or any(not isinstance(i, (int, float)) for i in t): - self.logger.log_error("not isinstance(t, list) or any(not isinstance(i, (int, float)) for i in t)") - return None - # 检查时间点数量 - if len(t) < 1: - self.logger.log_error("len(t) < 1") - return None - - if wrench is None: - self.logger.log_error("未输入力矩进行规划") - return None - # 时间序列 - time_points = [0.0] + t - time_points = np.array(time_points) - dt = self.command_rate.to_sec() - - for i in range(len(time_points)): - time_points[i] = np.round(time_points[i] / dt) * dt - if np.abs(time_points[-1]) < dt: - time_points[-1] = dt - # 由于浮点数精度问题,time_step要乘0.9 - time_seq = np.arange(time_points[0], time_points[-1] + dt * 0.9, dt) - - wrench_traj = None - - - - - # TODO 力轨迹生成 改为线性增加 - if wrench is not None: - wrench = np.array(wrench).reshape(-1, 6) - - - if wrench.shape[0] != len(t): - return None - if interpolation == 'oscillation': - wrench_traj = self.force_plan.oscillation_wrench_interpolate(wrench=wrench, time_points=time_points, time_step=dt) - elif interpolation == "auto": - # target_wrench = [] - # target_wrench.append(wrench) - wrench_traj = self.force_plan.S_shaped_wrench_interpolate(start=self.arm_state.desired_wrench, wrench=wrench, time_points=time_points, time_step=dt) - else: - wrench_traj = self.force_plan.linear_wrench_interpolate(wrench=wrench, time_points=time_points, time_step=dt) - - # indices = np.searchsorted(time_points, time_seq, side='left') - 1 - - - # indices = np.clip(indices, 0, len(wrench) - 1) - - # wrench_traj = wrench[indices] - - - # 构建返回数据字典 - traj = { - 't': time_seq, - 'wrench': wrench_traj - } - return traj - - - - if __name__ == "__main__": - waypoints = [ - {"time": 0.0, "position": [0.25, -0.135, 0.3443], "velocity": [0, 0, 0], - "acceleration": [0, 0, 0], - "orientation": R.from_euler("xyz", [0, 0, 0], degrees=True).as_quat()}, - {"time": 5.0, "position": [0.30, -0.135, 0.3043], "velocity": [0, 0, 0], - "acceleration": [0, 0, 0], - "orientation": R.from_euler("xyz", [0, 30, 30], degrees=True).as_quat()}, - ] ## 单位 m deg - myInterpolate = TrajectoryInterpolator(waypoints) - ts = np.linspace(0,5,100) - robot = MassageRobot(arm_config_path="/home/jsfb/jsfb/MassageRobot_Dobot/Massage/MassageControl/config/robot_config.yaml") - xr_list, vr_list, ar_list = [], [], [] - for t in ts: - xr, vr, ar = myInterpolate.interpolate(t) - xr_list.append(xr) - vr_list.append(vr) - ar_list.append(ar) - xr_array = np.array(xr_list) - vr_array = np.array(vr_list) - ar_array = np.array(ar_list) - - robot.xr = xr_array - robot.vr = vr_array - robot.ar = ar_array - robot.ts = ts - robot.dt = ts[1] - ts[0] - - - ready_pose = [250.0, -135.0, 344.3392, -180.0, 0.0, -90.0] - robot.current_head = 'finger_head' - - robot.force_sensor.disable_active_transmission() - time.sleep(0.5) - robot.force_sensor.disable_active_transmission() - time.sleep(0.5) - robot.force_sensor.disable_active_transmission() - time.sleep(0.5) - robot.force_sensor.set_zero() - time.sleep(0.5) - robot.force_sensor.enable_active_transmission() - - robot.init_hardwares(ready_pose) - time.sleep(3) - robot.controller_manager.switch_controller('admittance') - # robot.controller_manager.switch_controller('position') - - - - atexit.register(robot.force_sensor.disconnect) - robot.arm_state.desired_wrench = np.array([0,0,-0.5,0,0,0]) - try: - robot_thread = threading.Thread(target=robot.start) - robot_thread.start() - except KeyboardInterrupt: - print("用户中断操作。") - except Exception as e: - print("Exception occurred at line:", e.__traceback__.tb_lineno) - print("发生异常:", e) - - \ No newline at end of file +""" + +def track_function(function_name,log = False): + def before(self, *args, **kwargs): + self.current_function = function_name + if log: + self.logger.log_info(f"Entering function: {function_name}") + def after(self, result, *args, **kwargs): + self.current_function = None + if log: + self.logger.log_info(f"Exiting function: {function_name},code: {result}") + return custom_decorator(before=before, after=after) + +@apply_decorators +class MassageRobot: + def __init__(self,arm_config_path,is_log=False): + self.logger = CustomLogger() + if is_log: + self.logger_data = CustomLogger() + # 日志线程 + threading.Thread(target=self.log_thread,daemon=True).start() + + self.vtxdb = VTXClient() + + # 当前执行的函数 + self.current_function = None + self.arm_state = ArmState() + self.arm_config = read_yaml(arm_config_path) + # arm 实例化时机械臂类内部进行通讯连接 + # 初始化坐标系 TOOL=0 BASE=1 + self.arm = dobot_nova5(arm_ip = self.arm_config['arm_ip']) + self.arm.start() +<<<<<<< HEAD + self.arm.chooseRightFrame() +======= + self.arm.init() +>>>>>>> cc63eb47a5fd86a36d8af9fbf38a4b10a4c2ff7f + + self.thermotherapy = None + self.shockwave = None + self.stone = None + self.heat = None + self.ion = None + self.force_plan = ForcePlanner() + # 传感器 + # self.force_sensor = None + self.force_sensor = XjcSensor() + # self.force_sensor.connect() + # 控制器,初始为导纳控制 + self.controller_manager = ControllerManager(self.arm_state) + self.controller_manager.add_controller(AdmittanceController,'admittance',self.arm_config['controller'][0]) + # self.controller_manager.add_controller(HybridController,'hybrid',self.arm_config['controller'][1]) + self.controller_manager.add_controller(PositionController,'position',self.arm_config['controller'][1]) + # self.controller_manager.add_controller(AdmitHybridController,'admithybrid',self.arm_config['controller'][3]) + # self.controller_manager.add_controller(HybridPidController,'hybridPid',self.arm_config['controller'][4]) + # self.controller_manager.add_controller(HybridAdmitController,'hybridAdmit',self.arm_config['controller'][5]) + # self.controller_manager.add_controller(PositionerSensorController,'positionerSensor',self.arm_config['controller'][6]) + # self.controller_manager.add_controller(AdmittanceZController,'admittanceZ',self.arm_config['controller'][7]) + + self.controller_manager.switch_controller('admittance') + +<<<<<<< HEAD + # 读取数据 + self.gravity_base = None + self.force_zero = None + self.torque_zero = None + self.tool_position = None + self.mass_center_position = None + self.s_tf_matrix_t = None + arm_orientation_set0 = np.array([-180,-30,0]) + self.b_rotation_s_set0 = R.from_euler('xyz',arm_orientation_set0,degrees=True).as_matrix() +======= + + +>>>>>>> cc63eb47a5fd86a36d8af9fbf38a4b10a4c2ff7f + + # 频率数据赋值 + self.control_rate = Rate(self.arm_config['control_rate']) + self.sensor_rate = Rate(self.arm_config['sensor_rate']) + self.command_rate = Rate(self.arm_config['command_rate']) + + # 低通滤波 + self.cutoff_freq = 80.0 + + # 停止标志位线程 + self.exit_event = threading.Event() + self.exit_event.set() # 运行初始化为True + self.interrupt_event = threading.Event() + self.interrupt_event.clear() # 中断初始化为False + self.pause_envent = threading.Event() + self.pause_envent.clear() # 暂停初始化为False + self.skip_envent = threading.Event() + self.skip_envent.clear() # 跳过初始化为False + # 按摩调整 + self.adjust_wrench_envent = threading.Event() + self.adjust_wrench_envent.clear() # 调整初始化为False + self.is_execute = False + self.pos_increment = np.zeros(3,dtype=np.float64) + self.adjust_wrench = np.zeros(6,dtype=np.float64) + self.skip_pos = np.zeros(6,dtype=np.float64) + + # 按摩头参数暂时使用本地数据 + massage_head_dir = self.arm_config['massage_head_dir'] + all_items = os.listdir(massage_head_dir) + head_config_files = [f for f in all_items if os.path.isfile(os.path.join(massage_head_dir, f))] + self.playload_dict = {} + for file in head_config_files: + file_address = massage_head_dir + '/' + file + play_load = read_yaml(file_address) + self.playload_dict[play_load['name']] = play_load + # self.playload_dict = self.vtxdb.get("robot_config", "massage_head") + # print(self.playload_dict) + self.current_head = 'none' + self.is_waitting = False + + self.last_print_time = 0 + self.last_record_time = 0 + self.last_command_time = 0 + self.move_to_point_count = 0 + self.width_default = 5 + # 卡尔曼滤波相关初始化 + self.x_base = np.zeros(6) + self.P_base = np.eye(6) + # 过程噪声协方差矩阵 + self.Q_base = np.eye(6) * 0.01 + # 测量噪声协方差矩阵 + self.R_base = np.eye(6) * 0.1 + + self.x_tcp = np.zeros(6) + self.P_tcp = np.eye(6) + # 过程噪声协方差矩阵 + self.Q_tcp = np.eye(6) * 0.01 + # 测量噪声协方差矩阵 + self.R_tcp = np.eye(6) * 0.1 + # 传感器故障计数器 + self.sensor_fail_count = 0 + # 机械臂初始化,适配中间层 + # 读取数据 + self.gravity_base = None + self.force_zero = None + self.torque_zero = None + self.tool_position = None + self.mass_center_position = None + self.s_tf_matrix_t = None + arm_orientation_set0 = np.array([-180,0,-90]) + self.b_rotation_s_set0 = R.from_euler('xyz',arm_orientation_set0,degrees=True).as_matrix() + + + # REF TRAJ + self.xr = [] + self.vr = [] + self.ar = [] + self.ts = [] # time sequence + self.dt = [] # derivate of time + # --- + self.last_time = -1 + self.cur_time = -1 + + # 预测步骤 + def kalman_predict(self,x, P, Q): + # 预测状态(这里假设状态不变) + x_predict = x + # 预测误差协方差 + P_predict = P + Q + return x_predict, P_predict + + # 更新步骤 + def kalman_update(self,x_predict, P_predict, z, R): + # 卡尔曼增益 + # K = P_predict @ np.linalg.inv(P_predict + R) + S = P_predict + R + s = np.diag(S) # shape (6,) + p_diag = np.diag(P_predict) + K_diag = np.zeros_like(s) + nonzero_mask = s != 0 + K_diag[nonzero_mask] = p_diag[nonzero_mask] / s[nonzero_mask] + K = np.diag(K_diag) + # 更新状态 + x_update = x_predict + K @ (z - x_predict) + # 更新误差协方差 + P_update = (np.eye(len(K)) - K) @ P_predict + return x_update, P_update + def init_hardwares(self,ready_pose): + self.ready_pose = np.array(ready_pose) + self.switch_payload(self.current_head) + print(self.arm.get_arm_position()) + time.sleep(0.5) + + + def sensor_set_zero(self): + self.arm_position_set0,self.arm_orientation_set0 = self.arm.get_arm_position() + arm_orientation_set0 = self.arm_orientation_set0 + print("arm_orientation_set0",R.from_quat(arm_orientation_set0).as_euler('xyz',degrees=True)) + self.b_rotation_s_set0 = R.from_quat(arm_orientation_set0).as_matrix() + ######### + max_retries = 5 + #no_head_wrong=np.array([-80.01506042, 116.34187317, -87.65788269, -0.32910481, -0.65792173, -0.61110526])#旧版传感器) + head_set0=np.array([0, 0, 0, 0, 0, 0])#新版传感器) + min_tolerance=0.001 + retry_count = 0 + self.force_sensor.disable_active_transmission() + time.sleep(0.5) + while retry_count < max_retries: + # 调用读取数据的函数,假设返回值是一个 numpy 数组 + data = self.force_sensor.read_data_f32() + data = self.force_sensor.read_data_f32() + data = self.force_sensor.read_data_f32() + + # 检查返回值是否为 np.array 类型 + if isinstance(data, np.ndarray): + # 判断数据是否接近目标值 + + if np.allclose(data, head_set0, atol=min_tolerance): + print(f"检测到力传感器已置零:{data}") + break + else: + print(f"检测到力传感器未置零:{data}") + + # 禁用传感器传输 + self.force_sensor.disable_active_transmission() + self.force_sensor.disable_active_transmission() + time.sleep(0.5) + + # 尝试设置传感器清零 + code = self.force_sensor.set_zero() + code = self.force_sensor.set_zero() + max_try = 3 + + while code != 0 and max_try > 0: + self.logger.log_error("Force sensor set zero failed, retrying...") + self.force_sensor.disable_active_transmission() + self.force_sensor.disable_active_transmission() + time.sleep(0.5) + code = self.force_sensor.set_zero() + code = self.force_sensor.set_zero() + max_try -= 1 + time.sleep(0.5) + + # 如果多次尝试后失败 + if max_try == 0: + self.logger.log_error("Force sensor set zero failed, exiting...") + requests.post( + "http://127.0.0.1:5000/on_message", data={"message": "传感器初始化失败"} + ) + requests.post( + "http://127.0.0.1:5000/update_massage_status", data={"massage_service_started": False} + ) + # sys.exit(0) + return -1 # 置零失败 + + # 成功后跳出主循环 + break + + else: + self.logger.log_error("读取数据失败或格式错误,尝试重试...") + + # 增加重试次数 + retry_count += 1 + self.force_sensor.disable_active_transmission() + self.force_sensor.disconnect() + time.sleep(1) + self.force_sensor = XjcSensor() + time.sleep(1) # 每次重试之间添加一个短暂的延迟 + + if retry_count == max_retries: + self.logger.log_error(f"超过最大重试次数 ({max_retries}),操作失败,退出程序...") + requests.post( + "http://127.0.0.1:5000/on_message", data={"message": "传感器初始化失败"} + ) + requests.post( + "http://127.0.0.1:5000/update_massage_status", data={"massage_service_started": False} + ) + return -2 #读取失败 + + return 0 #读取成功并置零成功 + + def sensor_enable(self): + code = self.force_sensor.enable_active_transmission() + max_try = 3 + while code!= 0 and max_try > 0: + self.logger.log_error("Force sensor enable_active_transmission failed,retrying...") + code = self.force_sensor.enable_active_transmission() + max_try -= 1 + time.sleep(0.1) + if max_try == 0: + self.logger.log_error("Force sensor enable_active_transmission failed,exiting...") + requests.post( + "http://127.0.0.1:5000/on_message", data={"message": "传感器初始化失败"} + ) + requests.post( + "http://127.0.0.1:5000/update_massage_status", data={"massage_service_started": False} + ) + # sys.exit(0) + return -1 + return 0 + + + + def update_wrench(self): + # 当前的机械臂到末端转换 (实时) + b_rotation_s = R.from_quat(self.arm_state.arm_orientation).as_matrix() + # 读取数据 + sensor_data = self.force_sensor.read() + # self.logger.log_info(f"传感器数据{sensor_data}") + if sensor_data is None: + self.force_sensor.stop_background_reading() + self.logger.log_error("传感器数据读取失败") + return -1 + # 反重力补偿 + sensor_data[:3] = sensor_data[:3] + self.force_zero + self.b_rotation_s_set0.T @ self.gravity_base + sensor_data[3:] = sensor_data[3:] + self.torque_zero + np.cross(self.mass_center_position, self.b_rotation_s_set0.T @ self.gravity_base) + # 传感器数据通过矫正计算得到外来施加力 传感器坐标系下 + # 重力 + gravity_in_sensor = b_rotation_s.T @ self.gravity_base + s_force = sensor_data[:3] - self.force_zero - gravity_in_sensor + # 力矩 + s_torque = sensor_data[3:] - self.torque_zero - np.cross(self.mass_center_position,gravity_in_sensor) + wrench = np.concatenate([s_force,s_torque]) + # 传感器到TCP + wrench = self.wrench_coordinate_conversion(self.s_tf_matrix_t,wrench) + # 交给ARM STATE集中管理 + self.arm_state.external_wrench_tcp = wrench + # 对tcp坐标系下的外力外矩进行平滑 + x_tcp_predict, P_tcp_predict = self.kalman_predict(x = self.x_tcp, + P = self.P_tcp, + Q = self.Q_tcp) + self.x_tcp, self.P_tcp = self.kalman_update(x_predict = x_tcp_predict, + P_predict = P_tcp_predict, + z = self.arm_state.external_wrench_tcp, + R = self.R_tcp) + self.arm_state.external_wrench_tcp = self.x_tcp + self.arm_state.last_external_wrench_tcp = self.arm_state.external_wrench_tcp + return 0 + + def switch_payload(self,name): + if name in self.playload_dict: + self.stop() + self.current_head = name + + compensation_config = self.playload_dict[self.current_head] + + # 读取数据 + self.gravity_base = np.array(compensation_config['gravity_base']) + self.force_zero = np.array(compensation_config['force_zero']) + self.torque_zero = np.array(compensation_config['torque_zero']) + self.tool_position = np.array(compensation_config['tcp_offset']) + self.mass_center_position = np.array(compensation_config['mass_center_position']) + self.s_tf_matrix_t = self.get_tf_matrix(self.tool_position[:3], R.from_euler('xyz',self.tool_position[3:],degrees=False).as_quat()) + + tcp_offset = self.playload_dict[name]["tcp_offset"] + tcp_offset_str = "{" + ",".join(map(str, tcp_offset)) + "}" + print("tcp_offset_str",tcp_offset_str) + self.arm.setEndEffector(i=9,tool_i=tcp_offset_str) + self.arm.chooseEndEffector(i=9) + print(self.arm.get_arm_position()) + self.logger.log_info(f"切换到{name}按摩头") + + # ####################### 位姿伺服 ########################## + + def arm_measure_loop(self): + self.logger.log_info("机械臂测量线程启动") + while (not self.arm.is_exit) and (not self.exit_event.is_set()): + try: + if not self.is_waitting: + self.arm_state.arm_position,self.arm_state.arm_orientation = self.arm.get_arm_position() + code = self.update_wrench() + if code == -1: + self.sensor_fail_count += 1 + self.logger.log_error(f"传感器线程数据读取失败-{self.sensor_fail_count}") + if self.sensor_fail_count > 10: + self.logger.log_error("传感器线程数据读取失败超过10次,程序终止") + self.stop() + break + else: + self.sensor_fail_count = 0 + except Exception as e: + self.logger.log_error(f"机械臂测量线程报错:{e}") + self.exit_event.set() # 控制退出while + self.sensor_rate.sleep() # 控制频率 + + def arm_command_loop_traj(self): + self.logger.log_info("机械臂控制线程启动") + # 离线轨迹启动时间记录 + self.traj_start_time = time.time() + while (not self.arm.is_exit) and (not self.exit_event.is_set()): + try: + if not self.is_waitting: + process_start_time = time.time() + control_cycle = self.control_rate.to_sec() + t_now = time.time() - self.traj_start_time + i = min(int(t_now / self.dt), len(self.ts) - 2) + alpha = max(0.0, min(1.0, (t_now - self.ts[i]) / self.dt)) + + pos = (1 - alpha) * self.xr[i][:3] + alpha * self.xr[i+1][:3] + + slerp = Slerp([0, 1], R.concatenate([R.from_quat(self.xr[i][3:]), R.from_quat(self.xr[i+1][3:])])) + alpha = 0.5 + rot_interp = slerp(alpha).as_quat() + + xr = np.concatenate([pos, rot_interp]) + # xr = (1-alpha) * self.xr[i] + alpha * self.xr[i+1] + vr = (1 - alpha) * self.vr[i] + alpha * self.vr[i + 1] + ar = (1 - alpha) * self.ar[i] + alpha * self.ar[i + 1] + + self.arm_state.desired_position = xr[:3] + # self.arm_state.desired_orientation = xr[3:] + # print(111) + self.arm_state.desired_orientation = rot_interp + # print(222) + self.arm_state.desired_twist = vr + self.arm_state.desired_acc = ar + + print("self.arm_state.desired_position",self.arm_state.desired_position) + print("self.arm_state.desired_orientation",self.arm_state.desired_orientation) + print("self.arm_state.desired_twist",self.arm_state.desired_twist) + print("self.arm_state.desired_acc",self.arm_state.desired_acc) + + self.controller_manager.step_traj(control_cycle) + # self.controller_manager.step(control_cycle) + command_pose = np.concatenate([ + self.arm_state.arm_position_command * 1000, # 转毫米 + self.arm_state.arm_orientation_command + ]) + ## 输入滤波 + status, pose_processed = self.arm.process_command( + command_pose[:3], + command_pose[3:] + ) + # print(f"pose_processed:{pose_processed}") + + tcp_command = ( + f"ServoP({pose_processed[0]:.3f},{pose_processed[1]:.3f}," + f"{pose_processed[2]:.3f},{pose_processed[3]:.3f}," + f"{pose_processed[4]:.3f},{pose_processed[5]:.3f}," + "t=0.02, aheadtime=20, gain=200)" + ).encode() + + run_time = time.time() - process_start_time + sleep_duration = self.control_rate.to_sec() - run_time + b2 =time.time() + if sleep_duration > 0: + time.sleep(sleep_duration) + print(f"real sleep:{time.time()-b2}") + # self.arm.dashboard.socket_dobot.sendall(tcp_command) + if self.arm.feedbackData.robotMode == 10: # 10 当前工程暂停 + print("机械臂为暂停状态") + res = self.arm.dashboard.Continue() + code = self.arm.parseResultId(res)[0] + if code == 0: + print("机械臂继续已暂停的运动指令") + + except Exception as e: + self.logger.log_error(f"机械臂控制失败:{e}") + self.exit_event.set() + # print(f"循环周期时间: {time.time()-process_start_time:.4f} 秒",run_time,sleep_duration) + + def arm_command_loop(self): + self.logger.log_info("机械臂控制线程启动") + while (not self.arm.is_exit) and (not self.exit_event.is_set()): + try: + if not self.is_waitting: + process_start_time = time.time() + control_cycle = self.control_rate.to_sec() + self.controller_manager.step(control_cycle) + command_pose = np.concatenate([ + self.arm_state.arm_position_command * 1000, # 转毫米 + self.arm_state.arm_orientation_command + ]) + ## 输入滤波 + status, pose_processed = self.arm.process_command( + command_pose[:3], + command_pose[3:] + ) + # pose_processed = command_pose + print(f"pose_processed:{pose_processed}") + print(self.arm.feedbackData.robotMode) + tcp_command = ( + f"ServoP({pose_processed[0]:.3f},{pose_processed[1]:.3f}," + f"{pose_processed[2]:.3f},{pose_processed[3]:.3f}," + f"{pose_processed[4]:.3f},{pose_processed[5]:.3f}," + "t=0.02, aheadtime=20, gain=200)" + ).encode() + + run_time = time.time() - process_start_time + sleep_duration = self.control_rate.to_sec() - run_time + b2 =time.time() + if sleep_duration > 0: + time.sleep(sleep_duration) + print(f"real sleep:{time.time()-b2}") + self.arm.dashboard.socket_dobot.sendall(tcp_command) + if self.arm.feedbackData.robotMode == 10: # 10 当前工程暂停 + print("机械臂为暂停状态") + res = self.arm.dashboard.Continue() + code = self.arm.parseResultId(res)[0] + if code == 0: + print("机械臂继续已暂停的运动指令") + + except Exception as e: + self.logger.log_error(f"机械臂控制失败:{e}") + self.exit_event.set() + # print(f"循环周期时间: {time.time()-process_start_time:.4f} 秒",run_time,sleep_duration) + + ####################### 位姿伺服 ########################## + + def start(self): + if self.exit_event.is_set(): + # 线程 + self.exit_event.clear() + self.arm_measure_thread = threading.Thread(target=self.arm_measure_loop) + self.arm_control_thread = threading.Thread(target=self.arm_command_loop_traj) + # 线程启动 + self.arm_measure_thread.start() ## 测量线程 + position,quat_rot = self.arm.get_arm_position() + # 初始值 + self.arm_state.desired_position = position + self.arm_state.arm_position_command = position + self.arm_state.desired_orientation = quat_rot + self.arm_state.arm_orientation_command = quat_rot + self.arm_state.arm_position = position + self.arm_state.arm_orientation = quat_rot + for i in range(20): + self.controller_manager.step(self.control_rate.to_sec()) + self.logger.log_blue(f"position command: {self.arm_state.arm_position_command}") + self.logger.log_blue(f"orientation command: {R.from_quat(self.arm_state.arm_orientation_command).as_euler('xyz',degrees=True)}") + position, quat_rot = self.arm.get_arm_position() + self.logger.log_blue(f"position current:{position}") + self.logger.log_blue(f"orientation current: {R.from_quat(quat_rot).as_euler('xyz',degrees=False)}") + self.command_rate.precise_sleep() + + position ,quat_rot = self.arm.get_arm_position() + # self.arm_state.desired_position = poistion + self.arm_state.arm_position_command = position + # self.arm_state.desired_orientation = quat_rot + self.arm_state.arm_orientation_command = quat_rot + + ## POSITION CONTROLLER TEST ## + # delta_pos = np.array([0.1,0.1,-0.1],dtype=np.float64) + # # delta_quat = R.from_euler('xyz', [0, 0, 0], degrees=True).as_quat() + # self.arm_state.desired_position = position + delta_pos + # self.arm_state.desired_orientation = quat_rot + # 线程启动 + self.arm_control_thread.start() ## 赋初始值后控制线程 + + self.logger.log_info("MassageRobot启动") + time.sleep(0.5) + + def stop(self): + if not self.exit_event.is_set(): + self.exit_event.set() + self.interrupt_event.clear() + self.arm_control_thread.join() + self.arm_measure_thread.join() + for i in range(3): + self.force_sensor.disable_active_transmission() + self.force_sensor.start_background_reading() + self.arm.stop_motion() + self.logger.log_info("MassageRobot停止") + + + self.controller_manager.switch_controller('position') + else: + self.logger.log_error(f"未找到{name}按摩头") + + def log_thread(self): + while True: + self.logger_data.log_info(f"机械臂位置:{self.arm_state.arm_position},机械臂姿态:{self.arm_state.arm_orientation}",is_print=False) + self.logger_data.log_info(f"机械臂期望位置:{self.arm_state.desired_position},机械臂真实位置:{self.arm_state.arm_position}",is_print=False) + self.logger_data.log_info(f"机械臂期望姿态:{self.arm_state.desired_orientation},机械臂真实姿态:{self.arm_state.arm_orientation}",is_print=False) + self.logger_data.log_info(f"机械臂期望力矩:{self.arm_state.desired_wrench},机械臂真实力矩:{self.arm_state.external_wrench_base}",is_print=False) + self.logger_data.log_info(f"当前按摩头:{self.current_head}",is_print=False) + time.sleep(1) + + # 工具函数 + def get_tf_matrix(self,position,orientation): + tf_matrix = np.eye(4) + rotation_matrix = R.from_quat(orientation).as_matrix() + tf_matrix[:3,3] = position/1000 + tf_matrix[:3,:3] = rotation_matrix + return tf_matrix + def wrench_coordinate_conversion(self,tf_matrix, wrench): + rot_matrix = tf_matrix[:3, :3] + vector_p = tf_matrix[:3, 3] + temp_force = wrench[:3] + torque = wrench[3:] + force = rot_matrix.T @ temp_force + torque = rot_matrix.T @ np.cross(temp_force,vector_p) + rot_matrix.T @ torque + return np.concatenate([force, torque]) + + @track_function("set_position",True) + def set_position(self,pose,is_wait = False): + self.logger.log_info(f"set postion:{pose}") + x,y,z = pose[0],pose[1],pose[2] + roll,pitch,yaw = pose[3],pose[4],pose[5] + pose_command = np.concatenate([[x,y,z], [roll,pitch,yaw]]) + time.sleep(0.1) + code = self.arm.RunPoint_P_inPose_M_RAD(poses_list = pose_command) + time.sleep(0.1) + return code + + @track_function("move_to_point",True) + def move_to_point(self, t, pose, is_interrupt=True, wait=True, timeout=0.5, interpolation:Literal["linear","circle","cloud_point"] ='linear', algorithm:Literal["admittance","position","hybrid","admithybrid","hybridPid","hybridAdmit","admittanceZ"] = "admittance",is_switch_controller = False): + """ + 移动到指定的点 + param: + t: 时间或时间序列 + pose(nx6): 世界坐标系下的位姿或位姿序列,当输入为None时,表示保持当前位姿 + is_interrupt: 是否允许中断 + wait: 是否等待运动完成 + timeout: 超时时间 + interpolation: 插值方式 + algorithm: 控制算法 + return: + code: 0表示成功 + 1表示超时 + 2表示用户打断 + 3表示硬件错误 + 4表示参数错误 + """ + self.move_to_point_count += 1 + self.logger.log_blue(f"move_to_point_count: {self.move_to_point_count}") + self.logger.log_yellow(f"move_to_point: {pose[-1]}") + # 在函数内部直接规划轨迹 + traj = self.traj_generate(t, pose, None, interpolation=interpolation) + if traj is None: + self.logger.log_error("轨迹未生成或生成错误") + return 4 + + self.logger.log_yellow(f"traj_quat: {R.from_quat(traj['quat'][-1]).as_euler('xyz',degrees=False)}") + self.logger.log_yellow(f"traj_pos: {traj['pos'][-1]}") + + time_seq = traj['t'] + pos_traj = traj['pos'] + quat_traj = traj['quat'] + + if is_switch_controller == True: + try: + self.controller_manager.switch_controller(algorithm) + except ValueError as error: + self.logger.log_error(error) + self.logger.log_error("切换到%s控制器失败" % algorithm) + return 4 + self.logger.log_info("切换到%s控制器" % algorithm) + + # self.logger.log_info("当前为%s控制器" % algorithm) + + for i in range(len(time_seq)): + if self.interrupt_event.is_set(): + self.interrupt_event.clear() + self.logger.log_error("self.interrupt_event.clear()") + self.arm_state.desired_position = copy.deepcopy(self.arm_state.arm_position) + self.arm_state.desired_orientation = copy.deepcopy(self.arm_state.arm_orientation) + massage_wrench = np.array([0, 0, 0, 0, 0, 0]) + self.arm_state.desired_wrench = massage_wrench + return 2 + if self.pause_envent.is_set(): + self.pause_envent.clear() + self.logger.log_error("self.pause_envent.clear()") + self.arm_state.desired_position = copy.deepcopy(self.arm_state.arm_position) + self.arm_state.desired_orientation = copy.deepcopy(self.arm_state.arm_orientation) + massage_wrench = np.array([0, 0, 0, 0, 0, 0]) + self.arm_state.desired_wrench = massage_wrench + return 5 + if self.skip_envent.is_set(): + self.logger.log_error("self.skip_envent.clear()") + self.arm_state.desired_position = copy.deepcopy(self.arm_state.arm_position) + self.arm_state.desired_orientation = copy.deepcopy(self.arm_state.arm_orientation) + massage_wrench = np.array([0, 0, 0, 0, 0, 0]) + self.arm_state.desired_wrench = massage_wrench + return 6 + if self.arm.is_exit or self.exit_event.is_set(): + print("robot_hardware_error:",self.arm.is_exit,self.exit_event.is_set()) + return 3 + if self.adjust_wrench_envent.is_set() and self.is_execute == True: + if self.adjust_wrench is not None: + self.arm_state.desired_wrench = self.adjust_wrench + self.adjust_wrench_envent.clear() + + self.arm_state.desired_position = pos_traj[i] + self.arm_state.desired_orientation = quat_traj[i] + + if self.ion is not None: + + act_position = R.from_quat(self.arm_state.arm_orientation).as_matrix().T @ self.arm_state.arm_position + desired_position = R.from_quat(self.arm_state.arm_orientation).as_matrix().T @ self.arm_state.desired_position + if self.arm_state.positionerSensorData < 0.001: + desired_position[2] = desired_position[2] - self.arm_state.desired_high + else: + desired_position[2] = min(act_position[2] + self.arm_state.positionerSensorData - self.arm_state.desired_high , desired_position[2] - self.arm_state.desired_high) + self.arm_state.desired_position = R.from_quat(self.arm_state.arm_orientation).as_matrix() @ desired_position + self.command_rate.precise_sleep() + + start = time.time() + + while wait: + if self.interrupt_event.is_set(): + self.interrupt_event.clear() + return 2 + if self.pause_envent.is_set(): + self.pause_envent.clear() + return 5 + if self.skip_envent.is_set(): + # self.skip_envent.clear() + return 6 + if self.arm.is_exit or self.exit_event.is_set(): + print("robot_hardware_error:",self.arm.is_exit,self.exit_event.is_set()) + return 3 + if time.time() - start > timeout: + return 1 + self.command_rate.precise_sleep() + + return 0 + + @track_function("apply_force",True) + def apply_force(self, t, wrench, interpolation:Literal["linear","oscillation"] = "linear", algorithm:Literal["admittance","position","hybrid","admithybrid","hybridPid","hybridAdmit","admittanceZ"] = "admittance"): + """ + 施加力 + param: + t: 时间或时间序列 + wrench(nx6): 力矩或力矩序列,当输入为None时,表示力为[0,0,0,0,0,0] + is_interrupt: 是否允许中断 + wait: 是否等待运动完成 + timeout: 超时时间 + interpolation: 插值方式 + algorithm: 控制算法 + return: + code: 0表示成功 + 1表示超时 + 2表示用户打断 + 3表示硬件错误 + 4表示参数错误 + """ + + if t==0: + self.arm_state.desired_wrench = wrench + return 0 + + # 在函数内部直接规划力矩轨迹 + traj = self.wrench_traj_generate(t, wrench, interpolation=interpolation) + # # 在函数内部直接规划力矩轨迹 + # traj = self.traj_generate(t, None, wrench, interpolation=interpolation) + if traj is None: + return 4 + + time_seq = traj['t'] + wrench_traj = traj['wrench'] + + # try: + # self.controller_manager.switch_controller(algorithm) + # except ValueError as error: + # self.logger.log_error(error) + # return 4 + + self.logger.log_info("切换到%s控制器" % algorithm) + + for i in range(len(time_seq)): + if self.interrupt_event.is_set(): + self.interrupt_event.clear() + return 2 + if self.pause_envent.is_set(): + self.pause_envent.clear() + return 5 + if self.skip_envent.is_set(): + self.skip_envent.clear() + return 6 + if self.arm.is_exit or self.exit_event.is_set(): + return 3 + if self.adjust_wrench_envent.is_set(): + # if wrench_traj is not None: + # wrench_traj[i:] = self.adjust_wrench + self.adjust_wrench_envent.clear() + # print(wrench_traj[i]) + self.arm_state.desired_wrench = wrench_traj[i] if wrench_traj is not None else np.zeros(6, dtype=np.float64) + self.command_rate.precise_sleep() + + return 0 + + @track_function("resample_curve_strict",True) + def resample_curve_strict(self, points, num_resampled_points): + """ + 修正的曲线重新采样函数,移除重复点并确保累积弧长严格递增。 + """ + # 计算累积弧长 + distances = np.sqrt(np.sum(np.diff(points, axis=0)**2, axis=1)) # 计算相邻点之间的距离 + cumulative_length = np.insert(np.cumsum(distances), 0, 0) # 计算累积弧长 + + # 确保累积弧长严格递增(去除重复值) + unique_indices = np.where(np.diff(cumulative_length, prepend=-np.inf) > 0)[0] + cumulative_length = cumulative_length[unique_indices] + points = points[unique_indices] + + # 生成均匀间隔的新弧长 + target_lengths = np.linspace(0, cumulative_length[-1], num_resampled_points) + + # 在累积弧长上进行插值计算新点位置 + x_interp = interp1d(cumulative_length, points[:, 0], kind='linear', fill_value="extrapolate") + y_interp = interp1d(cumulative_length, points[:, 1], kind='linear', fill_value="extrapolate") + + new_x = x_interp(target_lengths) + new_y = y_interp(target_lengths) + + return np.column_stack((np.round(new_x).astype(int), np.round(new_y).astype(int))) + + @track_function("generate_in_spiral_cloudPoint",True) + def generate_in_spiral_cloudPoint(self, start, end, num_cycle_factor=None, width=None, direction:Literal["CW","CCW"] ='CW'): + """ + 绘制由内向外的螺旋线。 + + 参数: + - start: 起点坐标 (x, y) + - end: 终点坐标 (x, y) + - num_cycle_factor: 螺旋的圈数 + - points: 螺旋线上的点的数量 + """ + # 计算起点和终点之间的距离 + dx = end[0] - start[0] + dy = end[1] - start[1] + distance = np.sqrt(dx**2 + dy**2) + if width is None: + width = self.width_default + if num_cycle_factor is None: + num_cycle_factor = 5 + + num_cycle_factor = np.round(num_cycle_factor).astype(int) + if distance < 2: + print("距离过短,无法绘制螺旋线") + return 0 + if width <= 2: + width = 2 + if num_cycle_factor < 1: + print("圈数必须为大于1的整数") + num_cycle_factor = 1 + angle = np.arctan2(dy, dx) # 计算从起点到终点的角度 + + points = np.round(num_cycle_factor *2*np.pi/(np.pi/9)).astype(int) + + # 螺旋的角度范围 + theta = np.linspace(0, 2 * np.pi * num_cycle_factor, points) + + # 螺旋的半径随着角度增大而线性增长 + r = np.linspace(0, distance, points) + + tempK = np.abs(width/(distance*((2*num_cycle_factor-0.75)/num_cycle_factor))) + + # 计算螺旋的x, y坐标 + if direction == "CCW": + # 反向旋转时,使用负的角度来反向螺旋 + x = r * np.cos(-theta) # 逆时针旋转 + y = r * np.sin(-theta)*tempK + else: + # 正常旋转 + x = r * np.cos(theta) + y = r * np.sin(theta)*tempK + + # 旋转螺旋线以对齐起点和终点的角度 + x_rotated = x * np.cos(angle) - y * np.sin(angle) + y_rotated = x * np.sin(angle) + y * np.cos(angle) + + # 平移螺旋线,使其从起点开始 + x_final = x_rotated + start[0] - x_rotated[0] + y_final = y_rotated + start[1] - y_rotated[0] + + # return np.hstack((np.round(x_final).astype(int).reshape(-1, 1), np.round(y_final).astype(int).reshape(-1, 1))) + result_points = np.hstack((np.round(x_final).astype(int).reshape(-1, 1), + np.round(y_final).astype(int).reshape(-1, 1))) + + return self.resample_curve_strict(points=result_points, num_resampled_points=len(result_points)) + + @track_function("generate_out_spiral_cloudPoint",True) + def generate_out_spiral_cloudPoint(self,start, end, num_cycle_factor=None, width=None, direction:Literal["CW","CCW"] ='CW'): + """ + 绘制由外向内的螺旋线。 + + 参数: + - start: 起点坐标 (x, y) + - end: 终点坐标 (x, y) + - num_cycle_factor: 螺旋的圈数 + - points: 螺旋线上的点的数量 + """ + # 计算起点和终点之间的距离 + dx = end[0] - start[0] + dy = end[1] - start[1] + angle = np.arctan2(dy, dx) # 计算从起点到终点的角度 + distance = np.sqrt(dx**2 + dy**2) + if width is None: + width = self.width_default + if num_cycle_factor is None: + num_cycle_factor = 5 + + num_cycle_factor = np.round(num_cycle_factor).astype(int) + if distance < 2: + print("距离过短,无法绘制螺旋线") + return 0 + if width <= 2: + width = 2 + if num_cycle_factor < 1: + print("圈数必须为大于1的整数") + num_cycle_factor = 1 + + points = np.round(num_cycle_factor *2*np.pi/(np.pi/9)).astype(int) + + # 螺旋的角度范围 + theta = np.linspace(0, 2 * np.pi * num_cycle_factor, points) + + # 螺旋的半径随着角度增大而线性增长 + r = np.linspace(distance, 0, points) + + tempK = np.abs(width/(distance*((2*num_cycle_factor-0.75)/num_cycle_factor))) + + # 计算螺旋的x, y坐标 + if direction == "CCW": + # 反向旋转时,使用负的角度来反向螺旋 + x = r * np.cos(-theta) # 逆时针旋转 + y = r * np.sin(-theta)*tempK + else: + # 正常旋转 + x = r * np.cos(theta) + y = r * np.sin(theta)*tempK + + # 旋转螺旋线以对齐起点和终点的角度 + x = x * np.cos(np.pi) - y * np.sin(np.pi) + y = x * np.sin(np.pi) + y * np.cos(np.pi) + + # 旋转螺旋线以对齐起点和终点的角度 + x_rotated = x * np.cos(angle) - y * np.sin(angle) + y_rotated = x * np.sin(angle) + y * np.cos(angle) + + # 平移螺旋线,使其从起点开始 + x_final = x_rotated + start[0] - x_rotated[0] + y_final = y_rotated + start[1] - y_rotated[0] + + # return np.hstack((np.round(x_final).astype(int).reshape(-1, 1), np.round(y_final).astype(int).reshape(-1, 1))) + result_points = np.hstack((np.round(x_final).astype(int).reshape(-1, 1), + np.round(y_final).astype(int).reshape(-1, 1))) + + return self.resample_curve_strict(points=result_points, num_resampled_points=len(result_points)) + + @track_function("generate_prolateCycloid_cloudPoint",True) + def generate_prolateCycloid_cloudPoint(self, start, end, width=None, num_cycle_factor=None, direction: Literal["CW", "CCW"] = 'CW'): + # Parameters + step_degree = np.pi / 12 + a_prolate = 4.5 + radius = 30.0 + distance = np.linalg.norm(np.array(end) - np.array(start)) + angle = np.arctan2(end[1] - start[1], end[0] - start[0]) + + if width is None: + width = self.width_default + if num_cycle_factor is None: + num_cycle_factor = 5 + num_cycle_factor = np.round(num_cycle_factor).astype(int) + if distance < 2: + print("距离过短,无法绘制Cycloid线") + return 0 + if width <= 2: + width = 2 + if num_cycle_factor < 1: + print("圈数必须为大于1的整数") + num_cycle_factor = 1 + + r_cycloid = radius / (2 * a_prolate) + num_cycles = int(num_cycle_factor) + temp_k = distance / (r_cycloid * (np.pi + 2 * a_prolate + 2 * np.pi * num_cycles)) + + # Generate the prolate cycloid + theta_prolate = np.linspace(0.5 * np.pi, num_cycles * 2 * np.pi + 1.5 * np.pi, + round((num_cycles * 2 * np.pi + 1.5 * np.pi) / step_degree)) + if direction == 'CW': + theta_prolate = np.flip(theta_prolate) + + x_prolate = r_cycloid * (theta_prolate - a_prolate * np.sin(theta_prolate)) * temp_k + y_prolate = r_cycloid * (1 - a_prolate * np.cos(theta_prolate)) + + # Scale y-prolate based on width + y_prolate = (y_prolate / 30) * width + + # Rotation of cycloid + if direction == 'CW': + x_prolate, y_prolate = -x_prolate, -y_prolate + + x_rotated = x_prolate * np.cos(angle) - y_prolate * np.sin(angle) + y_rotated = x_prolate * np.sin(angle) + y_prolate * np.cos(angle) + + # Translate to starting point + x_final = x_rotated + start[0] - x_rotated[0] + y_final = y_rotated + start[1] - y_rotated[0] + + return np.column_stack((np.round(x_final).astype(int), np.round(y_final).astype(int))) + + @track_function("generate_ellipse_cloudPoint",True) + def generate_ellipse_cloudPoint(self,start, end, num_cycle_factor=None, width=None, direction:Literal["CW","CCW"] ='CW'): + # 计算椭圆的中心(两个端点的中点) + center = (start + end) / 2 + if num_cycle_factor is None: + num_cycle_factor = 1 + + num_cycle_factor = np.round(num_cycle_factor).astype(int) + + distance = np.sqrt((end[0] - start[0])**2 + (end[1] - start[1])**2) + + if width is None: + width = distance/2 + + + if distance < 2: + print("距离过短,无法绘制椭圆") + return 0 + if width <= 2: + print("椭圆半径过小,无法绘制椭圆") + width = self.width_default + if num_cycle_factor < 1: + print("圈数必须为大于1的整数") + num_cycle_factor = 1 + + # 计算两个端点的距离,即长半轴长度 + a = np.sqrt((end[0] - start[0])**2 + (end[1] - start[1])**2) / 2 + + # 假设短半轴长度为常量值radius + b = width / 2 + + # 计算旋转角度(由端点的方向确定) + angle = np.arctan2(end[1] - start[1], end[0] - start[0]) + points = np.round(((num_cycle_factor + 1) * np.pi * 2)/(np.pi/9)).astype(int) + + # 绘制椭圆的参数化公式 + theta = np.linspace(np.pi, (2+num_cycle_factor*2)* np.pi, points) + + # 计算螺旋的x, y坐标 + if direction == "CCW": + # 反向旋转时,使用负的角度来反向螺旋 + x_ellipse = a * np.cos(-theta) #顺时针 + y_ellipse = b * np.sin(-theta) + else: + # 正常旋转 + x_ellipse = a * np.cos(theta) + y_ellipse = b * np.sin(theta) + + # 绕中心旋转椭圆 + x_rot = center[0] + x_ellipse * np.cos(angle) - y_ellipse * np.sin(angle) + y_rot = center[1] + x_ellipse * np.sin(angle) + y_ellipse * np.cos(angle) + + # 返回椭圆点云,保留整数值 + return np.hstack((np.round(x_rot).astype(int).reshape(-1, 1), np.round(y_rot).astype(int).reshape(-1, 1))) + + @track_function("generate_Lemniscate_cloudPoint",True) + def generate_Lemniscate_cloudPoint(self, start, end, width = None,num_cycle_factor = None, direction:Literal["CW","CCW"] ='CW'): + """ + 生成从start到end的8字形插补路径 + + :param start: 起点坐标 (x0, y0) + :param end: 终点坐标 (x1, y1) + :param num_points: 插补点的数量 + :return: 插补路径的坐标列表 + """ + # 起点和终点的坐标 + x1, y1 = start + x0, y0 = end + step_degree=np.pi/12 + + # 计算轨迹的缩放因子和旋转角度 + distance = np.linalg.norm([x1 - x0, y1 - y0]) + + if width is None: + width = self.width_default + if num_cycle_factor is None: + num_cycle_factor = 1 + if distance < 2: + print("距离过短,无法绘制螺旋线") + return 0 + if width <= 2: + width = 2 + if num_cycle_factor < 1: + print("圈数必须为大于1的整数") + num_cycle_factor = 1 + + a = distance / 2 # 轨迹的宽度一半 + b = width/2 + angle = np.arctan2(y1 - y0, x1 - x0) + + num_points = np.abs((2*num_cycle_factor+1)*np.pi/step_degree).astype(int) + + # 生成标准的8字形轨迹 (Gerono卵形线) + t_values = np.linspace(0.5*np.pi, (2*num_cycle_factor + 1.5)* np.pi, num_points) + + x_curve = a * np.sin(t_values) + if direction == 'CCW': + y_curve = -b * np.sin(2 * t_values) + else: + y_curve = b * np.sin(2 * t_values) + + # 构建旋转矩阵 + rotation_matrix = np.array([[np.cos(angle), -np.sin(angle)], + [np.sin(angle), np.cos(angle)]]) + + # 旋转并平移轨迹 + curve_points = np.vstack((x_curve, y_curve)).T + rotated_points = curve_points @ rotation_matrix.T + shifted_points = rotated_points + [(x0 + x1) / 2, (y0 + y1) / 2] + + # 将插值结果四舍五入为整数 + return np.round(shifted_points).astype(int) + + @track_function("generate_circle_cloudPoint",True) + def generate_circle_cloudPoint(self, start_point, center, step_degree=np.pi/12, num_cycle_factor = 3): + """ + center: 圆心坐标,形如 (x_c, y_c) + start_point: 起始点坐标,形如 (x_0, y_0) + radius: 圆的半径 + delta_theta: 每次插补的角度增量 + num_turns: 绕圈的次数 + """ + # 确定总共需要生成的插补点数 + num_points = int((2 * np.pi * num_cycle_factor) / step_degree) + + # 圆心 + x_c, y_c = center + + radius = np.linalg.norm(np.array(start_point) - np.array(center)) # 半径 + + # 计算起始点的初始角度 + x_0, y_0 = start_point + theta_0 = np.arctan2(y_0 - y_c, x_0 - x_c) + + # 初始化存储插补点的列表 + circle_points = [] + + # 生成插补点 + for i in range(num_points): + # 当前角度 + theta_i = theta_0 + i * step_degree + + # 计算插补点的坐标 + x_i = x_c + radius * np.cos(theta_i) + y_i = y_c + radius * np.sin(theta_i) + + # 将点添加到列表中 + + circle_points.append((np.round(x_i).astype(int), np.round(y_i).astype(int))) + + circle_points.append((np.round(x_0).astype(int), np.round(y_0).astype(int))) + + return circle_points + + @track_function("generate_point_cloudPoint",True) + def generate_point_cloudPoint(self, point, num_cycle_factor=None, width=None, direction: Literal["CW", "CCW"] = 'CW'): + if num_cycle_factor is None: + num_cycle_factor = 12 + if num_cycle_factor < 1: + print("圈数必须为大于1的整数") + num_cycle_factor = 1 + num_cycle_factor = np.abs(np.round(num_cycle_factor).astype(int)) + if width is None: + width = 20 + if width < 2: + print("宽度必须为大于2的整数") + width = 2 + width = np.abs(np.round(width).astype(int)) + + radius = width/2 + + points = np.round((num_cycle_factor * np.pi * 2)/(np.pi/9)).astype(int) + thetas = np.linspace(0, num_cycle_factor * 2 * np.pi, points) + radiuss = np.ones_like(thetas) * radius + + for i in range(len(thetas)): + if thetas[i] <= np.pi/2: + radiuss[i] = np.sin(thetas[i]) * radius + if thetas[i] >= num_cycle_factor * np.pi * 2 - np.pi / 2: + radiuss[i] = np.sin(num_cycle_factor * np.pi * 2 - thetas[i]) * radius + + + + if direction == "CCW": + x_ellipse = radiuss * np.cos(-thetas) + y_ellipse = radiuss * np.sin(-thetas) + else: + x_ellipse = radiuss * np.cos(thetas) + y_ellipse = radiuss * np.sin(thetas) + x_rot = point[0] + x_ellipse + y_rot = point[1] + y_ellipse + result_points = np.hstack((np.round(x_rot).astype(int).reshape(-1, 1), np.round(y_rot).astype(int).reshape(-1, 1))) + return self.resample_curve_strict(points=result_points, num_resampled_points=len(result_points)) + + @track_function("generate_line_cloudPoint",True) + def generate_line_cloudPoint(self,start, end, step_distance = 5): + """ + 生成起点和终点之间的线性插值点 + + 参数: + start: 起点 (标量或数组,例如 [x1, y1, z1]) + end: 终点 (标量或数组,例如 [x2, y2, z2]) + num_points: 插值点的数量(包括起点和终点) + + 返回: + 包含线性插值点的数组 + """ + start = np.array(start) + end = np.array(end) + distance = np.linalg.norm(end - start) + num_points = int(distance / step_distance) + + if num_points <=2: + num_points = 2 + # 使用 numpy 的 linspace 在起点和终点之间生成线性插值点 + interpolated_points = np.linspace(start, end, num_points) + + # 将插值结果四舍五入为整数 + return np.round(interpolated_points).astype(int) + + @track_function("generate_repeat_line_cloudPoint",True) + def generate_repeat_line_cloudPoint(self, start_point, end_point, num_cycle_factor = 5): + """ + start_point: 起始点坐标,形如 (x_0, y_0) + end_point: 终点坐标,形如 (x_0, y_0) + num_cycle_factor: 重复的次数 + """ + + # 初始化存储插补点的列表 + result_points = [] + # 生成插补点 + for i in range(num_cycle_factor): + # 将点添加到列表中 + result_points.append(start_point) + result_points.append(end_point) + return np.round(np.array(result_points)).astype(int) + + @track_function("generate_finger_circle_cloudPoint",True) + def generate_finger_circle_cloudPoint(self, center_point, num_cycle_factor=5, radius=30,deviation=15): + """ + 生成一指禅绕球心的圆弧轨迹 + center_point: 圆心坐标 [x, y, z, roll, pitch, yaw] + num_cycle_factor: 轨迹的重复次数 + radius: 圆的半径 + """ + result_points = [] + result_points.append(copy.deepcopy(center_point)) + deviation = deviation * np.pi/180 + + totalCounts = int(360 / radius) + shift = 1 + + + for i in range(num_cycle_factor): + for countR in range(0, totalCounts): + # 当前旋转矩阵 + temp_pose = copy.deepcopy(center_point) + rotation = R.from_euler('xyz', temp_pose[3:], degrees=False) + current_rotation_matrix = rotation.as_matrix() + + + if i == 0: + shift = countR/(90/radius) if countR/(90/radius) < 1 else 1 + if i == num_cycle_factor-1: + shift = (totalCounts-countR)/(90/radius) if (totalCounts-countR)/(90/radius) < 1 else 1 + + + # 生成增量旋转矩阵 + increment_rotation= R.from_euler('xyz', [deviation*shift,0,countR * np.pi*2 /totalCounts], degrees=False).as_matrix() + + # 更新旋转矩阵 + updated_rotation_matrix = current_rotation_matrix @ increment_rotation + + # 将旋转矩阵转换回欧拉角 + temp_pose[3:] = R.from_matrix(updated_rotation_matrix).as_euler('xyz', degrees=False) + + # 添加当前点到结果列表 + result_points.append(copy.deepcopy(temp_pose)) + + result_points.append(copy.deepcopy(center_point)) + return result_points + + def traj_generate(self, t: Union[int, float, List[float]], pose = None, wrench = None, interpolation: Literal["linear", "cubic","circle","cloud_point"] = 'linear',**kwargs): + """ + 轨迹生成 + param: + t: 时间或时间序列 + pose(nx6): 世界坐标系下的位姿或位姿序列,当输入为None时,表示保持当前位姿 + wrench(nx6): 力矩或力矩序列,当输入为None时,表示保持当前的力 + interpolation: 插值方式 + return: + traj: 生成的轨迹 -> dict + """ + # 确保时间输入是列表 + if isinstance(t, (int, float)): + t = [t] + elif not isinstance(t, list) or any(not isinstance(i, (int, float)) for i in t): + self.logger.log_error("not isinstance(t, list) or any(not isinstance(i, (int, float)) for i in t)") + return None + # 检查时间点数量 + if len(t) < 1: + self.logger.log_error("len(t) < 1") + return None + # 时间序列 + time_points = [0.0] + t + time_points = np.array(time_points) + dt = self.command_rate.to_sec() + + for i in range(len(time_points)): + time_points[i] = np.round(time_points[i] / dt) * dt + if np.abs(time_points[-1]) < dt: + time_points[-1] = dt + # 由于浮点数精度问题,time_step要乘0.9 + time_seq = np.arange(time_points[0], time_points[-1] + dt * 0.9, dt) + pos_traj = None + quat_traj = None + wrench_traj = None + pos_vel = None + angular_vel = None + # 位置轨迹与速度轨迹生成 + if pose is not None: + if interpolation != 'cloud_point': + pose = np.array(pose).reshape(-1, 6) + if pose.shape[0] != len(t): + self.logger.log_error("pose.shape[0] != len(t)") + print(pose.shape[0],len(t)) + return None + pose_matrix = self.convert_to_7d(pose) + else: + pose = np.array(pose) + try: + # 插值 + if interpolation == 'linear': + pos_traj, quat_traj = linear_interpolate(positions=pose_matrix[:, :3], quaternions=pose_matrix[:, 3:], time_points=time_points, time_step=dt) + elif interpolation == 'cubic': + pos_traj, quat_traj = spline_interpolate(positions=pose_matrix[:, :3], quaternions=pose_matrix[:, 3:], time_points=time_points, time_step=dt) + elif interpolation == 'circle': + if self.ion is not None: + pos_traj = circle_trajectory(center=pose, radius=0.07,time_points=time_points,time_step=dt,**kwargs) + else: + pos_traj = circle_trajectory(center=pose,time_points=time_points,time_step=dt,**kwargs) + quat_traj = np.tile(R.from_euler('xyz', np.array(pose[0][3:])).as_quat(), (pos_traj.shape[0], 1)) + elif interpolation == 'cloud_point': + pos_traj, quat_traj = cloud_point_interpolate(positions=pose[:, :3], quaternions=pose[:, 3:], time_points=time_points, time_step=dt) + # self.logger.log_yellow(f'{pos_traj[0]} {pos_traj[-1]}') + except ValueError as error: + self.logger.log_error(error) + return None + else: + self.logger.log_error("未输入位置进行规划") + + # TODO 力轨迹生成 改为线性增加 + if wrench is not None: + wrench = np.array(wrench).reshape(-1, 6) + if wrench.shape[0] != len(t): + return None + indices = np.searchsorted(time_points, time_seq, side='left') - 1 + indices = np.clip(indices, 0, len(wrench) - 1) + wrench_traj = wrench[indices] + # 构建返回数据字典 + traj = { + 't': time_seq, + 'pos': pos_traj, + 'quat': quat_traj, + 'pos_vel': pos_vel, + 'angular_vel': angular_vel, + 'wrench': wrench_traj + } + return traj + + + def wrench_traj_generate(self, t: Union[int, float, List[float]], wrench = None, interpolation: Literal["linear", "oscillation"] = 'linear',**kwargs): + """ + 力/力矩轨迹生成 + param: + t: 时间或时间序列 + wrench(nx6): 力矩或力矩序列 + interpolation: 插值方式 + return: + traj: 生成的轨迹 -> dict + """ + # 确保时间输入是列表 + if isinstance(t, (int, float)): + t = [t] + elif not isinstance(t, list) or any(not isinstance(i, (int, float)) for i in t): + self.logger.log_error("not isinstance(t, list) or any(not isinstance(i, (int, float)) for i in t)") + return None + # 检查时间点数量 + if len(t) < 1: + self.logger.log_error("len(t) < 1") + return None + + if wrench is None: + self.logger.log_error("未输入力矩进行规划") + return None + # 时间序列 + time_points = [0.0] + t + time_points = np.array(time_points) + dt = self.command_rate.to_sec() + + for i in range(len(time_points)): + time_points[i] = np.round(time_points[i] / dt) * dt + if np.abs(time_points[-1]) < dt: + time_points[-1] = dt + # 由于浮点数精度问题,time_step要乘0.9 + time_seq = np.arange(time_points[0], time_points[-1] + dt * 0.9, dt) + + wrench_traj = None + + + + + # TODO 力轨迹生成 改为线性增加 + if wrench is not None: + wrench = np.array(wrench).reshape(-1, 6) + + + if wrench.shape[0] != len(t): + return None + if interpolation == 'oscillation': + wrench_traj = oscillation_wrench_interpolate(wrench=wrench, time_points=time_points, time_step=dt) + else: + wrench_traj = linear_wrench_interpolate(wrench=wrench, time_points=time_points, time_step=dt) + + # indices = np.searchsorted(time_points, time_seq, side='left') - 1 + + + # indices = np.clip(indices, 0, len(wrench) - 1) + + # wrench_traj = wrench[indices] + + + # 构建返回数据字典 + traj = { + 't': time_seq, + 'wrench': wrench_traj + } + return traj + + def force_traj_generate(self, t: Union[int, float, List[float]], wrench = None, interpolation: Literal["linear", "oscillation", "auto"] = "auto",**kwargs): + """ + 力/力矩轨迹生成 + param: + t: 时间或时间序列 + wrench(nx6): 力矩或力矩序列 + interpolation: 插值方式 + return: + traj: 生成的轨迹 -> dict + """ + # 确保时间输入是列表 + if isinstance(t, (int, float)): + t = [t] + elif not isinstance(t, list) or any(not isinstance(i, (int, float)) for i in t): + self.logger.log_error("not isinstance(t, list) or any(not isinstance(i, (int, float)) for i in t)") + return None + # 检查时间点数量 + if len(t) < 1: + self.logger.log_error("len(t) < 1") + return None + + if wrench is None: + self.logger.log_error("未输入力矩进行规划") + return None + # 时间序列 + time_points = [0.0] + t + time_points = np.array(time_points) + dt = self.command_rate.to_sec() + + for i in range(len(time_points)): + time_points[i] = np.round(time_points[i] / dt) * dt + if np.abs(time_points[-1]) < dt: + time_points[-1] = dt + # 由于浮点数精度问题,time_step要乘0.9 + time_seq = np.arange(time_points[0], time_points[-1] + dt * 0.9, dt) + + wrench_traj = None + + + + + # TODO 力轨迹生成 改为线性增加 + if wrench is not None: + wrench = np.array(wrench).reshape(-1, 6) + + + if wrench.shape[0] != len(t): + return None + if interpolation == 'oscillation': + wrench_traj = self.force_plan.oscillation_wrench_interpolate(wrench=wrench, time_points=time_points, time_step=dt) + elif interpolation == "auto": + # target_wrench = [] + # target_wrench.append(wrench) + wrench_traj = self.force_plan.S_shaped_wrench_interpolate(start=self.arm_state.desired_wrench, wrench=wrench, time_points=time_points, time_step=dt) + else: + wrench_traj = self.force_plan.linear_wrench_interpolate(wrench=wrench, time_points=time_points, time_step=dt) + + # indices = np.searchsorted(time_points, time_seq, side='left') - 1 + + + # indices = np.clip(indices, 0, len(wrench) - 1) + + # wrench_traj = wrench[indices] + + + # 构建返回数据字典 + traj = { + 't': time_seq, + 'wrench': wrench_traj + } + return traj + + + +if __name__ == "__main__": + # waypoints = [ + # {"time": 0.0, "position": [0.25, -0.135, 0.3443], "velocity": [0, 0, 0], + # "acceleration": [0, 0, 0], + # "orientation": R.from_euler("xyz", [0, 0, 0], degrees=True).as_quat()}, + # {"time": 5.0, "position": [0.30, -0.135, 0.3043], "velocity": [0, 0, 0], + # "acceleration": [0, 0, 0], + # "orientation": R.from_euler("xyz", [0, 30, 30], degrees=True).as_quat()}, + # ] ## 单位 m deg + # myInterpolate = TrajectoryInterpolator(waypoints) + # ts = np.linspace(0,5,100) + robot = MassageRobot(arm_config_path="/home/jsfb/jsfb/MassageRobot_Dobot/Massage/MassageControl/config/robot_config.yaml") + # xr_list, vr_list, ar_list = [], [], [] + # for t in ts: + # xr, vr, ar = myInterpolate.interpolate(t) + # xr_list.append(xr) + # vr_list.append(vr) + # ar_list.append(ar) + # xr_array = np.array(xr_list) + # vr_array = np.array(vr_list) + # ar_array = np.array(ar_list) + + # robot.xr = xr_array + # robot.vr = vr_array + # robot.ar = ar_array + # robot.ts = ts + # robot.dt = ts[1] - ts[0] + + + ready_pose = [194.3467,-134.9880,455.3604,-180.0000,-30.0000,0.0042] + robot.current_head = 'finger_head' + + robot.force_sensor.disable_active_transmission() + time.sleep(0.5) + robot.force_sensor.disable_active_transmission() + time.sleep(0.5) + robot.force_sensor.disable_active_transmission() + time.sleep(0.5) + robot.force_sensor.set_zero() + time.sleep(0.5) + robot.force_sensor.enable_active_transmission() + + robot.init_hardwares(ready_pose) + time.sleep(3) + robot.controller_manager.switch_controller('admittance') + # robot.controller_manager.switch_controller('position') + + + + atexit.register(robot.force_sensor.disconnect) + robot.arm_state.desired_wrench = np.array([0,0,-0.5,0,0,0]) + try: + robot_thread = threading.Thread(target=robot.start) + robot_thread.start() + except KeyboardInterrupt: + print("用户中断操作。") + except Exception as e: + print("Exception occurred at line:", e.__traceback__.tb_lineno) + print("发生异常:", e) + + + # robot.arm.disableRobot() \ No newline at end of file