MassageRobot_Dobot/Massage/MassageControl/MassageRobot_nova5.py
2025-05-27 22:08:48 +08:00

1618 lines
68 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from functools import wraps
import sys
import numpy as np
import threading
import time
from typing import List, Literal, Union
from scipy.interpolate import interp1d
import requests
import os
from scipy.spatial.transform import Rotation as R
import ctypes
libc = ctypes.CDLL("libc.so.6")
import atexit
import copy
from scipy.spatial.transform import Slerp
try:
# 导入算法
from .algorithms.arm_state import ArmState
from .algorithms.controller_manager import ControllerManager
from .algorithms.admittance_controller import AdmittanceController
from .algorithms.hybrid_controller import HybridController
from .algorithms.admithybrid_controller import AdmitHybridController
from .algorithms.position_controller import PositionController
from .algorithms.hybridPid_controller import HybridPidController
from .algorithms.hybridAdmit_controller import HybridAdmitController
from .algorithms.positionerSensor_controller import PositionerSensorController
from .algorithms.admittanceZ_controller import AdmittanceZController
from .algorithms.interpolation import linear_interpolate,spline_interpolate,circle_trajectory,cloud_point_interpolate,oscillation_wrench_interpolate,linear_wrench_interpolate
from .algorithms.force_planner import ForcePlanner
# 导入硬件
from .hardware.dobot_nova5 import dobot_nova5
from .hardware.force_sensor import XjcSensor
# 导入工具
from .tools.log import CustomLogger
from .tools.Rate import Rate
from .tools.yaml_operator import read_yaml
from .tools.decorator_tools import custom_decorator,apply_decorators
from .tools.serial_tools import find_serial_by_location
from .tools.Trajectory import TrajectoryInterpolator
except:
#导入算法
from algorithms.arm_state import ArmState
from algorithms.controller_manager import ControllerManager
from algorithms.admittance_controller import AdmittanceController
from algorithms.admittanceZ_controller import AdmittanceZController
from algorithms.hybrid_controller import HybridController
from algorithms.admithybrid_controller import AdmitHybridController
from algorithms.position_controller import PositionController
from algorithms.hybridPid_controller import HybridPidController
from algorithms.hybridAdmit_controller import HybridAdmitController
from algorithms.positionerSensor_controller import PositionerSensorController
from algorithms.interpolation import linear_interpolate,spline_interpolate,circle_trajectory,cloud_point_interpolate,oscillation_wrench_interpolate,linear_wrench_interpolate
from algorithms.force_planner import ForcePlanner
# 导入硬件
from hardware.dobot_nova5 import dobot_nova5
from hardware.force_sensor import XjcSensor
# 导入工具
from tools.log import CustomLogger
from tools.Rate import Rate
from tools.yaml_operator import read_yaml
from tools.decorator_tools import custom_decorator,apply_decorators
from tools.serial_tools import find_serial_by_location
from tools.Trajectory import TrajectoryInterpolator
import matplotlib.pyplot as plt
from matplotlib.animation import FuncAnimation
current_file_path = os.path.abspath(__file__)
MassageRobot_Dobot_Path = os.path.dirname(os.path.dirname(os.path.dirname(current_file_path)))
print(MassageRobot_Dobot_Path)
# 添加目标文件夹到系统路径
sys.path.append(MassageRobot_Dobot_Path)
from VortXDB.client import VTXClient
"""
在 Python 中atexit.register 用于注册程序退出时要执行的函数。atexit 模块提供了一个简单的接口,用于在程序正常终止时执行清理操作。
当你注册多个函数时它们的执行顺序遵循先进后出的顺序LIFOLast In, First Out。也就是说最后注册的函数会最先执行。
"""
def track_function(function_name,log = False):
def before(self, *args, **kwargs):
self.current_function = function_name
if log:
self.logger.log_info(f"Entering function: {function_name}")
def after(self, result, *args, **kwargs):
self.current_function = None
if log:
self.logger.log_info(f"Exiting function: {function_name},code: {result}")
return custom_decorator(before=before, after=after)
@apply_decorators
class MassageRobot:
def __init__(self,arm_config_path,is_log=False):
self.logger = CustomLogger()
if is_log:
self.logger_data = CustomLogger()
# 日志线程
threading.Thread(target=self.log_thread,daemon=True).start()
self.vtxdb = VTXClient()
self.arm_state = ArmState()
self.arm_config = read_yaml(arm_config_path)
# arm 实例化时机械臂类内部进行通讯连接
self.arm = dobot_nova5(arm_ip = self.arm_config['arm_ip'])
self.arm.start()
self.thermotherapy = None
self.shockwave = None
self.stone = None
self.heat = None
self.ion = None
self.force_plan = ForcePlanner()
# 传感器
# self.force_sensor = None
self.force_sensor = XjcSensor()
# self.force_sensor.connect()
# 控制器,初始为导纳控制
self.controller_manager = ControllerManager(self.arm_state)
self.controller_manager.add_controller(AdmittanceController,'admittance',self.arm_config['controller'][0])
self.controller_manager.add_controller(HybridController,'hybrid',self.arm_config['controller'][1])
self.controller_manager.add_controller(PositionController,'position',self.arm_config['controller'][2])
self.controller_manager.add_controller(AdmitHybridController,'admithybrid',self.arm_config['controller'][3])
self.controller_manager.add_controller(HybridPidController,'hybridPid',self.arm_config['controller'][4])
self.controller_manager.add_controller(HybridAdmitController,'hybridAdmit',self.arm_config['controller'][5])
self.controller_manager.add_controller(PositionerSensorController,'positionerSensor',self.arm_config['controller'][6])
self.controller_manager.add_controller(AdmittanceZController,'admittanceZ',self.arm_config['controller'][7])
self.controller_manager.switch_controller('admittance')
# 按摩头参数暂时使用本地数据
massage_head_dir = self.arm_config['massage_head_dir']
all_items = os.listdir(massage_head_dir)
head_config_files = [f for f in all_items if os.path.isfile(os.path.join(massage_head_dir, f))]
self.playload_dict = {}
for file in head_config_files:
file_address = massage_head_dir + '/' + file
play_load = read_yaml(file_address)
self.playload_dict[play_load['name']] = play_load
self.current_head = 'none'
# 读取数据
self.gravity_base = None
self.force_zero = None
self.torque_zero = None
self.tool_position = None
self.mass_center_position = None
self.s_tf_matrix_t = None
arm_orientation_set0 = np.array([-180,0,-90])
self.b_rotation_s_set0 = R.from_euler('xyz',arm_orientation_set0,degrees=True).as_matrix()
# 频率数据赋值
self.control_rate = Rate(self.arm_config['control_rate'])
self.sensor_rate = Rate(self.arm_config['sensor_rate'])
self.command_rate = Rate(self.arm_config['command_rate'])
# 低通滤波
self.cutoff_freq = 80.0
# 停止标志位线程
self.exit_event = threading.Event()
self.exit_event.set() # 运行初始化为True
self.interrupt_event = threading.Event()
self.interrupt_event.clear() # 中断初始化为False
self.pause_envent = threading.Event()
self.pause_envent.clear() # 暂停初始化为False
self.skip_envent = threading.Event()
self.skip_envent.clear() # 跳过初始化为False
# 按摩调整
self.adjust_wrench_envent = threading.Event()
self.adjust_wrench_envent.clear() # 调整初始化为False
self.pos_increment = np.zeros(3,dtype=np.float64)
self.adjust_wrench = np.zeros(6,dtype=np.float64)
self.is_waitting = False
self.last_print_time = 0
self.last_record_time = 0
self.last_command_time = 0
self.move_to_point_count = 0
# 卡尔曼滤波相关初始化
self.x_base = np.zeros(6)
self.P_base = np.eye(6)
# 过程噪声协方差矩阵
self.Q_base = np.eye(6) * 0.01
# 测量噪声协方差矩阵
self.R_base = np.eye(6) * 0.1
self.x_tcp = np.zeros(6)
self.P_tcp = np.eye(6)
# 过程噪声协方差矩阵
self.Q_tcp = np.eye(6) * 0.01
# 测量噪声协方差矩阵
self.R_tcp = np.eye(6) * 0.1
# 传感器故障计数器
self.sensor_fail_count = 0
# 机械臂初始化,适配中间层
self.arm.init()
# REF TRAJ
self.xr = []
self.vr = []
self.ar = []
self.ts = [] # time sequence
self.dt = [] # derivate of time
# ---
self.last_time = -1
self.cur_time = -1
def sensor_set_zero(self):
self.arm_position_set0,self.arm_orientation_set0 = self.arm.get_arm_position()
arm_orientation_set0 = self.arm_orientation_set0
print("arm_orientation_set0",R.from_quat(arm_orientation_set0).as_euler('xyz',degrees=True))
self.b_rotation_s_set0 = R.from_quat(arm_orientation_set0).as_matrix()
#########
max_retries = 5
#no_head_wrong=np.array([-80.01506042, 116.34187317, -87.65788269, -0.32910481, -0.65792173, -0.61110526])#旧版传感器)
head_set0=np.array([0, 0, 0, 0, 0, 0])#新版传感器)
min_tolerance=0.001
retry_count = 0
self.force_sensor.disable_active_transmission()
time.sleep(0.5)
while retry_count < max_retries:
# 调用读取数据的函数,假设返回值是一个 numpy 数组
data = self.force_sensor.read_data_f32()
data = self.force_sensor.read_data_f32()
data = self.force_sensor.read_data_f32()
# 检查返回值是否为 np.array 类型
if isinstance(data, np.ndarray):
# 判断数据是否接近目标值
if np.allclose(data, head_set0, atol=min_tolerance):
print(f"检测到力传感器已置零:{data}")
break
else:
print(f"检测到力传感器未置零:{data}")
# 禁用传感器传输
self.force_sensor.disable_active_transmission()
self.force_sensor.disable_active_transmission()
time.sleep(0.5)
# 尝试设置传感器清零
code = self.force_sensor.set_zero()
code = self.force_sensor.set_zero()
max_try = 3
while code != 0 and max_try > 0:
self.logger.log_error("Force sensor set zero failed, retrying...")
self.force_sensor.disable_active_transmission()
self.force_sensor.disable_active_transmission()
time.sleep(0.5)
code = self.force_sensor.set_zero()
code = self.force_sensor.set_zero()
max_try -= 1
time.sleep(0.5)
# 如果多次尝试后失败
if max_try == 0:
self.logger.log_error("Force sensor set zero failed, exiting...")
requests.post(
"http://127.0.0.1:5000/on_message", data={"message": "传感器初始化失败"}
)
requests.post(
"http://127.0.0.1:5000/update_massage_status", data={"massage_service_started": False}
)
# sys.exit(0)
return -1 # 置零失败
# 成功后跳出主循环
break
else:
self.logger.log_error("读取数据失败或格式错误,尝试重试...")
# 增加重试次数
retry_count += 1
self.force_sensor.disable_active_transmission()
self.force_sensor.disconnect()
time.sleep(1)
self.force_sensor = XjcSensor()
time.sleep(1) # 每次重试之间添加一个短暂的延迟
if retry_count == max_retries:
self.logger.log_error(f"超过最大重试次数 ({max_retries}),操作失败,退出程序...")
requests.post(
"http://127.0.0.1:5000/on_message", data={"message": "传感器初始化失败"}
)
requests.post(
"http://127.0.0.1:5000/update_massage_status", data={"massage_service_started": False}
)
return -2 #读取失败
return 0 #读取成功并置零成功
def sensor_enable(self):
code = self.force_sensor.enable_active_transmission()
max_try = 3
while code!= 0 and max_try > 0:
self.logger.log_error("Force sensor enable_active_transmission failed,retrying...")
code = self.force_sensor.enable_active_transmission()
max_try -= 1
time.sleep(0.1)
if max_try == 0:
self.logger.log_error("Force sensor enable_active_transmission failed,exiting...")
requests.post(
"http://127.0.0.1:5000/on_message", data={"message": "传感器初始化失败"}
)
requests.post(
"http://127.0.0.1:5000/update_massage_status", data={"massage_service_started": False}
)
# sys.exit(0)
return -1
return 0
# 预测步骤
def kalman_predict(self,x, P, Q):
# 预测状态(这里假设状态不变)
x_predict = x
# 预测误差协方差
P_predict = P + Q
return x_predict, P_predict
# 更新步骤
def kalman_update(self,x_predict, P_predict, z, R):
# 卡尔曼增益
# K = P_predict @ np.linalg.inv(P_predict + R)
S = P_predict + R
s = np.diag(S) # shape (6,)
p_diag = np.diag(P_predict)
K_diag = np.zeros_like(s)
nonzero_mask = s != 0
K_diag[nonzero_mask] = p_diag[nonzero_mask] / s[nonzero_mask]
K = np.diag(K_diag)
# 更新状态
x_update = x_predict + K @ (z - x_predict)
# 更新误差协方差
P_update = (np.eye(len(K)) - K) @ P_predict
return x_update, P_update
def update_wrench(self):
# 当前的机械臂到末端转换 (实时)
b_rotation_s = R.from_quat(self.arm_state.arm_orientation).as_matrix()
# 读取数据
sensor_data = self.force_sensor.read()
# self.logger.log_info(f"传感器数据{sensor_data}")
if sensor_data is None:
self.force_sensor.stop_background_reading()
self.logger.log_error("传感器数据读取失败")
return -1
# 反重力补偿
sensor_data[:3] = sensor_data[:3] + self.force_zero + self.b_rotation_s_set0.T @ self.gravity_base
sensor_data[3:] = sensor_data[3:] + self.torque_zero + np.cross(self.mass_center_position, self.b_rotation_s_set0.T @ self.gravity_base)
# 传感器数据通过矫正计算得到外来施加力 传感器坐标系下
# 重力
gravity_in_sensor = b_rotation_s.T @ self.gravity_base
s_force = sensor_data[:3] - self.force_zero - gravity_in_sensor
# 力矩
s_torque = sensor_data[3:] - self.torque_zero - np.cross(self.mass_center_position,gravity_in_sensor)
wrench = np.concatenate([s_force,s_torque])
# 传感器到TCP
wrench = self.wrench_coordinate_conversion(self.s_tf_matrix_t,wrench)
# 交给ARM STATE集中管理
self.arm_state.external_wrench_tcp = wrench
# 对tcp坐标系下的外力外矩进行平滑
x_tcp_predict, P_tcp_predict = self.kalman_predict(x = self.x_tcp,
P = self.P_tcp,
Q = self.Q_tcp)
self.x_tcp, self.P_tcp = self.kalman_update(x_predict = x_tcp_predict,
P_predict = P_tcp_predict,
z = self.arm_state.external_wrench_tcp,
R = self.R_tcp)
self.arm_state.external_wrench_tcp = self.x_tcp
self.arm_state.last_external_wrench_tcp = self.arm_state.external_wrench_tcp
return 0
# ####################### 位姿伺服 ##########################
def arm_measure_loop(self):
self.logger.log_info("机械臂测量线程启动")
while (not self.arm.is_exit) and (not self.exit_event.is_set()):
try:
if not self.is_waitting:
self.arm_state.arm_position,self.arm_state.arm_orientation = self.arm.get_arm_position()
code = self.update_wrench()
if code == -1:
self.sensor_fail_count += 1
self.logger.log_error(f"传感器线程数据读取失败-{self.sensor_fail_count}")
if self.sensor_fail_count > 10:
self.logger.log_error("传感器线程数据读取失败超过10次,程序终止")
self.stop()
break
else:
self.sensor_fail_count = 0
except Exception as e:
self.logger.log_error(f"机械臂测量线程报错:{e}")
self.exit_event.set() # 控制退出while
self.sensor_rate.sleep() # 控制频率
def arm_command_loop_traj(self):
self.logger.log_info("机械臂控制线程启动")
# 离线轨迹启动时间记录
self.traj_start_time = time.time()
while (not self.arm.is_exit) and (not self.exit_event.is_set()):
try:
if not self.is_waitting:
process_start_time = time.time()
control_cycle = self.control_rate.to_sec()
t_now = time.time() - self.traj_start_time
i = min(int(t_now / self.dt), len(self.ts) - 2)
alpha = max(0.0, min(1.0, (t_now - self.ts[i]) / self.dt))
pos = (1 - alpha) * self.xr[i][:3] + alpha * self.xr[i+1][:3]
slerp = Slerp([0, 1], R.concatenate([R.from_quat(self.xr[i][3:]), R.from_quat(self.xr[i+1][3:])]))
alpha = 0.5
rot_interp = slerp(alpha).as_quat()
xr = np.concatenate([pos, rot_interp])
# xr = (1-alpha) * self.xr[i] + alpha * self.xr[i+1]
vr = (1 - alpha) * self.vr[i] + alpha * self.vr[i + 1]
ar = (1 - alpha) * self.ar[i] + alpha * self.ar[i + 1]
self.arm_state.desired_position = xr[:3]
# self.arm_state.desired_orientation = xr[3:]
# print(111)
self.arm_state.desired_orientation = rot_interp
# print(222)
self.arm_state.desired_twist = vr
self.arm_state.desired_acc = ar
print("self.arm_state.desired_position",self.arm_state.desired_position)
print("self.arm_state.desired_orientation",self.arm_state.desired_orientation)
print("self.arm_state.desired_twist",self.arm_state.desired_twist)
print("self.arm_state.desired_acc",self.arm_state.desired_acc)
self.controller_manager.step_traj(control_cycle)
# self.controller_manager.step(control_cycle)
command_pose = np.concatenate([
self.arm_state.arm_position_command * 1000, # 转毫米
self.arm_state.arm_orientation_command
])
## 输入滤波
status, pose_processed = self.arm.process_command(
command_pose[:3],
command_pose[3:]
)
# print(f"pose_processed:{pose_processed}")
tcp_command = (
f"ServoP({pose_processed[0]:.3f},{pose_processed[1]:.3f},"
f"{pose_processed[2]:.3f},{pose_processed[3]:.3f},"
f"{pose_processed[4]:.3f},{pose_processed[5]:.3f},"
"t=0.02, aheadtime=20, gain=200)"
).encode()
run_time = time.time() - process_start_time
sleep_duration = self.control_rate.to_sec() - run_time
b2 =time.time()
if sleep_duration > 0:
time.sleep(sleep_duration)
print(f"real sleep:{time.time()-b2}")
# self.arm.dashboard.socket_dobot.sendall(tcp_command)
if self.arm.feedbackData.robotMode == 10: # 10 当前工程暂停
print("机械臂为暂停状态")
res = self.arm.dashboard.Continue()
code = self.arm.parseResultId(res)[0]
if code == 0:
print("机械臂继续已暂停的运动指令")
except Exception as e:
self.logger.log_error(f"机械臂控制失败:{e}")
self.exit_event.set()
# print(f"循环周期时间: {time.time()-process_start_time:.4f} 秒",run_time,sleep_duration)
def arm_command_loop(self):
self.logger.log_info("机械臂控制线程启动")
while (not self.arm.is_exit) and (not self.exit_event.is_set()):
try:
if not self.is_waitting:
process_start_time = time.time()
control_cycle = self.control_rate.to_sec()
self.controller_manager.step(control_cycle)
command_pose = np.concatenate([
self.arm_state.arm_position_command * 1000, # 转毫米
self.arm_state.arm_orientation_command
])
## 输入滤波
status, pose_processed = self.arm.process_command(
command_pose[:3],
command_pose[3:]
)
# print(f"pose_processed:{pose_processed}")
tcp_command = (
f"ServoP({pose_processed[0]:.3f},{pose_processed[1]:.3f},"
f"{pose_processed[2]:.3f},{pose_processed[3]:.3f},"
f"{pose_processed[4]:.3f},{pose_processed[5]:.3f},"
"t=0.02, aheadtime=20, gain=200)"
).encode()
run_time = time.time() - process_start_time
sleep_duration = self.control_rate.to_sec() - run_time
b2 =time.time()
if sleep_duration > 0:
time.sleep(sleep_duration)
print(f"real sleep:{time.time()-b2}")
self.arm.dashboard.socket_dobot.sendall(tcp_command)
if self.arm.feedbackData.robotMode == 10: # 10 当前工程暂停
print("机械臂为暂停状态")
res = self.arm.dashboard.Continue()
code = self.arm.parseResultId(res)[0]
if code == 0:
print("机械臂继续已暂停的运动指令")
except Exception as e:
self.logger.log_error(f"机械臂控制失败:{e}")
self.exit_event.set()
# print(f"循环周期时间: {time.time()-process_start_time:.4f} 秒",run_time,sleep_duration)
####################### 位姿伺服 ##########################
def start(self):
if self.exit_event.is_set():
# 线程
self.exit_event.clear()
self.arm_measure_thread = threading.Thread(target=self.arm_measure_loop)
self.arm_control_thread = threading.Thread(target=self.arm_command_loop_traj)
# 线程启动
self.arm_measure_thread.start() ## 测量线程
position,quat_rot = self.arm.get_arm_position()
# 初始值
self.arm_state.desired_position = position
self.arm_state.arm_position_command = position
self.arm_state.desired_orientation = quat_rot
self.arm_state.arm_orientation_command = quat_rot
self.arm_state.arm_position = position
self.arm_state.arm_orientation = quat_rot
for i in range(20):
self.controller_manager.step(self.control_rate.to_sec())
self.logger.log_blue(f"position command: {self.arm_state.arm_position_command}")
self.logger.log_blue(f"orientation command: {R.from_quat(self.arm_state.arm_orientation_command).as_euler('xyz',degrees=True)}")
position, quat_rot = self.arm.get_arm_position()
self.logger.log_blue(f"position current:{position}")
self.logger.log_blue(f"orientation current: {R.from_quat(quat_rot).as_euler('xyz',degrees=False)}")
self.command_rate.precise_sleep()
position ,quat_rot = self.arm.get_arm_position()
# self.arm_state.desired_position = poistion
self.arm_state.arm_position_command = position
# self.arm_state.desired_orientation = quat_rot
self.arm_state.arm_orientation_command = quat_rot
## POSITION CONTROLLER TEST ##
# delta_pos = np.array([0.1,0.1,-0.1],dtype=np.float64)
# # delta_quat = R.from_euler('xyz', [0, 0, 0], degrees=True).as_quat()
# self.arm_state.desired_position = position + delta_pos
# self.arm_state.desired_orientation = quat_rot
# 线程启动
self.arm_control_thread.start() ## 赋初始值后控制线程
self.logger.log_info("MassageRobot启动")
time.sleep(0.5)
def stop(self):
if not self.exit_event.is_set():
self.exit_event.set()
self.interrupt_event.clear()
self.arm_control_thread.join()
self.arm_measure_thread.join()
for i in range(3):
self.force_sensor.disable_active_transmission()
self.force_sensor.start_background_reading()
self.arm.stop_motion()
self.logger.log_info("MassageRobot停止")
def init_hardwares(self,ready_pose):
self.ready_pose = np.array(ready_pose)
self.switch_payload(self.current_head)
print(self.arm.get_arm_position())
time.sleep(0.5)
def switch_payload(self,name):
if name in self.playload_dict:
self.stop()
self.current_head = name
compensation_config = self.playload_dict[self.current_head]
# 读取数据
self.gravity_base = np.array(compensation_config['gravity_base'])
self.force_zero = np.array(compensation_config['force_zero'])
self.torque_zero = np.array(compensation_config['torque_zero'])
self.tool_position = np.array(compensation_config['tcp_offset'])
self.mass_center_position = np.array(compensation_config['mass_center_position'])
self.s_tf_matrix_t = self.get_tf_matrix(self.tool_position[:3], R.from_euler('xyz',self.tool_position[3:],degrees=False).as_quat())
tcp_offset = self.playload_dict[name]["tcp_offset"]
tcp_offset_str = "{" + ",".join(map(str, tcp_offset)) + "}"
print("tcp_offset_str",tcp_offset_str)
self.arm.setEndEffector(i=9,tool_i=tcp_offset_str)
self.arm.chooseEndEffector(i=9)
print(self.arm.get_arm_position())
self.logger.log_info(f"切换到{name}按摩头")
self.controller_manager.switch_controller('position')
else:
self.logger.log_error(f"未找到{name}按摩头")
def log_thread(self):
while True:
self.logger_data.log_info(f"机械臂位置:{self.arm_state.arm_position},机械臂姿态:{self.arm_state.arm_orientation}",is_print=False)
self.logger_data.log_info(f"机械臂期望位置:{self.arm_state.desired_position},机械臂真实位置:{self.arm_state.arm_position}",is_print=False)
self.logger_data.log_info(f"机械臂期望姿态:{self.arm_state.desired_orientation},机械臂真实姿态:{self.arm_state.arm_orientation}",is_print=False)
self.logger_data.log_info(f"机械臂期望力矩:{self.arm_state.desired_wrench},机械臂真实力矩:{self.arm_state.external_wrench_base}",is_print=False)
self.logger_data.log_info(f"当前按摩头:{self.current_head}",is_print=False)
time.sleep(1)
# 工具函数
def get_tf_matrix(self,position,orientation):
tf_matrix = np.eye(4)
rotation_matrix = R.from_quat(orientation).as_matrix()
tf_matrix[:3,3] = position/1000
tf_matrix[:3,:3] = rotation_matrix
return tf_matrix
def wrench_coordinate_conversion(self,tf_matrix, wrench):
rot_matrix = tf_matrix[:3, :3]
vector_p = tf_matrix[:3, 3]
temp_force = wrench[:3]
torque = wrench[3:]
force = rot_matrix.T @ temp_force
torque = rot_matrix.T @ np.cross(temp_force,vector_p) + rot_matrix.T @ torque
return np.concatenate([force, torque])
@track_function("set_position",True)
def set_position(self,pose,is_wait = False):
self.logger.log_info(f"set postion:{pose}")
x,y,z = pose[0],pose[1],pose[2]
roll,pitch,yaw = pose[3],pose[4],pose[5]
pose_command = np.concatenate([[x,y,z], [roll,pitch,yaw]])
time.sleep(0.1)
code = self.arm.RunPoint_P_inPose_M_RAD(poses_list = pose_command)
time.sleep(0.1)
return code
@track_function("move_to_point",True)
def move_to_point(self, t, pose, is_interrupt=True, wait=True, timeout=0.5, interpolation:Literal["linear","circle","cloud_point"] ='linear', algorithm:Literal["admittance","position","hybrid","admithybrid","hybridPid","hybridAdmit","admittanceZ"] = "admittance",is_switch_controller = False):
"""
移动到指定的点
param:
t: 时间或时间序列
pose(nx6): 世界坐标系下的位姿或位姿序列,当输入为None时,表示保持当前位姿
is_interrupt: 是否允许中断
wait: 是否等待运动完成
timeout: 超时时间
interpolation: 插值方式
algorithm: 控制算法
return:
code: 0表示成功
1表示超时
2表示用户打断
3表示硬件错误
4表示参数错误
"""
self.move_to_point_count += 1
self.logger.log_blue(f"move_to_point_count: {self.move_to_point_count}")
self.logger.log_yellow(f"move_to_point: {pose[-1]}")
# 在函数内部直接规划轨迹
traj = self.traj_generate(t, pose, None, interpolation=interpolation)
if traj is None:
self.logger.log_error("轨迹未生成或生成错误")
return 4
self.logger.log_yellow(f"traj_quat: {R.from_quat(traj['quat'][-1]).as_euler('xyz',degrees=False)}")
self.logger.log_yellow(f"traj_pos: {traj['pos'][-1]}")
time_seq = traj['t']
pos_traj = traj['pos']
quat_traj = traj['quat']
if is_switch_controller == True:
try:
self.controller_manager.switch_controller(algorithm)
except ValueError as error:
self.logger.log_error(error)
self.logger.log_error("切换到%s控制器失败" % algorithm)
return 4
self.logger.log_info("切换到%s控制器" % algorithm)
# self.logger.log_info("当前为%s控制器" % algorithm)
for i in range(len(time_seq)):
if self.interrupt_event.is_set():
self.interrupt_event.clear()
self.logger.log_error("self.interrupt_event.clear()")
self.arm_state.desired_position = copy.deepcopy(self.arm_state.arm_position)
self.arm_state.desired_orientation = copy.deepcopy(self.arm_state.arm_orientation)
massage_wrench = np.array([0, 0, 0, 0, 0, 0])
self.arm_state.desired_wrench = massage_wrench
return 2
if self.pause_envent.is_set():
self.pause_envent.clear()
self.logger.log_error("self.pause_envent.clear()")
self.arm_state.desired_position = copy.deepcopy(self.arm_state.arm_position)
self.arm_state.desired_orientation = copy.deepcopy(self.arm_state.arm_orientation)
massage_wrench = np.array([0, 0, 0, 0, 0, 0])
self.arm_state.desired_wrench = massage_wrench
return 5
if self.skip_envent.is_set():
self.logger.log_error("self.skip_envent.clear()")
self.arm_state.desired_position = copy.deepcopy(self.arm_state.arm_position)
self.arm_state.desired_orientation = copy.deepcopy(self.arm_state.arm_orientation)
massage_wrench = np.array([0, 0, 0, 0, 0, 0])
self.arm_state.desired_wrench = massage_wrench
return 6
if self.arm.is_exit or self.exit_event.is_set():
print("robot_hardware_error:",self.arm.is_exit,self.exit_event.is_set())
return 3
if self.adjust_wrench_envent.is_set() and self.is_execute == True:
if self.adjust_wrench is not None:
self.arm_state.desired_wrench = self.adjust_wrench
self.adjust_wrench_envent.clear()
self.arm_state.desired_position = pos_traj[i]
self.arm_state.desired_orientation = quat_traj[i]
if self.ion is not None:
act_position = R.from_quat(self.arm_state.arm_orientation).as_matrix().T @ self.arm_state.arm_position
desired_position = R.from_quat(self.arm_state.arm_orientation).as_matrix().T @ self.arm_state.desired_position
if self.arm_state.positionerSensorData < 0.001:
desired_position[2] = desired_position[2] - self.arm_state.desired_high
else:
desired_position[2] = min(act_position[2] + self.arm_state.positionerSensorData - self.arm_state.desired_high , desired_position[2] - self.arm_state.desired_high)
self.arm_state.desired_position = R.from_quat(self.arm_state.arm_orientation).as_matrix() @ desired_position
self.command_rate.precise_sleep()
start = time.time()
while wait:
if self.interrupt_event.is_set():
self.interrupt_event.clear()
return 2
if self.pause_envent.is_set():
self.pause_envent.clear()
return 5
if self.skip_envent.is_set():
# self.skip_envent.clear()
return 6
if self.arm.is_exit or self.exit_event.is_set():
print("robot_hardware_error:",self.arm.is_exit,self.exit_event.is_set())
return 3
if time.time() - start > timeout:
return 1
self.command_rate.precise_sleep()
return 0
@track_function("apply_force",True)
def apply_force(self, t, wrench, interpolation:Literal["linear","oscillation"] = "linear", algorithm:Literal["admittance","position","hybrid","admithybrid","hybridPid","hybridAdmit","admittanceZ"] = "admittance"):
"""
施加力
param:
t: 时间或时间序列
wrench(nx6): 力矩或力矩序列,当输入为None时,表示力为[0,0,0,0,0,0]
is_interrupt: 是否允许中断
wait: 是否等待运动完成
timeout: 超时时间
interpolation: 插值方式
algorithm: 控制算法
return:
code: 0表示成功
1表示超时
2表示用户打断
3表示硬件错误
4表示参数错误
"""
if t==0:
self.arm_state.desired_wrench = wrench
return 0
# 在函数内部直接规划力矩轨迹
traj = self.wrench_traj_generate(t, wrench, interpolation=interpolation)
# # 在函数内部直接规划力矩轨迹
# traj = self.traj_generate(t, None, wrench, interpolation=interpolation)
if traj is None:
return 4
time_seq = traj['t']
wrench_traj = traj['wrench']
# try:
# self.controller_manager.switch_controller(algorithm)
# except ValueError as error:
# self.logger.log_error(error)
# return 4
self.logger.log_info("切换到%s控制器" % algorithm)
for i in range(len(time_seq)):
if self.interrupt_event.is_set():
self.interrupt_event.clear()
return 2
if self.pause_envent.is_set():
self.pause_envent.clear()
return 5
if self.skip_envent.is_set():
self.skip_envent.clear()
return 6
if self.arm.is_exit or self.exit_event.is_set():
return 3
if self.adjust_wrench_envent.is_set():
# if wrench_traj is not None:
# wrench_traj[i:] = self.adjust_wrench
self.adjust_wrench_envent.clear()
# print(wrench_traj[i])
self.arm_state.desired_wrench = wrench_traj[i] if wrench_traj is not None else np.zeros(6, dtype=np.float64)
self.command_rate.precise_sleep()
return 0
@track_function("resample_curve_strict",True)
def resample_curve_strict(self, points, num_resampled_points):
"""
修正的曲线重新采样函数,移除重复点并确保累积弧长严格递增。
"""
# 计算累积弧长
distances = np.sqrt(np.sum(np.diff(points, axis=0)**2, axis=1)) # 计算相邻点之间的距离
cumulative_length = np.insert(np.cumsum(distances), 0, 0) # 计算累积弧长
# 确保累积弧长严格递增(去除重复值)
unique_indices = np.where(np.diff(cumulative_length, prepend=-np.inf) > 0)[0]
cumulative_length = cumulative_length[unique_indices]
points = points[unique_indices]
# 生成均匀间隔的新弧长
target_lengths = np.linspace(0, cumulative_length[-1], num_resampled_points)
# 在累积弧长上进行插值计算新点位置
x_interp = interp1d(cumulative_length, points[:, 0], kind='linear', fill_value="extrapolate")
y_interp = interp1d(cumulative_length, points[:, 1], kind='linear', fill_value="extrapolate")
new_x = x_interp(target_lengths)
new_y = y_interp(target_lengths)
return np.column_stack((np.round(new_x).astype(int), np.round(new_y).astype(int)))
@track_function("generate_in_spiral_cloudPoint",True)
def generate_in_spiral_cloudPoint(self, start, end, num_cycle_factor=None, width=None, direction:Literal["CW","CCW"] ='CW'):
"""
绘制由内向外的螺旋线。
参数:
- start: 起点坐标 (x, y)
- end: 终点坐标 (x, y)
- num_cycle_factor: 螺旋的圈数
- points: 螺旋线上的点的数量
"""
# 计算起点和终点之间的距离
dx = end[0] - start[0]
dy = end[1] - start[1]
distance = np.sqrt(dx**2 + dy**2)
if width is None:
width = self.width_default
if num_cycle_factor is None:
num_cycle_factor = 5
num_cycle_factor = np.round(num_cycle_factor).astype(int)
if distance < 2:
print("距离过短,无法绘制螺旋线")
return 0
if width <= 2:
width = 2
if num_cycle_factor < 1:
print("圈数必须为大于1的整数")
num_cycle_factor = 1
angle = np.arctan2(dy, dx) # 计算从起点到终点的角度
points = np.round(num_cycle_factor *2*np.pi/(np.pi/9)).astype(int)
# 螺旋的角度范围
theta = np.linspace(0, 2 * np.pi * num_cycle_factor, points)
# 螺旋的半径随着角度增大而线性增长
r = np.linspace(0, distance, points)
tempK = np.abs(width/(distance*((2*num_cycle_factor-0.75)/num_cycle_factor)))
# 计算螺旋的x, y坐标
if direction == "CCW":
# 反向旋转时,使用负的角度来反向螺旋
x = r * np.cos(-theta) # 逆时针旋转
y = r * np.sin(-theta)*tempK
else:
# 正常旋转
x = r * np.cos(theta)
y = r * np.sin(theta)*tempK
# 旋转螺旋线以对齐起点和终点的角度
x_rotated = x * np.cos(angle) - y * np.sin(angle)
y_rotated = x * np.sin(angle) + y * np.cos(angle)
# 平移螺旋线,使其从起点开始
x_final = x_rotated + start[0] - x_rotated[0]
y_final = y_rotated + start[1] - y_rotated[0]
# return np.hstack((np.round(x_final).astype(int).reshape(-1, 1), np.round(y_final).astype(int).reshape(-1, 1)))
result_points = np.hstack((np.round(x_final).astype(int).reshape(-1, 1),
np.round(y_final).astype(int).reshape(-1, 1)))
return self.resample_curve_strict(points=result_points, num_resampled_points=len(result_points))
@track_function("generate_out_spiral_cloudPoint",True)
def generate_out_spiral_cloudPoint(self,start, end, num_cycle_factor=None, width=None, direction:Literal["CW","CCW"] ='CW'):
"""
绘制由外向内的螺旋线。
参数:
- start: 起点坐标 (x, y)
- end: 终点坐标 (x, y)
- num_cycle_factor: 螺旋的圈数
- points: 螺旋线上的点的数量
"""
# 计算起点和终点之间的距离
dx = end[0] - start[0]
dy = end[1] - start[1]
angle = np.arctan2(dy, dx) # 计算从起点到终点的角度
distance = np.sqrt(dx**2 + dy**2)
if width is None:
width = self.width_default
if num_cycle_factor is None:
num_cycle_factor = 5
num_cycle_factor = np.round(num_cycle_factor).astype(int)
if distance < 2:
print("距离过短,无法绘制螺旋线")
return 0
if width <= 2:
width = 2
if num_cycle_factor < 1:
print("圈数必须为大于1的整数")
num_cycle_factor = 1
points = np.round(num_cycle_factor *2*np.pi/(np.pi/9)).astype(int)
# 螺旋的角度范围
theta = np.linspace(0, 2 * np.pi * num_cycle_factor, points)
# 螺旋的半径随着角度增大而线性增长
r = np.linspace(distance, 0, points)
tempK = np.abs(width/(distance*((2*num_cycle_factor-0.75)/num_cycle_factor)))
# 计算螺旋的x, y坐标
if direction == "CCW":
# 反向旋转时,使用负的角度来反向螺旋
x = r * np.cos(-theta) # 逆时针旋转
y = r * np.sin(-theta)*tempK
else:
# 正常旋转
x = r * np.cos(theta)
y = r * np.sin(theta)*tempK
# 旋转螺旋线以对齐起点和终点的角度
x = x * np.cos(np.pi) - y * np.sin(np.pi)
y = x * np.sin(np.pi) + y * np.cos(np.pi)
# 旋转螺旋线以对齐起点和终点的角度
x_rotated = x * np.cos(angle) - y * np.sin(angle)
y_rotated = x * np.sin(angle) + y * np.cos(angle)
# 平移螺旋线,使其从起点开始
x_final = x_rotated + start[0] - x_rotated[0]
y_final = y_rotated + start[1] - y_rotated[0]
# return np.hstack((np.round(x_final).astype(int).reshape(-1, 1), np.round(y_final).astype(int).reshape(-1, 1)))
result_points = np.hstack((np.round(x_final).astype(int).reshape(-1, 1),
np.round(y_final).astype(int).reshape(-1, 1)))
return self.resample_curve_strict(points=result_points, num_resampled_points=len(result_points))
@track_function("generate_prolateCycloid_cloudPoint",True)
def generate_prolateCycloid_cloudPoint(self, start, end, width=None, num_cycle_factor=None, direction: Literal["CW", "CCW"] = 'CW'):
# Parameters
step_degree = np.pi / 12
a_prolate = 4.5
radius = 30.0
distance = np.linalg.norm(np.array(end) - np.array(start))
angle = np.arctan2(end[1] - start[1], end[0] - start[0])
if width is None:
width = self.width_default
if num_cycle_factor is None:
num_cycle_factor = 5
num_cycle_factor = np.round(num_cycle_factor).astype(int)
if distance < 2:
print("距离过短无法绘制Cycloid线")
return 0
if width <= 2:
width = 2
if num_cycle_factor < 1:
print("圈数必须为大于1的整数")
num_cycle_factor = 1
r_cycloid = radius / (2 * a_prolate)
num_cycles = int(num_cycle_factor)
temp_k = distance / (r_cycloid * (np.pi + 2 * a_prolate + 2 * np.pi * num_cycles))
# Generate the prolate cycloid
theta_prolate = np.linspace(0.5 * np.pi, num_cycles * 2 * np.pi + 1.5 * np.pi,
round((num_cycles * 2 * np.pi + 1.5 * np.pi) / step_degree))
if direction == 'CW':
theta_prolate = np.flip(theta_prolate)
x_prolate = r_cycloid * (theta_prolate - a_prolate * np.sin(theta_prolate)) * temp_k
y_prolate = r_cycloid * (1 - a_prolate * np.cos(theta_prolate))
# Scale y-prolate based on width
y_prolate = (y_prolate / 30) * width
# Rotation of cycloid
if direction == 'CW':
x_prolate, y_prolate = -x_prolate, -y_prolate
x_rotated = x_prolate * np.cos(angle) - y_prolate * np.sin(angle)
y_rotated = x_prolate * np.sin(angle) + y_prolate * np.cos(angle)
# Translate to starting point
x_final = x_rotated + start[0] - x_rotated[0]
y_final = y_rotated + start[1] - y_rotated[0]
return np.column_stack((np.round(x_final).astype(int), np.round(y_final).astype(int)))
@track_function("generate_ellipse_cloudPoint",True)
def generate_ellipse_cloudPoint(self,start, end, num_cycle_factor=None, width=None, direction:Literal["CW","CCW"] ='CW'):
# 计算椭圆的中心(两个端点的中点)
center = (start + end) / 2
if num_cycle_factor is None:
num_cycle_factor = 1
num_cycle_factor = np.round(num_cycle_factor).astype(int)
distance = np.sqrt((end[0] - start[0])**2 + (end[1] - start[1])**2)
if width is None:
width = distance/2
if distance < 2:
print("距离过短,无法绘制椭圆")
return 0
if width <= 2:
print("椭圆半径过小,无法绘制椭圆")
width = self.width_default
if num_cycle_factor < 1:
print("圈数必须为大于1的整数")
num_cycle_factor = 1
# 计算两个端点的距离,即长半轴长度
a = np.sqrt((end[0] - start[0])**2 + (end[1] - start[1])**2) / 2
# 假设短半轴长度为常量值radius
b = width / 2
# 计算旋转角度(由端点的方向确定)
angle = np.arctan2(end[1] - start[1], end[0] - start[0])
points = np.round(((num_cycle_factor + 1) * np.pi * 2)/(np.pi/9)).astype(int)
# 绘制椭圆的参数化公式
theta = np.linspace(np.pi, (2+num_cycle_factor*2)* np.pi, points)
# 计算螺旋的x, y坐标
if direction == "CCW":
# 反向旋转时,使用负的角度来反向螺旋
x_ellipse = a * np.cos(-theta) #顺时针
y_ellipse = b * np.sin(-theta)
else:
# 正常旋转
x_ellipse = a * np.cos(theta)
y_ellipse = b * np.sin(theta)
# 绕中心旋转椭圆
x_rot = center[0] + x_ellipse * np.cos(angle) - y_ellipse * np.sin(angle)
y_rot = center[1] + x_ellipse * np.sin(angle) + y_ellipse * np.cos(angle)
# 返回椭圆点云,保留整数值
return np.hstack((np.round(x_rot).astype(int).reshape(-1, 1), np.round(y_rot).astype(int).reshape(-1, 1)))
@track_function("generate_Lemniscate_cloudPoint",True)
def generate_Lemniscate_cloudPoint(self, start, end, width = None,num_cycle_factor = None, direction:Literal["CW","CCW"] ='CW'):
"""
生成从start到end的8字形插补路径
:param start: 起点坐标 (x0, y0)
:param end: 终点坐标 (x1, y1)
:param num_points: 插补点的数量
:return: 插补路径的坐标列表
"""
# 起点和终点的坐标
x1, y1 = start
x0, y0 = end
step_degree=np.pi/12
# 计算轨迹的缩放因子和旋转角度
distance = np.linalg.norm([x1 - x0, y1 - y0])
if width is None:
width = self.width_default
if num_cycle_factor is None:
num_cycle_factor = 1
if distance < 2:
print("距离过短,无法绘制螺旋线")
return 0
if width <= 2:
width = 2
if num_cycle_factor < 1:
print("圈数必须为大于1的整数")
num_cycle_factor = 1
a = distance / 2 # 轨迹的宽度一半
b = width/2
angle = np.arctan2(y1 - y0, x1 - x0)
num_points = np.abs((2*num_cycle_factor+1)*np.pi/step_degree).astype(int)
# 生成标准的8字形轨迹 (Gerono卵形线)
t_values = np.linspace(0.5*np.pi, (2*num_cycle_factor + 1.5)* np.pi, num_points)
x_curve = a * np.sin(t_values)
if direction == 'CCW':
y_curve = -b * np.sin(2 * t_values)
else:
y_curve = b * np.sin(2 * t_values)
# 构建旋转矩阵
rotation_matrix = np.array([[np.cos(angle), -np.sin(angle)],
[np.sin(angle), np.cos(angle)]])
# 旋转并平移轨迹
curve_points = np.vstack((x_curve, y_curve)).T
rotated_points = curve_points @ rotation_matrix.T
shifted_points = rotated_points + [(x0 + x1) / 2, (y0 + y1) / 2]
# 将插值结果四舍五入为整数
return np.round(shifted_points).astype(int)
@track_function("generate_circle_cloudPoint",True)
def generate_circle_cloudPoint(self, start_point, center, step_degree=np.pi/12, num_cycle_factor = 3):
"""
center: 圆心坐标,形如 (x_c, y_c)
start_point: 起始点坐标,形如 (x_0, y_0)
radius: 圆的半径
delta_theta: 每次插补的角度增量
num_turns: 绕圈的次数
"""
# 确定总共需要生成的插补点数
num_points = int((2 * np.pi * num_cycle_factor) / step_degree)
# 圆心
x_c, y_c = center
radius = np.linalg.norm(np.array(start_point) - np.array(center)) # 半径
# 计算起始点的初始角度
x_0, y_0 = start_point
theta_0 = np.arctan2(y_0 - y_c, x_0 - x_c)
# 初始化存储插补点的列表
circle_points = []
# 生成插补点
for i in range(num_points):
# 当前角度
theta_i = theta_0 + i * step_degree
# 计算插补点的坐标
x_i = x_c + radius * np.cos(theta_i)
y_i = y_c + radius * np.sin(theta_i)
# 将点添加到列表中
circle_points.append((np.round(x_i).astype(int), np.round(y_i).astype(int)))
circle_points.append((np.round(x_0).astype(int), np.round(y_0).astype(int)))
return circle_points
@track_function("generate_point_cloudPoint",True)
def generate_point_cloudPoint(self, point, num_cycle_factor=None, width=None, direction: Literal["CW", "CCW"] = 'CW'):
if num_cycle_factor is None:
num_cycle_factor = 12
if num_cycle_factor < 1:
print("圈数必须为大于1的整数")
num_cycle_factor = 1
num_cycle_factor = np.abs(np.round(num_cycle_factor).astype(int))
if width is None:
width = 20
if width < 2:
print("宽度必须为大于2的整数")
width = 2
width = np.abs(np.round(width).astype(int))
radius = width/2
points = np.round((num_cycle_factor * np.pi * 2)/(np.pi/9)).astype(int)
thetas = np.linspace(0, num_cycle_factor * 2 * np.pi, points)
radiuss = np.ones_like(thetas) * radius
for i in range(len(thetas)):
if thetas[i] <= np.pi/2:
radiuss[i] = np.sin(thetas[i]) * radius
if thetas[i] >= num_cycle_factor * np.pi * 2 - np.pi / 2:
radiuss[i] = np.sin(num_cycle_factor * np.pi * 2 - thetas[i]) * radius
if direction == "CCW":
x_ellipse = radiuss * np.cos(-thetas)
y_ellipse = radiuss * np.sin(-thetas)
else:
x_ellipse = radiuss * np.cos(thetas)
y_ellipse = radiuss * np.sin(thetas)
x_rot = point[0] + x_ellipse
y_rot = point[1] + y_ellipse
result_points = np.hstack((np.round(x_rot).astype(int).reshape(-1, 1), np.round(y_rot).astype(int).reshape(-1, 1)))
return self.resample_curve_strict(points=result_points, num_resampled_points=len(result_points))
@track_function("generate_line_cloudPoint",True)
def generate_line_cloudPoint(self,start, end, step_distance = 5):
"""
生成起点和终点之间的线性插值点
参数:
start: 起点 (标量或数组,例如 [x1, y1, z1])
end: 终点 (标量或数组,例如 [x2, y2, z2])
num_points: 插值点的数量(包括起点和终点)
返回:
包含线性插值点的数组
"""
start = np.array(start)
end = np.array(end)
distance = np.linalg.norm(end - start)
num_points = int(distance / step_distance)
if num_points <=2:
num_points = 2
# 使用 numpy 的 linspace 在起点和终点之间生成线性插值点
interpolated_points = np.linspace(start, end, num_points)
# 将插值结果四舍五入为整数
return np.round(interpolated_points).astype(int)
@track_function("generate_repeat_line_cloudPoint",True)
def generate_repeat_line_cloudPoint(self, start_point, end_point, num_cycle_factor = 5):
"""
start_point: 起始点坐标,形如 (x_0, y_0)
end_point: 终点坐标,形如 (x_0, y_0)
num_cycle_factor: 重复的次数
"""
# 初始化存储插补点的列表
result_points = []
# 生成插补点
for i in range(num_cycle_factor):
# 将点添加到列表中
result_points.append(start_point)
result_points.append(end_point)
return np.round(np.array(result_points)).astype(int)
@track_function("generate_finger_circle_cloudPoint",True)
def generate_finger_circle_cloudPoint(self, center_point, num_cycle_factor=5, radius=30,deviation=15):
"""
生成一指禅绕球心的圆弧轨迹
center_point: 圆心坐标 [x, y, z, roll, pitch, yaw]
num_cycle_factor: 轨迹的重复次数
radius: 圆的半径
"""
result_points = []
result_points.append(copy.deepcopy(center_point))
deviation = deviation * np.pi/180
totalCounts = int(360 / radius)
shift = 1
for i in range(num_cycle_factor):
for countR in range(0, totalCounts):
# 当前旋转矩阵
temp_pose = copy.deepcopy(center_point)
rotation = R.from_euler('xyz', temp_pose[3:], degrees=False)
current_rotation_matrix = rotation.as_matrix()
if i == 0:
shift = countR/(90/radius) if countR/(90/radius) < 1 else 1
if i == num_cycle_factor-1:
shift = (totalCounts-countR)/(90/radius) if (totalCounts-countR)/(90/radius) < 1 else 1
# 生成增量旋转矩阵
increment_rotation= R.from_euler('xyz', [deviation*shift,0,countR * np.pi*2 /totalCounts], degrees=False).as_matrix()
# 更新旋转矩阵
updated_rotation_matrix = current_rotation_matrix @ increment_rotation
# 将旋转矩阵转换回欧拉角
temp_pose[3:] = R.from_matrix(updated_rotation_matrix).as_euler('xyz', degrees=False)
# 添加当前点到结果列表
result_points.append(copy.deepcopy(temp_pose))
result_points.append(copy.deepcopy(center_point))
return result_points
def traj_generate(self, t: Union[int, float, List[float]], pose = None, wrench = None, interpolation: Literal["linear", "cubic","circle","cloud_point"] = 'linear',**kwargs):
"""
轨迹生成
param:
t: 时间或时间序列
pose(nx6): 世界坐标系下的位姿或位姿序列,当输入为None时,表示保持当前位姿
wrench(nx6): 力矩或力矩序列,当输入为None时,表示保持当前的力
interpolation: 插值方式
return:
traj: 生成的轨迹 -> dict
"""
# 确保时间输入是列表
if isinstance(t, (int, float)):
t = [t]
elif not isinstance(t, list) or any(not isinstance(i, (int, float)) for i in t):
self.logger.log_error("not isinstance(t, list) or any(not isinstance(i, (int, float)) for i in t)")
return None
# 检查时间点数量
if len(t) < 1:
self.logger.log_error("len(t) < 1")
return None
# 时间序列
time_points = [0.0] + t
time_points = np.array(time_points)
dt = self.command_rate.to_sec()
for i in range(len(time_points)):
time_points[i] = np.round(time_points[i] / dt) * dt
if np.abs(time_points[-1]) < dt:
time_points[-1] = dt
# 由于浮点数精度问题time_step要乘0.9
time_seq = np.arange(time_points[0], time_points[-1] + dt * 0.9, dt)
pos_traj = None
quat_traj = None
wrench_traj = None
pos_vel = None
angular_vel = None
# 位置轨迹与速度轨迹生成
if pose is not None:
if interpolation != 'cloud_point':
pose = np.array(pose).reshape(-1, 6)
if pose.shape[0] != len(t):
self.logger.log_error("pose.shape[0] != len(t)")
print(pose.shape[0],len(t))
return None
pose_matrix = self.convert_to_7d(pose)
else:
pose = np.array(pose)
try:
# 插值
if interpolation == 'linear':
pos_traj, quat_traj = linear_interpolate(positions=pose_matrix[:, :3], quaternions=pose_matrix[:, 3:], time_points=time_points, time_step=dt)
elif interpolation == 'cubic':
pos_traj, quat_traj = spline_interpolate(positions=pose_matrix[:, :3], quaternions=pose_matrix[:, 3:], time_points=time_points, time_step=dt)
elif interpolation == 'circle':
if self.ion is not None:
pos_traj = circle_trajectory(center=pose, radius=0.07,time_points=time_points,time_step=dt,**kwargs)
else:
pos_traj = circle_trajectory(center=pose,time_points=time_points,time_step=dt,**kwargs)
quat_traj = np.tile(R.from_euler('xyz', np.array(pose[0][3:])).as_quat(), (pos_traj.shape[0], 1))
elif interpolation == 'cloud_point':
pos_traj, quat_traj = cloud_point_interpolate(positions=pose[:, :3], quaternions=pose[:, 3:], time_points=time_points, time_step=dt)
# self.logger.log_yellow(f'{pos_traj[0]} {pos_traj[-1]}')
except ValueError as error:
self.logger.log_error(error)
return None
else:
self.logger.log_error("未输入位置进行规划")
# TODO 力轨迹生成 改为线性增加
if wrench is not None:
wrench = np.array(wrench).reshape(-1, 6)
if wrench.shape[0] != len(t):
return None
indices = np.searchsorted(time_points, time_seq, side='left') - 1
indices = np.clip(indices, 0, len(wrench) - 1)
wrench_traj = wrench[indices]
# 构建返回数据字典
traj = {
't': time_seq,
'pos': pos_traj,
'quat': quat_traj,
'pos_vel': pos_vel,
'angular_vel': angular_vel,
'wrench': wrench_traj
}
return traj
def wrench_traj_generate(self, t: Union[int, float, List[float]], wrench = None, interpolation: Literal["linear", "oscillation"] = 'linear',**kwargs):
"""
力/力矩轨迹生成
param:
t: 时间或时间序列
wrench(nx6): 力矩或力矩序列
interpolation: 插值方式
return:
traj: 生成的轨迹 -> dict
"""
# 确保时间输入是列表
if isinstance(t, (int, float)):
t = [t]
elif not isinstance(t, list) or any(not isinstance(i, (int, float)) for i in t):
self.logger.log_error("not isinstance(t, list) or any(not isinstance(i, (int, float)) for i in t)")
return None
# 检查时间点数量
if len(t) < 1:
self.logger.log_error("len(t) < 1")
return None
if wrench is None:
self.logger.log_error("未输入力矩进行规划")
return None
# 时间序列
time_points = [0.0] + t
time_points = np.array(time_points)
dt = self.command_rate.to_sec()
for i in range(len(time_points)):
time_points[i] = np.round(time_points[i] / dt) * dt
if np.abs(time_points[-1]) < dt:
time_points[-1] = dt
# 由于浮点数精度问题time_step要乘0.9
time_seq = np.arange(time_points[0], time_points[-1] + dt * 0.9, dt)
wrench_traj = None
# TODO 力轨迹生成 改为线性增加
if wrench is not None:
wrench = np.array(wrench).reshape(-1, 6)
if wrench.shape[0] != len(t):
return None
if interpolation == 'oscillation':
wrench_traj = oscillation_wrench_interpolate(wrench=wrench, time_points=time_points, time_step=dt)
else:
wrench_traj = linear_wrench_interpolate(wrench=wrench, time_points=time_points, time_step=dt)
# indices = np.searchsorted(time_points, time_seq, side='left') - 1
# indices = np.clip(indices, 0, len(wrench) - 1)
# wrench_traj = wrench[indices]
# 构建返回数据字典
traj = {
't': time_seq,
'wrench': wrench_traj
}
return traj
def force_traj_generate(self, t: Union[int, float, List[float]], wrench = None, interpolation: Literal["linear", "oscillation", "auto"] = "auto",**kwargs):
"""
力/力矩轨迹生成
param:
t: 时间或时间序列
wrench(nx6): 力矩或力矩序列
interpolation: 插值方式
return:
traj: 生成的轨迹 -> dict
"""
# 确保时间输入是列表
if isinstance(t, (int, float)):
t = [t]
elif not isinstance(t, list) or any(not isinstance(i, (int, float)) for i in t):
self.logger.log_error("not isinstance(t, list) or any(not isinstance(i, (int, float)) for i in t)")
return None
# 检查时间点数量
if len(t) < 1:
self.logger.log_error("len(t) < 1")
return None
if wrench is None:
self.logger.log_error("未输入力矩进行规划")
return None
# 时间序列
time_points = [0.0] + t
time_points = np.array(time_points)
dt = self.command_rate.to_sec()
for i in range(len(time_points)):
time_points[i] = np.round(time_points[i] / dt) * dt
if np.abs(time_points[-1]) < dt:
time_points[-1] = dt
# 由于浮点数精度问题time_step要乘0.9
time_seq = np.arange(time_points[0], time_points[-1] + dt * 0.9, dt)
wrench_traj = None
# TODO 力轨迹生成 改为线性增加
if wrench is not None:
wrench = np.array(wrench).reshape(-1, 6)
if wrench.shape[0] != len(t):
return None
if interpolation == 'oscillation':
wrench_traj = self.force_plan.oscillation_wrench_interpolate(wrench=wrench, time_points=time_points, time_step=dt)
elif interpolation == "auto":
# target_wrench = []
# target_wrench.append(wrench)
wrench_traj = self.force_plan.S_shaped_wrench_interpolate(start=self.arm_state.desired_wrench, wrench=wrench, time_points=time_points, time_step=dt)
else:
wrench_traj = self.force_plan.linear_wrench_interpolate(wrench=wrench, time_points=time_points, time_step=dt)
# indices = np.searchsorted(time_points, time_seq, side='left') - 1
# indices = np.clip(indices, 0, len(wrench) - 1)
# wrench_traj = wrench[indices]
# 构建返回数据字典
traj = {
't': time_seq,
'wrench': wrench_traj
}
return traj
if __name__ == "__main__":
waypoints = [
{"time": 0.0, "position": [0.25, -0.135, 0.3443], "velocity": [0, 0, 0],
"acceleration": [0, 0, 0],
"orientation": R.from_euler("xyz", [0, 0, 0], degrees=True).as_quat()},
{"time": 5.0, "position": [0.30, -0.135, 0.3043], "velocity": [0, 0, 0],
"acceleration": [0, 0, 0],
"orientation": R.from_euler("xyz", [0, 30, 30], degrees=True).as_quat()},
] ## 单位 m deg
myInterpolate = TrajectoryInterpolator(waypoints)
ts = np.linspace(0,5,100)
robot = MassageRobot(arm_config_path="/home/jsfb/jsfb/MassageRobot_Dobot/Massage/MassageControl/config/robot_config.yaml")
xr_list, vr_list, ar_list = [], [], []
for t in ts:
xr, vr, ar = myInterpolate.interpolate(t)
xr_list.append(xr)
vr_list.append(vr)
ar_list.append(ar)
xr_array = np.array(xr_list)
vr_array = np.array(vr_list)
ar_array = np.array(ar_list)
robot.xr = xr_array
robot.vr = vr_array
robot.ar = ar_array
robot.ts = ts
robot.dt = ts[1] - ts[0]
ready_pose = [250.0, -135.0, 344.3392, -180.0, 0.0, -90.0]
robot.current_head = 'finger_head'
robot.force_sensor.disable_active_transmission()
time.sleep(0.5)
robot.force_sensor.disable_active_transmission()
time.sleep(0.5)
robot.force_sensor.disable_active_transmission()
time.sleep(0.5)
robot.force_sensor.set_zero()
time.sleep(0.5)
robot.force_sensor.enable_active_transmission()
robot.init_hardwares(ready_pose)
time.sleep(3)
robot.controller_manager.switch_controller('admittance')
# robot.controller_manager.switch_controller('position')
atexit.register(robot.force_sensor.disconnect)
robot.arm_state.desired_wrench = np.array([0,0,-0.5,0,0,0])
try:
robot_thread = threading.Thread(target=robot.start)
robot_thread.start()
except KeyboardInterrupt:
print("用户中断操作。")
except Exception as e:
print("Exception occurred at line:", e.__traceback__.tb_lineno)
print("发生异常:", e)
# robot.arm.disableRobot()