From 4bf67089fe4b1f43332e22d330bf267c63fdbf98 Mon Sep 17 00:00:00 2001 From: "ziwei.he" Date: Mon, 12 May 2025 17:54:59 +0800 Subject: [PATCH] =?UTF-8?q?=E4=B8=AD=E9=97=B4=E5=B1=82=E5=8A=9B=E6=8E=A7?= =?UTF-8?q?=E5=BC=80=E5=8F=91=E5=88=9D=E6=AD=A5=E5=AE=8C=E6=88=90=EF=BC=8C?= =?UTF-8?q?=E5=BE=85=E6=B5=8B=E8=AF=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Massage/MassageControl/MassageRobot_nova5.py | 180 +++++- .../algorithms/position_controller.py | 68 +++ .../MassageControl/hardware/dobot_nova5.py | 37 +- .../MassageControl/hardware/force_sensor.py | 572 ++++++++++++++++++ 4 files changed, 833 insertions(+), 24 deletions(-) create mode 100644 Massage/MassageControl/algorithms/position_controller.py diff --git a/Massage/MassageControl/MassageRobot_nova5.py b/Massage/MassageControl/MassageRobot_nova5.py index e5efba2..20f10aa 100644 --- a/Massage/MassageControl/MassageRobot_nova5.py +++ b/Massage/MassageControl/MassageRobot_nova5.py @@ -1,8 +1,10 @@ from hardware.dobot_nova5 import dobot_nova5 +from hardware.force_sensor import XjcSensor from algorithms.arm_state import ArmState from algorithms.controller_manager import ControllerManager from algorithms.admittance_controller import AdmittanceController +from algorithms.position_controller import PositionController from tools.log import CustomLogger from tools.yaml_operator import read_yaml @@ -11,6 +13,7 @@ import numpy as np import threading import time import os +from scipy.spatial.transform import Rotation as R class MassageRobot: def __init__(self,arm_config_path,is_log=False): self.logger = CustomLogger( @@ -26,14 +29,16 @@ class MassageRobot: self.arm_state = ArmState() self.arm_config = read_yaml(arm_config_path) + # arm 实例化时机械臂类内部进行通讯连接 self.arm = dobot_nova5(arm_ip=self.arm_config['arm_ip']) - self.force_sensor = None + self.force_sensor = XjcSensor(host=self.arm_config['arm_ip'],port=60000) - # controller + # 控制器初始化(初始化为导纳控制) self.controller_manager = ControllerManager(self.arm_state) self.controller_manager.add_controller(AdmittanceController,'admittance',self.arm_config['controller'][0]) - - # massage heads + self.controller_manager.add_controller(PositionController,'position',self.arm_config['controller'][1]) + self.controller_manager.switch_controller('admittance') + # 按摩头参数暂时使用本地数据 massage_head_dir = self.arm_config['massage_head_dir'] all_items = os.listdir(massage_head_dir) head_config_files = [f for f in all_items if os.path.isfile(os.path.join(massage_head_dir, f))] @@ -44,7 +49,7 @@ class MassageRobot: self.playload_dict[play_load['name']] = play_load self.current_head = 'none' - # 频率 + # 频率数据赋值 self.control_rate = Rate(self.arm_config['control_rate']) self.sensor_rate = Rate(self.arm_config['sensor_rate']) self.command_rate = Rate(self.arm_config['command_rate']) @@ -52,15 +57,20 @@ class MassageRobot: # 低通滤波 self.cutoff_freq = 80.0 - # flags + # 停止标志位线程 self.exit_event = threading.Event() - self.exit_event.set() # 运行 True + self.exit_event.set() # 运行初始化为True self.interrupt_event = threading.Event() - self.interrupt_event.clear() # 中断 False + self.interrupt_event.clear() # 中断初始化为False self.pause_envent = threading.Event() - self.pause_envent.clear() # 暂停 False + self.pause_envent.clear() # 暂停初始化为False self.skip_envent = threading.Event() - self.skip_envent.clear() # 跳过 False + self.skip_envent.clear() # 跳过初始化为False + # 按摩调整 + self.adjust_wrench_event = threading.Event() + self.adjust_wrench_event.clear() # 调整初始化为False + self.pos_increment = np.zeros(3,dtype=np.float64) + self.adjust_wrench = np.zeros(6,dtype=np.float64) self.is_waitting = False @@ -68,8 +78,7 @@ class MassageRobot: self.last_record_time = 0 self.last_command_time = 0 self.move_to_point_count = 0 - self.width_default = 5 - + # 卡尔曼滤波相关初始化 self.x_base = np.zeros(6) self.P_base = np.eye(6) # 过程噪声协方差矩阵 @@ -86,8 +95,8 @@ class MassageRobot: # 传感器故障计数器 self.sensor_fail_count = 0 - # init - self.arm.__init__() + # 机械臂初始化,适配中间层 + self.arm.init() # 预测步骤 def kalman_predict(self,x, P, Q): @@ -107,14 +116,101 @@ class MassageRobot: P_update = (np.eye(len(K)) - K) @ P_predict return x_update, P_update + def update_wrench(self): + compensation_config = self.playload_dict[self.current_head] + # 读取数据 + gravity_base = np.array(compensation_config['gravity_base']) + force_zero = np.array(compensation_config['force_zero']) + torque_zero = np.array(compensation_config['torque_zero']) + tool_position = np.array(compensation_config['tcp_offset']) + mass_center_position = np.array(compensation_config['mass_center_position']) + # 当前的机械臂到末端转换 (实时) + b_rotation_s = R.from_quat(self.arm_state.arm_orientation).as_matrix() + # 读取数据 + sensor_data = self.force_sensor.read() + if sensor_data is None: + self.force_sensor.stop_background_reading() + self.logger.log_error("传感器数据读取失败") + return -1 + # 传感器数据通过矫正计算得到外来施加力 传感器坐标系下 + # 重力和零位 + gravity_in_sensor = b_rotation_s.T @ gravity_base + s_force = sensor_data[:3] - force_zero - gravity_in_sensor + # 力矩 + s_torque = sensor_data[3:] - torque_zero - np.cross(mass_center_position,gravity_in_sensor) + wrench = np.concatenate([s_force,s_torque]) + # 传感器工具转换 + s_tf_matrix_t = self.get_tf_matrix(tool_position[:3], R.from_euler('xyz',tool_position[3:],degrees=False).as_quat()) + # 传感器到TCP + wrench = self.wrench_coordinate_conversion(s_tf_matrix_t,wrench) + # 交给ARM STATE集中管理 + self.arm_state.external_wrench_tcp = wrench + self.arm_state.external_wrench_base = np.concatenate([b_rotation_s @ self.arm_state.external_wrench_tcp[:3], + b_rotation_s @ self.arm_state.external_wrench_tcp[3:]]) + # 卡尔曼滤波 + x_base_predict, P_base_predict = self.kalman_predict(x = self.x_base, + P = self.P_base, + Q = self.Q_base) + self.x_base, self.P_base = self.kalman_update(x_predict = x_base_predict, + P_predict = P_base_predict, + z = self.arm_state.external_wrench_base, + R = self.R_base) + self.arm_state.external_wrench_base = self.x_base + self.arm_state.last_external_wrench_base = self.arm_state.external_wrench_base + # 对tcp坐标系下的外力外矩进行平滑 + x_tcp_predict, P_tcp_predict = self.kalman_predict(x = self.x_tcp, + P = self.P_tcp, + Q = self.Q_tcp) + self.x_tcp, self.P_tcp = self.kalman_update(x_predict = x_tcp_predict, + P_predict = P_tcp_predict, + z = self.arm_state.external_wrench_tcp, + R = self.R_tcp) + self.arm_state.external_wrench_tcp = self.x_tcp + self.arm_state.last_external_wrench_tcp = self.arm_state.external_wrench_tcp + return 0 + def arm_measure_loop(self): - return + self.logger.log_info("机械臂测量线程启动") + while (not self.arm.is_exit) and (not self.exit_event.is_set()): + try: + if not self.is_waitting: + self.arm_state.arm_position, + self.arm_state.arm_orientation = self.arm.get_arm_position() + code = self.update_wrench() + if code == -1: + self.sensor_fail_count += 1 + self.logger.log_error(f"传感器线程数据读取失败-{self.sensor_fail_count}") + if self.sensor_fail_count > 10: + self.logger.log_error("传感器线程数据读取失败超过10次,程序终止") + self.stop() + break + else: + self.sensor_fail_count = 0 + except Exception as e: + self.logger.log_error(f"机械臂测量线程报错:{e}") + self.exit_event.set() # 控制退出while + self.sensor_rate.precise_sleep() # 控制频率 def arm_command_loop(self): - return - + self.logger.log_info("机械臂控制线程启动") + while (not self.arm.is_exit) and (not self.exit_event.is_set()): + try: + if not self.is_waitting: + self.controller_manager.step(self.control_rate.to_sec()) + self.last_command_time += 1 + code = self.arm.ServoPose(self.arm_state.arm_position, + self.arm_state.arm_orientation_command) + if code == -1: + self.logger.log_error("机械臂急停") + # self.stop() # 底层已经做了急停处理 + break + except Exception as e: + self.logger.log_error(f"机械臂控制失败:{e}") + self.exit_event.set() + self.control_rate.precise_sleep() def start(self): if self.exit_event.is_set(): + # 线程 self.exit_event.clear() self.arm_measure_thread = threading.Thread(target=self.arm_measure_loop) self.arm_control_thread = threading.Thread(target=self.arm_command_loop) @@ -125,26 +221,48 @@ class MassageRobot: self.arm_state.arm_position_command = position self.arm_state.desired_orientation = quat_rot self.arm_state.arm_orientation_command = quat_rot + self.logger.log_info("MassageRobot启动") time.sleep(0.5) - - def stop(self): if not self.exit_event.is_set(): self.exit_event.set() self.interrupt_event.clear() self.arm_control_thread.join() self.arm_measure_thread.join() + for i in range(3): + self.force_sensor.disable_active_transmission() + self.force_sensor.start_background_reading() + self.arm.stop_motion() + self.logger.log_info("MassageRobot停止") - return - def init_hardwares(self,ready_pose): self.ready_pose = np.array(ready_pose) + self.switch_payload(self.current_head) + self.arm_state.desired_orientation = self.ready_pose[:3] + euler_angles = self.ready_pose + self.arm_state.desired_orientation = R.from_euler('xyz',euler_angles).as_quat() + time.sleep(0.5) def switch_payload(self,name): if name in self.playload_dict: - return - + self.stop() + self.current_head = name + tcp_offset = self.playload_dict[name]["tcp_offset"] + tcp_offset_str = "{" + ",".join(map(str, tcp_offset)) + "}" + print(tcp_offset_str) + self.arm.setEndEffector(i=1,tool_i=tcp_offset_str) + self.arm.chooseEndEffector(i=1) + self.logger.log_info(f"切换到{name}按摩头") + R_matrix = R.from_euler('xyz',self.ready_pose[3:] ,degrees=False).as_matrix() + ready_position = self.ready_pose[:3] + R_matrix @ self.playload_dict[self.current_head]['tcp_offset'][:3] + ready_orientation = R_matrix @ R.from_euler('xyz',self.playload_dict[self.current_head]['tcp_offset'][3:],degrees=False).as_matrix() + ready_orientation_euler = R.from_matrix(ready_orientation).as_euler('xyz',degrees=False) + self.arm_state.desired_position = ready_position + self.arm_state.desired_orientation = R.from_euler('xyz',ready_orientation_euler,degrees=False).as_quat() + self.controller_manager.switch_controller('position') + else: + self.logger.log_error(f"未找到{name}按摩头") def log_thread(self): while True: @@ -155,5 +273,21 @@ class MassageRobot: self.logger_data.log_info(f"当前按摩头:{self.current_head}",is_print=False) time.sleep(1) + # 工具函数 + def get_tf_matrix(position,orientation): + tf_matrix = np.eye(4) + rotation_matrix = R.from_quat(orientation).as_matrix() + tf_matrix[:3,3] = position + tf_matrix[:3,:3] = rotation_matrix + return tf_matrix + def wrench_coordinate_conversion(tf_matrix, wrench): + rot_matrix = tf_matrix[:3, :3] + vector_p = tf_matrix[:3, 3] + temp_force = wrench[:3] + torque = wrench[3:] + force = rot_matrix.T @ temp_force + torque = rot_matrix.T @ np.cross(temp_force,vector_p) + rot_matrix.T @ torque + return np.concatenate([force, torque]) + if __name__ == "__main__": robot = MassageRobot(arm_config_path="MassageControl/config/robot_config.yaml") \ No newline at end of file diff --git a/Massage/MassageControl/algorithms/position_controller.py b/Massage/MassageControl/algorithms/position_controller.py new file mode 100644 index 0000000..9e4da73 --- /dev/null +++ b/Massage/MassageControl/algorithms/position_controller.py @@ -0,0 +1,68 @@ +import sys +import numpy as np +from scipy.spatial.transform import Rotation as R +from .base_controller import BaseController +from .arm_state import ArmState +from pathlib import Path +import time +sys.path.append(str(Path(__file__).resolve().parent.parent)) + +from tools.yaml_operator import read_yaml + +class PositionController(BaseController): + def __init__(self, name, state:ArmState,config_path) -> None: + super().__init__(name, state) + self.load_config(config_path) + + self.laset_print_time = 0 + + def load_config(self, config_path): + config_dict = read_yaml(config_path) + if self.name != config_dict['name']: + raise ValueError(f"Controller name {self.name} does not match config name {config_dict['name']}") + self.Kp = np.diag(np.array(config_dict['Kp'])) + self.Ki = np.diag(np.array(config_dict['Ki'])) + self.Kd = np.diag(np.array(config_dict['Kd'])) + + self.pose_integral_error = np.zeros(6) + + def step(self,dt): + # print(f"desired position: {self.state.desired_position}, desired orientation: {R.from_quat(self.state.desired_orientation).as_euler('xyz',degrees=False)}") + # print(f"current position: {self.state.arm_position}, current orientation: {R.from_quat(self.state.arm_orientation).as_euler('xyz',degrees=False)}") + self.state.pose_error[:3] = self.state.arm_position - self.state.desired_position + if self.state.desired_orientation.dot(self.state.arm_orientation) < 0: + self.state.arm_orientation = -self.state.arm_orientation + rot_err_mat = R.from_quat(self.state.arm_orientation).as_matrix() @ R.from_quat(self.state.desired_orientation).as_matrix().T + rot_err_rotvex = R.from_matrix(rot_err_mat).as_rotvec(degrees=False) + self.state.pose_error[3:] = rot_err_rotvex + + self.pose_integral_error += self.state.pose_error * dt + + self.state.arm_desired_acc = - self.Kd @ (self.state.arm_desired_twist - self.state.desired_twist) - self.Kp @ self.state.pose_error - self.Ki @ self.pose_integral_error + self.clip_command(self.state.arm_desired_acc,"acc") + self.state.arm_desired_twist = self.state.arm_desired_acc * dt + self.state.arm_desired_twist + self.clip_command(self.state.arm_desired_twist,"vel") + delta_pose = self.state.arm_desired_twist * dt + self.clip_command(delta_pose,"pose") + delta_ori_mat = R.from_rotvec(delta_pose[3:]).as_matrix() + arm_ori_mat = delta_ori_mat @ R.from_quat(self.state.arm_orientation).as_matrix() + + self.state.arm_orientation_command = R.from_matrix(arm_ori_mat).as_quat() + # 归一化四元数 + self.state.arm_orientation_command /= np.linalg.norm(self.state.arm_orientation_command) + self.state.arm_position_command = self.state.arm_position + delta_pose[:3] + + # if time.time() - self.laset_print_time > 0.5: + # print("---------------positioner-------------------------------------") + # print("arm_position:",self.state.arm_position) + # print("desired_position:",self.state.desired_position) + # print("arm_orientation",R.from_quat(self.state.arm_orientation).as_euler('xyz',degrees=True)) + # print("desired_orientation",R.from_quat(self.state.desired_orientation).as_euler('xyz',degrees=True)) + # print("arm_position_command",self.state.arm_position_command) + # print("arm_orientation_command",R.from_quat(self.state.arm_orientation_command).as_euler('xyz',degrees=True)) + # print("delta_pose:",delta_pose) + # self.laset_print_time = time.time() + + # if time.time() - self.last_print_time > 1: + # print(self.state.arm_position_command,R.from_quat(self.state.arm_orientation_command).as_euler('xyz',degrees=True)) + # self.last_print_time = time.time() diff --git a/Massage/MassageControl/hardware/dobot_nova5.py b/Massage/MassageControl/hardware/dobot_nova5.py index 92f3d10..a51d21d 100644 --- a/Massage/MassageControl/hardware/dobot_nova5.py +++ b/Massage/MassageControl/hardware/dobot_nova5.py @@ -125,6 +125,7 @@ class dobot_nova5: print("该次运动完成") break sleep(0.1) + return 0 def RunPoint_P_inJoint(self,joints_list): ''' @@ -160,6 +161,7 @@ class dobot_nova5: print("该次运动完成") break sleep(0.1) # 避免 CPU 占用过高 + return 0 def RunPoint_L_inPose(self,poses_list): ''' @@ -188,6 +190,7 @@ class dobot_nova5: print("该次运动完成") break sleep(0.1) + return 0 def RunPoint_L_inJoint(self,joints_list): ''' @@ -216,6 +219,7 @@ class dobot_nova5: print("该次运动完成") break sleep(0.1) + return 0 def ServoJoint(self,joints_list,t=-1,aheadtime=-1,gain=-1): tcp_command = f"servoj({joints_list[0]},{joints_list[1]},{joints_list[2]},{joints_list[3]},{joints_list[4]},{joints_list[5]},t={t},aheadtime={aheadtime},gain={gain})" @@ -225,12 +229,14 @@ class dobot_nova5: self.dashboard.Stop() self.dashboard.EmergencyStop() print("当前工作停止,机械臂处于急停状态") + return -1 if self.feedbackData.robotMode == 10: # 10 当前工程暂停 print("机械臂为暂停状态") res = self.dashboard.Continue() code = int(res[0]) if code == 0: print("机械臂继续已暂停的运动指令") + return 0 def ServoPose(self,poses_list,t=-1,aheadtime=-1,gain=-1): poses_list = list(map(float, poses_list)) @@ -427,12 +433,40 @@ class dobot_nova5: else: print("拖拽模式关闭失败") return + + def stop_motion(self): + res = self.dashboard.Stop() + code = int(res[0]) + if code == 0: + print("机械臂停止下发运动指令队列") + else: + print("停止下发运动失败,需立即拍下急停") + return def __del__(self): del self.dashboard del self.feedFour - # 适配中间层接口 + ## 适配中间层的再封装接口 + + # 为状态管理而封装的初始化函数 + def init(self): + self.is_exit = False + self.traj_list = None + self.last_pose_command = np.zeros(6) + self.last_input_command = None + self.tcp_offset = None + self.init_J = [0,30,-120,0,90,0] + sleep(1) + self.is_standby = False + code = self.RunPoint_P_inJoint(self.init_J) + if code == 0: + print("机械臂初始化成功且到达初始位置") + else: + print("机械臂初始化失败") + + + def get_arm_position(self): pose = self.getPose() x,y,z,rx,ry,rz = pose # x y z in mm; rx ry rz in degree @@ -442,6 +476,7 @@ class dobot_nova5: + if __name__ == "__main__": dobot = dobot_nova5("192.168.5.1") dobot.start() diff --git a/Massage/MassageControl/hardware/force_sensor.py b/Massage/MassageControl/hardware/force_sensor.py index e69de29..e1df33a 100644 --- a/Massage/MassageControl/hardware/force_sensor.py +++ b/Massage/MassageControl/hardware/force_sensor.py @@ -0,0 +1,572 @@ +import time +import socket +import struct +import numpy as np +import atexit +import threading +from typing import Optional, Tuple +from matplotlib import pyplot as plt +import csv +from datetime import datetime + +class XjcSensor: + def __init__(self, host: str, port: int, rate: int = 250): + """ + 初始化 TCP 连接的力传感器 + :param host: 设备 IP 地址(如 "192.168.1.100") + :param port: 设备端口(如 502) + :param rate: 数据采样率(100/250/500Hz) + """ + self.host = host + self.port = port + self.rate = rate + self.slave_address = 0x01 # 默认从机地址 + self.sock = None # TCP socket 对象 + self.crc16_table = self.generate_crc16_table() + # 后台读取相关的属性 + self._background_thread = None + self._stop_background = False + self._last_reading = None + self._reading_lock = threading.Lock() + # 建立 TCP 连接 + self.connect() + + # 注册退出时的清理函数 + atexit.register(self.disconnect) + + def __new__(cls, *args, **kwargs): + """单例模式""" + if not hasattr(cls, 'instance'): + cls.instance = super(XjcSensor, cls).__new__(cls) + return cls.instance + + def connect(self): + """建立 TCP 连接""" + try: + self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.sock.settimeout(1.0) # 设置超时时间 + self.sock.connect((self.host, self.port)) + print(f"Connected to {self.host}:{self.port}") + except Exception as e: + print(f"TCP connection failed: {e}") + raise + + def send_and_receive(self, data: bytes, expected_response_len: int) -> Optional[bytes]: + """ + 发送数据并接收响应 + :param data: 要发送的字节数据 + :param expected_response_len: 预期响应的长度 + :return: 收到的字节数据(失败返回 None) + """ + try: + self.sock.sendall(data) + response = self.sock.recv(expected_response_len) + return response + except socket.timeout: + print("Timeout while waiting for response") + return None + except Exception as e: + print(f"TCP communication error: {e}") + return None + + def set_zero(self) -> int: + """传感器置零(TCP 版本)""" + if self.slave_address == 0x01: + zero_cmd = bytes.fromhex('01 10 46 1C 00 02 04 00 00 00 00 E8 95') + elif self.slave_address == 0x09: + zero_cmd = bytes.fromhex('09 10 46 1C 00 02 04 00 00 00 00 C2 F5') + + response = self.send_and_receive(zero_cmd, 8) + if not response: + print("set zero fail") + return -1 + + # CRC 校验(假设协议要求) + hi_check, lo_check = self.crc16(response[:-2]) + if (response[0] != self.slave_address or + response[1] != 0x10 or + response[-2] != lo_check or + response[-1] != hi_check): + print("Set zero failed: CRC check error") + return -1 + + print("Set zero success!") + return 0 + + def read_data_f32(self) -> np.ndarray: + """ + TCP版本 - 读取六维力传感器数据 (float32格式) + 返回: + np.ndarray(6,) - 六维力数据 [Fx,Fy,Fz,Mx,My,Mz] + 读取失败返回 -1 + """ + # 构造读取命令 + + if self.slave_address == 0x01: + command = bytes.fromhex('01 04 00 00 00 0C F0 0F') + elif self.slave_address == 0x09: + command = bytes.fromhex('09 04 00 54 00 0C B0 97') + + try: + # 清空接收缓冲区 + self._clear_tcp_buffer() + + # 接收完整响应 + response = self.send_and_receive(command, 29) + + # CRC校验 + Hi_check, Lo_check = self.crc16(response[:-2]) + if (response[0] != self.slave_address or + response[1] != 0x04 or + response[2] != 24 or # 数据长度字节 + response[-2] != Lo_check or + response[-1] != Hi_check): + print(f"Protocol error! Received: {response.hex(' ')}") + print(f"Expected slave:{self.slave_address}, " + f"func:0x04, len:24, " + f"CRC:{Lo_check:02X}{Hi_check:02X}") + return -1 + + # 解析6个float32数据 (大端序) + sensor_data = struct.unpack('>ffffff', response[3:27]) + return np.array(sensor_data) + + except socket.timeout: + print("Timeout while waiting for sensor response") + return -1 + except ConnectionError as e: + print(f"TCP connection error: {e}") + return -1 + except Exception as e: + print(f"Unexpected error in read_data_f32: {str(e)}") + return -1 + + def _clear_tcp_buffer(self): + """清空TCP接收缓冲区""" + self.sock.settimeout(0.1) # 短暂超时,避免阻塞 + try: + while True: + data = self.sock.recv(1024) + if not data: # 连接关闭或无数据 + break + print(f"Cleared TCP buffer: {data.hex(' ')}") # 调试输出 + except socket.timeout: + pass # 预期内的超时,表示缓冲区已空 + finally: + self.sock.settimeout(2.0) # 恢复默认超时 + + def enable_active_transmission(self) -> int: + """启用主动传输模式(TCP 版本)""" + if self.rate == 100: + cmd = bytes.fromhex('01 10 01 9A 00 01 02 00 00 AB 6A') if self.slave_address == 0x01 else \ + bytes.fromhex('09 10 01 9A 00 01 02 00 00 CC AA') + elif self.rate == 250: + cmd = bytes.fromhex('01 10 01 9A 00 01 02 00 01 6A AA') if self.slave_address == 0x01 else \ + bytes.fromhex('09 10 01 9A 00 01 02 00 01 0D 6A') + elif self.rate == 500: + cmd = bytes.fromhex('01 10 01 9A 00 01 02 00 02 2A AB') if self.slave_address == 0x01 else \ + bytes.fromhex('09 10 01 9A 00 01 02 00 02 4D 6B') + else: + print("Unsupported rate") + return -1 + + response = self.send_and_receive(cmd, 8) + if response and response[1] == 0x10: # 检查响应功能码 + print(f"Active transmission enabled at {self.rate}Hz") + return 0 + else: + print("Failed to enable active transmission") + return -1 + + def disconnect(self): + """关闭 TCP 连接""" + if self.sock: + try: + self.sock.close() + print("TCP connection closed") + except Exception as e: + print(f"Error while closing socket: {e}") + self.sock = None + def start_background_reading(self): + """ + 启动后台读取任务,每秒读取一次传感器数据 + """ + if self._background_thread is not None and self._background_thread.is_alive(): + print("Background reading is already running") + return False + + self._stop_background = False + self._background_thread = threading.Thread(target=self._background_reading_task) + self._background_thread.daemon = True # 设置为守护线程 + self._background_thread.start() + return True + + def stop_background_reading(self): + """ + 停止后台读取任务 + """ + if self._background_thread is None or not self._background_thread.is_alive(): + print("No background reading is running") + return False + + self._stop_background = True + self._background_thread.join(timeout=2.0) # 等待线程结束,最多等待2秒 + self._background_thread = None + return True + + def _background_reading_task(self): + """ + 后台读取任务的实现 + """ + while not self._stop_background: + try: + data = self.read_data_f32() + if isinstance(data, np.ndarray): + with self._reading_lock: + self._last_reading = data + else: + self.connect() + except Exception as e: + print(f"Error in background reading: {e}") + time.sleep(1.0) # 每秒读取一次 + + def get_last_reading(self): + """ + 获取最近一次的读数 + """ + with self._reading_lock: + return self._last_reading + + # ------------------------- 以下方法保持不变 ------------------------- + @staticmethod + def generate_crc16_table(): + """CRC16 表生成(与原代码相同)""" + table = [] + polynomial = 0xA001 + for i in range(256): + crc = i + for _ in range(8): + if crc & 0x0001: + crc = (crc >> 1) ^ polynomial + else: + crc >>= 1 + table.append(crc) + return table + + def crc16(self, data: bytes): + """CRC16 计算(与原代码相同)""" + crc = 0xFFFF + for byte in data: + crc = (crc >> 8) ^ self.crc16_table[(crc ^ byte) & 0xFF] + return (crc >> 8) & 0xFF, crc & 0xFF + + def __del__(self): + """析构时自动断开连接""" + self.disconnect() + + def disable_active_transmission(self) -> int: + """ + 禁用传感器的主动传输模式(TCP 版本) + Returns: + 0 表示成功,-1 表示失败 + """ + try: + # 构造禁用命令(原代码中的 FF x11 字节流) + disable_cmd = bytes.fromhex('FF FF FF FF FF FF FF FF FF FF FF') + print("Disabling active transmission mode...") + + # 检查 TCP 连接是否有效 + if not self.sock: + raise ConnectionError("TCP connection is not established") + + # 发送禁用命令 + self.sock.sendall(disable_cmd) + print("Active transmission disabled successfully") + return 0 + + except ConnectionError as e: + print(f"Connection error: {e}") + return -1 + except socket.timeout: + print("Timeout while sending disable command") + return -1 + except Exception as e: + print(f"Unexpected error: {e}") + return -1 + + def read(self): + """ + TCP Version - Read the sensor's data in passive mode. + Returns: + np.ndarray(6,) - Six-axis force/torque data [Fx, Fy, Fz, Mx, My, Mz] + None if reading fails + """ + try: + # Clear any existing data in the TCP buffer first + # self._clear_tcp_buffer() + + # Search for frame header (0x20 0x4E) + header_found = False + start_time = time.time() + timeout = 1.0 # 1 second timeout + + while not header_found and (time.time() - start_time < timeout): + # Read one byte at a time looking for header + byte1 = self.sock.recv(1) + if not byte1: + continue # No data available yet + + if byte1 == b'\x20': + byte2 = self.sock.recv(1) + if byte2 == b'\x4E': + header_found = True + + if not header_found: + print("Frame header not found within timeout period") + return None + + # Now read the remaining 14 bytes of the frame + response = bytearray([0x20, 0x4E]) # Start with the header we found + remaining_bytes = 14 + bytes_received = 0 + start_time = time.time() + + while bytes_received < remaining_bytes and (time.time() - start_time < timeout): + chunk = self.sock.recv(remaining_bytes - bytes_received) + if chunk: + response.extend(chunk) + bytes_received += len(chunk) + + if bytes_received < remaining_bytes: + print(f"Incomplete frame received. Got {len(response)} bytes, expected 16") + return None + + # Verify CRC checksum + Hi_check, Lo_check = self.crc16(response[:-2]) + if response[-1] != Hi_check or response[-2] != Lo_check: + print("CRC check failed!") + print(f"Received CRC: {response[-2]:02X}{response[-1]:02X}") + print(f"Calculated CRC: {Lo_check:02X}{Hi_check:02X}") + return None + + # Parse the sensor data + sensor_data = self.parse_data_passive(response) + return sensor_data + + except socket.timeout: + print("Timeout while waiting for sensor data") + return None + except ConnectionError as e: + print(f"TCP connection error: {e}") + return None + except Exception as e: + print(f"Unexpected error in TCP read(): {str(e)}") + return None + + def parse_data_passive(self, buffer): + values = [ + int.from_bytes(buffer[i:i+2], byteorder='little', signed=True) + for i in range(2, 14, 2) + ] + Fx, Fy, Fz = np.array(values[:3]) / 10.0 + Mx, My, Mz = np.array(values[3:]) / 1000.0 + return np.array([Fx, Fy, Fz, Mx, My, Mz]) + + +# 外部 +def test_sensor_frequency(sensor, mode='active', duration=5.0): + """ + 测试传感器在不同模式下的实际数据获取频率 + + 参数: + sensor: XjcSensor实例 + mode: 'active'(主动模式) 或 'polling'(查询模式) + duration: 测试持续时间(秒) + + 返回: + dict: 包含测试结果的字典 + """ + # 准备测试环境 + if mode == 'active': + print("\n=== 测试主动传输模式 ===") + sensor.enable_active_transmission() + read_func = sensor.read + else: + print("\n=== 测试查询模式 ===") + sensor.disable_active_transmission() # 确保不在主动模式 + time.sleep(0.5) + sensor.disable_active_transmission() # 确保不在主动模式 + time.sleep(0.5) + sensor.disable_active_transmission() # 确保不在主动模式 + time.sleep(0.5) + read_func = sensor.read_data_f32 + + # 初始化测试变量 + timestamps = [] + data_count = 0 + start_time = time.perf_counter() + end_time = start_time + duration + + print(f"开始测试,持续时间 {duration} 秒...") + + # 主测试循环 + while time.perf_counter() < end_time: + data = read_func() + print(data) + if data is not None: + timestamps.append(time.perf_counter()) + data_count += 1 + # print(timestamps) + # 计算统计结果 + if data_count < 2: + print("警告: 采集到的数据点不足") + return None + + intervals = np.diff(timestamps) + save_intervals_to_csv(timestamps) + avg_interval = np.mean(intervals) + min_interval = np.min(intervals) + max_interval = np.max(intervals) + std_interval = np.std(intervals) + avg_freq = 1.0 / avg_interval + + # 打印结果 + print("\n测试结果:") + print(f"总数据点数: {data_count}") + print(f"平均频率: {avg_freq:.2f} Hz") + print(f"最小间隔: {min_interval*1000:.3f} ms") + print(f"最大间隔: {max_interval*1000:.3f} ms") + print(f"间隔标准差: {std_interval*1000:.3f} ms") + + # 返回结果字典 + return { + 'mode': mode, + 'duration': duration, + 'data_count': data_count, + 'avg_freq': avg_freq, + 'min_interval': min_interval, + 'max_interval': max_interval, + 'std_interval': std_interval, + 'timestamps': timestamps, + 'intervals': intervals + } + +def plot_test_results(active_results, polling_results): + """绘制两种模式的测试结果对比图""" + plt.figure(figsize=(12, 8)) + + # 间隔时间分布图 + plt.subplot(2, 1, 1) + plt.hist(active_results['intervals']*1000, bins=50, alpha=0.7, label='主动模式') + plt.hist(polling_results['intervals']*1000, bins=50, alpha=0.7, label='查询模式') + plt.xlabel('间隔时间 (ms)') + plt.ylabel('出现次数') + plt.title('数据间隔时间分布') + plt.legend() + plt.grid(True) + + # 间隔时间序列图 + plt.subplot(2, 1, 2) + plt.plot(np.arange(len(active_results['intervals'])), + active_results['intervals']*1000, 'b.', label='主动模式') + plt.plot(np.arange(len(polling_results['intervals'])), + polling_results['intervals']*1000, 'r.', label='查询模式') + plt.xlabel('数据点序号') + plt.ylabel('间隔时间 (ms)') + plt.title('数据间隔时间序列') + plt.legend() + plt.grid(True) + + plt.tight_layout() + plt.show() + +def compare_modes(sensor, duration=5.0): + """比较两种工作模式的性能""" + # 测试主动模式 + active_results = test_sensor_frequency(sensor, 'active', duration) + time.sleep(1) # 模式切换间隔 + + # 测试查询模式 + polling_results = test_sensor_frequency(sensor, 'polling', duration) + time.sleep(1) + + # 打印对比报告 + print("\n=== 模式对比报告 ===") + print(f"{'指标':<15} {'主动模式':>15} {'查询模式':>15}") + print(f"{'平均频率(Hz)':<15} {active_results['avg_freq']:>15.2f} {polling_results['avg_freq']:>15.2f}") + print(f"{'最小间隔(ms)':<15} {active_results['min_interval']*1000:>15.3f} {polling_results['min_interval']*1000:>15.3f}") + print(f"{'最大间隔(ms)':<15} {active_results['max_interval']*1000:>15.3f} {polling_results['max_interval']*1000:>15.3f}") + print(f"{'间隔标准差(ms)':<15} {active_results['std_interval']*1000:>15.3f} {polling_results['std_interval']*1000:>15.3f}") + + # 绘制对比图表 + plot_test_results(active_results, polling_results) + + return active_results, polling_results + +def save_intervals_to_csv(timestamps, filename="sensor_intervals.csv"): + """ + Save timestamp intervals to CSV file with metadata + + Args: + timestamps: List of timestamp values (from time.perf_counter()) + filename: Output CSV filename + """ + if len(timestamps) < 2: + print("Not enough timestamps to calculate intervals") + return + + # Calculate intervals in milliseconds + intervals_ms = np.diff(timestamps) * 1000 + + # Prepare data rows + rows = [] + for i, interval in enumerate(intervals_ms): + rows.append({ + "reading_number": i+1, + "interval_ms": f"{interval:.4f}", + "timestamp": timestamps[i], + "expected_interval_ms": (1000/250) if i==0 else "", # For 250Hz + "deviation_ms": f"{interval - (1000/250):.4f}" if i>0 else "" + }) + + # CSV header + fieldnames = ["reading_number", "interval_ms", "timestamp", + "expected_interval_ms", "deviation_ms"] + + # Write to file + with open(filename, 'w', newline='') as csvfile: + writer = csv.DictWriter(csvfile, fieldnames=fieldnames) + writer.writeheader() + writer.writerows(rows) + + print(f"Saved {len(intervals_ms)} intervals to {filename}") + +if __name__ == "__main__": + # 替换为你的传感器 IP 和端口 + sensor = XjcSensor("192.168.5.1", 60000) + + # 示例操作 + + sensor.disable_active_transmission() + time.sleep(0.5) + sensor.disable_active_transmission() + time.sleep(0.5) + sensor.disable_active_transmission() + time.sleep(0.5) + sensor.set_zero() + + # sensor.set_zero() + # sensor.disable_active_transmission() + + # while True: + # data = sensor.read_data_f32() + # if data is not None: + # print(f"Force data: {data}") + # sensor.enable_active_transmission() + # while True: + # sensor_data = sensor.read() + # if sensor_data is None: + # print('failed to get force sensor data!') + # print(sensor_data) + test_sensor_frequency(sensor) + # compare_modes(sensor) \ No newline at end of file