from hardware.dobot_nova5 import dobot_nova5 from hardware.force_sensor import XjcSensor from algorithms.arm_state import ArmState from algorithms.controller_manager import ControllerManager from algorithms.admittance_controller import AdmittanceController from algorithms.position_controller import PositionController from tools.log import CustomLogger from tools.yaml_operator import read_yaml from tools.Rate import Rate import numpy as np import threading import time import os from scipy.spatial.transform import Rotation as R import ctypes libc = ctypes.CDLL("libc.so.6") import atexit from tools.Trajectory import TrajectoryInterpolator from scipy.spatial.transform import Slerp class MassageRobot: def __init__(self,arm_config_path,is_log=False): self.logger = CustomLogger( log_name="测试日志", log_file="logs/MassageRobot_nova5_test.log", precise_time=True) if is_log: self.logger_data = CustomLogger(log_name="运动数据日志", log_file="logs/MassageRobot_kinematics_data.log", precise_time=True) # 日志线程 threading.Thread(target=self.log_thread,daemon=True).start() self.arm_state = ArmState() self.arm_config = read_yaml(arm_config_path) # arm 实例化时机械臂类内部进行通讯连接 self.arm = dobot_nova5(self.arm_config['arm_ip']) self.arm.start() # 传感器 self.force_sensor = XjcSensor() self.force_sensor.connect() # 控制器初始化(初始化为导纳控制) self.controller_manager = ControllerManager(self.arm_state) self.controller_manager.add_controller(AdmittanceController,'admittance',self.arm_config['controller'][0]) self.controller_manager.add_controller(PositionController,'position',self.arm_config['controller'][1]) self.controller_manager.switch_controller('admittance') # 按摩头参数暂时使用本地数据 massage_head_dir = self.arm_config['massage_head_dir'] all_items = os.listdir(massage_head_dir) head_config_files = [f for f in all_items if os.path.isfile(os.path.join(massage_head_dir, f))] self.playload_dict = {} for file in head_config_files: file_address = massage_head_dir + '/' + file play_load = read_yaml(file_address) self.playload_dict[play_load['name']] = play_load self.current_head = 'none' # 读取数据 self.gravity_base = None self.force_zero = None self.torque_zero = None self.tool_position = None self.mass_center_position = None self.s_tf_matrix_t = None arm_orientation_set0 = np.array([-180,0,-90]) self.b_rotation_s_set0 = R.from_euler('xyz',arm_orientation_set0,degrees=True).as_matrix() # 频率数据赋值 self.control_rate = Rate(self.arm_config['control_rate']) self.sensor_rate = Rate(self.arm_config['sensor_rate']) self.command_rate = Rate(self.arm_config['command_rate']) # 低通滤波 self.cutoff_freq = 80.0 # 停止标志位线程 self.exit_event = threading.Event() self.exit_event.set() # 运行初始化为True self.interrupt_event = threading.Event() self.interrupt_event.clear() # 中断初始化为False self.pause_envent = threading.Event() self.pause_envent.clear() # 暂停初始化为False self.skip_envent = threading.Event() self.skip_envent.clear() # 跳过初始化为False # 按摩调整 self.adjust_wrench_event = threading.Event() self.adjust_wrench_event.clear() # 调整初始化为False self.pos_increment = np.zeros(3,dtype=np.float64) self.adjust_wrench = np.zeros(6,dtype=np.float64) self.is_waitting = False self.last_print_time = 0 self.last_record_time = 0 self.last_command_time = 0 self.move_to_point_count = 0 # 卡尔曼滤波相关初始化 self.x_base = np.zeros(6) self.P_base = np.eye(6) # 过程噪声协方差矩阵 self.Q_base = np.eye(6) * 0.01 # 测量噪声协方差矩阵 self.R_base = np.eye(6) * 0.1 self.x_tcp = np.zeros(6) self.P_tcp = np.eye(6) # 过程噪声协方差矩阵 self.Q_tcp = np.eye(6) * 0.01 # 测量噪声协方差矩阵 self.R_tcp = np.eye(6) * 0.1 # 传感器故障计数器 self.sensor_fail_count = 0 # 机械臂初始化,适配中间层 self.arm.init() # REF TRAJ self.xr = [] self.vr = [] self.ar = [] self.ts = [] # time sequence self.dt = [] # derivate of time # --- self.last_time = -1 self.cur_time = -1 # 预测步骤 def kalman_predict(self,x, P, Q): # 预测状态(这里假设状态不变) x_predict = x # 预测误差协方差 P_predict = P + Q return x_predict, P_predict # 更新步骤 def kalman_update(self,x_predict, P_predict, z, R): # 卡尔曼增益 # K = P_predict @ np.linalg.inv(P_predict + R) S = P_predict + R s = np.diag(S) # shape (6,) p_diag = np.diag(P_predict) K_diag = np.zeros_like(s) nonzero_mask = s != 0 K_diag[nonzero_mask] = p_diag[nonzero_mask] / s[nonzero_mask] K = np.diag(K_diag) # 更新状态 x_update = x_predict + K @ (z - x_predict) # 更新误差协方差 P_update = (np.eye(len(K)) - K) @ P_predict return x_update, P_update def update_wrench(self): # 当前的机械臂到末端转换 (实时) b_rotation_s = R.from_quat(self.arm_state.arm_orientation).as_matrix() # 读取数据 sensor_data = self.force_sensor.read() # self.logger.log_info(f"传感器数据{sensor_data}") if sensor_data is None: self.force_sensor.stop_background_reading() self.logger.log_error("传感器数据读取失败") return -1 # 反重力补偿 sensor_data[:3] = sensor_data[:3] + self.force_zero + self.b_rotation_s_set0.T @ self.gravity_base sensor_data[3:] = sensor_data[3:] + self.torque_zero + np.cross(self.mass_center_position, self.b_rotation_s_set0.T @ self.gravity_base) # 传感器数据通过矫正计算得到外来施加力 传感器坐标系下 # 重力 gravity_in_sensor = b_rotation_s.T @ self.gravity_base s_force = sensor_data[:3] - self.force_zero - gravity_in_sensor # 力矩 s_torque = sensor_data[3:] - self.torque_zero - np.cross(self.mass_center_position,gravity_in_sensor) wrench = np.concatenate([s_force,s_torque]) # 传感器到TCP wrench = self.wrench_coordinate_conversion(self.s_tf_matrix_t,wrench) # 交给ARM STATE集中管理 self.arm_state.external_wrench_tcp = wrench # 对tcp坐标系下的外力外矩进行平滑 x_tcp_predict, P_tcp_predict = self.kalman_predict(x = self.x_tcp, P = self.P_tcp, Q = self.Q_tcp) self.x_tcp, self.P_tcp = self.kalman_update(x_predict = x_tcp_predict, P_predict = P_tcp_predict, z = self.arm_state.external_wrench_tcp, R = self.R_tcp) self.arm_state.external_wrench_tcp = self.x_tcp self.arm_state.last_external_wrench_tcp = self.arm_state.external_wrench_tcp return 0 # ####################### 位姿伺服 ########################## def arm_measure_loop(self): self.logger.log_info("机械臂测量线程启动") while (not self.arm.is_exit) and (not self.exit_event.is_set()): try: if not self.is_waitting: self.arm_state.arm_position,self.arm_state.arm_orientation = self.arm.get_arm_position() code = self.update_wrench() if code == -1: self.sensor_fail_count += 1 self.logger.log_error(f"传感器线程数据读取失败-{self.sensor_fail_count}") if self.sensor_fail_count > 10: self.logger.log_error("传感器线程数据读取失败超过10次,程序终止") self.stop() break else: self.sensor_fail_count = 0 except Exception as e: self.logger.log_error(f"机械臂测量线程报错:{e}") self.exit_event.set() # 控制退出while self.sensor_rate.sleep() # 控制频率 def arm_command_loop_traj(self): self.logger.log_info("机械臂控制线程启动") # 离线轨迹启动时间记录 self.traj_start_time = time.time() while (not self.arm.is_exit) and (not self.exit_event.is_set()): try: if not self.is_waitting: process_start_time = time.time() control_cycle = self.control_rate.to_sec() t_now = time.time() - self.traj_start_time i = min(int(t_now / self.dt), len(self.ts) - 2) alpha = max(0.0, min(1.0, (t_now - self.ts[i]) / self.dt)) pos = (1 - alpha) * self.xr[i][:3] + alpha * self.xr[i+1][:3] slerp = Slerp([0, 1], R.concatenate([R.from_quat(self.xr[i][3:]), R.from_quat(self.xr[i+1][3:])])) alpha = 0.5 rot_interp = slerp(alpha).as_quat() xr = np.concatenate([pos, rot_interp]) # xr = (1-alpha) * self.xr[i] + alpha * self.xr[i+1] vr = (1 - alpha) * self.vr[i] + alpha * self.vr[i + 1] ar = (1 - alpha) * self.ar[i] + alpha * self.ar[i + 1] self.arm_state.desired_position = xr[:3] # self.arm_state.desired_orientation = xr[3:] # print(111) self.arm_state.desired_orientation = rot_interp # print(222) self.arm_state.desired_twist = vr self.arm_state.desired_acc = ar print("self.arm_state.desired_position",self.arm_state.desired_position) print("self.arm_state.desired_orientation",self.arm_state.desired_orientation) print("self.arm_state.desired_twist",self.arm_state.desired_twist) print("self.arm_state.desired_acc",self.arm_state.desired_acc) self.controller_manager.step_traj(control_cycle) # self.controller_manager.step(control_cycle) command_pose = np.concatenate([ self.arm_state.arm_position_command * 1000, # 转毫米 self.arm_state.arm_orientation_command ]) ## 输入滤波 status, pose_processed = self.arm.process_command( command_pose[:3], command_pose[3:] ) # print(f"pose_processed:{pose_processed}") tcp_command = ( f"ServoP({pose_processed[0]:.3f},{pose_processed[1]:.3f}," f"{pose_processed[2]:.3f},{pose_processed[3]:.3f}," f"{pose_processed[4]:.3f},{pose_processed[5]:.3f}," "t=0.02, aheadtime=20, gain=200)" ).encode() run_time = time.time() - process_start_time sleep_duration = self.control_rate.to_sec() - run_time b2 =time.time() if sleep_duration > 0: time.sleep(sleep_duration) print(f"real sleep:{time.time()-b2}") # self.arm.dashboard.socket_dobot.sendall(tcp_command) if self.arm.feedbackData.robotMode == 10: # 10 当前工程暂停 print("机械臂为暂停状态") res = self.arm.dashboard.Continue() code = self.arm.parseResultId(res)[0] if code == 0: print("机械臂继续已暂停的运动指令") except Exception as e: self.logger.log_error(f"机械臂控制失败:{e}") self.exit_event.set() # print(f"循环周期时间: {time.time()-process_start_time:.4f} 秒",run_time,sleep_duration) def arm_command_loop(self): self.logger.log_info("机械臂控制线程启动") while (not self.arm.is_exit) and (not self.exit_event.is_set()): try: if not self.is_waitting: process_start_time = time.time() control_cycle = self.control_rate.to_sec() self.controller_manager.step(control_cycle) command_pose = np.concatenate([ self.arm_state.arm_position_command * 1000, # 转毫米 self.arm_state.arm_orientation_command ]) ## 输入滤波 status, pose_processed = self.arm.process_command( command_pose[:3], command_pose[3:] ) # print(f"pose_processed:{pose_processed}") tcp_command = ( f"ServoP({pose_processed[0]:.3f},{pose_processed[1]:.3f}," f"{pose_processed[2]:.3f},{pose_processed[3]:.3f}," f"{pose_processed[4]:.3f},{pose_processed[5]:.3f}," "t=0.02, aheadtime=20, gain=200)" ).encode() run_time = time.time() - process_start_time sleep_duration = self.control_rate.to_sec() - run_time b2 =time.time() if sleep_duration > 0: time.sleep(sleep_duration) print(f"real sleep:{time.time()-b2}") self.arm.dashboard.socket_dobot.sendall(tcp_command) if self.arm.feedbackData.robotMode == 10: # 10 当前工程暂停 print("机械臂为暂停状态") res = self.arm.dashboard.Continue() code = self.arm.parseResultId(res)[0] if code == 0: print("机械臂继续已暂停的运动指令") except Exception as e: self.logger.log_error(f"机械臂控制失败:{e}") self.exit_event.set() # print(f"循环周期时间: {time.time()-process_start_time:.4f} 秒",run_time,sleep_duration) ####################### 位姿伺服 ########################## def start(self): if self.exit_event.is_set(): # 线程 self.exit_event.clear() self.arm_measure_thread = threading.Thread(target=self.arm_measure_loop) self.arm_control_thread = threading.Thread(target=self.arm_command_loop) # 线程启动 self.arm_measure_thread.start() ## 测量线程 position,quat_rot = self.arm.get_arm_position() # 初始值 self.arm_state.desired_position = position self.arm_state.arm_position_command = position self.arm_state.desired_orientation = quat_rot self.arm_state.arm_orientation_command = quat_rot self.arm_state.arm_position = position self.arm_state.arm_orientation = quat_rot for i in range(20): self.controller_manager.step(self.control_rate.to_sec()) self.logger.log_blue(f"position command: {self.arm_state.arm_position_command}") self.logger.log_blue(f"orientation command: {R.from_quat(self.arm_state.arm_orientation_command).as_euler('xyz',degrees=True)}") position, quat_rot = self.arm.get_arm_position() self.logger.log_blue(f"position current:{position}") self.logger.log_blue(f"orientation current: {R.from_quat(quat_rot).as_euler('xyz',degrees=False)}") self.command_rate.precise_sleep() position ,quat_rot = self.arm.get_arm_position() # self.arm_state.desired_position = poistion self.arm_state.arm_position_command = position # self.arm_state.desired_orientation = quat_rot self.arm_state.arm_orientation_command = quat_rot ## POSITION CONTROLLER TEST ## # delta_pos = np.array([0.1,0.1,-0.1],dtype=np.float64) # # delta_quat = R.from_euler('xyz', [0, 0, 0], degrees=True).as_quat() # self.arm_state.desired_position = position + delta_pos # self.arm_state.desired_orientation = quat_rot # 线程启动 self.arm_control_thread.start() ## 赋初始值后控制线程 self.logger.log_info("MassageRobot启动") time.sleep(0.5) def stop(self): if not self.exit_event.is_set(): self.exit_event.set() self.interrupt_event.clear() self.arm_control_thread.join() self.arm_measure_thread.join() for i in range(3): self.force_sensor.disable_active_transmission() self.force_sensor.start_background_reading() self.arm.stop_motion() self.logger.log_info("MassageRobot停止") def init_hardwares(self,ready_pose): self.ready_pose = np.array(ready_pose) self.switch_payload(self.current_head) print(self.arm.get_arm_position()) time.sleep(0.5) def switch_payload(self,name): if name in self.playload_dict: self.stop() self.current_head = name compensation_config = self.playload_dict[self.current_head] # 读取数据 self.gravity_base = np.array(compensation_config['gravity_base']) self.force_zero = np.array(compensation_config['force_zero']) self.torque_zero = np.array(compensation_config['torque_zero']) self.tool_position = np.array(compensation_config['tcp_offset']) self.mass_center_position = np.array(compensation_config['mass_center_position']) self.s_tf_matrix_t = self.get_tf_matrix(self.tool_position[:3], R.from_euler('xyz',self.tool_position[3:],degrees=False).as_quat()) tcp_offset = self.playload_dict[name]["tcp_offset"] tcp_offset_str = "{" + ",".join(map(str, tcp_offset)) + "}" print("tcp_offset_str",tcp_offset_str) self.arm.setEndEffector(i=9,tool_i=tcp_offset_str) self.arm.chooseEndEffector(i=9) print(self.arm.get_arm_position()) self.logger.log_info(f"切换到{name}按摩头") self.controller_manager.switch_controller('position') else: self.logger.log_error(f"未找到{name}按摩头") def log_thread(self): while True: self.logger_data.log_info(f"机械臂位置:{self.arm_state.arm_position},机械臂姿态:{self.arm_state.arm_orientation}",is_print=False) self.logger_data.log_info(f"机械臂期望位置:{self.arm_state.desired_position},机械臂真实位置:{self.arm_state.arm_position}",is_print=False) self.logger_data.log_info(f"机械臂期望姿态:{self.arm_state.desired_orientation},机械臂真实姿态:{self.arm_state.arm_orientation}",is_print=False) self.logger_data.log_info(f"机械臂期望力矩:{self.arm_state.desired_wrench},机械臂真实力矩:{self.arm_state.external_wrench_base}",is_print=False) self.logger_data.log_info(f"当前按摩头:{self.current_head}",is_print=False) time.sleep(1) # 工具函数 def get_tf_matrix(self,position,orientation): tf_matrix = np.eye(4) rotation_matrix = R.from_quat(orientation).as_matrix() tf_matrix[:3,3] = position/1000 tf_matrix[:3,:3] = rotation_matrix return tf_matrix def wrench_coordinate_conversion(self,tf_matrix, wrench): rot_matrix = tf_matrix[:3, :3] vector_p = tf_matrix[:3, 3] temp_force = wrench[:3] torque = wrench[3:] force = rot_matrix.T @ temp_force torque = rot_matrix.T @ np.cross(temp_force,vector_p) + rot_matrix.T @ torque return np.concatenate([force, torque]) if __name__ == "__main__": waypoints = [ {"time": 0.0, "position": [0.25, -0.135, 0.3443], "velocity": [0, 0, 0], "acceleration": [0, 0, 0], "orientation": R.from_euler("xyz", [0, 0, 0], degrees=True).as_quat()}, {"time": 5.0, "position": [0.30, -0.135, 0.3043], "velocity": [0, 0, 0], "acceleration": [0, 0, 0], "orientation": R.from_euler("xyz", [0, 30, 30], degrees=True).as_quat()}, ] ## 单位 m deg myInterpolate = TrajectoryInterpolator(waypoints) ts = np.linspace(0,5,100) robot = MassageRobot(arm_config_path="/home/jsfb/jsfb/MassageRobot_Dobot/Massage/MassageControl/config/robot_config.yaml") xr_list, vr_list, ar_list = [], [], [] for t in ts: xr, vr, ar = myInterpolate.interpolate(t) xr_list.append(xr) vr_list.append(vr) ar_list.append(ar) xr_array = np.array(xr_list) vr_array = np.array(vr_list) ar_array = np.array(ar_list) robot.xr = xr_array robot.vr = vr_array robot.ar = ar_array robot.ts = ts robot.dt = ts[1] - ts[0] ready_pose = [250.0, -135.0, 344.3392, -180.0, 0.0, -90.0] robot.current_head = 'finger_head' robot.force_sensor.disable_active_transmission() time.sleep(0.5) robot.force_sensor.disable_active_transmission() time.sleep(0.5) robot.force_sensor.disable_active_transmission() time.sleep(0.5) robot.force_sensor.set_zero() time.sleep(0.5) robot.force_sensor.enable_active_transmission() robot.init_hardwares(ready_pose) time.sleep(3) robot.controller_manager.switch_controller('admittance') # robot.controller_manager.switch_controller('position') atexit.register(robot.force_sensor.disconnect) robot.arm_state.desired_wrench = np.array([0,0,-0.5,0,0,0]) try: robot_thread = threading.Thread(target=robot.start) robot_thread.start() except KeyboardInterrupt: print("用户中断操作。") except Exception as e: print("Exception occurred at line:", e.__traceback__.tb_lineno) print("发生异常:", e) robot_thread.join() # 等待机械臂线程结束 # robot.arm.disableRobot()