2025-05-28 21:13:54 +08:00

439 lines
16 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import struct
import serial
import serial.tools.list_ports
import numpy as np
import atexit
import threading
import time
import yaml
import subprocess
import psutil
import sys
import os
sys.path.append(os.path.abspath(os.path.join(os.getcwd(), "Massage/MassageControl")))
class XjcSensor:
def __init__(self, port=None, baudrate=115200, rate=250):
self.port = '/dev/ttyUSB0'
self.baudrate = baudrate
self.rate = rate
self.crc16_table = self.generate_crc16_table()
self.slave_adress = 0x01
self.ser = None
# 后台读取相关的属性
self._background_thread = None
self._stop_background = False
self._last_reading = None
self._reading_lock = threading.Lock()
if not self.port:
self.port = self.find_sensor_port()
try:
self.connect()
atexit.register(self.disconnect)
except KeyboardInterrupt:
print("Process interrupted by user")
self.disconnect()
def __new__(cls, *args, **kwargs):
"""
Making this class singleton
"""
if not hasattr(cls, 'instance'):
cls.instance = super(XjcSensor, cls).__new__(cls)
return cls.instance
def connect(self):
self.ser = serial.Serial(self.port, self.baudrate,timeout=0.1)
def find_sensor_port(self):
"""寻找对应六维力传感器的串口"""
try:
# 目标 YAML 文件路径
yaml_file_path = '/home/jsfb/jsfb_ws/global_config/force_sensor.yaml'
# 读取现有的 YAML 文件内容
with open(yaml_file_path, 'r') as file:
data = yaml.safe_load(file) # 读取并解析 YAML 内容
# 获取 target_usb_device_path 的值
target_usb_device_path = data.get('target_usb_device_path')
print(f"已成功将 target_usb_device_path 添加到 {yaml_file_path}")
ports = serial.tools.list_ports.comports()
for port in ports:
print(port.usb_device_path)
if target_usb_device_path == port.usb_device_path:
print(f"找到六维力传感器在端口: {port.usb_device_path}")
return port.device
print("未找到具有指定描述的六维力传感器。")
except Exception as e:
print(f"查找串口时发生错误: {e}")
return None
@staticmethod
def generate_crc16_table():
table = []
polynomial = 0xA001
for i in range(256):
crc = i
for _ in range(8):
if crc & 0x0001:
crc = (crc >> 1) ^ polynomial
else:
crc >>= 1
table.append(crc)
return table
def crc16(self,data: bytes):
crc = 0xFFFF
for byte in data:
crc = (crc >> 8) ^ self.crc16_table[(crc ^ byte) & 0xFF]
return (crc >> 8) & 0xFF, crc & 0xFF
def set_zero(self):
"""
传感器置零,重新上电之后需要置零
"""
# 确保后台读取已停止
was_reading = False
if self._background_thread is not None and self._background_thread.is_alive():
was_reading = True
self.stop_background_reading()
try:
# 重置输入输出缓冲区
self.ser.reset_input_buffer()
self.ser.reset_output_buffer()
# 清除串口缓冲区中的数据
if self.ser.in_waiting > 0:
self.ser.read(self.ser.in_waiting)
# 根据地址设置置零命令
if self.slave_adress == 0x01:
zero_cmd = bytes.fromhex('01 10 46 1C 00 02 04 00 00 00 00 E8 95') # 传感器内置通信协议--清零传感器数据
elif self.slave_adress == 0x09:
zero_cmd = bytes.fromhex('09 10 46 1C 00 02 04 00 00 00 00 C2 F5')
# 发送置零命令
self.ser.write(zero_cmd)
response = bytearray(self.ser.read(8))
print(f"set zero response: {response}")
# CRC 校验
Hi_check, Lo_check = self.crc16(response[:-2])
if response[0] != self.slave_adress or response[1] != 0x10 or \
response[-1] != Hi_check or response[-2] != Lo_check:
print("set zero failed!")
return -1
print("set zero success!")
result = 0
except serial.SerialException as e:
print(f"Serial communication error: {e}")
print(f"Details - Address: {self.slave_adress}, Port: {self.ser.port}")
result = -1
except Exception as e:
print(f"Unexpected error: {e}")
result = -1
# 如果之前在进行后台读取,恢复它
if was_reading:
self.start_background_reading()
return result
# 可能比较慢
def read_data_f32(self):
self.ser.reset_input_buffer() # 清除输入缓冲区
self.ser.reset_output_buffer() # 清除输出缓冲区
if self.ser.in_waiting > 0:
self.ser.read(self.ser.in_waiting)
if self.slave_adress == 0x01:
# command = bytes.fromhex('01 04 00 54 00 0C B1 DF') # (旧版)传感器内置通信协议--读取一次六维力数据
command = bytes.fromhex('01 04 00 00 00 0C F0 0F') # (新版)传感器内置通信协议--读取一次六维力数据
elif self.slave_adress == 0x09:
command = bytes.fromhex('09 04 00 54 00 0C B0 97') # 传感器内置通信协议--读取一次六维力数据
try:
self.ser.write(command)
response = bytearray(self.ser.read(29))
# print(response)
Hi_check, Lo_check = self.crc16(response[:-2])
if response[0] != self.slave_adress or response[1] != 0x04 or \
response[-1] != Hi_check or response[-2] != Lo_check or response[2] != 24:
print("read data failed!")
return -1
sensor_data = struct.unpack('>ffffff', response[3:27])
except serial.SerialException as e:
print(f"Serial communication error: {e}")
return -1
except Exception as e:
print(f"Unexpected error: {e}")
return -1
return np.array(sensor_data)
def enable_active_transmission(self):
"""
Activate the sensor's active transmission mode
"""
# 确保后台读取已停止
was_reading = False
if self._background_thread is not None and self._background_thread.is_alive():
was_reading = True
self.stop_background_reading()
try:
# 根据不同的速率设置命令
if self.rate == 100:
if self.slave_adress == 0x01:
rate_cmd = bytes.fromhex('01 10 01 9A 00 01 02 00 00 AB 6A') # 传感器内置通信协议--100Hz
elif self.slave_adress == 0x09:
rate_cmd = bytes.fromhex('09 10 01 9A 00 01 02 00 00 CC AA')
print("Set mode in 100Hz")
elif self.rate == 250:
if self.slave_adress == 0x01:
rate_cmd = bytes.fromhex('01 10 01 9A 00 01 02 00 01 6A AA')
elif self.slave_adress == 0x09:
rate_cmd = bytes.fromhex('09 10 01 9A 00 01 02 00 01 0D 6A')
print("Set mode in 250Hz")
elif self.rate == 500:
if self.slave_adress == 0x01:
rate_cmd = bytes.fromhex('01 10 01 9A 00 01 02 00 02 2A AB') # 传感器内置通信协议--500Hz
elif self.slave_adress == 0x09:
rate_cmd = bytes.fromhex('09 10 01 9A 00 01 02 00 02 4D 6B')
print("Set mode in 500Hz")
else:
print("Rate not supported")
result = -1
return result
# 检查串口是否打开
if not self.ser.is_open:
raise serial.SerialException("Serial port is not open")
# 重置输入缓冲区
self.ser.reset_input_buffer()
# 写入指令
self.ser.write(rate_cmd)
print("Transmission command sent successfully")
result = 0
except serial.SerialException as e:
print(f"Serial communication error: {e}")
print(f"Details - Rate: {self.rate}, Address: {self.slave_adress}, Port: {self.ser.port}")
result = -1
except Exception as e:
print(f"Unexpected error: {e}")
result = -1
# 如果之前在进行后台读取,恢复它
if was_reading:
self.start_background_reading()
return result
def disable_active_transmission(self):
"""
Disable the sensor's active transmission mode
"""
try:
# 禁用传感器活动传输模式的命令
rate_cmd = bytes.fromhex('FF FF FF FF FF FF FF FF FF FF FF')
print("Disable the sensor's active transmission mode")
# 重置输入缓冲区
if not self.ser.is_open:
raise serial.SerialException("Serial port is not open")
self.ser.reset_input_buffer()
# 发送禁用传输模式的命令
self.ser.write(rate_cmd)
print("Transmission disabled successfully")
return 0
except serial.SerialException as e:
print(f"Serial communication error: {e}")
print(f"Details - Port: {self.ser.port}")
return -1
except Exception as e:
print(f"Unexpected error: {e}")
return -1
def read(self):
"""
Read the sensor's data.
"""
try:
if self.ser.in_waiting > 0:
self.ser.read(self.ser.in_waiting)
while True:
if int.from_bytes(self.ser.read(1), byteorder='big') == 0x20:
if int.from_bytes(self.ser.read(1), byteorder='big') == 0x4E:
break
response = bytearray([0x20, 0x4E]) # 使用bytearray创建16进制数据的列表
response.extend(self.ser.read(14)) # 读取12个字节的数据并添加到列表中
Hi_check, Lo_check = self.crc16(response[:-2])
if response[-1] != Hi_check or response[-2] != Lo_check:
print("check failed!")
return None
sensor_data = self.parse_data_passive(response)
return sensor_data
except serial.SerialException as e:
print(f"Serial communication error: {e}")
return None
def parse_data_passive(self, buffer):
values = [
int.from_bytes(buffer[i:i+2], byteorder='little', signed=True)
for i in range(2, 14, 2)
]
Fx, Fy, Fz = np.array(values[:3]) / 10.0
Mx, My, Mz = np.array(values[3:]) / 1000.0
return np.array([Fx, Fy, Fz, Mx, My, Mz])
def disconnect(self):
"""
完全断开传感器连接并清理相关资源
"""
# 首先停止后台读取任务
if hasattr(self, '_background_thread') and self._background_thread is not None:
self.stop_background_reading()
try:
# 1. 尝试禁用传输
for _ in range(3):
try:
send_str = "FF " * 50
send_str = send_str[:-1]
rate_cmd = bytes.fromhex(send_str)
if hasattr(self, 'ser') and self.ser.is_open:
self.ser.write(rate_cmd)
time.sleep(0.1)
except Exception as e:
print(f"禁用传输失败: {e}")
# 2. 关闭并清理串口
if hasattr(self, 'ser'):
try:
if self.ser.is_open:
self.ser.flush() # 清空缓冲区
self.ser.reset_input_buffer() # 清空输入缓冲区
self.ser.reset_output_buffer() # 清空输出缓冲区
self.ser.close()
del self.ser # 删除串口对象
except Exception as e:
print(f"关闭串口失败: {e}")
except Exception as e:
print(f"断开连接过程中发生错误: {e}")
finally:
# 确保清理所有属性
if hasattr(self, 'ser'):
del self.ser
def __del__(self):
self.disconnect()
def start_background_reading(self):
"""
启动后台读取任务,每秒读取一次传感器数据
"""
if self._background_thread is not None and self._background_thread.is_alive():
print("Background reading is already running")
return False
self._stop_background = False
self._background_thread = threading.Thread(target=self._background_reading_task)
self._background_thread.daemon = True # 设置为守护线程
self._background_thread.start()
return True
def stop_background_reading(self):
"""
停止后台读取任务
"""
if self._background_thread is None or not self._background_thread.is_alive():
print("No background reading is running")
return False
self._stop_background = True
self._background_thread.join(timeout=2.0) # 等待线程结束最多等待2秒
self._background_thread = None
return True
def _background_reading_task(self):
"""
后台读取任务的实现
"""
while not self._stop_background:
try:
data = self.read_data_f32()
if isinstance(data, np.ndarray):
with self._reading_lock:
self._last_reading = data
else:
self.connect()
except Exception as e:
print(f"Error in background reading: {e}")
time.sleep(1.0) # 每秒读取一次
def get_last_reading(self):
"""
获取最近一次的读数
"""
with self._reading_lock:
return self._last_reading
if __name__ == "__main__":
import sys
import pathlib
sys.path.append(str(pathlib.Path(__file__).parent.parent))
from tools.Rate import Rate
import signal
import sys
rate = Rate(250)
xjc_sensor = XjcSensor()
def signal_handler(sig, frame):
print('Received Ctrl+C, exiting gracefully...')
sys.exit(0)
signal.signal(signal.SIGINT, lambda signum, frame: sys.exit(0))
# atexit.register(xjc_sensor.disconnect)
time.sleep(1)
xjc_sensor.disable_active_transmission()
xjc_sensor.disable_active_transmission()
xjc_sensor.disable_active_transmission()
print(xjc_sensor.read_data_f32())
print(xjc_sensor.read_data_f32())
print(xjc_sensor.read_data_f32())
# print(xjc_sensor.read())
time.sleep(1)
xjc_sensor.set_zero()
xjc_sensor.set_zero()
xjc_sensor.set_zero()
# time.sleep(1)
# xjc_sensor.enable_active_transmission()
while True:
# sensor_data = xjc_sensor.read()
sensor_data = xjc_sensor.read_data_f32()
current_time = time.strftime("%Y-%m-%d_%H-%M-%S")
if sensor_data is None:
print('failed to get force sensor data!')
print(f"{current_time}-----{sensor_data}")
rate.sleep(False)
# break