力控部分开发完成,后续调整底层数学单位后,可开发上层

This commit is contained in:
Ziwei.He 2025-05-19 10:42:57 +08:00
parent 2064831435
commit b1322495b0
16 changed files with 9233 additions and 568 deletions

View File

@ -36,7 +36,7 @@ class MassageRobot:
self.arm = dobot_nova5(self.arm_config['arm_ip'])
self.arm.start()
# 传感器
self.force_sensor = XjcSensor(self.arm_config['arm_ip'],60000)
self.force_sensor = XjcSensor()
self.force_sensor.connect()
# 控制器初始化(初始化为导纳控制)
self.controller_manager = ControllerManager(self.arm_state)
@ -135,6 +135,7 @@ class MassageRobot:
b_rotation_s = R.from_quat(self.arm_state.arm_orientation).as_matrix()
# 读取数据
sensor_data = self.force_sensor.read()
# self.logger.log_info(f"传感器数据{sensor_data}")
if sensor_data is None:
self.force_sensor.stop_background_reading()
self.logger.log_error("传感器数据读取失败")
@ -153,20 +154,6 @@ class MassageRobot:
wrench = self.wrench_coordinate_conversion(self.s_tf_matrix_t,wrench)
# 交给ARM STATE集中管理
self.arm_state.external_wrench_tcp = wrench
# self.arm_state.external_wrench_base = np.concatenate([b_rotation_s @ self.arm_state.external_wrench_tcp[:3],
# b_rotation_s @ self.arm_state.external_wrench_tcp[3:]])
# 卡尔曼滤波
# x_base_predict, P_base_predict = self.kalman_predict(x = self.x_base,
# P = self.P_base,
# Q = self.Q_base)
# self.x_base, self.P_base = self.kalman_update(x_predict = x_base_predict,
# P_predict = P_base_predict,
# z = self.arm_state.external_wrench_base,
# R = self.R_base)
# self.arm_state.external_wrench_base = self.x_base
# self.arm_state.last_external_wrench_base = self.arm_state.external_wrench_base
# 对tcp坐标系下的外力外矩进行平滑
x_tcp_predict, P_tcp_predict = self.kalman_predict(x = self.x_tcp,
P = self.P_tcp,
@ -179,6 +166,70 @@ class MassageRobot:
self.arm_state.last_external_wrench_tcp = self.arm_state.external_wrench_tcp
return 0
# def arm_measure_loop(self):
# self.logger.log_info("机械臂测量线程启动")
# while (not self.arm.is_exit) and (not self.exit_event.is_set()):
# try:
# if not self.is_waitting:
# self.arm_state.arm_position,self.arm_state.arm_orientation = self.arm.get_arm_position()
# code = self.update_wrench()
# self.logger.log_blue(f"measure position: {self.arm_state.arm_position}")
# self.logger.log_blue(f"measure orientation: {self.arm_state.arm_orientation}")
# self.logger.log_blue(f"measure update wrench:{self.arm_state.external_wrench_tcp}")
# if code == -1:
# self.sensor_fail_count += 1
# self.logger.log_error(f"传感器线程数据读取失败-{self.sensor_fail_count}")
# if self.sensor_fail_count > 10:
# self.logger.log_error("传感器线程数据读取失败超过10次,程序终止")
# self.stop()
# break
# else:
# self.sensor_fail_count = 0
# except Exception as e:
# self.logger.log_error(f"机械臂测量线程报错:{e}")
# self.exit_event.set() # 控制退出while
# self.sensor_rate.precise_sleep() # 控制频率
# def arm_command_loop(self):
# self.logger.log_info("机械臂控制线程启动")
# self.arm.filter_matirx = np.zeros((8,7)) # 滑动窗口
# while (not self.arm.is_exit) and (not self.exit_event.is_set()):
# try:
# if not self.is_waitting:
# process_start_time = time.time()
# self.controller_manager.step(self.control_rate.to_sec())
# status, joints_processed = self.arm.process_command(self.arm_state.arm_position_command,
# self.arm_state.arm_orientation_command)
# self.logger.log_blue(f"处理后的关节角度: {joints_processed}")
# run_time = time.time() - process_start_time
# sleep_duration = self.control_rate.to_sec() - run_time
# delta_command = np.linalg.norm(self.arm_state.arm_position_command - self.arm_state.arm_position * 1000)
# self.logger.log_blue(f"期望位置:{self.arm_state.desired_position}")
# self.logger.log_blue(f"现在的位置:{self.arm_state.arm_position}")
# self.logger.log_blue(f"指令位置:{self.arm_state.arm_position_command}")
# self.logger.log_blue(f"位置变化量: {delta_command}")
# if delta_command > 20: # 20 mm
# self.logger.log_error("机械臂位置变化过大,可能是数据异常")
# break
# if sleep_duration > 0:
# libc.usleep(int(sleep_duration * 1e6))
# if status == 0:
# # self.logger.log_blue("不进行伺服运动,只计算")
# self.logger.log_info("进行伺服运动")
# code = self.arm.ServoJoint(joints_processed,t=0.008,aheadtime=20,gain=200)
# else:
# self.logger.log_error("滑动窗口数据处理失败")
# self.exit_event.set()
# if code == -1:
# self.logger.log_error("机械臂急停")
# # self.stop() # 底层已经做了急停处理
# break
# except Exception as e:
# self.logger.log_error(f"机械臂控制失败:{e}")
# self.exit_event.set()
####################### TEST ##########################3
def arm_measure_loop(self):
self.logger.log_info("机械臂测量线程启动")
while (not self.arm.is_exit) and (not self.exit_event.is_set()):
@ -186,9 +237,9 @@ class MassageRobot:
if not self.is_waitting:
self.arm_state.arm_position,self.arm_state.arm_orientation = self.arm.get_arm_position()
code = self.update_wrench()
self.logger.log_blue(f"measure position: {self.arm_state.arm_position}")
self.logger.log_blue(f"measure orientation: {self.arm_state.arm_orientation}")
self.logger.log_blue(f"measure update wrench:{self.arm_state.external_wrench_tcp}")
# self.logger.log_blue(f"measure position: {self.arm_state.arm_position}")
# self.logger.log_blue(f"measure orientation: {self.arm_state.arm_orientation}")
# self.logger.log_blue(f"measure update wrench:{self.arm_state.external_wrench_tcp}")
if code == -1:
self.sensor_fail_count += 1
self.logger.log_error(f"传感器线程数据读取失败-{self.sensor_fail_count}")
@ -201,38 +252,47 @@ class MassageRobot:
except Exception as e:
self.logger.log_error(f"机械臂测量线程报错:{e}")
self.exit_event.set() # 控制退出while
self.sensor_rate.precise_sleep() # 控制频率
self.sensor_rate.sleep() # 控制频率
def arm_command_loop(self):
self.logger.log_info("机械臂控制线程启动")
self.arm.filter_matirx = np.zeros((8,7)) # 滑动窗口
# self.arm.filter_matirx = np.zeros((8,7)) # 滑动窗口 ## 暂时禁用滑动窗口
while (not self.arm.is_exit) and (not self.exit_event.is_set()):
try:
if not self.is_waitting:
process_start_time = time.time()
self.controller_manager.step(self.control_rate.to_sec())
status, joints_processed = self.arm.process_command(self.arm_state.arm_position_command,
self.arm_state.arm_orientation_command)
self.logger.log_blue(f"处理后的关节角度: {joints_processed}")
# self.logger.log_blue(f"目标位姿: {self.arm_state.arm_position_command},{self.arm_state.arm_orientation_command}")
run_time = time.time() - process_start_time
sleep_duration = self.control_rate.to_sec() - run_time
delta_command = np.linalg.norm(self.arm_state.arm_position_command - self.arm_state.arm_position * 1000)
self.logger.log_blue(f"期望位置:{self.arm_state.desired_position}")
self.logger.log_blue(f"现在的位置:{self.arm_state.arm_position}")
self.logger.log_blue(f"指令位置:{self.arm_state.arm_position_command}")
self.logger.log_blue(f"位置变化量: {delta_command}")
if delta_command > 20: # 20 mm
delta_command = np.linalg.norm(self.arm_state.arm_position_command - self.arm_state.arm_position)
# self.logger.log_blue(f"期望位置:{self.arm_state.desired_position}")
# self.logger.log_blue(f"现在的位置:{self.arm_state.arm_position}")
# self.logger.log_blue(f"指令位置:{self.arm_state.arm_position_command}")
# self.logger.log_blue(f"位置变化量: {delta_command}")
if delta_command > 0.1: # m
self.logger.log_error("机械臂位置变化过大,可能是数据异常")
break
if sleep_duration > 0:
libc.usleep(int(sleep_duration * 1e6))
if status == 0:
# self.logger.log_blue("不进行伺服运动,只计算")
self.logger.log_info("进行伺服运动")
code = self.arm.ServoJoint(joints_processed,t=0.008,aheadtime=20,gain=200)
else:
self.logger.log_error("滑动窗口数据处理失败")
self.exit_event.set()
# self.logger.log_blue(f"输入位置:{self.arm_state.arm_position_command}")
# self.logger.log_blue(f"输入姿态:{self.arm_state.arm_orientation_command}")
command_pos_in_mm = self.arm_state.arm_position_command * 1000
command_ori_in_degree = R.from_quat(self.arm_state.arm_orientation_command).as_euler('xyz',degrees=True)
combined = np.concatenate([command_pos_in_mm, command_ori_in_degree]) # 合并成一个数组
# self.logger.log_blue(f"合并后的位姿: {combined.tolist()}")
# self.logger.log_info("进行伺服运动")
# 数据记录
# self.logger.log_info("desired_pos",self.arm_state.desired_position)
# self.logger.log_info("desired_ori",self.arm_state.desired_orientation)
# self.logger.log_info("command_pos",self.arm_state.arm_position_command)
# self.logger.log_info("command_ori",self.arm_state.arm_orientation_command)
code = self.arm.ServoPose(combined)
# else:
# self.logger.log_error("滑动窗口数据处理失败")
# self.exit_event.set()
if code == -1:
self.logger.log_error("机械臂急停")
# self.stop() # 底层已经做了急停处理
@ -240,6 +300,11 @@ class MassageRobot:
except Exception as e:
self.logger.log_error(f"机械臂控制失败:{e}")
self.exit_event.set()
def start(self):
if self.exit_event.is_set():
# 线程
@ -333,7 +398,7 @@ class MassageRobot:
def get_tf_matrix(self,position,orientation):
tf_matrix = np.eye(4)
rotation_matrix = R.from_quat(orientation).as_matrix()
tf_matrix[:3,3] = position
tf_matrix[:3,3] = position/1000
tf_matrix[:3,:3] = rotation_matrix
return tf_matrix
def wrench_coordinate_conversion(self,tf_matrix, wrench):
@ -375,4 +440,5 @@ if __name__ == "__main__":
print("Exception occurred at line:", e.__traceback__.tb_lineno)
print("发生异常:", e)
# robot.arm.disableRobot()

View File

@ -6,8 +6,8 @@ from .arm_state import ArmState
from pathlib import Path
sys.path.append(str(Path(__file__).resolve().parent.parent))
import time
from tools.yaml_operator import read_yaml
import random
class AdmittanceController(BaseController):
def __init__(self, name, state:ArmState,config_path) -> None:
super().__init__(name, state)
@ -57,7 +57,7 @@ class AdmittanceController(BaseController):
#wrench_err = self.state.external_wrench_base - self.state.desired_wrench
wrench_err = self.state.external_wrench_tcp - self.state.desired_wrench
# if time.time() - self.laset_print_time > 5:
# if time.time() - self.laset_print_time > 1:
# print(f'wrench_err: {wrench_err} ||| external_wrench_tcp: {self.state.external_wrench_tcp} ||| desired_wrench: {self.state.desired_wrench}')
self.state.arm_desired_acc = np.linalg.inv(self.M) @ (wrench_err - self.D @ (self.state.arm_desired_twist -self.state.desired_twist) - self.K @ self.state.pose_error)
# if time.time() - self.laset_print_time > 5:
@ -67,8 +67,8 @@ class AdmittanceController(BaseController):
self.clip_command(self.state.arm_desired_twist,"vel")
delta_pose = self.state.arm_desired_twist * dt
delta_pose[:3] = self.pos_scale_factor * delta_pose[:3]
delta_pose[3:] = self.rot_scale_factor * delta_pose[3:]
# delta_pose[:3] = self.pos_scale_factor * delta_pose[:3]
# delta_pose[3:] = self.rot_scale_factor * delta_pose[3:]
# if time.time() - self.laset_print_time > 5:
# print("delta_pose:",delta_pose)
@ -79,32 +79,33 @@ class AdmittanceController(BaseController):
self.clip_command(delta_pose,"pose")
# testlsy
delta_ori_mat = R.from_rotvec(delta_pose[3:]).as_matrix()
# delta_ori_mat = R.from_rotvec(delta_pose[3:]).as_matrix()
#arm_ori_mat = delta_ori_mat @ R.from_quat(self.state.arm_orientation).as_matrix()
random_array = np.random.rand(3) # 生成长度为 3 的数组
delta_ori_mat = R.from_rotvec(random_array/10000).as_matrix()
arm_ori_mat = delta_ori_mat @ R.from_quat(self.state.arm_orientation).as_matrix()
arm_ori_mat = R.from_quat(self.state.arm_orientation).as_matrix() @ delta_ori_mat
# self.state.arm_orientation_command = R.from_matrix(arm_ori_mat).as_quat()
self.state.arm_orientation_command = R.from_matrix(arm_ori_mat).as_quat()
# 归一化四元数
self.state.arm_orientation_command /= np.linalg.norm(self.state.arm_orientation_command)
# arm_ori_mat = R.from_quat(self.state.arm_orientation).as_rotvec(degrees=False) + delta_pose[3:]
# self.state.arm_orientation_command = R.from_rotvec(arm_ori_mat).as_quat()
# self.state.arm_orientation_command = R.from_matrix(arm_ori_mat).as_quat()
self.state.arm_position_command = self.state.arm_position + delta_pose[:3]
if time.time() - self.laset_print_time > 0.1:
print("-------------admittance_1-------------------------------")
print("arm_position:",self.state.arm_position)
print("desired_position:",self.state.desired_position)
print("arm_orientation",R.from_quat(self.state.arm_orientation).as_euler('xyz',degrees=True))
print("desired_orientation",R.from_quat(self.state.desired_orientation).as_euler('xyz',degrees=True))
print("arm_position_command",self.state.arm_position_command)
print("arm_orientation_command",R.from_quat(self.state.arm_orientation_command).as_euler('xyz',degrees=True))
if time.time() - self.laset_print_time > 2:
# print("-------------admittance_1-------------------------------")
# print("arm_position:",self.state.arm_position)
# print("desired_position:",self.state.desired_position)
print(f'wrench_err: {wrench_err} ||| external_wrench_tcp: {self.state.external_wrench_tcp} ||| desired_wrench: {self.state.desired_wrench}')
print("self.state.arm_desired_acc:",self.state.arm_desired_acc)
# print("arm_orientation",R.from_quat(self.state.arm_orientation).as_euler('xyz',degrees=True))
# print("desired_orientation",R.from_quat(self.state.desired_orientation).as_euler('xyz',degrees=True))
# print("arm_position_command",self.state.arm_position_command)
# print("arm_orientation_command",R.from_quat(self.state.arm_orientation_command).as_euler('xyz',degrees=True))
print("delta_pose:",delta_pose)
print("delta_ori_mat",delta_ori_mat)
# print("delta_ori_mat",delta_ori_mat)
self.laset_print_time = time.time()

View File

@ -53,14 +53,14 @@ rot_scale_factor: 1
mass_tran: [2.0, 2.0, 2.0]
mass_rot: [0.2, 0.2, 0.2]
stiff_tran: [400, 400, 400]
stiff_rot: [4, 4, 4]
mass_tran: [9.0, 9.0, 9.0]
mass_rot: [0.8, 0.8, 0.8]
stiff_tran: [0, 0, 0]
stiff_rot: [7, 7, 7]
# stiff_tran: [100, 100, 100]
# stiff_rot: [1, 1, 1]
damp_tran: [20, 20, 20]
damp_rot: [0.6, 0.6, 0.6]
damp_tran: [100, 100, 100]
damp_rot: [1, 1, 1]
# damp_tran: [40, 40, 40]
# damp_rot: [3, 3, 3]

View File

@ -8,6 +8,6 @@ controller: ['Massage/MassageControl/config/admittance.yaml',
# massage heads diretory
massage_head_dir: 'Massage/MassageControl/config/massage_head'
control_rate: 120
sensor_rate: 120
command_rate: 120
control_rate: 100
sensor_rate: 100
command_rate: 100

View File

@ -51,7 +51,14 @@ class dobot_nova5:
self.MessageSize = -1
self.DigitalInputs = -1
self.DigitalOutputs = -1
self.QActual = -1
self.ActualQuaternion = -1
self.ToolVectorActual = -1
# ... 自定义反馈数据
'''
self.UserValue = -1
self.ToolValue = -1
'''
self.feedbackData = feedbackItem() # 自定义反馈数据结构体
def parseResultId(self,valueRecv):
@ -74,11 +81,16 @@ class dobot_nova5:
self.feedbackData.DigitalInputs = feedInfo['DigitalInputs'][0]
self.feedbackData.DigitalOutputs = feedInfo['DigitalOutputs'][0]
self.feedbackData.robotCurrentCommandID = feedInfo['CurrentCommandId'][0]
self.feedbackData.QActual = feedInfo['QActual'][0]
self.feedbackData.ActualQuaternion = feedInfo['ActualQuaternion'][0]
self.feedbackData.ToolVectorActual = feedInfo['ToolVectorActual'][0]
# 自定义添加所需反馈数据
'''
self.feedData.DigitalOutputs = int(feedInfo['DigitalOutputs'][0])
self.feedData.RobotMode = int(feedInfo['RobotMode'][0])
self.feedData.TimeStamp = int(feedInfo['TimeStamp'][0])
self.feedbackData.UserValue = feedInfo['UserValue[6]'][0]
self.feedbackData.ToolValue = feedInfo['ToolValue[6]'][0]
'''
def start(self):
@ -119,7 +131,7 @@ class dobot_nova5:
currentCommandID = self.parseResultId(self.dashboard.GetCurrentCommandID())[1]
if self.feedbackData.robotMode == 9 or self.feedbackData.robotMode == 11:
self.dashboard.Stop()
self.dashboard.EmergencyStop()
self.dashboard.EmergencyStop(mode=1)
print("当前工作停止,机械臂处于急停状态")
if self.feedbackData.robotMode == 10: # 10 当前工程暂停
print("机械臂为暂停状态")
@ -184,7 +196,7 @@ class dobot_nova5:
currentCommandID = self.parseResultId(self.dashboard.GetCurrentCommandID())[1]
if self.feedbackData.robotMode == 9 or self.feedbackData.robotMode == 11:
self.dashboard.Stop()
self.dashboard.EmergencyStop()
self.dashboard.EmergencyStop(mode=1)
print("当前工作停止,机械臂处于急停状态")
if self.feedbackData.robotMode == 10: # 10 当前工程暂停
print("机械臂为暂停状态")
@ -213,7 +225,7 @@ class dobot_nova5:
currentCommandID = self.parseResultId(self.dashboard.GetCurrentCommandID())[1]
if self.feedbackData.robotMode == 9 or self.feedbackData.robotMode == 11:
self.dashboard.Stop()
self.dashboard.EmergencyStop()
self.dashboard.EmergencyStop(mode=1)
print("当前工作停止,机械臂处于急停状态")
if self.feedbackData.robotMode == 10: # 10 当前工程暂停
print("机械臂为暂停状态")
@ -233,7 +245,7 @@ class dobot_nova5:
print("ServoJ:",tcp_command)
if self.feedbackData.robotMode == 9 or self.feedbackData.robotMode == 11:
self.dashboard.Stop()
self.dashboard.EmergencyStop()
self.dashboard.EmergencyStop(mode=1)
print("当前工作停止,机械臂处于急停状态")
return -1
if self.feedbackData.robotMode == 10: # 10 当前工程暂停
@ -244,14 +256,14 @@ class dobot_nova5:
print("机械臂继续已暂停的运动指令")
return 0
def ServoPose(self,poses_list,t=-1,aheadtime=-1,gain=-1):
def ServoPose(self,poses_list):
poses_list = list(map(float, poses_list))
tcp_command = f"ServoP({poses_list[0]},{poses_list[1]},{poses_list[2]},{poses_list[3]},{poses_list[4]},{poses_list[5]}, t={t}, aheadtime={aheadtime}, gain={gain})"
self.dashboard.send_data(tcp_command) # 通过反馈状态来进行异常处理,故这里只需要发,不需要收
print("ServoP:",tcp_command)
tcp_command = f"ServoP({poses_list[0]},{poses_list[1]},{poses_list[2]},{poses_list[3]},{poses_list[4]},{poses_list[5]}, t=0.01, aheadtime=20, gain=200)"
res = self.dashboard.sendRecvMsg(tcp_command) # 通过反馈状态来进行异常处理,故这里只需要发,不需要收
print("ServoP msg:",res)
if self.feedbackData.robotMode == 9 or self.feedbackData.robotMode == 11:
self.dashboard.Stop()
self.dashboard.EmergencyStop()
self.dashboard.EmergencyStop(mode=1)
print("当前工作停止,机械臂处于急停状态")
if self.feedbackData.robotMode == 10: # 10 当前工程暂停
print("机械臂为暂停状态")
@ -338,7 +350,7 @@ class dobot_nova5:
if (user == -1) != (tool == -1): ## XOR 必须同时设置或者同时不设置
print("必须同时设置或同时不设置坐标系参数")
res = self.dashboard.GetPose(user,tool) # 返回字符串
# print("get pose",res)
print("get pose",res)
code = self.parseResultId(res)[0]
if code == 0:
pass
@ -458,6 +470,16 @@ class dobot_nova5:
del self.dashboard
del self.feedFour
## 为反馈线程封装
def Get_feedInfo_angle(self):
# print("反馈当前角度",self.feedbackData.QActual)
return self.feedbackData.QActual
def Get_feedInfo_pose(self):
# print("反馈当前TCP位置",self.feedbackData.ToolVectorActual)
return self.feedbackData.ToolVectorActual
## 适配中间层的再封装接口
# 为状态管理而封装的初始化函数
@ -480,7 +502,8 @@ class dobot_nova5:
def get_arm_position(self):
pose = self.getPose()
pose = self.Get_feedInfo_pose()
# print("pose",pose)
x,y,z,rx,ry,rz = pose # x y z in mm; rx ry rz in degree
x /= 1000
y /= 1000
@ -497,10 +520,9 @@ class dobot_nova5:
print("pose_command",pose_command)
if (not self.is_exit) and (not self.is_standby):
cur = self.getAngel() # 参考关节角度
target = self.getInverseKin(pose_command,-1,-1,1,"{" + ",".join(map(str, cur)) + "}")
# print("target",target)
self.joint_current = copy.deepcopy(cur)
self.joint_current = cur
self.joint_target = target
ress = np.array(target)
ress = np.append(ress,[1])
@ -520,13 +542,23 @@ if __name__ == "__main__":
posJ = [0,30,-120,0,90,0]
dobot.RunPoint_P_inJoint(posJ)
sleep(1)
dobot.setEndEffector(i=9,tool_i="{0.0,0.0,154.071,0.0,0.0,0.0}")
dobot.chooseEndEffector(i=9)
print(dobot.getPose())
# dobot.setEndEffector(i=9,tool_i="{0.0,0.0,154.071,0.0,0.0,0.0}")
# dobot.chooseEndEffector(i=9)
# print(dobot.getPose())
# print(dobot.get_arm_position())
# pos,rot_quat = dobot.get_arm_position()
# print("pos",pos)
# print("rot_quat",rot_quat)
# while(True):
# print("反馈角度",dobot.feedbackData.QActual)
# print("反馈四元数",dobot.feedbackData.ActualQuaternion)
# print("反馈TCP笛卡尔实际坐标值",dobot.feedbackData.ToolVectorActual)
# sleep(0.008)
dobot.Get_feedInfo_angle()
sleep(0.008)
dobot.Get_feedInfo_pose()
sleep(0.5)
# for i in range(400):
# posJ[2] += 0.04

View File

@ -1,189 +1,323 @@
import time
import socket
import struct
import serial
import numpy as np
import atexit
import threading
from typing import Optional, Tuple
from matplotlib import pyplot as plt
import csv
from datetime import datetime
import time
import subprocess
import psutil
import sys
import os
sys.path.append(os.path.abspath(os.path.join(os.getcwd(), "Massage/MassageControl")))
class XjcSensor:
def __init__(self, host: str, port: int, rate: int = 250):
"""
初始化 TCP 连接的力传感器
:param host: 设备 IP 地址 "192.168.1.100"
:param port: 设备端口 502
:param rate: 数据采样率100/250/500Hz
"""
self.host = host
def __init__(self, port="/dev/ttyUSB0", baudrate=115200, rate=250):
self.port = port
self.baudrate = baudrate
self.rate = rate
self.slave_address = 0x01 # 默认从机地址
self.sock = None # TCP socket 对象
self.crc16_table = self.generate_crc16_table()
self.slave_adress = 0x01
self.ser = None
# 后台读取相关的属性
self._background_thread = None
self._stop_background = False
self._last_reading = None
self._reading_lock = threading.Lock()
# 注册退出时的清理函数
try:
self.connect()
atexit.register(self.disconnect)
except KeyboardInterrupt:
print("Process interrupted by user")
self.disconnect()
def __new__(cls, *args, **kwargs):
"""单例模式"""
"""
Making this class singleton
"""
if not hasattr(cls, 'instance'):
cls.instance = super(XjcSensor, cls).__new__(cls)
return cls.instance
def connect(self):
"""建立 TCP 连接"""
try:
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.settimeout(1.0) # 设置超时时间
self.sock.connect((self.host, self.port))
print(f"Connected to {self.host}:{self.port}")
except Exception as e:
print(f"TCP connection failed: {e}")
raise
self.ser = serial.Serial(self.port, self.baudrate,timeout=0.1)
def send_and_receive(self, data: bytes, expected_response_len: int) -> Optional[bytes]:
"""
发送数据并接收响应
:param data: 要发送的字节数据
:param expected_response_len: 预期响应的长度
:return: 收到的字节数据失败返回 None
"""
try:
self.sock.sendall(data)
response = self.sock.recv(expected_response_len)
return response
except socket.timeout:
print("Timeout while waiting for response")
return None
except Exception as e:
print(f"TCP communication error: {e}")
return None
@staticmethod
def generate_crc16_table():
table = []
polynomial = 0xA001
for i in range(256):
crc = i
for _ in range(8):
if crc & 0x0001:
crc = (crc >> 1) ^ polynomial
else:
crc >>= 1
table.append(crc)
return table
def set_zero(self) -> int:
"""传感器置零TCP 版本)"""
if self.slave_address == 0x01:
zero_cmd = bytes.fromhex('01 10 46 1C 00 02 04 00 00 00 00 E8 95')
elif self.slave_address == 0x09:
def crc16(self,data: bytes):
crc = 0xFFFF
for byte in data:
crc = (crc >> 8) ^ self.crc16_table[(crc ^ byte) & 0xFF]
return (crc >> 8) & 0xFF, crc & 0xFF
def set_zero(self):
"""
传感器置零重新上电之后需要置零
"""
# 确保后台读取已停止
was_reading = False
if self._background_thread is not None and self._background_thread.is_alive():
was_reading = True
self.stop_background_reading()
try:
# 重置输入输出缓冲区
self.ser.reset_input_buffer()
self.ser.reset_output_buffer()
# 清除串口缓冲区中的数据
if self.ser.in_waiting > 0:
self.ser.read(self.ser.in_waiting)
# 根据地址设置置零命令
if self.slave_adress == 0x01:
zero_cmd = bytes.fromhex('01 10 46 1C 00 02 04 00 00 00 00 E8 95') # 传感器内置通信协议--清零传感器数据
elif self.slave_adress == 0x09:
zero_cmd = bytes.fromhex('09 10 46 1C 00 02 04 00 00 00 00 C2 F5')
response = self.send_and_receive(zero_cmd, 8)
if not response:
print("set zero fail")
return -1
# 发送置零命令
self.ser.write(zero_cmd)
response = bytearray(self.ser.read(8))
print(f"set zero response: {response}")
# CRC 校验(假设协议要求)
hi_check, lo_check = self.crc16(response[:-2])
if (response[0] != self.slave_address or
response[1] != 0x10 or
response[-2] != lo_check or
response[-1] != hi_check):
print("Set zero failed: CRC check error")
return -1
print("Set zero success!")
return 0
def read_data_f32(self) -> np.ndarray:
"""
TCP版本 - 读取六维力传感器数据 (float32格式)
返回:
np.ndarray(6,) - 六维力数据 [Fx,Fy,Fz,Mx,My,Mz]
读取失败返回 -1
"""
# 构造读取命令
if self.slave_address == 0x01:
command = bytes.fromhex('01 04 00 00 00 0C F0 0F')
elif self.slave_address == 0x09:
command = bytes.fromhex('09 04 00 54 00 0C B0 97')
try:
# 清空接收缓冲区
self._clear_tcp_buffer()
# 接收完整响应
response = self.send_and_receive(command, 29)
# CRC校验
# CRC 校验
Hi_check, Lo_check = self.crc16(response[:-2])
if (response[0] != self.slave_address or
response[1] != 0x04 or
response[2] != 24 or # 数据长度字节
response[-2] != Lo_check or
response[-1] != Hi_check):
print(f"Protocol error! Received: {response.hex(' ')}")
print(f"Expected slave:{self.slave_address}, "
f"func:0x04, len:24, "
f"CRC:{Lo_check:02X}{Hi_check:02X}")
if response[0] != self.slave_adress or response[1] != 0x10 or \
response[-1] != Hi_check or response[-2] != Lo_check:
print("set zero failed!")
return -1
# 解析6个float32数据 (大端序)
print("set zero success!")
result = 0
except serial.SerialException as e:
print(f"Serial communication error: {e}")
print(f"Details - Address: {self.slave_adress}, Port: {self.ser.port}")
result = -1
except Exception as e:
print(f"Unexpected error: {e}")
result = -1
# 如果之前在进行后台读取,恢复它
if was_reading:
self.start_background_reading()
return result
# 可能比较慢
def read_data_f32(self):
self.ser.reset_input_buffer() # 清除输入缓冲区
self.ser.reset_output_buffer() # 清除输出缓冲区
if self.ser.in_waiting > 0:
self.ser.read(self.ser.in_waiting)
if self.slave_adress == 0x01:
# command = bytes.fromhex('01 04 00 54 00 0C B1 DF') # (旧版)传感器内置通信协议--读取一次六维力数据
command = bytes.fromhex('01 04 00 00 00 0C F0 0F') # (新版)传感器内置通信协议--读取一次六维力数据
elif self.slave_adress == 0x09:
command = bytes.fromhex('09 04 00 54 00 0C B0 97') # 传感器内置通信协议--读取一次六维力数据
try:
self.ser.write(command)
response = bytearray(self.ser.read(29))
# print(response)
Hi_check, Lo_check = self.crc16(response[:-2])
if response[0] != self.slave_adress or response[1] != 0x04 or \
response[-1] != Hi_check or response[-2] != Lo_check or response[2] != 24:
print("read data failed!")
return -1
sensor_data = struct.unpack('>ffffff', response[3:27])
except serial.SerialException as e:
print(f"Serial communication error: {e}")
return -1
except Exception as e:
print(f"Unexpected error: {e}")
return -1
return np.array(sensor_data)
except socket.timeout:
print("Timeout while waiting for sensor response")
return -1
except ConnectionError as e:
print(f"TCP connection error: {e}")
def enable_active_transmission(self):
"""
Activate the sensor's active transmission mode
"""
# 确保后台读取已停止
was_reading = False
if self._background_thread is not None and self._background_thread.is_alive():
was_reading = True
self.stop_background_reading()
try:
# 根据不同的速率设置命令
if self.rate == 100:
if self.slave_adress == 0x01:
rate_cmd = bytes.fromhex('01 10 01 9A 00 01 02 00 00 AB 6A') # 传感器内置通信协议--100Hz
elif self.slave_adress == 0x09:
rate_cmd = bytes.fromhex('09 10 01 9A 00 01 02 00 00 CC AA')
print("Set mode in 100Hz")
elif self.rate == 250:
if self.slave_adress == 0x01:
rate_cmd = bytes.fromhex('01 10 01 9A 00 01 02 00 01 6A AA')
elif self.slave_adress == 0x09:
rate_cmd = bytes.fromhex('09 10 01 9A 00 01 02 00 01 0D 6A')
print("Set mode in 250Hz")
elif self.rate == 500:
if self.slave_adress == 0x01:
rate_cmd = bytes.fromhex('01 10 01 9A 00 01 02 00 02 2A AB') # 传感器内置通信协议--500Hz
elif self.slave_adress == 0x09:
rate_cmd = bytes.fromhex('09 10 01 9A 00 01 02 00 02 4D 6B')
print("Set mode in 500Hz")
else:
print("Rate not supported")
result = -1
return result
# 检查串口是否打开
if not self.ser.is_open:
raise serial.SerialException("Serial port is not open")
# 重置输入缓冲区
self.ser.reset_input_buffer()
# 写入指令
self.ser.write(rate_cmd)
print("Transmission command sent successfully")
result = 0
except serial.SerialException as e:
print(f"Serial communication error: {e}")
print(f"Details - Rate: {self.rate}, Address: {self.slave_adress}, Port: {self.ser.port}")
result = -1
except Exception as e:
print(f"Unexpected error: {e}")
result = -1
# 如果之前在进行后台读取,恢复它
if was_reading:
self.start_background_reading()
return result
def disable_active_transmission(self):
"""
Disable the sensor's active transmission mode
"""
try:
# 禁用传感器活动传输模式的命令
rate_cmd = bytes.fromhex('FF FF FF FF FF FF FF FF FF FF FF')
print("Disable the sensor's active transmission mode")
# 重置输入缓冲区
if not self.ser.is_open:
raise serial.SerialException("Serial port is not open")
self.ser.reset_input_buffer()
# 发送禁用传输模式的命令
self.ser.write(rate_cmd)
print("Transmission disabled successfully")
return 0
except serial.SerialException as e:
print(f"Serial communication error: {e}")
print(f"Details - Port: {self.ser.port}")
return -1
except Exception as e:
print(f"Unexpected error in read_data_f32: {str(e)}")
print(f"Unexpected error: {e}")
return -1
def _clear_tcp_buffer(self):
"""清空TCP接收缓冲区"""
self.sock.settimeout(0.1) # 短暂超时,避免阻塞
def read(self):
"""
Read the sensor's data.
"""
try:
if self.ser.in_waiting > 0:
self.ser.read(self.ser.in_waiting)
while True:
data = self.sock.recv(1024)
if not data: # 连接关闭或无数据
if int.from_bytes(self.ser.read(1), byteorder='big') == 0x20:
if int.from_bytes(self.ser.read(1), byteorder='big') == 0x4E:
break
print(f"Cleared TCP buffer: {data.hex(' ')}") # 调试输出
except socket.timeout:
pass # 预期内的超时,表示缓冲区已空
finally:
self.sock.settimeout(2.0) # 恢复默认超时
response = bytearray([0x20, 0x4E]) # 使用bytearray创建16进制数据的列表
response.extend(self.ser.read(14)) # 读取12个字节的数据并添加到列表中
Hi_check, Lo_check = self.crc16(response[:-2])
if response[-1] != Hi_check or response[-2] != Lo_check:
print("check failed!")
return None
sensor_data = self.parse_data_passive(response)
return sensor_data
except serial.SerialException as e:
print(f"Serial communication error: {e}")
return None
def enable_active_transmission(self) -> int:
"""启用主动传输模式TCP 版本)"""
if self.rate == 100:
cmd = bytes.fromhex('01 10 01 9A 00 01 02 00 00 AB 6A') if self.slave_address == 0x01 else \
bytes.fromhex('09 10 01 9A 00 01 02 00 00 CC AA')
elif self.rate == 250:
cmd = bytes.fromhex('01 10 01 9A 00 01 02 00 01 6A AA') if self.slave_address == 0x01 else \
bytes.fromhex('09 10 01 9A 00 01 02 00 01 0D 6A')
elif self.rate == 500:
cmd = bytes.fromhex('01 10 01 9A 00 01 02 00 02 2A AB') if self.slave_address == 0x01 else \
bytes.fromhex('09 10 01 9A 00 01 02 00 02 4D 6B')
else:
print("Unsupported rate")
return -1
response = self.send_and_receive(cmd, 8)
if response and response[1] == 0x10: # 检查响应功能码
print(f"Active transmission enabled at {self.rate}Hz")
return 0
else:
print("Failed to enable active transmission")
return -1
def parse_data_passive(self, buffer):
values = [
int.from_bytes(buffer[i:i+2], byteorder='little', signed=True)
for i in range(2, 14, 2)
]
Fx, Fy, Fz = np.array(values[:3]) / 10.0
Mx, My, Mz = np.array(values[3:]) / 1000.0
return np.array([Fx, Fy, Fz, Mx, My, Mz])
def disconnect(self):
"""关闭 TCP 连接"""
if self.sock:
"""
完全断开传感器连接并清理相关资源
"""
# 首先停止后台读取任务
if hasattr(self, '_background_thread') and self._background_thread is not None:
self.stop_background_reading()
try:
self.sock.close()
print("TCP connection closed")
# 1. 尝试禁用传输
for _ in range(3):
try:
send_str = "FF " * 50
send_str = send_str[:-1]
rate_cmd = bytes.fromhex(send_str)
if hasattr(self, 'ser') and self.ser.is_open:
self.ser.write(rate_cmd)
time.sleep(0.1)
except Exception as e:
print(f"Error while closing socket: {e}")
self.sock = None
print(f"禁用传输失败: {e}")
# 2. 关闭并清理串口
if hasattr(self, 'ser'):
try:
if self.ser.is_open:
self.ser.flush() # 清空缓冲区
self.ser.reset_input_buffer() # 清空输入缓冲区
self.ser.reset_output_buffer() # 清空输出缓冲区
self.ser.close()
del self.ser # 删除串口对象
except Exception as e:
print(f"关闭串口失败: {e}")
except Exception as e:
print(f"断开连接过程中发生错误: {e}")
finally:
# 确保清理所有属性
if hasattr(self, 'ser'):
del self.ser
def __del__(self):
self.disconnect()
def start_background_reading(self):
"""
启动后台读取任务每秒读取一次传感器数据
@ -234,336 +368,40 @@ class XjcSensor:
with self._reading_lock:
return self._last_reading
# ------------------------- 以下方法保持不变 -------------------------
@staticmethod
def generate_crc16_table():
"""CRC16 表生成(与原代码相同)"""
table = []
polynomial = 0xA001
for i in range(256):
crc = i
for _ in range(8):
if crc & 0x0001:
crc = (crc >> 1) ^ polynomial
else:
crc >>= 1
table.append(crc)
return table
def crc16(self, data: bytes):
"""CRC16 计算(与原代码相同)"""
crc = 0xFFFF
for byte in data:
crc = (crc >> 8) ^ self.crc16_table[(crc ^ byte) & 0xFF]
return (crc >> 8) & 0xFF, crc & 0xFF
def __del__(self):
"""析构时断开连接"""
self.disconnect()
def disable_active_transmission(self) -> int:
"""
禁用传感器的主动传输模式TCP 版本
Returns:
0 表示成功-1 表示失败
"""
try:
# 构造禁用命令(原代码中的 FF x11 字节流)
disable_cmd = bytes.fromhex('FF FF FF FF FF FF FF FF FF FF FF')
print("Disabling active transmission mode...")
# 检查 TCP 连接是否有效
if not self.sock:
raise ConnectionError("TCP connection is not established")
# 发送禁用命令
self.sock.sendall(disable_cmd)
print("Active transmission disabled successfully")
return 0
except ConnectionError as e:
print(f"Connection error: {e}")
return -1
except socket.timeout:
print("Timeout while sending disable command")
return -1
except Exception as e:
print(f"Unexpected error: {e}")
return -1
def read(self):
"""
TCP Version - Read the sensor's data in passive mode.
Returns:
np.ndarray(6,) - Six-axis force/torque data [Fx, Fy, Fz, Mx, My, Mz]
None if reading fails
"""
try:
# Clear any existing data in the TCP buffer first
# self._clear_tcp_buffer()
# Search for frame header (0x20 0x4E)
header_found = False
start_time = time.time()
timeout = 1.0 # 1 second timeout
while not header_found and (time.time() - start_time < timeout):
# Read one byte at a time looking for header
byte1 = self.sock.recv(1)
if not byte1:
continue # No data available yet
if byte1 == b'\x20':
byte2 = self.sock.recv(1)
if byte2 == b'\x4E':
header_found = True
if not header_found:
print("Frame header not found within timeout period")
return None
# Now read the remaining 14 bytes of the frame
response = bytearray([0x20, 0x4E]) # Start with the header we found
remaining_bytes = 14
bytes_received = 0
start_time = time.time()
while bytes_received < remaining_bytes and (time.time() - start_time < timeout):
chunk = self.sock.recv(remaining_bytes - bytes_received)
if chunk:
response.extend(chunk)
bytes_received += len(chunk)
if bytes_received < remaining_bytes:
print(f"Incomplete frame received. Got {len(response)} bytes, expected 16")
return None
# Verify CRC checksum
Hi_check, Lo_check = self.crc16(response[:-2])
if response[-1] != Hi_check or response[-2] != Lo_check:
print("CRC check failed!")
print(f"Received CRC: {response[-2]:02X}{response[-1]:02X}")
print(f"Calculated CRC: {Lo_check:02X}{Hi_check:02X}")
return None
# Parse the sensor data
sensor_data = self.parse_data_passive(response)
return sensor_data
except socket.timeout:
print("Timeout while waiting for sensor data")
return None
except ConnectionError as e:
print(f"TCP connection error: {e}")
return None
except Exception as e:
print(f"Unexpected error in TCP read(): {str(e)}")
return None
def parse_data_passive(self, buffer):
values = [
int.from_bytes(buffer[i:i+2], byteorder='little', signed=True)
for i in range(2, 14, 2)
]
Fx, Fy, Fz = np.array(values[:3]) / 10.0
Mx, My, Mz = np.array(values[3:]) / 1000.0
return np.array([Fx, Fy, Fz, Mx, My, Mz])
# 外部
def test_sensor_frequency(sensor, mode='active', duration=5.0):
"""
测试传感器在不同模式下的实际数据获取频率
参数:
sensor: XjcSensor实例
mode: 'active'(主动模式) 'polling'(查询模式)
duration: 测试持续时间()
返回:
dict: 包含测试结果的字典
"""
# 准备测试环境
if mode == 'active':
print("\n=== 测试主动传输模式 ===")
sensor.enable_active_transmission()
read_func = sensor.read
else:
print("\n=== 测试查询模式 ===")
sensor.disable_active_transmission() # 确保不在主动模式
time.sleep(0.5)
sensor.disable_active_transmission() # 确保不在主动模式
time.sleep(0.5)
sensor.disable_active_transmission() # 确保不在主动模式
time.sleep(0.5)
read_func = sensor.read_data_f32
# 初始化测试变量
timestamps = []
data_count = 0
start_time = time.perf_counter()
end_time = start_time + duration
print(f"开始测试,持续时间 {duration} 秒...")
# 主测试循环
while time.perf_counter() < end_time:
data = read_func()
print(data)
if data is not None:
timestamps.append(time.perf_counter())
data_count += 1
# print(timestamps)
# 计算统计结果
if data_count < 2:
print("警告: 采集到的数据点不足")
return None
intervals = np.diff(timestamps)
save_intervals_to_csv(timestamps)
avg_interval = np.mean(intervals)
min_interval = np.min(intervals)
max_interval = np.max(intervals)
std_interval = np.std(intervals)
avg_freq = 1.0 / avg_interval
# 打印结果
print("\n测试结果:")
print(f"总数据点数: {data_count}")
print(f"平均频率: {avg_freq:.2f} Hz")
print(f"最小间隔: {min_interval*1000:.3f} ms")
print(f"最大间隔: {max_interval*1000:.3f} ms")
print(f"间隔标准差: {std_interval*1000:.3f} ms")
# 返回结果字典
return {
'mode': mode,
'duration': duration,
'data_count': data_count,
'avg_freq': avg_freq,
'min_interval': min_interval,
'max_interval': max_interval,
'std_interval': std_interval,
'timestamps': timestamps,
'intervals': intervals
}
def plot_test_results(active_results, polling_results):
"""绘制两种模式的测试结果对比图"""
plt.figure(figsize=(12, 8))
# 间隔时间分布图
plt.subplot(2, 1, 1)
plt.hist(active_results['intervals']*1000, bins=50, alpha=0.7, label='主动模式')
plt.hist(polling_results['intervals']*1000, bins=50, alpha=0.7, label='查询模式')
plt.xlabel('间隔时间 (ms)')
plt.ylabel('出现次数')
plt.title('数据间隔时间分布')
plt.legend()
plt.grid(True)
# 间隔时间序列图
plt.subplot(2, 1, 2)
plt.plot(np.arange(len(active_results['intervals'])),
active_results['intervals']*1000, 'b.', label='主动模式')
plt.plot(np.arange(len(polling_results['intervals'])),
polling_results['intervals']*1000, 'r.', label='查询模式')
plt.xlabel('数据点序号')
plt.ylabel('间隔时间 (ms)')
plt.title('数据间隔时间序列')
plt.legend()
plt.grid(True)
plt.tight_layout()
plt.show()
def compare_modes(sensor, duration=5.0):
"""比较两种工作模式的性能"""
# 测试主动模式
active_results = test_sensor_frequency(sensor, 'active', duration)
time.sleep(1) # 模式切换间隔
# 测试查询模式
polling_results = test_sensor_frequency(sensor, 'polling', duration)
time.sleep(1)
# 打印对比报告
print("\n=== 模式对比报告 ===")
print(f"{'指标':<15} {'主动模式':>15} {'查询模式':>15}")
print(f"{'平均频率(Hz)':<15} {active_results['avg_freq']:>15.2f} {polling_results['avg_freq']:>15.2f}")
print(f"{'最小间隔(ms)':<15} {active_results['min_interval']*1000:>15.3f} {polling_results['min_interval']*1000:>15.3f}")
print(f"{'最大间隔(ms)':<15} {active_results['max_interval']*1000:>15.3f} {polling_results['max_interval']*1000:>15.3f}")
print(f"{'间隔标准差(ms)':<15} {active_results['std_interval']*1000:>15.3f} {polling_results['std_interval']*1000:>15.3f}")
# 绘制对比图表
plot_test_results(active_results, polling_results)
return active_results, polling_results
def save_intervals_to_csv(timestamps, filename="sensor_intervals.csv"):
"""
Save timestamp intervals to CSV file with metadata
Args:
timestamps: List of timestamp values (from time.perf_counter())
filename: Output CSV filename
"""
if len(timestamps) < 2:
print("Not enough timestamps to calculate intervals")
return
# Calculate intervals in milliseconds
intervals_ms = np.diff(timestamps) * 1000
# Prepare data rows
rows = []
for i, interval in enumerate(intervals_ms):
rows.append({
"reading_number": i+1,
"interval_ms": f"{interval:.4f}",
"timestamp": timestamps[i],
"expected_interval_ms": (1000/250) if i==0 else "", # For 250Hz
"deviation_ms": f"{interval - (1000/250):.4f}" if i>0 else ""
})
# CSV header
fieldnames = ["reading_number", "interval_ms", "timestamp",
"expected_interval_ms", "deviation_ms"]
# Write to file
with open(filename, 'w', newline='') as csvfile:
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(rows)
print(f"Saved {len(intervals_ms)} intervals to {filename}")
if __name__ == "__main__":
# 替换为你的传感器 IP 和端口
sensor = XjcSensor("192.168.5.1", 60000)
sensor.connect()
# 示例操作
sensor.disable_active_transmission()
time.sleep(0.5)
sensor.disable_active_transmission()
time.sleep(0.5)
sensor.disable_active_transmission()
time.sleep(0.5)
sensor.set_zero()
# sensor.set_zero()
# sensor.disable_active_transmission()
# while True:
# data = sensor.read_data_f32()
# if data is not None:
# print(f"Force data: {data}")
# sensor.enable_active_transmission()
# while True:
# sensor_data = sensor.read()
# if sensor_data is None:
# print('failed to get force sensor data!')
# print(sensor_data)
test_sensor_frequency(sensor)
# compare_modes(sensor)
import sys
import pathlib
sys.path.append(str(pathlib.Path(__file__).parent.parent))
from tools.Rate import Rate
import signal
import sys
rate = Rate(250)
xjc_sensor = XjcSensor()
def signal_handler(sig, frame):
print('Received Ctrl+C, exiting gracefully...')
sys.exit(0)
signal.signal(signal.SIGINT, lambda signum, frame: sys.exit(0))
# atexit.register(xjc_sensor.disconnect)
time.sleep(1)
xjc_sensor.disable_active_transmission()
xjc_sensor.disable_active_transmission()
xjc_sensor.disable_active_transmission()
print(xjc_sensor.read_data_f32())
print(xjc_sensor.read_data_f32())
print(xjc_sensor.read_data_f32())
# print(xjc_sensor.read())
time.sleep(1)
xjc_sensor.set_zero()
xjc_sensor.set_zero()
xjc_sensor.set_zero()
# time.sleep(1)
# xjc_sensor.enable_active_transmission()
while True:
# sensor_data = xjc_sensor.read()
sensor_data = xjc_sensor.read_data_f32()
current_time = time.strftime("%Y-%m-%d_%H-%M-%S")
if sensor_data is None:
print('failed to get force sensor data!')
print(f"{current_time}-----{sensor_data}")
rate.sleep(False)
# break

File diff suppressed because it is too large Load Diff

5
logs/test.py Normal file
View File

@ -0,0 +1,5 @@
import random
import numpy as np
random_array = np.random.rand(3) # 生成长度为 3 的数组
print(random_array/1000)