From cc63eb47a5fd86a36d8af9fbf38a4b10a4c2ff7f Mon Sep 17 00:00:00 2001 From: swayneleong <15723182159@163.com> Date: Wed, 28 May 2025 15:34:32 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E6=94=B9massagrobot=5Fnova5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Massage/MassageControl/MassageRobot_nova5.py | 162 ++++++++++--------- 1 file changed, 87 insertions(+), 75 deletions(-) diff --git a/Massage/MassageControl/MassageRobot_nova5.py b/Massage/MassageControl/MassageRobot_nova5.py index f656d81..854a253 100644 --- a/Massage/MassageControl/MassageRobot_nova5.py +++ b/Massage/MassageControl/MassageRobot_nova5.py @@ -108,11 +108,14 @@ class MassageRobot: self.vtxdb = VTXClient() + # 当前执行的函数 + self.current_function = None self.arm_state = ArmState() self.arm_config = read_yaml(arm_config_path) # arm 实例化时机械臂类内部进行通讯连接 self.arm = dobot_nova5(arm_ip = self.arm_config['arm_ip']) self.arm.start() + self.arm.init() self.thermotherapy = None self.shockwave = None @@ -136,26 +139,9 @@ class MassageRobot: self.controller_manager.add_controller(AdmittanceZController,'admittanceZ',self.arm_config['controller'][7]) self.controller_manager.switch_controller('admittance') - # 按摩头参数暂时使用本地数据 - massage_head_dir = self.arm_config['massage_head_dir'] - all_items = os.listdir(massage_head_dir) - head_config_files = [f for f in all_items if os.path.isfile(os.path.join(massage_head_dir, f))] - self.playload_dict = {} - for file in head_config_files: - file_address = massage_head_dir + '/' + file - play_load = read_yaml(file_address) - self.playload_dict[play_load['name']] = play_load - self.current_head = 'none' - # 读取数据 - self.gravity_base = None - self.force_zero = None - self.torque_zero = None - self.tool_position = None - self.mass_center_position = None - self.s_tf_matrix_t = None - arm_orientation_set0 = np.array([-180,0,-90]) - self.b_rotation_s_set0 = R.from_euler('xyz',arm_orientation_set0,degrees=True).as_matrix() + + # 频率数据赋值 self.control_rate = Rate(self.arm_config['control_rate']) @@ -177,15 +163,30 @@ class MassageRobot: # 按摩调整 self.adjust_wrench_envent = threading.Event() self.adjust_wrench_envent.clear() # 调整初始化为False + self.is_execute = False self.pos_increment = np.zeros(3,dtype=np.float64) self.adjust_wrench = np.zeros(6,dtype=np.float64) + self.skip_pos = np.zeros(6,dtype=np.float64) + # 按摩头参数暂时使用本地数据 + massage_head_dir = self.arm_config['massage_head_dir'] + all_items = os.listdir(massage_head_dir) + head_config_files = [f for f in all_items if os.path.isfile(os.path.join(massage_head_dir, f))] + self.playload_dict = {} + for file in head_config_files: + file_address = massage_head_dir + '/' + file + play_load = read_yaml(file_address) + self.playload_dict[play_load['name']] = play_load + # self.playload_dict = self.vtxdb.get("robot_config", "massage_head") + # print(self.playload_dict) + self.current_head = 'none' self.is_waitting = False self.last_print_time = 0 self.last_record_time = 0 self.last_command_time = 0 self.move_to_point_count = 0 + self.width_default = 5 # 卡尔曼滤波相关初始化 self.x_base = np.zeros(6) self.P_base = np.eye(6) @@ -203,7 +204,16 @@ class MassageRobot: # 传感器故障计数器 self.sensor_fail_count = 0 # 机械臂初始化,适配中间层 - self.arm.init() + # 读取数据 + self.gravity_base = None + self.force_zero = None + self.torque_zero = None + self.tool_position = None + self.mass_center_position = None + self.s_tf_matrix_t = None + arm_orientation_set0 = np.array([-180,0,-90]) + self.b_rotation_s_set0 = R.from_euler('xyz',arm_orientation_set0,degrees=True).as_matrix() + # REF TRAJ self.xr = [] @@ -215,6 +225,36 @@ class MassageRobot: self.last_time = -1 self.cur_time = -1 + # 预测步骤 + def kalman_predict(self,x, P, Q): + # 预测状态(这里假设状态不变) + x_predict = x + # 预测误差协方差 + P_predict = P + Q + return x_predict, P_predict + + # 更新步骤 + def kalman_update(self,x_predict, P_predict, z, R): + # 卡尔曼增益 + # K = P_predict @ np.linalg.inv(P_predict + R) + S = P_predict + R + s = np.diag(S) # shape (6,) + p_diag = np.diag(P_predict) + K_diag = np.zeros_like(s) + nonzero_mask = s != 0 + K_diag[nonzero_mask] = p_diag[nonzero_mask] / s[nonzero_mask] + K = np.diag(K_diag) + # 更新状态 + x_update = x_predict + K @ (z - x_predict) + # 更新误差协方差 + P_update = (np.eye(len(K)) - K) @ P_predict + return x_update, P_update + def init_hardwares(self,ready_pose): + self.ready_pose = np.array(ready_pose) + self.switch_payload(self.current_head) + print(self.arm.get_arm_position()) + time.sleep(0.5) + def sensor_set_zero(self): self.arm_position_set0,self.arm_orientation_set0 = self.arm.get_arm_position() @@ -323,30 +363,7 @@ class MassageRobot: return -1 return 0 - # 预测步骤 - def kalman_predict(self,x, P, Q): - # 预测状态(这里假设状态不变) - x_predict = x - # 预测误差协方差 - P_predict = P + Q - return x_predict, P_predict - - # 更新步骤 - def kalman_update(self,x_predict, P_predict, z, R): - # 卡尔曼增益 - # K = P_predict @ np.linalg.inv(P_predict + R) - S = P_predict + R - s = np.diag(S) # shape (6,) - p_diag = np.diag(P_predict) - K_diag = np.zeros_like(s) - nonzero_mask = s != 0 - K_diag[nonzero_mask] = p_diag[nonzero_mask] / s[nonzero_mask] - K = np.diag(K_diag) - # 更新状态 - x_update = x_predict + K @ (z - x_predict) - # 更新误差协方差 - P_update = (np.eye(len(K)) - K) @ P_predict - return x_update, P_update + def update_wrench(self): # 当前的机械臂到末端转换 (实时) @@ -383,6 +400,29 @@ class MassageRobot: self.arm_state.external_wrench_tcp = self.x_tcp self.arm_state.last_external_wrench_tcp = self.arm_state.external_wrench_tcp return 0 + + def switch_payload(self,name): + if name in self.playload_dict: + self.stop() + self.current_head = name + + compensation_config = self.playload_dict[self.current_head] + + # 读取数据 + self.gravity_base = np.array(compensation_config['gravity_base']) + self.force_zero = np.array(compensation_config['force_zero']) + self.torque_zero = np.array(compensation_config['torque_zero']) + self.tool_position = np.array(compensation_config['tcp_offset']) + self.mass_center_position = np.array(compensation_config['mass_center_position']) + self.s_tf_matrix_t = self.get_tf_matrix(self.tool_position[:3], R.from_euler('xyz',self.tool_position[3:],degrees=False).as_quat()) + + tcp_offset = self.playload_dict[name]["tcp_offset"] + tcp_offset_str = "{" + ",".join(map(str, tcp_offset)) + "}" + print("tcp_offset_str",tcp_offset_str) + self.arm.setEndEffector(i=9,tool_i=tcp_offset_str) + self.arm.chooseEndEffector(i=9) + print(self.arm.get_arm_position()) + self.logger.log_info(f"切换到{name}按摩头") # ####################### 位姿伺服 ########################## @@ -535,7 +575,7 @@ class MassageRobot: # 线程 self.exit_event.clear() self.arm_measure_thread = threading.Thread(target=self.arm_measure_loop) - self.arm_control_thread = threading.Thread(target=self.arm_command_loop) + self.arm_control_thread = threading.Thread(target=self.arm_command_loop_traj) # 线程启动 self.arm_measure_thread.start() ## 测量线程 position,quat_rot = self.arm.get_arm_position() @@ -584,34 +624,7 @@ class MassageRobot: self.arm.stop_motion() self.logger.log_info("MassageRobot停止") - def init_hardwares(self,ready_pose): - self.ready_pose = np.array(ready_pose) - self.switch_payload(self.current_head) - print(self.arm.get_arm_position()) - time.sleep(0.5) - - def switch_payload(self,name): - if name in self.playload_dict: - self.stop() - self.current_head = name - - compensation_config = self.playload_dict[self.current_head] - # 读取数据 - self.gravity_base = np.array(compensation_config['gravity_base']) - self.force_zero = np.array(compensation_config['force_zero']) - self.torque_zero = np.array(compensation_config['torque_zero']) - self.tool_position = np.array(compensation_config['tcp_offset']) - self.mass_center_position = np.array(compensation_config['mass_center_position']) - self.s_tf_matrix_t = self.get_tf_matrix(self.tool_position[:3], R.from_euler('xyz',self.tool_position[3:],degrees=False).as_quat()) - - tcp_offset = self.playload_dict[name]["tcp_offset"] - tcp_offset_str = "{" + ",".join(map(str, tcp_offset)) + "}" - print("tcp_offset_str",tcp_offset_str) - self.arm.setEndEffector(i=9,tool_i=tcp_offset_str) - self.arm.chooseEndEffector(i=9) - print(self.arm.get_arm_position()) - self.logger.log_info(f"切换到{name}按摩头") self.controller_manager.switch_controller('position') else: self.logger.log_error(f"未找到{name}按摩头") @@ -1608,12 +1621,11 @@ if __name__ == "__main__": try: robot_thread = threading.Thread(target=robot.start) robot_thread.start() - except KeyboardInterrupt: print("用户中断操作。") except Exception as e: print("Exception occurred at line:", e.__traceback__.tb_lineno) print("发生异常:", e) - - robot_thread.join() # 等待机械臂线程结束 + + # robot.arm.disableRobot() \ No newline at end of file