import struct import serial import serial.tools.list_ports import numpy as np import atexit import threading import time import yaml import subprocess import psutil import sys import os sys.path.append(os.path.abspath(os.path.join(os.getcwd(), "Massage/MassageControl"))) class XjcSensor: def __init__(self, port='/dev/ttyUSB0', baudrate=115200, rate=250): self.port = port self.baudrate = baudrate self.rate = rate self.crc16_table = self.generate_crc16_table() self.slave_adress = 0x01 self.ser = None # 后台读取相关的属性 self._background_thread = None self._stop_background = False self._last_reading = None self._reading_lock = threading.Lock() if not self.port: self.port = self.find_sensor_port() try: self.connect() atexit.register(self.disconnect) except KeyboardInterrupt: print("Process interrupted by user") self.disconnect() def __new__(cls, *args, **kwargs): """ Making this class singleton """ if not hasattr(cls, 'instance'): cls.instance = super(XjcSensor, cls).__new__(cls) return cls.instance def connect(self): self.ser = serial.Serial(self.port, self.baudrate,timeout=0.1) def find_sensor_port(self): """寻找对应六维力传感器的串口""" try: # 目标 YAML 文件路径 yaml_file_path = '/home/jsfb/jsfb_ws/global_config/force_sensor.yaml' # 读取现有的 YAML 文件内容 with open(yaml_file_path, 'r') as file: data = yaml.safe_load(file) # 读取并解析 YAML 内容 # 获取 target_usb_device_path 的值 target_usb_device_path = data.get('target_usb_device_path') print(f"已成功将 target_usb_device_path 添加到 {yaml_file_path}") ports = serial.tools.list_ports.comports() for port in ports: print(port.usb_device_path) if target_usb_device_path == port.usb_device_path: print(f"找到六维力传感器在端口: {port.usb_device_path}") return port.device print("未找到具有指定描述的六维力传感器。") except Exception as e: print(f"查找串口时发生错误: {e}") return None @staticmethod def generate_crc16_table(): table = [] polynomial = 0xA001 for i in range(256): crc = i for _ in range(8): if crc & 0x0001: crc = (crc >> 1) ^ polynomial else: crc >>= 1 table.append(crc) return table def crc16(self,data: bytes): crc = 0xFFFF for byte in data: crc = (crc >> 8) ^ self.crc16_table[(crc ^ byte) & 0xFF] return (crc >> 8) & 0xFF, crc & 0xFF def set_zero(self): """ 传感器置零,重新上电之后需要置零 """ # 确保后台读取已停止 was_reading = False if self._background_thread is not None and self._background_thread.is_alive(): was_reading = True self.stop_background_reading() try: # 重置输入输出缓冲区 self.ser.reset_input_buffer() self.ser.reset_output_buffer() # 清除串口缓冲区中的数据 if self.ser.in_waiting > 0: self.ser.read(self.ser.in_waiting) # 根据地址设置置零命令 if self.slave_adress == 0x01: zero_cmd = bytes.fromhex('01 10 46 1C 00 02 04 00 00 00 00 E8 95') # 传感器内置通信协议--清零传感器数据 elif self.slave_adress == 0x09: zero_cmd = bytes.fromhex('09 10 46 1C 00 02 04 00 00 00 00 C2 F5') # 发送置零命令 self.ser.write(zero_cmd) response = bytearray(self.ser.read(8)) print(f"set zero response: {response}") # CRC 校验 Hi_check, Lo_check = self.crc16(response[:-2]) if response[0] != self.slave_adress or response[1] != 0x10 or \ response[-1] != Hi_check or response[-2] != Lo_check: print("set zero failed!") return -1 print("set zero success!") result = 0 except serial.SerialException as e: print(f"Serial communication error: {e}") print(f"Details - Address: {self.slave_adress}, Port: {self.ser.port}") result = -1 except Exception as e: print(f"Unexpected error: {e}") result = -1 # 如果之前在进行后台读取,恢复它 if was_reading: self.start_background_reading() return result # 可能比较慢 def read_data_f32(self): self.ser.reset_input_buffer() # 清除输入缓冲区 self.ser.reset_output_buffer() # 清除输出缓冲区 if self.ser.in_waiting > 0: self.ser.read(self.ser.in_waiting) if self.slave_adress == 0x01: # command = bytes.fromhex('01 04 00 54 00 0C B1 DF') # (旧版)传感器内置通信协议--读取一次六维力数据 command = bytes.fromhex('01 04 00 00 00 0C F0 0F') # (新版)传感器内置通信协议--读取一次六维力数据 elif self.slave_adress == 0x09: command = bytes.fromhex('09 04 00 54 00 0C B0 97') # 传感器内置通信协议--读取一次六维力数据 try: self.ser.write(command) response = bytearray(self.ser.read(29)) # print(response) Hi_check, Lo_check = self.crc16(response[:-2]) if response[0] != self.slave_adress or response[1] != 0x04 or \ response[-1] != Hi_check or response[-2] != Lo_check or response[2] != 24: print("read data failed!") return -1 sensor_data = struct.unpack('>ffffff', response[3:27]) except serial.SerialException as e: print(f"Serial communication error: {e}") return -1 except Exception as e: print(f"Unexpected error: {e}") return -1 return np.array(sensor_data) def enable_active_transmission(self): """ Activate the sensor's active transmission mode """ # 确保后台读取已停止 was_reading = False if self._background_thread is not None and self._background_thread.is_alive(): was_reading = True self.stop_background_reading() try: # 根据不同的速率设置命令 if self.rate == 100: if self.slave_adress == 0x01: rate_cmd = bytes.fromhex('01 10 01 9A 00 01 02 00 00 AB 6A') # 传感器内置通信协议--100Hz elif self.slave_adress == 0x09: rate_cmd = bytes.fromhex('09 10 01 9A 00 01 02 00 00 CC AA') print("Set mode in 100Hz") elif self.rate == 250: if self.slave_adress == 0x01: rate_cmd = bytes.fromhex('01 10 01 9A 00 01 02 00 01 6A AA') elif self.slave_adress == 0x09: rate_cmd = bytes.fromhex('09 10 01 9A 00 01 02 00 01 0D 6A') print("Set mode in 250Hz") elif self.rate == 500: if self.slave_adress == 0x01: rate_cmd = bytes.fromhex('01 10 01 9A 00 01 02 00 02 2A AB') # 传感器内置通信协议--500Hz elif self.slave_adress == 0x09: rate_cmd = bytes.fromhex('09 10 01 9A 00 01 02 00 02 4D 6B') print("Set mode in 500Hz") else: print("Rate not supported") result = -1 return result # 检查串口是否打开 if not self.ser.is_open: raise serial.SerialException("Serial port is not open") # 重置输入缓冲区 self.ser.reset_input_buffer() # 写入指令 self.ser.write(rate_cmd) print("Transmission command sent successfully") result = 0 except serial.SerialException as e: print(f"Serial communication error: {e}") print(f"Details - Rate: {self.rate}, Address: {self.slave_adress}, Port: {self.ser.port}") result = -1 except Exception as e: print(f"Unexpected error: {e}") result = -1 # 如果之前在进行后台读取,恢复它 if was_reading: self.start_background_reading() return result def disable_active_transmission(self): """ Disable the sensor's active transmission mode """ try: # 禁用传感器活动传输模式的命令 rate_cmd = bytes.fromhex('FF FF FF FF FF FF FF FF FF FF FF') print("Disable the sensor's active transmission mode") # 重置输入缓冲区 if not self.ser.is_open: raise serial.SerialException("Serial port is not open") self.ser.reset_input_buffer() # 发送禁用传输模式的命令 self.ser.write(rate_cmd) print("Transmission disabled successfully") return 0 except serial.SerialException as e: print(f"Serial communication error: {e}") print(f"Details - Port: {self.ser.port}") return -1 except Exception as e: print(f"Unexpected error: {e}") return -1 def read(self): """ Read the sensor's data. """ try: if self.ser.in_waiting > 0: self.ser.read(self.ser.in_waiting) while True: if int.from_bytes(self.ser.read(1), byteorder='big') == 0x20: if int.from_bytes(self.ser.read(1), byteorder='big') == 0x4E: break response = bytearray([0x20, 0x4E]) # 使用bytearray创建16进制数据的列表 response.extend(self.ser.read(14)) # 读取12个字节的数据并添加到列表中 Hi_check, Lo_check = self.crc16(response[:-2]) if response[-1] != Hi_check or response[-2] != Lo_check: print("check failed!") return None sensor_data = self.parse_data_passive(response) return sensor_data except serial.SerialException as e: print(f"Serial communication error: {e}") return None # def read(self): # """ # Read the sensor's data. # """ # if self.ser is None: # print("Serial port not initialized.") # return None # try: # # 清空缓冲区 # if self.ser.in_waiting > 0: # self.ser.read(self.ser.in_waiting) # # 尝试寻找帧头(0x20 0x4E) # for _ in range(100): # 最多尝试100次避免死循环 # if self.ser.in_waiting >= 2: # byte1 = self.ser.read(1) # if int.from_bytes(byte1, 'big') == 0x20: # byte2 = self.ser.read(1) # if int.from_bytes(byte2, 'big') == 0x4E: # response = bytearray([0x20, 0x4E]) # break # else: # continue # else: # print("Frame header not found.") # return None # # 读取剩余14个字节 # remaining = self.ser.read(14) # if len(remaining) < 14: # print("Incomplete data received.") # return None # response.extend(remaining) # # CRC 校验 # Lo_check, Hi_check = self.crc16(response[:-2]) # if response[-1] != Hi_check or response[-2] != Lo_check: # print("CRC check failed!") # return None # # 数据解析 # sensor_data = self.parse_data_passive(response) # return sensor_data # except serial.SerialException as e: # print(f"Serial communication error: {e}") # return None def parse_data_passive(self, buffer): values = [ int.from_bytes(buffer[i:i+2], byteorder='little', signed=True) for i in range(2, 14, 2) ] Fx, Fy, Fz = np.array(values[:3]) / 10.0 Mx, My, Mz = np.array(values[3:]) / 1000.0 return np.array([Fx, Fy, Fz, Mx, My, Mz]) def disconnect(self): """ 完全断开传感器连接并清理相关资源 """ # 首先停止后台读取任务 if hasattr(self, '_background_thread') and self._background_thread is not None: self.stop_background_reading() try: # 1. 尝试禁用传输 for _ in range(3): try: send_str = "FF " * 50 send_str = send_str[:-1] rate_cmd = bytes.fromhex(send_str) if hasattr(self, 'ser') and self.ser.is_open: self.ser.write(rate_cmd) time.sleep(0.1) except Exception as e: print(f"禁用传输失败: {e}") # 2. 关闭并清理串口 if hasattr(self, 'ser'): try: if self.ser.is_open: self.ser.flush() # 清空缓冲区 self.ser.reset_input_buffer() # 清空输入缓冲区 self.ser.reset_output_buffer() # 清空输出缓冲区 self.ser.close() del self.ser # 删除串口对象 except Exception as e: print(f"关闭串口失败: {e}") except Exception as e: print(f"断开连接过程中发生错误: {e}") finally: # 确保清理所有属性 if hasattr(self, 'ser'): del self.ser def __del__(self): self.disconnect() def start_background_reading(self): """ 启动后台读取任务,每秒读取一次传感器数据 """ if self._background_thread is not None and self._background_thread.is_alive(): print("Background reading is already running") return False self._stop_background = False self._background_thread = threading.Thread(target=self._background_reading_task) self._background_thread.daemon = True # 设置为守护线程 self._background_thread.start() return True def stop_background_reading(self): """ 停止后台读取任务 """ if self._background_thread is None or not self._background_thread.is_alive(): print("No background reading is running") return False self._stop_background = True self._background_thread.join(timeout=2.0) # 等待线程结束,最多等待2秒 self._background_thread = None return True def _background_reading_task(self): """ 后台读取任务的实现 """ while not self._stop_background: try: data = self.read_data_f32() if isinstance(data, np.ndarray): with self._reading_lock: self._last_reading = data else: self.connect() except Exception as e: print(f"Error in background reading: {e}") time.sleep(1.0) # 每秒读取一次 def get_last_reading(self): """ 获取最近一次的读数 """ with self._reading_lock: return self._last_reading if __name__ == "__main__": import sys import pathlib sys.path.append(str(pathlib.Path(__file__).parent.parent)) from tools.Rate import Rate import signal import sys rate = Rate(250) xjc_sensor = XjcSensor() def signal_handler(sig, frame): print('Received Ctrl+C, exiting gracefully...') sys.exit(0) signal.signal(signal.SIGINT, lambda signum, frame: sys.exit(0)) # atexit.register(xjc_sensor.disconnect) time.sleep(1) xjc_sensor.disable_active_transmission() xjc_sensor.disable_active_transmission() xjc_sensor.disable_active_transmission() print(xjc_sensor.read_data_f32()) print(xjc_sensor.read_data_f32()) print(xjc_sensor.read_data_f32()) # print(xjc_sensor.read()) time.sleep(1) xjc_sensor.set_zero() xjc_sensor.set_zero() xjc_sensor.set_zero() time.sleep(1) xjc_sensor.enable_active_transmission() while True: sensor_data = xjc_sensor.read() # sensor_data = xjc_sensor.read_data_f32() current_time = time.strftime("%Y-%m-%d_%H-%M-%S") if sensor_data is None: print('failed to get force sensor data!') print(f"{current_time}-----{sensor_data}") rate.sleep(False) # break