MassageRobot_Dobot/Massage/MassageControl/MassageRobot_nova5.py

506 lines
23 KiB
Python

from hardware.dobot_nova5 import dobot_nova5
from hardware.force_sensor import XjcSensor
from algorithms.arm_state import ArmState
from algorithms.controller_manager import ControllerManager
from algorithms.admittance_controller import AdmittanceController
from algorithms.position_controller import PositionController
from tools.log import CustomLogger
from tools.yaml_operator import read_yaml
from tools.Rate import Rate
import numpy as np
import threading
import time
import os
from scipy.spatial.transform import Rotation as R
import ctypes
libc = ctypes.CDLL("libc.so.6")
import atexit
from tools.Trajectory import TrajectoryInterpolator
from scipy.spatial.transform import Slerp
class MassageRobot:
def __init__(self,arm_config_path,is_log=False):
self.logger = CustomLogger(
log_name="测试日志",
log_file="logs/MassageRobot_nova5_test.log",
precise_time=True)
if is_log:
self.logger_data = CustomLogger(log_name="运动数据日志",
log_file="logs/MassageRobot_kinematics_data.log",
precise_time=True)
# 日志线程
threading.Thread(target=self.log_thread,daemon=True).start()
self.arm_state = ArmState()
self.arm_config = read_yaml(arm_config_path)
# arm 实例化时机械臂类内部进行通讯连接
self.arm = dobot_nova5(self.arm_config['arm_ip'])
self.arm.start()
# 传感器
self.force_sensor = XjcSensor()
self.force_sensor.connect()
# 控制器初始化(初始化为导纳控制)
self.controller_manager = ControllerManager(self.arm_state)
self.controller_manager.add_controller(AdmittanceController,'admittance',self.arm_config['controller'][0])
self.controller_manager.add_controller(PositionController,'position',self.arm_config['controller'][1])
self.controller_manager.switch_controller('admittance')
# 按摩头参数暂时使用本地数据
massage_head_dir = self.arm_config['massage_head_dir']
all_items = os.listdir(massage_head_dir)
head_config_files = [f for f in all_items if os.path.isfile(os.path.join(massage_head_dir, f))]
self.playload_dict = {}
for file in head_config_files:
file_address = massage_head_dir + '/' + file
play_load = read_yaml(file_address)
self.playload_dict[play_load['name']] = play_load
self.current_head = 'none'
# 读取数据
self.gravity_base = None
self.force_zero = None
self.torque_zero = None
self.tool_position = None
self.mass_center_position = None
self.s_tf_matrix_t = None
arm_orientation_set0 = np.array([-180,0,-90])
self.b_rotation_s_set0 = R.from_euler('xyz',arm_orientation_set0,degrees=True).as_matrix()
# 频率数据赋值
self.control_rate = Rate(self.arm_config['control_rate'])
self.sensor_rate = Rate(self.arm_config['sensor_rate'])
self.command_rate = Rate(self.arm_config['command_rate'])
# 低通滤波
self.cutoff_freq = 80.0
# 停止标志位线程
self.exit_event = threading.Event()
self.exit_event.set() # 运行初始化为True
self.interrupt_event = threading.Event()
self.interrupt_event.clear() # 中断初始化为False
self.pause_envent = threading.Event()
self.pause_envent.clear() # 暂停初始化为False
self.skip_envent = threading.Event()
self.skip_envent.clear() # 跳过初始化为False
# 按摩调整
self.adjust_wrench_event = threading.Event()
self.adjust_wrench_event.clear() # 调整初始化为False
self.pos_increment = np.zeros(3,dtype=np.float64)
self.adjust_wrench = np.zeros(6,dtype=np.float64)
self.is_waitting = False
self.last_print_time = 0
self.last_record_time = 0
self.last_command_time = 0
self.move_to_point_count = 0
# 卡尔曼滤波相关初始化
self.x_base = np.zeros(6)
self.P_base = np.eye(6)
# 过程噪声协方差矩阵
self.Q_base = np.eye(6) * 0.01
# 测量噪声协方差矩阵
self.R_base = np.eye(6) * 0.1
self.x_tcp = np.zeros(6)
self.P_tcp = np.eye(6)
# 过程噪声协方差矩阵
self.Q_tcp = np.eye(6) * 0.01
# 测量噪声协方差矩阵
self.R_tcp = np.eye(6) * 0.1
# 传感器故障计数器
self.sensor_fail_count = 0
# 机械臂初始化,适配中间层
self.arm.init()
# REF TRAJ
self.xr = []
self.vr = []
self.ar = []
self.ts = [] # time sequence
self.dt = [] # derivate of time
# ---
self.last_time = -1
self.cur_time = -1
# 预测步骤
def kalman_predict(self,x, P, Q):
# 预测状态(这里假设状态不变)
x_predict = x
# 预测误差协方差
P_predict = P + Q
return x_predict, P_predict
# 更新步骤
def kalman_update(self,x_predict, P_predict, z, R):
# 卡尔曼增益
# K = P_predict @ np.linalg.inv(P_predict + R)
S = P_predict + R
s = np.diag(S) # shape (6,)
p_diag = np.diag(P_predict)
K_diag = np.zeros_like(s)
nonzero_mask = s != 0
K_diag[nonzero_mask] = p_diag[nonzero_mask] / s[nonzero_mask]
K = np.diag(K_diag)
# 更新状态
x_update = x_predict + K @ (z - x_predict)
# 更新误差协方差
P_update = (np.eye(len(K)) - K) @ P_predict
return x_update, P_update
def update_wrench(self):
# 当前的机械臂到末端转换 (实时)
b_rotation_s = R.from_quat(self.arm_state.arm_orientation).as_matrix()
# 读取数据
sensor_data = self.force_sensor.read()
# self.logger.log_info(f"传感器数据{sensor_data}")
if sensor_data is None:
self.force_sensor.stop_background_reading()
self.logger.log_error("传感器数据读取失败")
return -1
# 反重力补偿
sensor_data[:3] = sensor_data[:3] + self.force_zero + self.b_rotation_s_set0.T @ self.gravity_base
sensor_data[3:] = sensor_data[3:] + self.torque_zero + np.cross(self.mass_center_position, self.b_rotation_s_set0.T @ self.gravity_base)
# 传感器数据通过矫正计算得到外来施加力 传感器坐标系下
# 重力
gravity_in_sensor = b_rotation_s.T @ self.gravity_base
s_force = sensor_data[:3] - self.force_zero - gravity_in_sensor
# 力矩
s_torque = sensor_data[3:] - self.torque_zero - np.cross(self.mass_center_position,gravity_in_sensor)
wrench = np.concatenate([s_force,s_torque])
# 传感器到TCP
wrench = self.wrench_coordinate_conversion(self.s_tf_matrix_t,wrench)
# 交给ARM STATE集中管理
self.arm_state.external_wrench_tcp = wrench
# 对tcp坐标系下的外力外矩进行平滑
x_tcp_predict, P_tcp_predict = self.kalman_predict(x = self.x_tcp,
P = self.P_tcp,
Q = self.Q_tcp)
self.x_tcp, self.P_tcp = self.kalman_update(x_predict = x_tcp_predict,
P_predict = P_tcp_predict,
z = self.arm_state.external_wrench_tcp,
R = self.R_tcp)
self.arm_state.external_wrench_tcp = self.x_tcp
self.arm_state.last_external_wrench_tcp = self.arm_state.external_wrench_tcp
return 0
# ####################### 位姿伺服 ##########################
def arm_measure_loop(self):
self.logger.log_info("机械臂测量线程启动")
while (not self.arm.is_exit) and (not self.exit_event.is_set()):
try:
if not self.is_waitting:
self.arm_state.arm_position,self.arm_state.arm_orientation = self.arm.get_arm_position()
code = self.update_wrench()
if code == -1:
self.sensor_fail_count += 1
self.logger.log_error(f"传感器线程数据读取失败-{self.sensor_fail_count}")
if self.sensor_fail_count > 10:
self.logger.log_error("传感器线程数据读取失败超过10次,程序终止")
self.stop()
break
else:
self.sensor_fail_count = 0
except Exception as e:
self.logger.log_error(f"机械臂测量线程报错:{e}")
self.exit_event.set() # 控制退出while
self.sensor_rate.sleep() # 控制频率
def arm_command_loop_traj(self):
self.logger.log_info("机械臂控制线程启动")
# 离线轨迹启动时间记录
self.traj_start_time = time.time()
while (not self.arm.is_exit) and (not self.exit_event.is_set()):
try:
if not self.is_waitting:
process_start_time = time.time()
control_cycle = self.control_rate.to_sec()
t_now = time.time() - self.traj_start_time
i = min(int(t_now / self.dt), len(self.ts) - 2)
alpha = max(0.0, min(1.0, (t_now - self.ts[i]) / self.dt))
pos = (1 - alpha) * self.xr[i][:3] + alpha * self.xr[i+1][:3]
slerp = Slerp([0, 1], R.concatenate([R.from_quat(self.xr[i][3:]), R.from_quat(self.xr[i+1][3:])]))
alpha = 0.5
rot_interp = slerp(alpha).as_quat()
xr = np.concatenate([pos, rot_interp])
# xr = (1-alpha) * self.xr[i] + alpha * self.xr[i+1]
vr = (1 - alpha) * self.vr[i] + alpha * self.vr[i + 1]
ar = (1 - alpha) * self.ar[i] + alpha * self.ar[i + 1]
self.arm_state.desired_position = xr[:3]
# self.arm_state.desired_orientation = xr[3:]
# print(111)
self.arm_state.desired_orientation = rot_interp
# print(222)
self.arm_state.desired_twist = vr
self.arm_state.desired_acc = ar
print("self.arm_state.desired_position",self.arm_state.desired_position)
print("self.arm_state.desired_orientation",self.arm_state.desired_orientation)
print("self.arm_state.desired_twist",self.arm_state.desired_twist)
print("self.arm_state.desired_acc",self.arm_state.desired_acc)
self.controller_manager.step_traj(control_cycle)
# self.controller_manager.step(control_cycle)
command_pose = np.concatenate([
self.arm_state.arm_position_command * 1000, # 转毫米
self.arm_state.arm_orientation_command
])
## 输入滤波
status, pose_processed = self.arm.process_command(
command_pose[:3],
command_pose[3:]
)
# print(f"pose_processed:{pose_processed}")
tcp_command = (
f"ServoP({pose_processed[0]:.3f},{pose_processed[1]:.3f},"
f"{pose_processed[2]:.3f},{pose_processed[3]:.3f},"
f"{pose_processed[4]:.3f},{pose_processed[5]:.3f},"
"t=0.02, aheadtime=20, gain=200)"
).encode()
run_time = time.time() - process_start_time
sleep_duration = self.control_rate.to_sec() - run_time
b2 =time.time()
if sleep_duration > 0:
time.sleep(sleep_duration)
print(f"real sleep:{time.time()-b2}")
# self.arm.dashboard.socket_dobot.sendall(tcp_command)
if self.arm.feedbackData.robotMode == 10: # 10 当前工程暂停
print("机械臂为暂停状态")
res = self.arm.dashboard.Continue()
code = self.arm.parseResultId(res)[0]
if code == 0:
print("机械臂继续已暂停的运动指令")
except Exception as e:
self.logger.log_error(f"机械臂控制失败:{e}")
self.exit_event.set()
# print(f"循环周期时间: {time.time()-process_start_time:.4f} 秒",run_time,sleep_duration)
def arm_command_loop(self):
self.logger.log_info("机械臂控制线程启动")
while (not self.arm.is_exit) and (not self.exit_event.is_set()):
try:
if not self.is_waitting:
process_start_time = time.time()
control_cycle = self.control_rate.to_sec()
self.controller_manager.step(control_cycle)
command_pose = np.concatenate([
self.arm_state.arm_position_command * 1000, # 转毫米
self.arm_state.arm_orientation_command
])
## 输入滤波
status, pose_processed = self.arm.process_command(
command_pose[:3],
command_pose[3:]
)
# print(f"pose_processed:{pose_processed}")
tcp_command = (
f"ServoP({pose_processed[0]:.3f},{pose_processed[1]:.3f},"
f"{pose_processed[2]:.3f},{pose_processed[3]:.3f},"
f"{pose_processed[4]:.3f},{pose_processed[5]:.3f},"
"t=0.02, aheadtime=20, gain=200)"
).encode()
run_time = time.time() - process_start_time
sleep_duration = self.control_rate.to_sec() - run_time
b2 =time.time()
if sleep_duration > 0:
time.sleep(sleep_duration)
print(f"real sleep:{time.time()-b2}")
self.arm.dashboard.socket_dobot.sendall(tcp_command)
if self.arm.feedbackData.robotMode == 10: # 10 当前工程暂停
print("机械臂为暂停状态")
res = self.arm.dashboard.Continue()
code = self.arm.parseResultId(res)[0]
if code == 0:
print("机械臂继续已暂停的运动指令")
except Exception as e:
self.logger.log_error(f"机械臂控制失败:{e}")
self.exit_event.set()
# print(f"循环周期时间: {time.time()-process_start_time:.4f} 秒",run_time,sleep_duration)
####################### 位姿伺服 ##########################
def start(self):
if self.exit_event.is_set():
# 线程
self.exit_event.clear()
self.arm_measure_thread = threading.Thread(target=self.arm_measure_loop)
self.arm_control_thread = threading.Thread(target=self.arm_command_loop_traj)
# 线程启动
self.arm_measure_thread.start() ## 测量线程
position,quat_rot = self.arm.get_arm_position()
# 初始值
self.arm_state.desired_position = position
self.arm_state.arm_position_command = position
self.arm_state.desired_orientation = quat_rot
self.arm_state.arm_orientation_command = quat_rot
self.arm_state.arm_position = position
self.arm_state.arm_orientation = quat_rot
for i in range(20):
self.controller_manager.step(self.control_rate.to_sec())
self.logger.log_blue(f"position command: {self.arm_state.arm_position_command}")
self.logger.log_blue(f"orientation command: {R.from_quat(self.arm_state.arm_orientation_command).as_euler('xyz',degrees=True)}")
position, quat_rot = self.arm.get_arm_position()
self.logger.log_blue(f"position current:{position}")
self.logger.log_blue(f"orientation current: {R.from_quat(quat_rot).as_euler('xyz',degrees=False)}")
self.command_rate.precise_sleep()
position ,quat_rot = self.arm.get_arm_position()
# self.arm_state.desired_position = poistion
self.arm_state.arm_position_command = position
# self.arm_state.desired_orientation = quat_rot
self.arm_state.arm_orientation_command = quat_rot
## POSITION CONTROLLER TEST ##
# delta_pos = np.array([0.1,0.1,-0.1],dtype=np.float64)
# # delta_quat = R.from_euler('xyz', [0, 0, 0], degrees=True).as_quat()
# self.arm_state.desired_position = position + delta_pos
# self.arm_state.desired_orientation = quat_rot
# 线程启动
self.arm_control_thread.start() ## 赋初始值后控制线程
self.logger.log_info("MassageRobot启动")
time.sleep(0.5)
def stop(self):
if not self.exit_event.is_set():
self.exit_event.set()
self.interrupt_event.clear()
self.arm_control_thread.join()
self.arm_measure_thread.join()
for i in range(3):
self.force_sensor.disable_active_transmission()
self.force_sensor.start_background_reading()
self.arm.stop_motion()
self.logger.log_info("MassageRobot停止")
def init_hardwares(self,ready_pose):
self.ready_pose = np.array(ready_pose)
self.switch_payload(self.current_head)
print(self.arm.get_arm_position())
time.sleep(0.5)
def switch_payload(self,name):
if name in self.playload_dict:
self.stop()
self.current_head = name
compensation_config = self.playload_dict[self.current_head]
# 读取数据
self.gravity_base = np.array(compensation_config['gravity_base'])
self.force_zero = np.array(compensation_config['force_zero'])
self.torque_zero = np.array(compensation_config['torque_zero'])
self.tool_position = np.array(compensation_config['tcp_offset'])
self.mass_center_position = np.array(compensation_config['mass_center_position'])
self.s_tf_matrix_t = self.get_tf_matrix(self.tool_position[:3], R.from_euler('xyz',self.tool_position[3:],degrees=False).as_quat())
tcp_offset = self.playload_dict[name]["tcp_offset"]
tcp_offset_str = "{" + ",".join(map(str, tcp_offset)) + "}"
print("tcp_offset_str",tcp_offset_str)
self.arm.setEndEffector(i=9,tool_i=tcp_offset_str)
self.arm.chooseEndEffector(i=9)
print(self.arm.get_arm_position())
self.logger.log_info(f"切换到{name}按摩头")
self.controller_manager.switch_controller('position')
else:
self.logger.log_error(f"未找到{name}按摩头")
def log_thread(self):
while True:
self.logger_data.log_info(f"机械臂位置:{self.arm_state.arm_position},机械臂姿态:{self.arm_state.arm_orientation}",is_print=False)
self.logger_data.log_info(f"机械臂期望位置:{self.arm_state.desired_position},机械臂真实位置:{self.arm_state.arm_position}",is_print=False)
self.logger_data.log_info(f"机械臂期望姿态:{self.arm_state.desired_orientation},机械臂真实姿态:{self.arm_state.arm_orientation}",is_print=False)
self.logger_data.log_info(f"机械臂期望力矩:{self.arm_state.desired_wrench},机械臂真实力矩:{self.arm_state.external_wrench_base}",is_print=False)
self.logger_data.log_info(f"当前按摩头:{self.current_head}",is_print=False)
time.sleep(1)
# 工具函数
def get_tf_matrix(self,position,orientation):
tf_matrix = np.eye(4)
rotation_matrix = R.from_quat(orientation).as_matrix()
tf_matrix[:3,3] = position/1000
tf_matrix[:3,:3] = rotation_matrix
return tf_matrix
def wrench_coordinate_conversion(self,tf_matrix, wrench):
rot_matrix = tf_matrix[:3, :3]
vector_p = tf_matrix[:3, 3]
temp_force = wrench[:3]
torque = wrench[3:]
force = rot_matrix.T @ temp_force
torque = rot_matrix.T @ np.cross(temp_force,vector_p) + rot_matrix.T @ torque
return np.concatenate([force, torque])
if __name__ == "__main__":
waypoints = [
{"time": 0.0, "position": [0.25, -0.135, 0.3443], "velocity": [0, 0, 0],
"acceleration": [0, 0, 0],
"orientation": R.from_euler("xyz", [0, 0, 0], degrees=True).as_quat()},
{"time": 5.0, "position": [0.30, -0.135, 0.3043], "velocity": [0, 0, 0],
"acceleration": [0, 0, 0],
"orientation": R.from_euler("xyz", [0, 30, 30], degrees=True).as_quat()},
] ## 单位 m deg
myInterpolate = TrajectoryInterpolator(waypoints)
ts = np.linspace(0,5,100)
robot = MassageRobot(arm_config_path="/home/jsfb/jsfb/MassageRobot_Dobot/Massage/MassageControl/config/robot_config.yaml")
xr_list, vr_list, ar_list = [], [], []
for t in ts:
xr, vr, ar = myInterpolate.interpolate(t)
xr_list.append(xr)
vr_list.append(vr)
ar_list.append(ar)
xr_array = np.array(xr_list)
vr_array = np.array(vr_list)
ar_array = np.array(ar_list)
robot.xr = xr_array
robot.vr = vr_array
robot.ar = ar_array
robot.ts = ts
robot.dt = ts[1] - ts[0]
ready_pose = [250.0, -135.0, 344.3392, -180.0, 0.0, -90.0]
robot.current_head = 'finger_head'
robot.force_sensor.disable_active_transmission()
time.sleep(0.5)
robot.force_sensor.disable_active_transmission()
time.sleep(0.5)
robot.force_sensor.disable_active_transmission()
time.sleep(0.5)
robot.force_sensor.set_zero()
time.sleep(0.5)
robot.force_sensor.enable_active_transmission()
robot.init_hardwares(ready_pose)
time.sleep(3)
robot.controller_manager.switch_controller('admittance')
# robot.controller_manager.switch_controller('position')
atexit.register(robot.force_sensor.disconnect)
robot.arm_state.desired_wrench = np.array([0,0,-0.5,0,0,0])
try:
robot_thread = threading.Thread(target=robot.start)
robot_thread.start()
except KeyboardInterrupt:
print("用户中断操作。")
except Exception as e:
print("Exception occurred at line:", e.__traceback__.tb_lineno)
print("发生异常:", e)
# robot.arm.disableRobot()