定点伺服导纳调试,优化部分代码

This commit is contained in:
Ziwei.He 2025-05-21 16:43:45 +08:00
parent b1322495b0
commit 80f0074e9a
18 changed files with 15249 additions and 8930 deletions

View File

@ -111,6 +111,11 @@ class MassageRobot:
self.sensor_fail_count = 0
# 机械臂初始化,适配中间层
self.arm.init()
# ---
self.last_time = -1
self.cur_time = -1
# 预测步骤
def kalman_predict(self,x, P, Q):
@ -165,70 +170,8 @@ class MassageRobot:
self.arm_state.external_wrench_tcp = self.x_tcp
self.arm_state.last_external_wrench_tcp = self.arm_state.external_wrench_tcp
return 0
# def arm_measure_loop(self):
# self.logger.log_info("机械臂测量线程启动")
# while (not self.arm.is_exit) and (not self.exit_event.is_set()):
# try:
# if not self.is_waitting:
# self.arm_state.arm_position,self.arm_state.arm_orientation = self.arm.get_arm_position()
# code = self.update_wrench()
# self.logger.log_blue(f"measure position: {self.arm_state.arm_position}")
# self.logger.log_blue(f"measure orientation: {self.arm_state.arm_orientation}")
# self.logger.log_blue(f"measure update wrench:{self.arm_state.external_wrench_tcp}")
# if code == -1:
# self.sensor_fail_count += 1
# self.logger.log_error(f"传感器线程数据读取失败-{self.sensor_fail_count}")
# if self.sensor_fail_count > 10:
# self.logger.log_error("传感器线程数据读取失败超过10次,程序终止")
# self.stop()
# break
# else:
# self.sensor_fail_count = 0
# except Exception as e:
# self.logger.log_error(f"机械臂测量线程报错:{e}")
# self.exit_event.set() # 控制退出while
# self.sensor_rate.precise_sleep() # 控制频率
# def arm_command_loop(self):
# self.logger.log_info("机械臂控制线程启动")
# self.arm.filter_matirx = np.zeros((8,7)) # 滑动窗口
# while (not self.arm.is_exit) and (not self.exit_event.is_set()):
# try:
# if not self.is_waitting:
# process_start_time = time.time()
# self.controller_manager.step(self.control_rate.to_sec())
# status, joints_processed = self.arm.process_command(self.arm_state.arm_position_command,
# self.arm_state.arm_orientation_command)
# self.logger.log_blue(f"处理后的关节角度: {joints_processed}")
# run_time = time.time() - process_start_time
# sleep_duration = self.control_rate.to_sec() - run_time
# delta_command = np.linalg.norm(self.arm_state.arm_position_command - self.arm_state.arm_position * 1000)
# self.logger.log_blue(f"期望位置:{self.arm_state.desired_position}")
# self.logger.log_blue(f"现在的位置:{self.arm_state.arm_position}")
# self.logger.log_blue(f"指令位置:{self.arm_state.arm_position_command}")
# self.logger.log_blue(f"位置变化量: {delta_command}")
# if delta_command > 20: # 20 mm
# self.logger.log_error("机械臂位置变化过大,可能是数据异常")
# break
# if sleep_duration > 0:
# libc.usleep(int(sleep_duration * 1e6))
# if status == 0:
# # self.logger.log_blue("不进行伺服运动,只计算")
# self.logger.log_info("进行伺服运动")
# code = self.arm.ServoJoint(joints_processed,t=0.008,aheadtime=20,gain=200)
# else:
# self.logger.log_error("滑动窗口数据处理失败")
# self.exit_event.set()
# if code == -1:
# self.logger.log_error("机械臂急停")
# # self.stop() # 底层已经做了急停处理
# break
# except Exception as e:
# self.logger.log_error(f"机械臂控制失败:{e}")
# self.exit_event.set()
####################### TEST ##########################3
# ####################### 位姿伺服 ##########################
def arm_measure_loop(self):
self.logger.log_info("机械臂测量线程启动")
@ -237,9 +180,6 @@ class MassageRobot:
if not self.is_waitting:
self.arm_state.arm_position,self.arm_state.arm_orientation = self.arm.get_arm_position()
code = self.update_wrench()
# self.logger.log_blue(f"measure position: {self.arm_state.arm_position}")
# self.logger.log_blue(f"measure orientation: {self.arm_state.arm_orientation}")
# self.logger.log_blue(f"measure update wrench:{self.arm_state.external_wrench_tcp}")
if code == -1:
self.sensor_fail_count += 1
self.logger.log_error(f"传感器线程数据读取失败-{self.sensor_fail_count}")
@ -256,54 +196,51 @@ class MassageRobot:
def arm_command_loop(self):
self.logger.log_info("机械臂控制线程启动")
# self.arm.filter_matirx = np.zeros((8,7)) # 滑动窗口 ## 暂时禁用滑动窗口
while (not self.arm.is_exit) and (not self.exit_event.is_set()):
try:
if not self.is_waitting:
process_start_time = time.time()
self.controller_manager.step(self.control_rate.to_sec())
# self.logger.log_blue(f"目标位姿: {self.arm_state.arm_position_command},{self.arm_state.arm_orientation_command}")
control_cycle = self.control_rate.to_sec()
self.controller_manager.step(control_cycle)
command_pose = np.concatenate([
self.arm_state.arm_position_command * 1000, # 转毫米
self.arm_state.arm_orientation_command
])
## 输入滤波
status, pose_processed = self.arm.process_command(
command_pose[:3],
command_pose[3:]
)
# print(f"pose_processed:{pose_processed}")
tcp_command = (
f"ServoP({pose_processed[0]:.3f},{pose_processed[1]:.3f},"
f"{pose_processed[2]:.3f},{pose_processed[3]:.3f},"
f"{pose_processed[4]:.3f},{pose_processed[5]:.3f},"
"t=0.02, aheadtime=20, gain=200)"
).encode()
run_time = time.time() - process_start_time
sleep_duration = self.control_rate.to_sec() - run_time
delta_command = np.linalg.norm(self.arm_state.arm_position_command - self.arm_state.arm_position)
# self.logger.log_blue(f"期望位置:{self.arm_state.desired_position}")
# self.logger.log_blue(f"现在的位置:{self.arm_state.arm_position}")
# self.logger.log_blue(f"指令位置:{self.arm_state.arm_position_command}")
# self.logger.log_blue(f"位置变化量: {delta_command}")
if delta_command > 0.1: # m
self.logger.log_error("机械臂位置变化过大,可能是数据异常")
break
b2 =time.time()
if sleep_duration > 0:
libc.usleep(int(sleep_duration * 1e6))
# self.logger.log_blue("不进行伺服运动,只计算")
# self.logger.log_blue(f"输入位置:{self.arm_state.arm_position_command}")
# self.logger.log_blue(f"输入姿态:{self.arm_state.arm_orientation_command}")
command_pos_in_mm = self.arm_state.arm_position_command * 1000
command_ori_in_degree = R.from_quat(self.arm_state.arm_orientation_command).as_euler('xyz',degrees=True)
combined = np.concatenate([command_pos_in_mm, command_ori_in_degree]) # 合并成一个数组
# self.logger.log_blue(f"合并后的位姿: {combined.tolist()}")
# self.logger.log_info("进行伺服运动")
# 数据记录
# self.logger.log_info("desired_pos",self.arm_state.desired_position)
# self.logger.log_info("desired_ori",self.arm_state.desired_orientation)
# self.logger.log_info("command_pos",self.arm_state.arm_position_command)
# self.logger.log_info("command_ori",self.arm_state.arm_orientation_command)
code = self.arm.ServoPose(combined)
# else:
# self.logger.log_error("滑动窗口数据处理失败")
# self.exit_event.set()
if code == -1:
self.logger.log_error("机械臂急停")
# self.stop() # 底层已经做了急停处理
break
time.sleep(sleep_duration)
print(f"real sleep:{time.time()-b2}")
self.arm.dashboard.socket_dobot.sendall(tcp_command)
if self.arm.feedbackData.robotMode == 10: # 10 当前工程暂停
print("机械臂为暂停状态")
res = self.arm.dashboard.Continue()
code = self.arm.parseResultId(res)[0]
if code == 0:
print("机械臂继续已暂停的运动指令")
except Exception as e:
self.logger.log_error(f"机械臂控制失败:{e}")
self.exit_event.set()
# print(f"循环周期时间: {time.time()-process_start_time:.4f} 秒",run_time,sleep_duration)
####################### 位姿伺服 ##########################
def start(self):
if self.exit_event.is_set():

View File

@ -79,12 +79,10 @@ class AdmittanceController(BaseController):
self.clip_command(delta_pose,"pose")
# testlsy
# delta_ori_mat = R.from_rotvec(delta_pose[3:]).as_matrix()
delta_ori_mat = R.from_rotvec(delta_pose[3:]).as_matrix()
random_array = np.random.rand(3) # 生成长度为 3 的数组
delta_ori_mat = R.from_rotvec(random_array/10000).as_matrix()
# random_array = np.random.rand(3) # 生成长度为 3 的数组
# delta_ori_mat = R.from_rotvec(random_array/100000).as_matrix()
arm_ori_mat = delta_ori_mat @ R.from_quat(self.state.arm_orientation).as_matrix()
arm_ori_mat = R.from_quat(self.state.arm_orientation).as_matrix() @ delta_ori_mat
@ -94,19 +92,19 @@ class AdmittanceController(BaseController):
self.state.arm_position_command = self.state.arm_position + delta_pose[:3]
if time.time() - self.laset_print_time > 2:
# if time.time() - self.laset_print_time > 2:
# print("-------------admittance_1-------------------------------")
# print("arm_position:",self.state.arm_position)
# print("desired_position:",self.state.desired_position)
print(f'wrench_err: {wrench_err} ||| external_wrench_tcp: {self.state.external_wrench_tcp} ||| desired_wrench: {self.state.desired_wrench}')
print("self.state.arm_desired_acc:",self.state.arm_desired_acc)
# print(f'wrench_err: {wrench_err} ||| external_wrench_tcp: {self.state.external_wrench_tcp} ||| desired_wrench: {self.state.desired_wrench}')
# print("self.state.arm_desired_acc:",self.state.arm_desired_acc)
# print("arm_orientation",R.from_quat(self.state.arm_orientation).as_euler('xyz',degrees=True))
# print("desired_orientation",R.from_quat(self.state.desired_orientation).as_euler('xyz',degrees=True))
# print("arm_position_command",self.state.arm_position_command)
# print("arm_orientation_command",R.from_quat(self.state.arm_orientation_command).as_euler('xyz',degrees=True))
print("delta_pose:",delta_pose)
# print("delta_pose:",delta_pose)
# print("delta_ori_mat",delta_ori_mat)
self.laset_print_time = time.time()
# self.laset_print_time = time.time()

View File

@ -1,66 +1,20 @@
name: admittance
# mass_tran: [5,5,5]
# mass_rot: [5,5,5]
# stiff_tran: [500,500,500]
# stiff_rot: [500,500,500]
# desired_xi: 0.7
# damp_tran: [-125,-125,-125]
# damp_rot: [-125,-125,-125]
# pos_scale_factor: 1
# rot_scale_factor: 1
# name: admittance
# mass_tran: [1,1,1]
# mass_rot: [0.5,0.5,0.5]
# stiff_tran: [800,800,800]
# stiff_rot: [100,100,100]
# desired_xi: 1.1
# damp_tran: [-2,-2,-2]
# damp_rot: [-10,-10,-10]
# pos_scale_factor: 1
# rot_scale_factor: 1
# mass_tran: [4,4,4]
# mass_rot: [0.4,0.4,0.4]
# stiff_tran: [500,500,500]
# stiff_rot: [6,6,6]
# desired_xi: 1.1
# damp_tran: [-1,-1,-1]
# damp_rot: [-1,-1,-1]
# pos_scale_factor: 1
# rot_scale_factor: 12
# mass_tran: [1.3,0.4,2.4]
# mass_rot: [0.08,0.02,0.01]
# stiff_tran: [200,200,200]
# stiff_rot: [200,200,200]
# desired_xi: 1.1
# damp_tran: [-1,-1,-1]
# damp_rot: [-1,-1,-1]
# pos_scale_factor: 1
# rot_scale_factor: 1
desired_xi: 1
pos_scale_factor: 1
rot_scale_factor: 1
# mass_tran: [2.0, 2.0, 2.0]
# mass_rot: [0.2, 0.2, 0.2]
# A
mass_tran: [3.0, 3.0, 3.0]
mass_rot: [0.11, 0.11, 0.11]
# P
# stiff_tran: [300, 300, 300]
# stiff_rot: [3, 3, 3]
# damp_tran: [30, 30, 30]
# damp_rot: [1.0, 1.0, 1.0]
mass_tran: [9.0, 9.0, 9.0]
mass_rot: [0.8, 0.8, 0.8]
stiff_tran: [0, 0, 0]
stiff_rot: [7, 7, 7]
# stiff_rot: [7, 7, 7]
stiff_tran: [270, 270, 270]
stiff_rot: [11, 11, 11]
# stiff_tran: [100, 100, 100]
# stiff_rot: [1, 1, 1]
damp_tran: [100, 100, 100]
damp_rot: [1, 1, 1]
# D
damp_tran: [30, 30, 30]
damp_rot: [1.1, 1.1, 1.1]
# damp_tran: [40, 40, 40]
# damp_rot: [3, 3, 3]

View File

@ -8,6 +8,6 @@ controller: ['Massage/MassageControl/config/admittance.yaml',
# massage heads diretory
massage_head_dir: 'Massage/MassageControl/config/massage_head'
control_rate: 100
sensor_rate: 100
command_rate: 100
control_rate: 55
sensor_rate: 55
command_rate: 55

File diff suppressed because it is too large Load Diff

View File

@ -258,9 +258,10 @@ class dobot_nova5:
def ServoPose(self,poses_list):
poses_list = list(map(float, poses_list))
tcp_command = f"ServoP({poses_list[0]},{poses_list[1]},{poses_list[2]},{poses_list[3]},{poses_list[4]},{poses_list[5]}, t=0.01, aheadtime=20, gain=200)"
res = self.dashboard.sendRecvMsg(tcp_command) # 通过反馈状态来进行异常处理,故这里只需要发,不需要收
print("ServoP msg:",res)
tcp_command = f"ServoP({poses_list[0]},{poses_list[1]},{poses_list[2]},{poses_list[3]},{poses_list[4]},{poses_list[5]}, t=0.01, aheadtime=20, gain=300)"
# res = self.dashboard.sendRecvMsg(tcp_command) # 通过反馈状态来进行异常处理,故这里只需要发,不需要收
self.dashboard.send_data(tcp_command)
# print("ServoP msg:",res)
if self.feedbackData.robotMode == 9 or self.feedbackData.robotMode == 11:
self.dashboard.Stop()
self.dashboard.EmergencyStop(mode=1)
@ -490,7 +491,8 @@ class dobot_nova5:
self.last_input_command = None
self.tcp_offset = None
self.init_J = [0,30,-120,0,90,0]
self.filter_matirx = np.zeros((8,7))
self.filter_matirx = np.zeros((1,7)) # 位姿伺服用
self.filter_matrix = np.zeros((1,6)) # 角度伺服用
sleep(1)
self.is_standby = False
code = self.RunPoint_P_inJoint(self.init_J)
@ -512,26 +514,64 @@ class dobot_nova5:
quat_rot = R.from_euler('xyz', [rx, ry, rz], degrees=True).as_quat()
return pos,quat_rot
def process_command(self,arm_position_command, arm_orientation_command):
rot_euler = R.from_quat(arm_orientation_command).as_euler('xyz',degrees=True)
rot_euler = np.clip(rot_euler,-180,180)
arm_position_command *= 1000
pose_command = np.concatenate([arm_position_command, rot_euler])
print("pose_command",pose_command)
def __circular_mean(self,angles):
# 将角度转换为复数单位圆上的点,再计算均值
# 为处理周期性问题
angles_rad = np.deg2rad(angles)
x = np.mean(np.cos(angles_rad))
y = np.mean(np.sin(angles_rad))
return np.rad2deg(np.arctan2(y, x))
def process_command_angle(self,arm_joint_command):
if (not self.is_exit) and (not self.is_standby):
cur = self.getAngel() # 参考关节角度
target = self.getInverseKin(pose_command,-1,-1,1,"{" + ",".join(map(str, cur)) + "}")
# print("target",target)
self.joint_current = cur
self.joint_target = target
ress = np.array(target)
ress = np.append(ress,[1])
self.filter_matirx = np.append(self.filter_matirx,[ress],axis=0)
self.filter_matirx = np.delete(self.filter_matirx,0,axis=0)
sum_joint = np.sum(self.filter_matirx,axis=0)
sum_joint[:6] = sum_joint[:6]/sum_joint[6]
result = sum_joint[:6]
current_joint = np.asarray(arm_joint_command) # degree
# FIFO
self.filter_matrix = np.append(self.filter_matrix,[current_joint],axis=0)
if len(self.filter_matrix)<7:
return 0,current_joint
self.filter_matrix = np.delete(self.filter_matrix,0,axis = 0)
avg_joint = np.array([self.__circular_mean(self.filter_matrix[:, i]) for i in range(len(current_joint))])
return 0,avg_joint
return -1,None
def process_command(self, arm_position_command, arm_orientation_command):
if (not self.is_exit) and (not self.is_standby):
# 构造当前位姿(位置 + 四元数)
current_pose = np.concatenate([
arm_position_command, # [x, y, z]
arm_orientation_command # [qx, qy, qz, qw]
])
# 更新滑动窗口FIFO
self.filter_matirx = np.append(self.filter_matirx, [current_pose], axis=0)
# 如果窗口未填满,直接返回当前值(不滤波)
if len(self.filter_matirx) < 7:
rot_euler = R.from_quat(arm_orientation_command).as_euler('xyz', degrees=True)
result = np.concatenate([arm_position_command, rot_euler])
return 0, result
# 窗口已填满,删除最旧数据
self.filter_matirx = np.delete(self.filter_matirx, 0, axis=0)
# --- 位置分量:滑动平均(仅对有效数据)---
avg_position = np.mean(self.filter_matirx[:, :3], axis=0)
# --- 四元数分量:加权平均(处理符号歧义)---
quats = self.filter_matirx[:, 3:7]
for i in range(1, len(quats)):
if np.dot(quats[0], quats[i]) < 0:
quats[i] = -quats[i] # 确保所有四元数在同一半球
avg_quat = np.mean(quats, axis=0)
avg_quat = avg_quat / np.linalg.norm(avg_quat) # 归一化
# 转换回欧拉角
rot_euler = R.from_quat(avg_quat).as_euler('xyz', degrees=True)
# 合并结果 [x, y, z, roll, pitch, yaw]
result = np.concatenate([avg_position, rot_euler])
return 0, result
return -1, None
@ -542,8 +582,8 @@ if __name__ == "__main__":
posJ = [0,30,-120,0,90,0]
dobot.RunPoint_P_inJoint(posJ)
sleep(1)
# dobot.setEndEffector(i=9,tool_i="{0.0,0.0,154.071,0.0,0.0,0.0}")
# dobot.chooseEndEffector(i=9)
dobot.setEndEffector(i=9,tool_i="{0.0,0.0,154.071,0.0,0.0,0.0}")
dobot.chooseEndEffector(i=9)
# print(dobot.getPose())
# print(dobot.get_arm_position())
# pos,rot_quat = dobot.get_arm_position()
@ -560,9 +600,17 @@ if __name__ == "__main__":
sleep(0.008)
dobot.Get_feedInfo_pose()
sleep(0.5)
cur = dobot.Get_feedInfo_angle() # 参考关节角度
short_cur = [f"{x:.3f}" for x in cur]
target = dobot.getInverseKin([250.000045,-134.999537,344.115807,-179.999982,-0.000408,-90.000642],-1,-1,1,"{" + ",".join(map(str, short_cur)) + "}")
print(target)
# for i in range(400):
# posJ[2] += 0.04
# dobot.ServoJoint(posJ,t=0.008,aheadtime=20,gain=200)
# sleep(0.007)
# dobot.start_drag()
dobot.disableRobot()

View File

@ -13,7 +13,7 @@ sys.path.append(os.path.abspath(os.path.join(os.getcwd(), "Massage/MassageContro
class XjcSensor:
def __init__(self, port="/dev/ttyUSB0", baudrate=115200, rate=250):
def __init__(self, port="/dev/ttyUSB0", baudrate=115200, rate=100):
self.port = port
self.baudrate = baudrate
self.rate = rate

View File

@ -0,0 +1,572 @@
import time
import socket
import struct
import numpy as np
import atexit
import threading
from typing import Optional, Tuple
from matplotlib import pyplot as plt
import csv
from datetime import datetime
class XjcSensor:
def __init__(self, host: str, port: int, rate: int = 250):
"""
初始化 TCP 连接的力传感器
:param host: 设备 IP 地址 "192.168.1.100"
:param port: 设备端口 502
:param rate: 数据采样率100/250/500Hz
"""
self.host = host
self.port = port
self.rate = rate
self.slave_address = 0x01 # 默认从机地址
self.sock = None # TCP socket 对象
self.crc16_table = self.generate_crc16_table()
# 后台读取相关的属性
self._background_thread = None
self._stop_background = False
self._last_reading = None
self._reading_lock = threading.Lock()
# 建立 TCP 连接
self.connect()
# 注册退出时的清理函数
atexit.register(self.disconnect)
def __new__(cls, *args, **kwargs):
"""单例模式"""
if not hasattr(cls, 'instance'):
cls.instance = super(XjcSensor, cls).__new__(cls)
return cls.instance
def connect(self):
"""建立 TCP 连接"""
try:
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.settimeout(1.0) # 设置超时时间
self.sock.connect((self.host, self.port))
print(f"Connected to {self.host}:{self.port}")
except Exception as e:
print(f"TCP connection failed: {e}")
raise
def send_and_receive(self, data: bytes, expected_response_len: int) -> Optional[bytes]:
"""
发送数据并接收响应
:param data: 要发送的字节数据
:param expected_response_len: 预期响应的长度
:return: 收到的字节数据失败返回 None
"""
try:
self.sock.sendall(data)
response = self.sock.recv(expected_response_len)
return response
except socket.timeout:
print("Timeout while waiting for response")
return None
except Exception as e:
print(f"TCP communication error: {e}")
return None
def set_zero(self) -> int:
"""传感器置零TCP 版本)"""
if self.slave_address == 0x01:
zero_cmd = bytes.fromhex('01 10 46 1C 00 02 04 00 00 00 00 E8 95')
elif self.slave_address == 0x09:
zero_cmd = bytes.fromhex('09 10 46 1C 00 02 04 00 00 00 00 C2 F5')
response = self.send_and_receive(zero_cmd, 8)
if not response:
print("set zero fail")
return -1
# CRC 校验(假设协议要求)
hi_check, lo_check = self.crc16(response[:-2])
if (response[0] != self.slave_address or
response[1] != 0x10 or
response[-2] != lo_check or
response[-1] != hi_check):
print("Set zero failed: CRC check error")
return -1
print("Set zero success!")
return 0
def read_data_f32(self) -> np.ndarray:
"""
TCP版本 - 读取六维力传感器数据 (float32格式)
返回:
np.ndarray(6,) - 六维力数据 [Fx,Fy,Fz,Mx,My,Mz]
读取失败返回 -1
"""
# 构造读取命令
if self.slave_address == 0x01:
command = bytes.fromhex('01 04 00 00 00 0C F0 0F')
elif self.slave_address == 0x09:
command = bytes.fromhex('09 04 00 54 00 0C B0 97')
try:
# 清空接收缓冲区
self._clear_tcp_buffer()
# 接收完整响应
response = self.send_and_receive(command, 29)
# CRC校验
Hi_check, Lo_check = self.crc16(response[:-2])
if (response[0] != self.slave_address or
response[1] != 0x04 or
response[2] != 24 or # 数据长度字节
response[-2] != Lo_check or
response[-1] != Hi_check):
print(f"Protocol error! Received: {response.hex(' ')}")
print(f"Expected slave:{self.slave_address}, "
f"func:0x04, len:24, "
f"CRC:{Lo_check:02X}{Hi_check:02X}")
return -1
# 解析6个float32数据 (大端序)
sensor_data = struct.unpack('>ffffff', response[3:27])
return np.array(sensor_data)
except socket.timeout:
print("Timeout while waiting for sensor response")
return -1
except ConnectionError as e:
print(f"TCP connection error: {e}")
return -1
except Exception as e:
print(f"Unexpected error in read_data_f32: {str(e)}")
return -1
def _clear_tcp_buffer(self):
"""清空TCP接收缓冲区"""
self.sock.settimeout(0.1) # 短暂超时,避免阻塞
try:
while True:
data = self.sock.recv(1024)
if not data: # 连接关闭或无数据
break
print(f"Cleared TCP buffer: {data.hex(' ')}") # 调试输出
except socket.timeout:
pass # 预期内的超时,表示缓冲区已空
finally:
self.sock.settimeout(2.0) # 恢复默认超时
def enable_active_transmission(self) -> int:
"""启用主动传输模式TCP 版本)"""
if self.rate == 100:
cmd = bytes.fromhex('01 10 01 9A 00 01 02 00 00 AB 6A') if self.slave_address == 0x01 else \
bytes.fromhex('09 10 01 9A 00 01 02 00 00 CC AA')
elif self.rate == 250:
cmd = bytes.fromhex('01 10 01 9A 00 01 02 00 01 6A AA') if self.slave_address == 0x01 else \
bytes.fromhex('09 10 01 9A 00 01 02 00 01 0D 6A')
elif self.rate == 500:
cmd = bytes.fromhex('01 10 01 9A 00 01 02 00 02 2A AB') if self.slave_address == 0x01 else \
bytes.fromhex('09 10 01 9A 00 01 02 00 02 4D 6B')
else:
print("Unsupported rate")
return -1
response = self.send_and_receive(cmd, 8)
if response and response[1] == 0x10: # 检查响应功能码
print(f"Active transmission enabled at {self.rate}Hz")
return 0
else:
print("Failed to enable active transmission")
return -1
def disconnect(self):
"""关闭 TCP 连接"""
if self.sock:
try:
self.sock.close()
print("TCP connection closed")
except Exception as e:
print(f"Error while closing socket: {e}")
self.sock = None
def start_background_reading(self):
"""
启动后台读取任务每秒读取一次传感器数据
"""
if self._background_thread is not None and self._background_thread.is_alive():
print("Background reading is already running")
return False
self._stop_background = False
self._background_thread = threading.Thread(target=self._background_reading_task)
self._background_thread.daemon = True # 设置为守护线程
self._background_thread.start()
return True
def stop_background_reading(self):
"""
停止后台读取任务
"""
if self._background_thread is None or not self._background_thread.is_alive():
print("No background reading is running")
return False
self._stop_background = True
self._background_thread.join(timeout=2.0) # 等待线程结束最多等待2秒
self._background_thread = None
return True
def _background_reading_task(self):
"""
后台读取任务的实现
"""
while not self._stop_background:
try:
data = self.read_data_f32()
if isinstance(data, np.ndarray):
with self._reading_lock:
self._last_reading = data
else:
self.connect()
except Exception as e:
print(f"Error in background reading: {e}")
time.sleep(1.0) # 每秒读取一次
def get_last_reading(self):
"""
获取最近一次的读数
"""
with self._reading_lock:
return self._last_reading
# ------------------------- 以下方法保持不变 -------------------------
@staticmethod
def generate_crc16_table():
"""CRC16 表生成(与原代码相同)"""
table = []
polynomial = 0xA001
for i in range(256):
crc = i
for _ in range(8):
if crc & 0x0001:
crc = (crc >> 1) ^ polynomial
else:
crc >>= 1
table.append(crc)
return table
def crc16(self, data: bytes):
"""CRC16 计算(与原代码相同)"""
crc = 0xFFFF
for byte in data:
crc = (crc >> 8) ^ self.crc16_table[(crc ^ byte) & 0xFF]
return (crc >> 8) & 0xFF, crc & 0xFF
def __del__(self):
"""析构时自动断开连接"""
self.disconnect()
def disable_active_transmission(self) -> int:
"""
禁用传感器的主动传输模式TCP 版本
Returns:
0 表示成功-1 表示失败
"""
try:
# 构造禁用命令(原代码中的 FF x11 字节流)
disable_cmd = bytes.fromhex('FF FF FF FF FF FF FF FF FF FF FF')
print("Disabling active transmission mode...")
# 检查 TCP 连接是否有效
if not self.sock:
raise ConnectionError("TCP connection is not established")
# 发送禁用命令
self.sock.sendall(disable_cmd)
print("Active transmission disabled successfully")
return 0
except ConnectionError as e:
print(f"Connection error: {e}")
return -1
except socket.timeout:
print("Timeout while sending disable command")
return -1
except Exception as e:
print(f"Unexpected error: {e}")
return -1
def read(self):
"""
TCP Version - Read the sensor's data in passive mode.
Returns:
np.ndarray(6,) - Six-axis force/torque data [Fx, Fy, Fz, Mx, My, Mz]
None if reading fails
"""
try:
# Clear any existing data in the TCP buffer first
# self._clear_tcp_buffer()
# Search for frame header (0x20 0x4E)
header_found = False
start_time = time.time()
timeout = 1.0 # 1 second timeout
while not header_found and (time.time() - start_time < timeout):
# Read one byte at a time looking for header
byte1 = self.sock.recv(1)
if not byte1:
continue # No data available yet
if byte1 == b'\x20':
byte2 = self.sock.recv(1)
if byte2 == b'\x4E':
header_found = True
if not header_found:
print("Frame header not found within timeout period")
return None
# Now read the remaining 14 bytes of the frame
response = bytearray([0x20, 0x4E]) # Start with the header we found
remaining_bytes = 14
bytes_received = 0
start_time = time.time()
while bytes_received < remaining_bytes and (time.time() - start_time < timeout):
chunk = self.sock.recv(remaining_bytes - bytes_received)
if chunk:
response.extend(chunk)
bytes_received += len(chunk)
if bytes_received < remaining_bytes:
print(f"Incomplete frame received. Got {len(response)} bytes, expected 16")
return None
# Verify CRC checksum
Hi_check, Lo_check = self.crc16(response[:-2])
if response[-1] != Hi_check or response[-2] != Lo_check:
print("CRC check failed!")
print(f"Received CRC: {response[-2]:02X}{response[-1]:02X}")
print(f"Calculated CRC: {Lo_check:02X}{Hi_check:02X}")
return None
# Parse the sensor data
sensor_data = self.parse_data_passive(response)
return sensor_data
except socket.timeout:
print("Timeout while waiting for sensor data")
return None
except ConnectionError as e:
print(f"TCP connection error: {e}")
return None
except Exception as e:
print(f"Unexpected error in TCP read(): {str(e)}")
return None
def parse_data_passive(self, buffer):
values = [
int.from_bytes(buffer[i:i+2], byteorder='little', signed=True)
for i in range(2, 14, 2)
]
Fx, Fy, Fz = np.array(values[:3]) / 10.0
Mx, My, Mz = np.array(values[3:]) / 1000.0
return np.array([Fx, Fy, Fz, Mx, My, Mz])
# 外部
def test_sensor_frequency(sensor, mode='active', duration=5.0):
"""
测试传感器在不同模式下的实际数据获取频率
参数:
sensor: XjcSensor实例
mode: 'active'(主动模式) 'polling'(查询模式)
duration: 测试持续时间()
返回:
dict: 包含测试结果的字典
"""
# 准备测试环境
if mode == 'active':
print("\n=== 测试主动传输模式 ===")
sensor.enable_active_transmission()
read_func = sensor.read
else:
print("\n=== 测试查询模式 ===")
sensor.disable_active_transmission() # 确保不在主动模式
time.sleep(0.5)
sensor.disable_active_transmission() # 确保不在主动模式
time.sleep(0.5)
sensor.disable_active_transmission() # 确保不在主动模式
time.sleep(0.5)
read_func = sensor.read_data_f32
# 初始化测试变量
timestamps = []
data_count = 0
start_time = time.perf_counter()
end_time = start_time + duration
print(f"开始测试,持续时间 {duration} 秒...")
# 主测试循环
while time.perf_counter() < end_time:
data = read_func()
print(data)
if data is not None:
timestamps.append(time.perf_counter())
data_count += 1
# print(timestamps)
# 计算统计结果
if data_count < 2:
print("警告: 采集到的数据点不足")
return None
intervals = np.diff(timestamps)
save_intervals_to_csv(timestamps)
avg_interval = np.mean(intervals)
min_interval = np.min(intervals)
max_interval = np.max(intervals)
std_interval = np.std(intervals)
avg_freq = 1.0 / avg_interval
# 打印结果
print("\n测试结果:")
print(f"总数据点数: {data_count}")
print(f"平均频率: {avg_freq:.2f} Hz")
print(f"最小间隔: {min_interval*1000:.3f} ms")
print(f"最大间隔: {max_interval*1000:.3f} ms")
print(f"间隔标准差: {std_interval*1000:.3f} ms")
# 返回结果字典
return {
'mode': mode,
'duration': duration,
'data_count': data_count,
'avg_freq': avg_freq,
'min_interval': min_interval,
'max_interval': max_interval,
'std_interval': std_interval,
'timestamps': timestamps,
'intervals': intervals
}
def plot_test_results(active_results, polling_results):
"""绘制两种模式的测试结果对比图"""
plt.figure(figsize=(12, 8))
# 间隔时间分布图
plt.subplot(2, 1, 1)
plt.hist(active_results['intervals']*1000, bins=50, alpha=0.7, label='主动模式')
plt.hist(polling_results['intervals']*1000, bins=50, alpha=0.7, label='查询模式')
plt.xlabel('间隔时间 (ms)')
plt.ylabel('出现次数')
plt.title('数据间隔时间分布')
plt.legend()
plt.grid(True)
# 间隔时间序列图
plt.subplot(2, 1, 2)
plt.plot(np.arange(len(active_results['intervals'])),
active_results['intervals']*1000, 'b.', label='主动模式')
plt.plot(np.arange(len(polling_results['intervals'])),
polling_results['intervals']*1000, 'r.', label='查询模式')
plt.xlabel('数据点序号')
plt.ylabel('间隔时间 (ms)')
plt.title('数据间隔时间序列')
plt.legend()
plt.grid(True)
plt.tight_layout()
plt.show()
def compare_modes(sensor, duration=5.0):
"""比较两种工作模式的性能"""
# 测试主动模式
active_results = test_sensor_frequency(sensor, 'active', duration)
time.sleep(1) # 模式切换间隔
# 测试查询模式
polling_results = test_sensor_frequency(sensor, 'polling', duration)
time.sleep(1)
# 打印对比报告
print("\n=== 模式对比报告 ===")
print(f"{'指标':<15} {'主动模式':>15} {'查询模式':>15}")
print(f"{'平均频率(Hz)':<15} {active_results['avg_freq']:>15.2f} {polling_results['avg_freq']:>15.2f}")
print(f"{'最小间隔(ms)':<15} {active_results['min_interval']*1000:>15.3f} {polling_results['min_interval']*1000:>15.3f}")
print(f"{'最大间隔(ms)':<15} {active_results['max_interval']*1000:>15.3f} {polling_results['max_interval']*1000:>15.3f}")
print(f"{'间隔标准差(ms)':<15} {active_results['std_interval']*1000:>15.3f} {polling_results['std_interval']*1000:>15.3f}")
# 绘制对比图表
plot_test_results(active_results, polling_results)
return active_results, polling_results
def save_intervals_to_csv(timestamps, filename="sensor_intervals.csv"):
"""
Save timestamp intervals to CSV file with metadata
Args:
timestamps: List of timestamp values (from time.perf_counter())
filename: Output CSV filename
"""
if len(timestamps) < 2:
print("Not enough timestamps to calculate intervals")
return
# Calculate intervals in milliseconds
intervals_ms = np.diff(timestamps) * 1000
# Prepare data rows
rows = []
for i, interval in enumerate(intervals_ms):
rows.append({
"reading_number": i+1,
"interval_ms": f"{interval:.4f}",
"timestamp": timestamps[i],
"expected_interval_ms": (1000/250) if i==0 else "", # For 250Hz
"deviation_ms": f"{interval - (1000/250):.4f}" if i>0 else ""
})
# CSV header
fieldnames = ["reading_number", "interval_ms", "timestamp",
"expected_interval_ms", "deviation_ms"]
# Write to file
with open(filename, 'w', newline='') as csvfile:
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(rows)
print(f"Saved {len(intervals_ms)} intervals to {filename}")
if __name__ == "__main__":
# 替换为你的传感器 IP 和端口
sensor = XjcSensor("192.168.5.1", 60000)
# 示例操作
sensor.disable_active_transmission()
time.sleep(0.5)
sensor.disable_active_transmission()
time.sleep(0.5)
sensor.disable_active_transmission()
time.sleep(0.5)
sensor.set_zero()
# sensor.set_zero()
# sensor.disable_active_transmission()
# while True:
# data = sensor.read_data_f32()
# if data is not None:
# print(f"Force data: {data}")
# sensor.enable_active_transmission()
# while True:
# sensor_data = sensor.read()
# if sensor_data is None:
# print('failed to get force sensor data!')
# print(sensor_data)
test_sensor_frequency(sensor)
# compare_modes(sensor)

View File

@ -29,14 +29,14 @@ def ServoP(client, pt_floats):
client.send(tcp_command)
def ServoJ(client, J1_angle):
tcp_command = "servoj(0,%f,%f,0,0,0,t=0.008,aheadtime=20, gain=200)" % (J1_angle,J1_angle/2)
tcp_command = "servop(249,%f,%f,179,0,-90,t=0.008,aheadtime=20, gain=200)" % (J1_angle-134,497+J1_angle/2)
tcp_command = tcp_command.encode()
client.sendall(tcp_command)
# buf = client.recv(200).decode() # 接收反馈信息的长度
# print(buf)
def servoj_2(client, J1_angle):
tcp_command = "servoj(%f,0,0,0,0,0,t=2,aheadtime=50, gain=500)" % (J1_angle)
tcp_command = "servop(249,-135,497,179,0,-90,t=2,aheadtime=50, gain=500)"
tcp_command = tcp_command.encode()
print(tcp_command)
client.send(tcp_command)
@ -57,17 +57,17 @@ def enableStatus(client):
servoj_2(client, 0)
sleep(3)
J1 = 0
bias = 0.04
bias = 0.2
pl = bias
while stop:
time_home = time.time()
if J1 > 60 and bias == pl:
if J1 > 50 and bias == pl:
bias = -pl
elif J1 < -1 and bias == -pl:
bias = pl
J1 = J1 + bias
ServoJ(client, J1)
time.sleep(0.007)
time.sleep(0.008)
tie = time.time()-time_home
logger.info(f"time: {tie}")
print(tie)
@ -90,7 +90,7 @@ def worker(): #子线程接受模式
try:
data = client_socket1.recv(1024).decode()
if data:
print(data)
#print(data)
index_str = data[data.find('{') + 1:data.find('}')]
if (index_str == ''):
stop = False
@ -107,6 +107,7 @@ try:
feed_thread.daemon = True
feed_thread.start()
enableStatus(client_socket1)
# servoj_2(client_socket1, 0)
except KeyboardInterrupt:
print("用户中断操作。")
print((time.time()-time_h)/60)

View File

@ -5,6 +5,7 @@ class Rate:
def __init__(self, hz):
self.interval = 1.0 / hz
self.last_time = time.monotonic()
# self.last_time =time.time()
self.start_time = self.last_time
def sleep(self,print_duration=False):
@ -47,9 +48,11 @@ class Rate:
return max(0.0, remaining_time)
def cycle_time(self):
now = time.monotonic()
cycle_duration = now - self.start_time
self.start_time = now
# now = time.monotonic()
now = time.time()
# cycle_duration = now - self.start_time
cycle_duration = now - self.last_time
self.last_time = now
return cycle_duration
def to_sec(self):

File diff suppressed because it is too large Load Diff