暂存修改

This commit is contained in:
Ziwei.He 2025-05-28 21:18:29 +08:00
commit 04364f7a95

View File

@ -1,24 +1,24 @@
from functools import wraps
import sys
import numpy as np
import threading
import time
from typing import List, Literal, Union
import sys
import numpy as np
import threading
import time
from typing import List, Literal, Union
from scipy.interpolate import interp1d
from scipy.interpolate import interp1d
import requests
import os
from scipy.spatial.transform import Rotation as R
import ctypes
libc = ctypes.CDLL("libc.so.6")
import atexit
import copy
import requests
import os
from scipy.spatial.transform import Rotation as R
import ctypes
libc = ctypes.CDLL("libc.so.6")
import atexit
import copy
from scipy.spatial.transform import Slerp
from scipy.spatial.transform import Slerp
try:
try:
# 导入算法
from .algorithms.arm_state import ArmState
from .algorithms.controller_manager import ControllerManager
@ -44,7 +44,7 @@ try:
from .tools.decorator_tools import custom_decorator,apply_decorators
from .tools.serial_tools import find_serial_by_location
from .tools.Trajectory import TrajectoryInterpolator
except:
except:
#导入算法
from algorithms.arm_state import ArmState
from algorithms.controller_manager import ControllerManager
@ -71,22 +71,22 @@ except:
from tools.serial_tools import find_serial_by_location
from tools.Trajectory import TrajectoryInterpolator
import matplotlib.pyplot as plt
from matplotlib.animation import FuncAnimation
import matplotlib.pyplot as plt
from matplotlib.animation import FuncAnimation
current_file_path = os.path.abspath(__file__)
MassageRobot_Dobot_Path = os.path.dirname(os.path.dirname(os.path.dirname(current_file_path)))
print(MassageRobot_Dobot_Path)
# 添加目标文件夹到系统路径
sys.path.append(MassageRobot_Dobot_Path)
from VortXDB.client import VTXClient
current_file_path = os.path.abspath(__file__)
MassageRobot_Dobot_Path = os.path.dirname(os.path.dirname(os.path.dirname(current_file_path)))
print(MassageRobot_Dobot_Path)
# 添加目标文件夹到系统路径
sys.path.append(MassageRobot_Dobot_Path)
from VortXDB.client import VTXClient
"""
Python atexit.register 用于注册程序退出时要执行的函数atexit 模块提供了一个简单的接口用于在程序正常终止时执行清理操作
"""
Python atexit.register 用于注册程序退出时要执行的函数atexit 模块提供了一个简单的接口用于在程序正常终止时执行清理操作
当你注册多个函数时它们的执行顺序遵循先进后出的顺序LIFOLast In, First Out也就是说最后注册的函数会最先执行
"""
"""
def track_function(function_name,log = False):
def track_function(function_name,log = False):
def before(self, *args, **kwargs):
self.current_function = function_name
if log:
@ -97,8 +97,8 @@ def track_function(function_name,log = False):
self.logger.log_info(f"Exiting function: {function_name},code: {result}")
return custom_decorator(before=before, after=after)
@apply_decorators
class MassageRobot:
@apply_decorators
class MassageRobot:
def __init__(self,arm_config_path,is_log=False):
self.logger = CustomLogger()
if is_log:
@ -108,13 +108,14 @@ class MassageRobot:
self.vtxdb = VTXClient()
# 当前执行的函数
self.current_function = None
self.arm_state = ArmState()
self.arm_config = read_yaml(arm_config_path)
# arm 实例化时机械臂类内部进行通讯连接
# 初始化坐标系 TOOL=0 BASE=1
self.arm = dobot_nova5(arm_ip = self.arm_config['arm_ip'])
self.arm.start()
self.arm.chooseRightFrame()
self.arm.init()
self.thermotherapy = None
self.shockwave = None
@ -129,35 +130,18 @@ class MassageRobot:
# 控制器,初始为导纳控制
self.controller_manager = ControllerManager(self.arm_state)
self.controller_manager.add_controller(AdmittanceController,'admittance',self.arm_config['controller'][0])
# self.controller_manager.add_controller(HybridController,'hybrid',self.arm_config['controller'][1])
self.controller_manager.add_controller(PositionController,'position',self.arm_config['controller'][1])
# self.controller_manager.add_controller(AdmitHybridController,'admithybrid',self.arm_config['controller'][3])
# self.controller_manager.add_controller(HybridPidController,'hybridPid',self.arm_config['controller'][4])
# self.controller_manager.add_controller(HybridAdmitController,'hybridAdmit',self.arm_config['controller'][5])
# self.controller_manager.add_controller(PositionerSensorController,'positionerSensor',self.arm_config['controller'][6])
# self.controller_manager.add_controller(AdmittanceZController,'admittanceZ',self.arm_config['controller'][7])
self.controller_manager.add_controller(HybridController,'hybrid',self.arm_config['controller'][1])
self.controller_manager.add_controller(PositionController,'position',self.arm_config['controller'][2])
self.controller_manager.add_controller(AdmitHybridController,'admithybrid',self.arm_config['controller'][3])
self.controller_manager.add_controller(HybridPidController,'hybridPid',self.arm_config['controller'][4])
self.controller_manager.add_controller(HybridAdmitController,'hybridAdmit',self.arm_config['controller'][5])
self.controller_manager.add_controller(PositionerSensorController,'positionerSensor',self.arm_config['controller'][6])
self.controller_manager.add_controller(AdmittanceZController,'admittanceZ',self.arm_config['controller'][7])
self.controller_manager.switch_controller('admittance')
# 按摩头参数暂时使用本地数据
massage_head_dir = self.arm_config['massage_head_dir']
all_items = os.listdir(massage_head_dir)
head_config_files = [f for f in all_items if os.path.isfile(os.path.join(massage_head_dir, f))]
self.playload_dict = {}
for file in head_config_files:
file_address = massage_head_dir + '/' + file
play_load = read_yaml(file_address)
self.playload_dict[play_load['name']] = play_load
self.current_head = 'none'
# 读取数据
self.gravity_base = None
self.force_zero = None
self.torque_zero = None
self.tool_position = None
self.mass_center_position = None
self.s_tf_matrix_t = None
arm_orientation_set0 = np.array([-180,-30,0])
self.b_rotation_s_set0 = R.from_euler('xyz',arm_orientation_set0,degrees=True).as_matrix()
# 频率数据赋值
self.control_rate = Rate(self.arm_config['control_rate'])
@ -179,15 +163,30 @@ class MassageRobot:
# 按摩调整
self.adjust_wrench_envent = threading.Event()
self.adjust_wrench_envent.clear() # 调整初始化为False
self.is_execute = False
self.pos_increment = np.zeros(3,dtype=np.float64)
self.adjust_wrench = np.zeros(6,dtype=np.float64)
self.skip_pos = np.zeros(6,dtype=np.float64)
# 按摩头参数暂时使用本地数据
massage_head_dir = self.arm_config['massage_head_dir']
all_items = os.listdir(massage_head_dir)
head_config_files = [f for f in all_items if os.path.isfile(os.path.join(massage_head_dir, f))]
self.playload_dict = {}
for file in head_config_files:
file_address = massage_head_dir + '/' + file
play_load = read_yaml(file_address)
self.playload_dict[play_load['name']] = play_load
# self.playload_dict = self.vtxdb.get("robot_config", "massage_head")
# print(self.playload_dict)
self.current_head = 'none'
self.is_waitting = False
self.last_print_time = 0
self.last_record_time = 0
self.last_command_time = 0
self.move_to_point_count = 0
self.width_default = 5
# 卡尔曼滤波相关初始化
self.x_base = np.zeros(6)
self.P_base = np.eye(6)
@ -205,7 +204,16 @@ class MassageRobot:
# 传感器故障计数器
self.sensor_fail_count = 0
# 机械臂初始化,适配中间层
self.arm.init()
# 读取数据
self.gravity_base = None
self.force_zero = None
self.torque_zero = None
self.tool_position = None
self.mass_center_position = None
self.s_tf_matrix_t = None
arm_orientation_set0 = np.array([-180,0,-90])
self.b_rotation_s_set0 = R.from_euler('xyz',arm_orientation_set0,degrees=True).as_matrix()
# REF TRAJ
self.xr = []
@ -217,6 +225,36 @@ class MassageRobot:
self.last_time = -1
self.cur_time = -1
# 预测步骤
def kalman_predict(self,x, P, Q):
# 预测状态(这里假设状态不变)
x_predict = x
# 预测误差协方差
P_predict = P + Q
return x_predict, P_predict
# 更新步骤
def kalman_update(self,x_predict, P_predict, z, R):
# 卡尔曼增益
# K = P_predict @ np.linalg.inv(P_predict + R)
S = P_predict + R
s = np.diag(S) # shape (6,)
p_diag = np.diag(P_predict)
K_diag = np.zeros_like(s)
nonzero_mask = s != 0
K_diag[nonzero_mask] = p_diag[nonzero_mask] / s[nonzero_mask]
K = np.diag(K_diag)
# 更新状态
x_update = x_predict + K @ (z - x_predict)
# 更新误差协方差
P_update = (np.eye(len(K)) - K) @ P_predict
return x_update, P_update
def init_hardwares(self,ready_pose):
self.ready_pose = np.array(ready_pose)
self.switch_payload(self.current_head)
print(self.arm.get_arm_position())
time.sleep(0.5)
def sensor_set_zero(self):
self.arm_position_set0,self.arm_orientation_set0 = self.arm.get_arm_position()
@ -325,30 +363,7 @@ class MassageRobot:
return -1
return 0
# 预测步骤
def kalman_predict(self,x, P, Q):
# 预测状态(这里假设状态不变)
x_predict = x
# 预测误差协方差
P_predict = P + Q
return x_predict, P_predict
# 更新步骤
def kalman_update(self,x_predict, P_predict, z, R):
# 卡尔曼增益
# K = P_predict @ np.linalg.inv(P_predict + R)
S = P_predict + R
s = np.diag(S) # shape (6,)
p_diag = np.diag(P_predict)
K_diag = np.zeros_like(s)
nonzero_mask = s != 0
K_diag[nonzero_mask] = p_diag[nonzero_mask] / s[nonzero_mask]
K = np.diag(K_diag)
# 更新状态
x_update = x_predict + K @ (z - x_predict)
# 更新误差协方差
P_update = (np.eye(len(K)) - K) @ P_predict
return x_update, P_update
def update_wrench(self):
# 当前的机械臂到末端转换 (实时)
@ -386,6 +401,29 @@ class MassageRobot:
self.arm_state.last_external_wrench_tcp = self.arm_state.external_wrench_tcp
return 0
def switch_payload(self,name):
if name in self.playload_dict:
self.stop()
self.current_head = name
compensation_config = self.playload_dict[self.current_head]
# 读取数据
self.gravity_base = np.array(compensation_config['gravity_base'])
self.force_zero = np.array(compensation_config['force_zero'])
self.torque_zero = np.array(compensation_config['torque_zero'])
self.tool_position = np.array(compensation_config['tcp_offset'])
self.mass_center_position = np.array(compensation_config['mass_center_position'])
self.s_tf_matrix_t = self.get_tf_matrix(self.tool_position[:3], R.from_euler('xyz',self.tool_position[3:],degrees=False).as_quat())
tcp_offset = self.playload_dict[name]["tcp_offset"]
tcp_offset_str = "{" + ",".join(map(str, tcp_offset)) + "}"
print("tcp_offset_str",tcp_offset_str)
self.arm.setEndEffector(i=9,tool_i=tcp_offset_str)
self.arm.chooseEndEffector(i=9)
print(self.arm.get_arm_position())
self.logger.log_info(f"切换到{name}按摩头")
# ####################### 位姿伺服 ##########################
def arm_measure_loop(self):
@ -502,9 +540,8 @@ class MassageRobot:
command_pose[:3],
command_pose[3:]
)
# pose_processed = command_pose
print(f"pose_processed:{pose_processed}")
print(self.arm.feedbackData.robotMode)
# print(f"pose_processed:{pose_processed}")
tcp_command = (
f"ServoP({pose_processed[0]:.3f},{pose_processed[1]:.3f},"
f"{pose_processed[2]:.3f},{pose_processed[3]:.3f},"
@ -538,7 +575,7 @@ class MassageRobot:
# 线程
self.exit_event.clear()
self.arm_measure_thread = threading.Thread(target=self.arm_measure_loop)
self.arm_control_thread = threading.Thread(target=self.arm_command_loop)
self.arm_control_thread = threading.Thread(target=self.arm_command_loop_traj)
# 线程启动
self.arm_measure_thread.start() ## 测量线程
position,quat_rot = self.arm.get_arm_position()
@ -587,34 +624,7 @@ class MassageRobot:
self.arm.stop_motion()
self.logger.log_info("MassageRobot停止")
def init_hardwares(self,ready_pose):
self.ready_pose = np.array(ready_pose)
self.switch_payload(self.current_head)
print(self.arm.get_arm_position())
time.sleep(0.5)
def switch_payload(self,name):
if name in self.playload_dict:
self.stop()
self.current_head = name
compensation_config = self.playload_dict[self.current_head]
# 读取数据
self.gravity_base = np.array(compensation_config['gravity_base'])
self.force_zero = np.array(compensation_config['force_zero'])
self.torque_zero = np.array(compensation_config['torque_zero'])
self.tool_position = np.array(compensation_config['tcp_offset'])
self.mass_center_position = np.array(compensation_config['mass_center_position'])
self.s_tf_matrix_t = self.get_tf_matrix(self.tool_position[:3], R.from_euler('xyz',self.tool_position[3:],degrees=False).as_quat())
tcp_offset = self.playload_dict[name]["tcp_offset"]
tcp_offset_str = "{" + ",".join(map(str, tcp_offset)) + "}"
print("tcp_offset_str",tcp_offset_str)
self.arm.setEndEffector(i=9,tool_i=tcp_offset_str)
self.arm.chooseEndEffector(i=9)
print(self.arm.get_arm_position())
self.logger.log_info(f"切换到{name}按摩头")
self.controller_manager.switch_controller('position')
else:
self.logger.log_error(f"未找到{name}按摩头")
@ -1557,36 +1567,36 @@ class MassageRobot:
if __name__ == "__main__":
# waypoints = [
# {"time": 0.0, "position": [0.25, -0.135, 0.3443], "velocity": [0, 0, 0],
# "acceleration": [0, 0, 0],
# "orientation": R.from_euler("xyz", [0, 0, 0], degrees=True).as_quat()},
# {"time": 5.0, "position": [0.30, -0.135, 0.3043], "velocity": [0, 0, 0],
# "acceleration": [0, 0, 0],
# "orientation": R.from_euler("xyz", [0, 30, 30], degrees=True).as_quat()},
# ] ## 单位 m deg
# myInterpolate = TrajectoryInterpolator(waypoints)
# ts = np.linspace(0,5,100)
if __name__ == "__main__":
waypoints = [
{"time": 0.0, "position": [0.25, -0.135, 0.3443], "velocity": [0, 0, 0],
"acceleration": [0, 0, 0],
"orientation": R.from_euler("xyz", [0, 0, 0], degrees=True).as_quat()},
{"time": 5.0, "position": [0.30, -0.135, 0.3043], "velocity": [0, 0, 0],
"acceleration": [0, 0, 0],
"orientation": R.from_euler("xyz", [0, 30, 30], degrees=True).as_quat()},
] ## 单位 m deg
myInterpolate = TrajectoryInterpolator(waypoints)
ts = np.linspace(0,5,100)
robot = MassageRobot(arm_config_path="/home/jsfb/jsfb/MassageRobot_Dobot/Massage/MassageControl/config/robot_config.yaml")
# xr_list, vr_list, ar_list = [], [], []
# for t in ts:
# xr, vr, ar = myInterpolate.interpolate(t)
# xr_list.append(xr)
# vr_list.append(vr)
# ar_list.append(ar)
# xr_array = np.array(xr_list)
# vr_array = np.array(vr_list)
# ar_array = np.array(ar_list)
xr_list, vr_list, ar_list = [], [], []
for t in ts:
xr, vr, ar = myInterpolate.interpolate(t)
xr_list.append(xr)
vr_list.append(vr)
ar_list.append(ar)
xr_array = np.array(xr_list)
vr_array = np.array(vr_list)
ar_array = np.array(ar_list)
# robot.xr = xr_array
# robot.vr = vr_array
# robot.ar = ar_array
# robot.ts = ts
# robot.dt = ts[1] - ts[0]
robot.xr = xr_array
robot.vr = vr_array
robot.ar = ar_array
robot.ts = ts
robot.dt = ts[1] - ts[0]
ready_pose = [194.3467,-134.9880,455.3604,-180.0000,-30.0000,0.0042]
ready_pose = [250.0, -135.0, 344.3392, -180.0, 0.0, -90.0]
robot.current_head = 'finger_head'
robot.force_sensor.disable_active_transmission()
@ -1611,12 +1621,10 @@ if __name__ == "__main__":
try:
robot_thread = threading.Thread(target=robot.start)
robot_thread.start()
except KeyboardInterrupt:
print("用户中断操作。")
except Exception as e:
print("Exception occurred at line:", e.__traceback__.tb_lineno)
print("发生异常:", e)
robot_thread.join() # 等待机械臂线程结束
# robot.arm.disableRobot()