1619 lines
68 KiB
Python
1619 lines
68 KiB
Python
from functools import wraps
|
||
import sys
|
||
import numpy as np
|
||
import threading
|
||
import time
|
||
from typing import List, Literal, Union
|
||
|
||
from scipy.interpolate import interp1d
|
||
|
||
import requests
|
||
import os
|
||
from scipy.spatial.transform import Rotation as R
|
||
import ctypes
|
||
libc = ctypes.CDLL("libc.so.6")
|
||
import atexit
|
||
import copy
|
||
|
||
from scipy.spatial.transform import Slerp
|
||
|
||
|
||
try:
|
||
# 导入算法
|
||
from .algorithms.arm_state import ArmState
|
||
from .algorithms.controller_manager import ControllerManager
|
||
from .algorithms.admittance_controller import AdmittanceController
|
||
from .algorithms.hybrid_controller import HybridController
|
||
from .algorithms.admithybrid_controller import AdmitHybridController
|
||
from .algorithms.position_controller import PositionController
|
||
from .algorithms.hybridPid_controller import HybridPidController
|
||
from .algorithms.hybridAdmit_controller import HybridAdmitController
|
||
from .algorithms.positionerSensor_controller import PositionerSensorController
|
||
from .algorithms.admittanceZ_controller import AdmittanceZController
|
||
from .algorithms.interpolation import linear_interpolate,spline_interpolate,circle_trajectory,cloud_point_interpolate,oscillation_wrench_interpolate,linear_wrench_interpolate
|
||
from .algorithms.force_planner import ForcePlanner
|
||
|
||
# 导入硬件
|
||
from .hardware.dobot_nova5 import dobot_nova5
|
||
from .hardware.force_sensor import XjcSensor
|
||
|
||
# 导入工具
|
||
from .tools.log import CustomLogger
|
||
from .tools.Rate import Rate
|
||
from .tools.yaml_operator import read_yaml
|
||
from .tools.decorator_tools import custom_decorator,apply_decorators
|
||
from .tools.serial_tools import find_serial_by_location
|
||
from .tools.Trajectory import TrajectoryInterpolator
|
||
except:
|
||
#导入算法
|
||
from algorithms.arm_state import ArmState
|
||
from algorithms.controller_manager import ControllerManager
|
||
from algorithms.admittance_controller import AdmittanceController
|
||
from algorithms.admittanceZ_controller import AdmittanceZController
|
||
from algorithms.hybrid_controller import HybridController
|
||
from algorithms.admithybrid_controller import AdmitHybridController
|
||
from algorithms.position_controller import PositionController
|
||
from algorithms.hybridPid_controller import HybridPidController
|
||
from algorithms.hybridAdmit_controller import HybridAdmitController
|
||
from algorithms.positionerSensor_controller import PositionerSensorController
|
||
from algorithms.interpolation import linear_interpolate,spline_interpolate,circle_trajectory,cloud_point_interpolate,oscillation_wrench_interpolate,linear_wrench_interpolate
|
||
from algorithms.force_planner import ForcePlanner
|
||
|
||
# 导入硬件
|
||
from hardware.dobot_nova5 import dobot_nova5
|
||
from hardware.force_sensor import XjcSensor
|
||
|
||
# 导入工具
|
||
from tools.log import CustomLogger
|
||
from tools.Rate import Rate
|
||
from tools.yaml_operator import read_yaml
|
||
from tools.decorator_tools import custom_decorator,apply_decorators
|
||
from tools.serial_tools import find_serial_by_location
|
||
from tools.Trajectory import TrajectoryInterpolator
|
||
|
||
import matplotlib.pyplot as plt
|
||
from matplotlib.animation import FuncAnimation
|
||
|
||
current_file_path = os.path.abspath(__file__)
|
||
MassageRobot_Dobot_Path = os.path.dirname(os.path.dirname(os.path.dirname(current_file_path)))
|
||
print(MassageRobot_Dobot_Path)
|
||
# 添加目标文件夹到系统路径
|
||
sys.path.append(MassageRobot_Dobot_Path)
|
||
from VortXDB.client import VTXClient
|
||
|
||
"""
|
||
在 Python 中,atexit.register 用于注册程序退出时要执行的函数。atexit 模块提供了一个简单的接口,用于在程序正常终止时执行清理操作。
|
||
当你注册多个函数时,它们的执行顺序遵循先进后出的顺序(LIFO:Last In, First Out)。也就是说,最后注册的函数会最先执行。
|
||
"""
|
||
|
||
def track_function(function_name,log = False):
|
||
def before(self, *args, **kwargs):
|
||
self.current_function = function_name
|
||
if log:
|
||
self.logger.log_info(f"Entering function: {function_name}")
|
||
def after(self, result, *args, **kwargs):
|
||
self.current_function = None
|
||
if log:
|
||
self.logger.log_info(f"Exiting function: {function_name},code: {result}")
|
||
return custom_decorator(before=before, after=after)
|
||
|
||
@apply_decorators
|
||
class MassageRobot:
|
||
def __init__(self,arm_config_path,is_log=False):
|
||
self.logger = CustomLogger()
|
||
if is_log:
|
||
self.logger_data = CustomLogger()
|
||
# 日志线程
|
||
threading.Thread(target=self.log_thread,daemon=True).start()
|
||
|
||
self.vtxdb = VTXClient()
|
||
|
||
self.arm_state = ArmState()
|
||
self.arm_config = read_yaml(arm_config_path)
|
||
# arm 实例化时机械臂类内部进行通讯连接
|
||
self.arm = dobot_nova5(arm_ip = self.arm_config['arm_ip'])
|
||
self.arm.start()
|
||
|
||
self.thermotherapy = None
|
||
self.shockwave = None
|
||
self.stone = None
|
||
self.heat = None
|
||
self.ion = None
|
||
self.force_plan = ForcePlanner()
|
||
# 传感器
|
||
# self.force_sensor = None
|
||
self.force_sensor = XjcSensor()
|
||
# self.force_sensor.connect()
|
||
# 控制器,初始为导纳控制
|
||
self.controller_manager = ControllerManager(self.arm_state)
|
||
self.controller_manager.add_controller(AdmittanceController,'admittance',self.arm_config['controller'][0])
|
||
self.controller_manager.add_controller(HybridController,'hybrid',self.arm_config['controller'][1])
|
||
self.controller_manager.add_controller(PositionController,'position',self.arm_config['controller'][2])
|
||
self.controller_manager.add_controller(AdmitHybridController,'admithybrid',self.arm_config['controller'][3])
|
||
self.controller_manager.add_controller(HybridPidController,'hybridPid',self.arm_config['controller'][4])
|
||
self.controller_manager.add_controller(HybridAdmitController,'hybridAdmit',self.arm_config['controller'][5])
|
||
self.controller_manager.add_controller(PositionerSensorController,'positionerSensor',self.arm_config['controller'][6])
|
||
self.controller_manager.add_controller(AdmittanceZController,'admittanceZ',self.arm_config['controller'][7])
|
||
|
||
self.controller_manager.switch_controller('admittance')
|
||
# 按摩头参数暂时使用本地数据
|
||
massage_head_dir = self.arm_config['massage_head_dir']
|
||
all_items = os.listdir(massage_head_dir)
|
||
head_config_files = [f for f in all_items if os.path.isfile(os.path.join(massage_head_dir, f))]
|
||
self.playload_dict = {}
|
||
for file in head_config_files:
|
||
file_address = massage_head_dir + '/' + file
|
||
play_load = read_yaml(file_address)
|
||
self.playload_dict[play_load['name']] = play_load
|
||
self.current_head = 'none'
|
||
|
||
# 读取数据
|
||
self.gravity_base = None
|
||
self.force_zero = None
|
||
self.torque_zero = None
|
||
self.tool_position = None
|
||
self.mass_center_position = None
|
||
self.s_tf_matrix_t = None
|
||
arm_orientation_set0 = np.array([-180,0,-90])
|
||
self.b_rotation_s_set0 = R.from_euler('xyz',arm_orientation_set0,degrees=True).as_matrix()
|
||
|
||
# 频率数据赋值
|
||
self.control_rate = Rate(self.arm_config['control_rate'])
|
||
self.sensor_rate = Rate(self.arm_config['sensor_rate'])
|
||
self.command_rate = Rate(self.arm_config['command_rate'])
|
||
|
||
# 低通滤波
|
||
self.cutoff_freq = 80.0
|
||
|
||
# 停止标志位线程
|
||
self.exit_event = threading.Event()
|
||
self.exit_event.set() # 运行初始化为True
|
||
self.interrupt_event = threading.Event()
|
||
self.interrupt_event.clear() # 中断初始化为False
|
||
self.pause_envent = threading.Event()
|
||
self.pause_envent.clear() # 暂停初始化为False
|
||
self.skip_envent = threading.Event()
|
||
self.skip_envent.clear() # 跳过初始化为False
|
||
# 按摩调整
|
||
self.adjust_wrench_envent = threading.Event()
|
||
self.adjust_wrench_envent.clear() # 调整初始化为False
|
||
self.pos_increment = np.zeros(3,dtype=np.float64)
|
||
self.adjust_wrench = np.zeros(6,dtype=np.float64)
|
||
|
||
self.is_waitting = False
|
||
|
||
self.last_print_time = 0
|
||
self.last_record_time = 0
|
||
self.last_command_time = 0
|
||
self.move_to_point_count = 0
|
||
# 卡尔曼滤波相关初始化
|
||
self.x_base = np.zeros(6)
|
||
self.P_base = np.eye(6)
|
||
# 过程噪声协方差矩阵
|
||
self.Q_base = np.eye(6) * 0.01
|
||
# 测量噪声协方差矩阵
|
||
self.R_base = np.eye(6) * 0.1
|
||
|
||
self.x_tcp = np.zeros(6)
|
||
self.P_tcp = np.eye(6)
|
||
# 过程噪声协方差矩阵
|
||
self.Q_tcp = np.eye(6) * 0.01
|
||
# 测量噪声协方差矩阵
|
||
self.R_tcp = np.eye(6) * 0.1
|
||
# 传感器故障计数器
|
||
self.sensor_fail_count = 0
|
||
# 机械臂初始化,适配中间层
|
||
self.arm.init()
|
||
|
||
# REF TRAJ
|
||
self.xr = []
|
||
self.vr = []
|
||
self.ar = []
|
||
self.ts = [] # time sequence
|
||
self.dt = [] # derivate of time
|
||
# ---
|
||
self.last_time = -1
|
||
self.cur_time = -1
|
||
|
||
|
||
def sensor_set_zero(self):
|
||
self.arm_position_set0,self.arm_orientation_set0 = self.arm.get_arm_position()
|
||
arm_orientation_set0 = self.arm_orientation_set0
|
||
print("arm_orientation_set0",R.from_quat(arm_orientation_set0).as_euler('xyz',degrees=True))
|
||
self.b_rotation_s_set0 = R.from_quat(arm_orientation_set0).as_matrix()
|
||
#########
|
||
max_retries = 5
|
||
#no_head_wrong=np.array([-80.01506042, 116.34187317, -87.65788269, -0.32910481, -0.65792173, -0.61110526])#旧版传感器)
|
||
head_set0=np.array([0, 0, 0, 0, 0, 0])#新版传感器)
|
||
min_tolerance=0.001
|
||
retry_count = 0
|
||
self.force_sensor.disable_active_transmission()
|
||
time.sleep(0.5)
|
||
while retry_count < max_retries:
|
||
# 调用读取数据的函数,假设返回值是一个 numpy 数组
|
||
data = self.force_sensor.read_data_f32()
|
||
data = self.force_sensor.read_data_f32()
|
||
data = self.force_sensor.read_data_f32()
|
||
|
||
# 检查返回值是否为 np.array 类型
|
||
if isinstance(data, np.ndarray):
|
||
# 判断数据是否接近目标值
|
||
|
||
if np.allclose(data, head_set0, atol=min_tolerance):
|
||
print(f"检测到力传感器已置零:{data}")
|
||
break
|
||
else:
|
||
print(f"检测到力传感器未置零:{data}")
|
||
|
||
# 禁用传感器传输
|
||
self.force_sensor.disable_active_transmission()
|
||
self.force_sensor.disable_active_transmission()
|
||
time.sleep(0.5)
|
||
|
||
# 尝试设置传感器清零
|
||
code = self.force_sensor.set_zero()
|
||
code = self.force_sensor.set_zero()
|
||
max_try = 3
|
||
|
||
while code != 0 and max_try > 0:
|
||
self.logger.log_error("Force sensor set zero failed, retrying...")
|
||
self.force_sensor.disable_active_transmission()
|
||
self.force_sensor.disable_active_transmission()
|
||
time.sleep(0.5)
|
||
code = self.force_sensor.set_zero()
|
||
code = self.force_sensor.set_zero()
|
||
max_try -= 1
|
||
time.sleep(0.5)
|
||
|
||
# 如果多次尝试后失败
|
||
if max_try == 0:
|
||
self.logger.log_error("Force sensor set zero failed, exiting...")
|
||
requests.post(
|
||
"http://127.0.0.1:5000/on_message", data={"message": "传感器初始化失败"}
|
||
)
|
||
requests.post(
|
||
"http://127.0.0.1:5000/update_massage_status", data={"massage_service_started": False}
|
||
)
|
||
# sys.exit(0)
|
||
return -1 # 置零失败
|
||
|
||
# 成功后跳出主循环
|
||
break
|
||
|
||
else:
|
||
self.logger.log_error("读取数据失败或格式错误,尝试重试...")
|
||
|
||
# 增加重试次数
|
||
retry_count += 1
|
||
self.force_sensor.disable_active_transmission()
|
||
self.force_sensor.disconnect()
|
||
time.sleep(1)
|
||
self.force_sensor = XjcSensor()
|
||
time.sleep(1) # 每次重试之间添加一个短暂的延迟
|
||
|
||
if retry_count == max_retries:
|
||
self.logger.log_error(f"超过最大重试次数 ({max_retries}),操作失败,退出程序...")
|
||
requests.post(
|
||
"http://127.0.0.1:5000/on_message", data={"message": "传感器初始化失败"}
|
||
)
|
||
requests.post(
|
||
"http://127.0.0.1:5000/update_massage_status", data={"massage_service_started": False}
|
||
)
|
||
return -2 #读取失败
|
||
|
||
return 0 #读取成功并置零成功
|
||
|
||
def sensor_enable(self):
|
||
code = self.force_sensor.enable_active_transmission()
|
||
max_try = 3
|
||
while code!= 0 and max_try > 0:
|
||
self.logger.log_error("Force sensor enable_active_transmission failed,retrying...")
|
||
code = self.force_sensor.enable_active_transmission()
|
||
max_try -= 1
|
||
time.sleep(0.1)
|
||
if max_try == 0:
|
||
self.logger.log_error("Force sensor enable_active_transmission failed,exiting...")
|
||
requests.post(
|
||
"http://127.0.0.1:5000/on_message", data={"message": "传感器初始化失败"}
|
||
)
|
||
requests.post(
|
||
"http://127.0.0.1:5000/update_massage_status", data={"massage_service_started": False}
|
||
)
|
||
# sys.exit(0)
|
||
return -1
|
||
return 0
|
||
|
||
# 预测步骤
|
||
def kalman_predict(self,x, P, Q):
|
||
# 预测状态(这里假设状态不变)
|
||
x_predict = x
|
||
# 预测误差协方差
|
||
P_predict = P + Q
|
||
return x_predict, P_predict
|
||
|
||
# 更新步骤
|
||
def kalman_update(self,x_predict, P_predict, z, R):
|
||
# 卡尔曼增益
|
||
# K = P_predict @ np.linalg.inv(P_predict + R)
|
||
S = P_predict + R
|
||
s = np.diag(S) # shape (6,)
|
||
p_diag = np.diag(P_predict)
|
||
K_diag = np.zeros_like(s)
|
||
nonzero_mask = s != 0
|
||
K_diag[nonzero_mask] = p_diag[nonzero_mask] / s[nonzero_mask]
|
||
K = np.diag(K_diag)
|
||
# 更新状态
|
||
x_update = x_predict + K @ (z - x_predict)
|
||
# 更新误差协方差
|
||
P_update = (np.eye(len(K)) - K) @ P_predict
|
||
return x_update, P_update
|
||
|
||
def update_wrench(self):
|
||
# 当前的机械臂到末端转换 (实时)
|
||
b_rotation_s = R.from_quat(self.arm_state.arm_orientation).as_matrix()
|
||
# 读取数据
|
||
sensor_data = self.force_sensor.read()
|
||
# self.logger.log_info(f"传感器数据{sensor_data}")
|
||
if sensor_data is None:
|
||
self.force_sensor.stop_background_reading()
|
||
self.logger.log_error("传感器数据读取失败")
|
||
return -1
|
||
# 反重力补偿
|
||
sensor_data[:3] = sensor_data[:3] + self.force_zero + self.b_rotation_s_set0.T @ self.gravity_base
|
||
sensor_data[3:] = sensor_data[3:] + self.torque_zero + np.cross(self.mass_center_position, self.b_rotation_s_set0.T @ self.gravity_base)
|
||
# 传感器数据通过矫正计算得到外来施加力 传感器坐标系下
|
||
# 重力
|
||
gravity_in_sensor = b_rotation_s.T @ self.gravity_base
|
||
s_force = sensor_data[:3] - self.force_zero - gravity_in_sensor
|
||
# 力矩
|
||
s_torque = sensor_data[3:] - self.torque_zero - np.cross(self.mass_center_position,gravity_in_sensor)
|
||
wrench = np.concatenate([s_force,s_torque])
|
||
# 传感器到TCP
|
||
wrench = self.wrench_coordinate_conversion(self.s_tf_matrix_t,wrench)
|
||
# 交给ARM STATE集中管理
|
||
self.arm_state.external_wrench_tcp = wrench
|
||
# 对tcp坐标系下的外力外矩进行平滑
|
||
x_tcp_predict, P_tcp_predict = self.kalman_predict(x = self.x_tcp,
|
||
P = self.P_tcp,
|
||
Q = self.Q_tcp)
|
||
self.x_tcp, self.P_tcp = self.kalman_update(x_predict = x_tcp_predict,
|
||
P_predict = P_tcp_predict,
|
||
z = self.arm_state.external_wrench_tcp,
|
||
R = self.R_tcp)
|
||
self.arm_state.external_wrench_tcp = self.x_tcp
|
||
self.arm_state.last_external_wrench_tcp = self.arm_state.external_wrench_tcp
|
||
return 0
|
||
|
||
# ####################### 位姿伺服 ##########################
|
||
|
||
def arm_measure_loop(self):
|
||
self.logger.log_info("机械臂测量线程启动")
|
||
while (not self.arm.is_exit) and (not self.exit_event.is_set()):
|
||
try:
|
||
if not self.is_waitting:
|
||
self.arm_state.arm_position,self.arm_state.arm_orientation = self.arm.get_arm_position()
|
||
code = self.update_wrench()
|
||
if code == -1:
|
||
self.sensor_fail_count += 1
|
||
self.logger.log_error(f"传感器线程数据读取失败-{self.sensor_fail_count}")
|
||
if self.sensor_fail_count > 10:
|
||
self.logger.log_error("传感器线程数据读取失败超过10次,程序终止")
|
||
self.stop()
|
||
break
|
||
else:
|
||
self.sensor_fail_count = 0
|
||
except Exception as e:
|
||
self.logger.log_error(f"机械臂测量线程报错:{e}")
|
||
self.exit_event.set() # 控制退出while
|
||
self.sensor_rate.sleep() # 控制频率
|
||
|
||
def arm_command_loop_traj(self):
|
||
self.logger.log_info("机械臂控制线程启动")
|
||
# 离线轨迹启动时间记录
|
||
self.traj_start_time = time.time()
|
||
while (not self.arm.is_exit) and (not self.exit_event.is_set()):
|
||
try:
|
||
if not self.is_waitting:
|
||
process_start_time = time.time()
|
||
control_cycle = self.control_rate.to_sec()
|
||
t_now = time.time() - self.traj_start_time
|
||
i = min(int(t_now / self.dt), len(self.ts) - 2)
|
||
alpha = max(0.0, min(1.0, (t_now - self.ts[i]) / self.dt))
|
||
|
||
pos = (1 - alpha) * self.xr[i][:3] + alpha * self.xr[i+1][:3]
|
||
|
||
slerp = Slerp([0, 1], R.concatenate([R.from_quat(self.xr[i][3:]), R.from_quat(self.xr[i+1][3:])]))
|
||
alpha = 0.5
|
||
rot_interp = slerp(alpha).as_quat()
|
||
|
||
xr = np.concatenate([pos, rot_interp])
|
||
# xr = (1-alpha) * self.xr[i] + alpha * self.xr[i+1]
|
||
vr = (1 - alpha) * self.vr[i] + alpha * self.vr[i + 1]
|
||
ar = (1 - alpha) * self.ar[i] + alpha * self.ar[i + 1]
|
||
|
||
self.arm_state.desired_position = xr[:3]
|
||
# self.arm_state.desired_orientation = xr[3:]
|
||
# print(111)
|
||
self.arm_state.desired_orientation = rot_interp
|
||
# print(222)
|
||
self.arm_state.desired_twist = vr
|
||
self.arm_state.desired_acc = ar
|
||
|
||
print("self.arm_state.desired_position",self.arm_state.desired_position)
|
||
print("self.arm_state.desired_orientation",self.arm_state.desired_orientation)
|
||
print("self.arm_state.desired_twist",self.arm_state.desired_twist)
|
||
print("self.arm_state.desired_acc",self.arm_state.desired_acc)
|
||
|
||
self.controller_manager.step_traj(control_cycle)
|
||
# self.controller_manager.step(control_cycle)
|
||
command_pose = np.concatenate([
|
||
self.arm_state.arm_position_command * 1000, # 转毫米
|
||
self.arm_state.arm_orientation_command
|
||
])
|
||
## 输入滤波
|
||
status, pose_processed = self.arm.process_command(
|
||
command_pose[:3],
|
||
command_pose[3:]
|
||
)
|
||
# print(f"pose_processed:{pose_processed}")
|
||
|
||
tcp_command = (
|
||
f"ServoP({pose_processed[0]:.3f},{pose_processed[1]:.3f},"
|
||
f"{pose_processed[2]:.3f},{pose_processed[3]:.3f},"
|
||
f"{pose_processed[4]:.3f},{pose_processed[5]:.3f},"
|
||
"t=0.02, aheadtime=20, gain=200)"
|
||
).encode()
|
||
|
||
run_time = time.time() - process_start_time
|
||
sleep_duration = self.control_rate.to_sec() - run_time
|
||
b2 =time.time()
|
||
if sleep_duration > 0:
|
||
time.sleep(sleep_duration)
|
||
print(f"real sleep:{time.time()-b2}")
|
||
# self.arm.dashboard.socket_dobot.sendall(tcp_command)
|
||
if self.arm.feedbackData.robotMode == 10: # 10 当前工程暂停
|
||
print("机械臂为暂停状态")
|
||
res = self.arm.dashboard.Continue()
|
||
code = self.arm.parseResultId(res)[0]
|
||
if code == 0:
|
||
print("机械臂继续已暂停的运动指令")
|
||
|
||
except Exception as e:
|
||
self.logger.log_error(f"机械臂控制失败:{e}")
|
||
self.exit_event.set()
|
||
# print(f"循环周期时间: {time.time()-process_start_time:.4f} 秒",run_time,sleep_duration)
|
||
|
||
def arm_command_loop(self):
|
||
self.logger.log_info("机械臂控制线程启动")
|
||
while (not self.arm.is_exit) and (not self.exit_event.is_set()):
|
||
try:
|
||
if not self.is_waitting:
|
||
process_start_time = time.time()
|
||
control_cycle = self.control_rate.to_sec()
|
||
self.controller_manager.step(control_cycle)
|
||
command_pose = np.concatenate([
|
||
self.arm_state.arm_position_command * 1000, # 转毫米
|
||
self.arm_state.arm_orientation_command
|
||
])
|
||
## 输入滤波
|
||
status, pose_processed = self.arm.process_command(
|
||
command_pose[:3],
|
||
command_pose[3:]
|
||
)
|
||
# print(f"pose_processed:{pose_processed}")
|
||
|
||
tcp_command = (
|
||
f"ServoP({pose_processed[0]:.3f},{pose_processed[1]:.3f},"
|
||
f"{pose_processed[2]:.3f},{pose_processed[3]:.3f},"
|
||
f"{pose_processed[4]:.3f},{pose_processed[5]:.3f},"
|
||
"t=0.02, aheadtime=20, gain=200)"
|
||
).encode()
|
||
|
||
run_time = time.time() - process_start_time
|
||
sleep_duration = self.control_rate.to_sec() - run_time
|
||
b2 =time.time()
|
||
if sleep_duration > 0:
|
||
time.sleep(sleep_duration)
|
||
print(f"real sleep:{time.time()-b2}")
|
||
self.arm.dashboard.socket_dobot.sendall(tcp_command)
|
||
if self.arm.feedbackData.robotMode == 10: # 10 当前工程暂停
|
||
print("机械臂为暂停状态")
|
||
res = self.arm.dashboard.Continue()
|
||
code = self.arm.parseResultId(res)[0]
|
||
if code == 0:
|
||
print("机械臂继续已暂停的运动指令")
|
||
|
||
except Exception as e:
|
||
self.logger.log_error(f"机械臂控制失败:{e}")
|
||
self.exit_event.set()
|
||
# print(f"循环周期时间: {time.time()-process_start_time:.4f} 秒",run_time,sleep_duration)
|
||
|
||
####################### 位姿伺服 ##########################
|
||
|
||
def start(self):
|
||
if self.exit_event.is_set():
|
||
# 线程
|
||
self.exit_event.clear()
|
||
self.arm_measure_thread = threading.Thread(target=self.arm_measure_loop)
|
||
self.arm_control_thread = threading.Thread(target=self.arm_command_loop)
|
||
# 线程启动
|
||
self.arm_measure_thread.start() ## 测量线程
|
||
position,quat_rot = self.arm.get_arm_position()
|
||
# 初始值
|
||
self.arm_state.desired_position = position
|
||
self.arm_state.arm_position_command = position
|
||
self.arm_state.desired_orientation = quat_rot
|
||
self.arm_state.arm_orientation_command = quat_rot
|
||
self.arm_state.arm_position = position
|
||
self.arm_state.arm_orientation = quat_rot
|
||
for i in range(20):
|
||
self.controller_manager.step(self.control_rate.to_sec())
|
||
self.logger.log_blue(f"position command: {self.arm_state.arm_position_command}")
|
||
self.logger.log_blue(f"orientation command: {R.from_quat(self.arm_state.arm_orientation_command).as_euler('xyz',degrees=True)}")
|
||
position, quat_rot = self.arm.get_arm_position()
|
||
self.logger.log_blue(f"position current:{position}")
|
||
self.logger.log_blue(f"orientation current: {R.from_quat(quat_rot).as_euler('xyz',degrees=False)}")
|
||
self.command_rate.precise_sleep()
|
||
|
||
position ,quat_rot = self.arm.get_arm_position()
|
||
# self.arm_state.desired_position = poistion
|
||
self.arm_state.arm_position_command = position
|
||
# self.arm_state.desired_orientation = quat_rot
|
||
self.arm_state.arm_orientation_command = quat_rot
|
||
|
||
## POSITION CONTROLLER TEST ##
|
||
# delta_pos = np.array([0.1,0.1,-0.1],dtype=np.float64)
|
||
# # delta_quat = R.from_euler('xyz', [0, 0, 0], degrees=True).as_quat()
|
||
# self.arm_state.desired_position = position + delta_pos
|
||
# self.arm_state.desired_orientation = quat_rot
|
||
# 线程启动
|
||
self.arm_control_thread.start() ## 赋初始值后控制线程
|
||
|
||
self.logger.log_info("MassageRobot启动")
|
||
time.sleep(0.5)
|
||
|
||
def stop(self):
|
||
if not self.exit_event.is_set():
|
||
self.exit_event.set()
|
||
self.interrupt_event.clear()
|
||
self.arm_control_thread.join()
|
||
self.arm_measure_thread.join()
|
||
for i in range(3):
|
||
self.force_sensor.disable_active_transmission()
|
||
self.force_sensor.start_background_reading()
|
||
self.arm.stop_motion()
|
||
self.logger.log_info("MassageRobot停止")
|
||
|
||
def init_hardwares(self,ready_pose):
|
||
self.ready_pose = np.array(ready_pose)
|
||
self.switch_payload(self.current_head)
|
||
print(self.arm.get_arm_position())
|
||
time.sleep(0.5)
|
||
|
||
def switch_payload(self,name):
|
||
if name in self.playload_dict:
|
||
self.stop()
|
||
self.current_head = name
|
||
|
||
compensation_config = self.playload_dict[self.current_head]
|
||
|
||
# 读取数据
|
||
self.gravity_base = np.array(compensation_config['gravity_base'])
|
||
self.force_zero = np.array(compensation_config['force_zero'])
|
||
self.torque_zero = np.array(compensation_config['torque_zero'])
|
||
self.tool_position = np.array(compensation_config['tcp_offset'])
|
||
self.mass_center_position = np.array(compensation_config['mass_center_position'])
|
||
self.s_tf_matrix_t = self.get_tf_matrix(self.tool_position[:3], R.from_euler('xyz',self.tool_position[3:],degrees=False).as_quat())
|
||
|
||
tcp_offset = self.playload_dict[name]["tcp_offset"]
|
||
tcp_offset_str = "{" + ",".join(map(str, tcp_offset)) + "}"
|
||
print("tcp_offset_str",tcp_offset_str)
|
||
self.arm.setEndEffector(i=9,tool_i=tcp_offset_str)
|
||
self.arm.chooseEndEffector(i=9)
|
||
print(self.arm.get_arm_position())
|
||
self.logger.log_info(f"切换到{name}按摩头")
|
||
self.controller_manager.switch_controller('position')
|
||
else:
|
||
self.logger.log_error(f"未找到{name}按摩头")
|
||
|
||
def log_thread(self):
|
||
while True:
|
||
self.logger_data.log_info(f"机械臂位置:{self.arm_state.arm_position},机械臂姿态:{self.arm_state.arm_orientation}",is_print=False)
|
||
self.logger_data.log_info(f"机械臂期望位置:{self.arm_state.desired_position},机械臂真实位置:{self.arm_state.arm_position}",is_print=False)
|
||
self.logger_data.log_info(f"机械臂期望姿态:{self.arm_state.desired_orientation},机械臂真实姿态:{self.arm_state.arm_orientation}",is_print=False)
|
||
self.logger_data.log_info(f"机械臂期望力矩:{self.arm_state.desired_wrench},机械臂真实力矩:{self.arm_state.external_wrench_base}",is_print=False)
|
||
self.logger_data.log_info(f"当前按摩头:{self.current_head}",is_print=False)
|
||
time.sleep(1)
|
||
|
||
# 工具函数
|
||
def get_tf_matrix(self,position,orientation):
|
||
tf_matrix = np.eye(4)
|
||
rotation_matrix = R.from_quat(orientation).as_matrix()
|
||
tf_matrix[:3,3] = position/1000
|
||
tf_matrix[:3,:3] = rotation_matrix
|
||
return tf_matrix
|
||
def wrench_coordinate_conversion(self,tf_matrix, wrench):
|
||
rot_matrix = tf_matrix[:3, :3]
|
||
vector_p = tf_matrix[:3, 3]
|
||
temp_force = wrench[:3]
|
||
torque = wrench[3:]
|
||
force = rot_matrix.T @ temp_force
|
||
torque = rot_matrix.T @ np.cross(temp_force,vector_p) + rot_matrix.T @ torque
|
||
return np.concatenate([force, torque])
|
||
|
||
@track_function("set_position",True)
|
||
def set_position(self,pose,is_wait = False):
|
||
self.logger.log_info(f"set postion:{pose}")
|
||
x,y,z = pose[0],pose[1],pose[2]
|
||
roll,pitch,yaw = pose[3],pose[4],pose[5]
|
||
pose_command = np.concatenate([[x,y,z], [roll,pitch,yaw]])
|
||
time.sleep(0.1)
|
||
code = self.arm.RunPoint_P_inPose_M_RAD(poses_list = pose_command)
|
||
time.sleep(0.1)
|
||
return code
|
||
|
||
@track_function("move_to_point",True)
|
||
def move_to_point(self, t, pose, is_interrupt=True, wait=True, timeout=0.5, interpolation:Literal["linear","circle","cloud_point"] ='linear', algorithm:Literal["admittance","position","hybrid","admithybrid","hybridPid","hybridAdmit","admittanceZ"] = "admittance",is_switch_controller = False):
|
||
"""
|
||
移动到指定的点
|
||
param:
|
||
t: 时间或时间序列
|
||
pose(nx6): 世界坐标系下的位姿或位姿序列,当输入为None时,表示保持当前位姿
|
||
is_interrupt: 是否允许中断
|
||
wait: 是否等待运动完成
|
||
timeout: 超时时间
|
||
interpolation: 插值方式
|
||
algorithm: 控制算法
|
||
return:
|
||
code: 0表示成功
|
||
1表示超时
|
||
2表示用户打断
|
||
3表示硬件错误
|
||
4表示参数错误
|
||
"""
|
||
self.move_to_point_count += 1
|
||
self.logger.log_blue(f"move_to_point_count: {self.move_to_point_count}")
|
||
self.logger.log_yellow(f"move_to_point: {pose[-1]}")
|
||
# 在函数内部直接规划轨迹
|
||
traj = self.traj_generate(t, pose, None, interpolation=interpolation)
|
||
if traj is None:
|
||
self.logger.log_error("轨迹未生成或生成错误")
|
||
return 4
|
||
|
||
self.logger.log_yellow(f"traj_quat: {R.from_quat(traj['quat'][-1]).as_euler('xyz',degrees=False)}")
|
||
self.logger.log_yellow(f"traj_pos: {traj['pos'][-1]}")
|
||
|
||
time_seq = traj['t']
|
||
pos_traj = traj['pos']
|
||
quat_traj = traj['quat']
|
||
|
||
if is_switch_controller == True:
|
||
try:
|
||
self.controller_manager.switch_controller(algorithm)
|
||
except ValueError as error:
|
||
self.logger.log_error(error)
|
||
self.logger.log_error("切换到%s控制器失败" % algorithm)
|
||
return 4
|
||
self.logger.log_info("切换到%s控制器" % algorithm)
|
||
|
||
# self.logger.log_info("当前为%s控制器" % algorithm)
|
||
|
||
for i in range(len(time_seq)):
|
||
if self.interrupt_event.is_set():
|
||
self.interrupt_event.clear()
|
||
self.logger.log_error("self.interrupt_event.clear()")
|
||
self.arm_state.desired_position = copy.deepcopy(self.arm_state.arm_position)
|
||
self.arm_state.desired_orientation = copy.deepcopy(self.arm_state.arm_orientation)
|
||
massage_wrench = np.array([0, 0, 0, 0, 0, 0])
|
||
self.arm_state.desired_wrench = massage_wrench
|
||
return 2
|
||
if self.pause_envent.is_set():
|
||
self.pause_envent.clear()
|
||
self.logger.log_error("self.pause_envent.clear()")
|
||
self.arm_state.desired_position = copy.deepcopy(self.arm_state.arm_position)
|
||
self.arm_state.desired_orientation = copy.deepcopy(self.arm_state.arm_orientation)
|
||
massage_wrench = np.array([0, 0, 0, 0, 0, 0])
|
||
self.arm_state.desired_wrench = massage_wrench
|
||
return 5
|
||
if self.skip_envent.is_set():
|
||
self.logger.log_error("self.skip_envent.clear()")
|
||
self.arm_state.desired_position = copy.deepcopy(self.arm_state.arm_position)
|
||
self.arm_state.desired_orientation = copy.deepcopy(self.arm_state.arm_orientation)
|
||
massage_wrench = np.array([0, 0, 0, 0, 0, 0])
|
||
self.arm_state.desired_wrench = massage_wrench
|
||
return 6
|
||
if self.arm.is_exit or self.exit_event.is_set():
|
||
print("robot_hardware_error:",self.arm.is_exit,self.exit_event.is_set())
|
||
return 3
|
||
if self.adjust_wrench_envent.is_set() and self.is_execute == True:
|
||
if self.adjust_wrench is not None:
|
||
self.arm_state.desired_wrench = self.adjust_wrench
|
||
self.adjust_wrench_envent.clear()
|
||
|
||
self.arm_state.desired_position = pos_traj[i]
|
||
self.arm_state.desired_orientation = quat_traj[i]
|
||
|
||
if self.ion is not None:
|
||
|
||
act_position = R.from_quat(self.arm_state.arm_orientation).as_matrix().T @ self.arm_state.arm_position
|
||
desired_position = R.from_quat(self.arm_state.arm_orientation).as_matrix().T @ self.arm_state.desired_position
|
||
if self.arm_state.positionerSensorData < 0.001:
|
||
desired_position[2] = desired_position[2] - self.arm_state.desired_high
|
||
else:
|
||
desired_position[2] = min(act_position[2] + self.arm_state.positionerSensorData - self.arm_state.desired_high , desired_position[2] - self.arm_state.desired_high)
|
||
self.arm_state.desired_position = R.from_quat(self.arm_state.arm_orientation).as_matrix() @ desired_position
|
||
self.command_rate.precise_sleep()
|
||
|
||
start = time.time()
|
||
|
||
while wait:
|
||
if self.interrupt_event.is_set():
|
||
self.interrupt_event.clear()
|
||
return 2
|
||
if self.pause_envent.is_set():
|
||
self.pause_envent.clear()
|
||
return 5
|
||
if self.skip_envent.is_set():
|
||
# self.skip_envent.clear()
|
||
return 6
|
||
if self.arm.is_exit or self.exit_event.is_set():
|
||
print("robot_hardware_error:",self.arm.is_exit,self.exit_event.is_set())
|
||
return 3
|
||
if time.time() - start > timeout:
|
||
return 1
|
||
self.command_rate.precise_sleep()
|
||
|
||
return 0
|
||
|
||
@track_function("apply_force",True)
|
||
def apply_force(self, t, wrench, interpolation:Literal["linear","oscillation"] = "linear", algorithm:Literal["admittance","position","hybrid","admithybrid","hybridPid","hybridAdmit","admittanceZ"] = "admittance"):
|
||
"""
|
||
施加力
|
||
param:
|
||
t: 时间或时间序列
|
||
wrench(nx6): 力矩或力矩序列,当输入为None时,表示力为[0,0,0,0,0,0]
|
||
is_interrupt: 是否允许中断
|
||
wait: 是否等待运动完成
|
||
timeout: 超时时间
|
||
interpolation: 插值方式
|
||
algorithm: 控制算法
|
||
return:
|
||
code: 0表示成功
|
||
1表示超时
|
||
2表示用户打断
|
||
3表示硬件错误
|
||
4表示参数错误
|
||
"""
|
||
|
||
if t==0:
|
||
self.arm_state.desired_wrench = wrench
|
||
return 0
|
||
|
||
# 在函数内部直接规划力矩轨迹
|
||
traj = self.wrench_traj_generate(t, wrench, interpolation=interpolation)
|
||
# # 在函数内部直接规划力矩轨迹
|
||
# traj = self.traj_generate(t, None, wrench, interpolation=interpolation)
|
||
if traj is None:
|
||
return 4
|
||
|
||
time_seq = traj['t']
|
||
wrench_traj = traj['wrench']
|
||
|
||
# try:
|
||
# self.controller_manager.switch_controller(algorithm)
|
||
# except ValueError as error:
|
||
# self.logger.log_error(error)
|
||
# return 4
|
||
|
||
self.logger.log_info("切换到%s控制器" % algorithm)
|
||
|
||
for i in range(len(time_seq)):
|
||
if self.interrupt_event.is_set():
|
||
self.interrupt_event.clear()
|
||
return 2
|
||
if self.pause_envent.is_set():
|
||
self.pause_envent.clear()
|
||
return 5
|
||
if self.skip_envent.is_set():
|
||
self.skip_envent.clear()
|
||
return 6
|
||
if self.arm.is_exit or self.exit_event.is_set():
|
||
return 3
|
||
if self.adjust_wrench_envent.is_set():
|
||
# if wrench_traj is not None:
|
||
# wrench_traj[i:] = self.adjust_wrench
|
||
self.adjust_wrench_envent.clear()
|
||
# print(wrench_traj[i])
|
||
self.arm_state.desired_wrench = wrench_traj[i] if wrench_traj is not None else np.zeros(6, dtype=np.float64)
|
||
self.command_rate.precise_sleep()
|
||
|
||
return 0
|
||
|
||
@track_function("resample_curve_strict",True)
|
||
def resample_curve_strict(self, points, num_resampled_points):
|
||
"""
|
||
修正的曲线重新采样函数,移除重复点并确保累积弧长严格递增。
|
||
"""
|
||
# 计算累积弧长
|
||
distances = np.sqrt(np.sum(np.diff(points, axis=0)**2, axis=1)) # 计算相邻点之间的距离
|
||
cumulative_length = np.insert(np.cumsum(distances), 0, 0) # 计算累积弧长
|
||
|
||
# 确保累积弧长严格递增(去除重复值)
|
||
unique_indices = np.where(np.diff(cumulative_length, prepend=-np.inf) > 0)[0]
|
||
cumulative_length = cumulative_length[unique_indices]
|
||
points = points[unique_indices]
|
||
|
||
# 生成均匀间隔的新弧长
|
||
target_lengths = np.linspace(0, cumulative_length[-1], num_resampled_points)
|
||
|
||
# 在累积弧长上进行插值计算新点位置
|
||
x_interp = interp1d(cumulative_length, points[:, 0], kind='linear', fill_value="extrapolate")
|
||
y_interp = interp1d(cumulative_length, points[:, 1], kind='linear', fill_value="extrapolate")
|
||
|
||
new_x = x_interp(target_lengths)
|
||
new_y = y_interp(target_lengths)
|
||
|
||
return np.column_stack((np.round(new_x).astype(int), np.round(new_y).astype(int)))
|
||
|
||
@track_function("generate_in_spiral_cloudPoint",True)
|
||
def generate_in_spiral_cloudPoint(self, start, end, num_cycle_factor=None, width=None, direction:Literal["CW","CCW"] ='CW'):
|
||
"""
|
||
绘制由内向外的螺旋线。
|
||
|
||
参数:
|
||
- start: 起点坐标 (x, y)
|
||
- end: 终点坐标 (x, y)
|
||
- num_cycle_factor: 螺旋的圈数
|
||
- points: 螺旋线上的点的数量
|
||
"""
|
||
# 计算起点和终点之间的距离
|
||
dx = end[0] - start[0]
|
||
dy = end[1] - start[1]
|
||
distance = np.sqrt(dx**2 + dy**2)
|
||
if width is None:
|
||
width = self.width_default
|
||
if num_cycle_factor is None:
|
||
num_cycle_factor = 5
|
||
|
||
num_cycle_factor = np.round(num_cycle_factor).astype(int)
|
||
if distance < 2:
|
||
print("距离过短,无法绘制螺旋线")
|
||
return 0
|
||
if width <= 2:
|
||
width = 2
|
||
if num_cycle_factor < 1:
|
||
print("圈数必须为大于1的整数")
|
||
num_cycle_factor = 1
|
||
angle = np.arctan2(dy, dx) # 计算从起点到终点的角度
|
||
|
||
points = np.round(num_cycle_factor *2*np.pi/(np.pi/9)).astype(int)
|
||
|
||
# 螺旋的角度范围
|
||
theta = np.linspace(0, 2 * np.pi * num_cycle_factor, points)
|
||
|
||
# 螺旋的半径随着角度增大而线性增长
|
||
r = np.linspace(0, distance, points)
|
||
|
||
tempK = np.abs(width/(distance*((2*num_cycle_factor-0.75)/num_cycle_factor)))
|
||
|
||
# 计算螺旋的x, y坐标
|
||
if direction == "CCW":
|
||
# 反向旋转时,使用负的角度来反向螺旋
|
||
x = r * np.cos(-theta) # 逆时针旋转
|
||
y = r * np.sin(-theta)*tempK
|
||
else:
|
||
# 正常旋转
|
||
x = r * np.cos(theta)
|
||
y = r * np.sin(theta)*tempK
|
||
|
||
# 旋转螺旋线以对齐起点和终点的角度
|
||
x_rotated = x * np.cos(angle) - y * np.sin(angle)
|
||
y_rotated = x * np.sin(angle) + y * np.cos(angle)
|
||
|
||
# 平移螺旋线,使其从起点开始
|
||
x_final = x_rotated + start[0] - x_rotated[0]
|
||
y_final = y_rotated + start[1] - y_rotated[0]
|
||
|
||
# return np.hstack((np.round(x_final).astype(int).reshape(-1, 1), np.round(y_final).astype(int).reshape(-1, 1)))
|
||
result_points = np.hstack((np.round(x_final).astype(int).reshape(-1, 1),
|
||
np.round(y_final).astype(int).reshape(-1, 1)))
|
||
|
||
return self.resample_curve_strict(points=result_points, num_resampled_points=len(result_points))
|
||
|
||
@track_function("generate_out_spiral_cloudPoint",True)
|
||
def generate_out_spiral_cloudPoint(self,start, end, num_cycle_factor=None, width=None, direction:Literal["CW","CCW"] ='CW'):
|
||
"""
|
||
绘制由外向内的螺旋线。
|
||
|
||
参数:
|
||
- start: 起点坐标 (x, y)
|
||
- end: 终点坐标 (x, y)
|
||
- num_cycle_factor: 螺旋的圈数
|
||
- points: 螺旋线上的点的数量
|
||
"""
|
||
# 计算起点和终点之间的距离
|
||
dx = end[0] - start[0]
|
||
dy = end[1] - start[1]
|
||
angle = np.arctan2(dy, dx) # 计算从起点到终点的角度
|
||
distance = np.sqrt(dx**2 + dy**2)
|
||
if width is None:
|
||
width = self.width_default
|
||
if num_cycle_factor is None:
|
||
num_cycle_factor = 5
|
||
|
||
num_cycle_factor = np.round(num_cycle_factor).astype(int)
|
||
if distance < 2:
|
||
print("距离过短,无法绘制螺旋线")
|
||
return 0
|
||
if width <= 2:
|
||
width = 2
|
||
if num_cycle_factor < 1:
|
||
print("圈数必须为大于1的整数")
|
||
num_cycle_factor = 1
|
||
|
||
points = np.round(num_cycle_factor *2*np.pi/(np.pi/9)).astype(int)
|
||
|
||
# 螺旋的角度范围
|
||
theta = np.linspace(0, 2 * np.pi * num_cycle_factor, points)
|
||
|
||
# 螺旋的半径随着角度增大而线性增长
|
||
r = np.linspace(distance, 0, points)
|
||
|
||
tempK = np.abs(width/(distance*((2*num_cycle_factor-0.75)/num_cycle_factor)))
|
||
|
||
# 计算螺旋的x, y坐标
|
||
if direction == "CCW":
|
||
# 反向旋转时,使用负的角度来反向螺旋
|
||
x = r * np.cos(-theta) # 逆时针旋转
|
||
y = r * np.sin(-theta)*tempK
|
||
else:
|
||
# 正常旋转
|
||
x = r * np.cos(theta)
|
||
y = r * np.sin(theta)*tempK
|
||
|
||
# 旋转螺旋线以对齐起点和终点的角度
|
||
x = x * np.cos(np.pi) - y * np.sin(np.pi)
|
||
y = x * np.sin(np.pi) + y * np.cos(np.pi)
|
||
|
||
# 旋转螺旋线以对齐起点和终点的角度
|
||
x_rotated = x * np.cos(angle) - y * np.sin(angle)
|
||
y_rotated = x * np.sin(angle) + y * np.cos(angle)
|
||
|
||
# 平移螺旋线,使其从起点开始
|
||
x_final = x_rotated + start[0] - x_rotated[0]
|
||
y_final = y_rotated + start[1] - y_rotated[0]
|
||
|
||
# return np.hstack((np.round(x_final).astype(int).reshape(-1, 1), np.round(y_final).astype(int).reshape(-1, 1)))
|
||
result_points = np.hstack((np.round(x_final).astype(int).reshape(-1, 1),
|
||
np.round(y_final).astype(int).reshape(-1, 1)))
|
||
|
||
return self.resample_curve_strict(points=result_points, num_resampled_points=len(result_points))
|
||
|
||
@track_function("generate_prolateCycloid_cloudPoint",True)
|
||
def generate_prolateCycloid_cloudPoint(self, start, end, width=None, num_cycle_factor=None, direction: Literal["CW", "CCW"] = 'CW'):
|
||
# Parameters
|
||
step_degree = np.pi / 12
|
||
a_prolate = 4.5
|
||
radius = 30.0
|
||
distance = np.linalg.norm(np.array(end) - np.array(start))
|
||
angle = np.arctan2(end[1] - start[1], end[0] - start[0])
|
||
|
||
if width is None:
|
||
width = self.width_default
|
||
if num_cycle_factor is None:
|
||
num_cycle_factor = 5
|
||
num_cycle_factor = np.round(num_cycle_factor).astype(int)
|
||
if distance < 2:
|
||
print("距离过短,无法绘制Cycloid线")
|
||
return 0
|
||
if width <= 2:
|
||
width = 2
|
||
if num_cycle_factor < 1:
|
||
print("圈数必须为大于1的整数")
|
||
num_cycle_factor = 1
|
||
|
||
r_cycloid = radius / (2 * a_prolate)
|
||
num_cycles = int(num_cycle_factor)
|
||
temp_k = distance / (r_cycloid * (np.pi + 2 * a_prolate + 2 * np.pi * num_cycles))
|
||
|
||
# Generate the prolate cycloid
|
||
theta_prolate = np.linspace(0.5 * np.pi, num_cycles * 2 * np.pi + 1.5 * np.pi,
|
||
round((num_cycles * 2 * np.pi + 1.5 * np.pi) / step_degree))
|
||
if direction == 'CW':
|
||
theta_prolate = np.flip(theta_prolate)
|
||
|
||
x_prolate = r_cycloid * (theta_prolate - a_prolate * np.sin(theta_prolate)) * temp_k
|
||
y_prolate = r_cycloid * (1 - a_prolate * np.cos(theta_prolate))
|
||
|
||
# Scale y-prolate based on width
|
||
y_prolate = (y_prolate / 30) * width
|
||
|
||
# Rotation of cycloid
|
||
if direction == 'CW':
|
||
x_prolate, y_prolate = -x_prolate, -y_prolate
|
||
|
||
x_rotated = x_prolate * np.cos(angle) - y_prolate * np.sin(angle)
|
||
y_rotated = x_prolate * np.sin(angle) + y_prolate * np.cos(angle)
|
||
|
||
# Translate to starting point
|
||
x_final = x_rotated + start[0] - x_rotated[0]
|
||
y_final = y_rotated + start[1] - y_rotated[0]
|
||
|
||
return np.column_stack((np.round(x_final).astype(int), np.round(y_final).astype(int)))
|
||
|
||
@track_function("generate_ellipse_cloudPoint",True)
|
||
def generate_ellipse_cloudPoint(self,start, end, num_cycle_factor=None, width=None, direction:Literal["CW","CCW"] ='CW'):
|
||
# 计算椭圆的中心(两个端点的中点)
|
||
center = (start + end) / 2
|
||
if num_cycle_factor is None:
|
||
num_cycle_factor = 1
|
||
|
||
num_cycle_factor = np.round(num_cycle_factor).astype(int)
|
||
|
||
distance = np.sqrt((end[0] - start[0])**2 + (end[1] - start[1])**2)
|
||
|
||
if width is None:
|
||
width = distance/2
|
||
|
||
|
||
if distance < 2:
|
||
print("距离过短,无法绘制椭圆")
|
||
return 0
|
||
if width <= 2:
|
||
print("椭圆半径过小,无法绘制椭圆")
|
||
width = self.width_default
|
||
if num_cycle_factor < 1:
|
||
print("圈数必须为大于1的整数")
|
||
num_cycle_factor = 1
|
||
|
||
# 计算两个端点的距离,即长半轴长度
|
||
a = np.sqrt((end[0] - start[0])**2 + (end[1] - start[1])**2) / 2
|
||
|
||
# 假设短半轴长度为常量值radius
|
||
b = width / 2
|
||
|
||
# 计算旋转角度(由端点的方向确定)
|
||
angle = np.arctan2(end[1] - start[1], end[0] - start[0])
|
||
points = np.round(((num_cycle_factor + 1) * np.pi * 2)/(np.pi/9)).astype(int)
|
||
|
||
# 绘制椭圆的参数化公式
|
||
theta = np.linspace(np.pi, (2+num_cycle_factor*2)* np.pi, points)
|
||
|
||
# 计算螺旋的x, y坐标
|
||
if direction == "CCW":
|
||
# 反向旋转时,使用负的角度来反向螺旋
|
||
x_ellipse = a * np.cos(-theta) #顺时针
|
||
y_ellipse = b * np.sin(-theta)
|
||
else:
|
||
# 正常旋转
|
||
x_ellipse = a * np.cos(theta)
|
||
y_ellipse = b * np.sin(theta)
|
||
|
||
# 绕中心旋转椭圆
|
||
x_rot = center[0] + x_ellipse * np.cos(angle) - y_ellipse * np.sin(angle)
|
||
y_rot = center[1] + x_ellipse * np.sin(angle) + y_ellipse * np.cos(angle)
|
||
|
||
# 返回椭圆点云,保留整数值
|
||
return np.hstack((np.round(x_rot).astype(int).reshape(-1, 1), np.round(y_rot).astype(int).reshape(-1, 1)))
|
||
|
||
@track_function("generate_Lemniscate_cloudPoint",True)
|
||
def generate_Lemniscate_cloudPoint(self, start, end, width = None,num_cycle_factor = None, direction:Literal["CW","CCW"] ='CW'):
|
||
"""
|
||
生成从start到end的8字形插补路径
|
||
|
||
:param start: 起点坐标 (x0, y0)
|
||
:param end: 终点坐标 (x1, y1)
|
||
:param num_points: 插补点的数量
|
||
:return: 插补路径的坐标列表
|
||
"""
|
||
# 起点和终点的坐标
|
||
x1, y1 = start
|
||
x0, y0 = end
|
||
step_degree=np.pi/12
|
||
|
||
# 计算轨迹的缩放因子和旋转角度
|
||
distance = np.linalg.norm([x1 - x0, y1 - y0])
|
||
|
||
if width is None:
|
||
width = self.width_default
|
||
if num_cycle_factor is None:
|
||
num_cycle_factor = 1
|
||
if distance < 2:
|
||
print("距离过短,无法绘制螺旋线")
|
||
return 0
|
||
if width <= 2:
|
||
width = 2
|
||
if num_cycle_factor < 1:
|
||
print("圈数必须为大于1的整数")
|
||
num_cycle_factor = 1
|
||
|
||
a = distance / 2 # 轨迹的宽度一半
|
||
b = width/2
|
||
angle = np.arctan2(y1 - y0, x1 - x0)
|
||
|
||
num_points = np.abs((2*num_cycle_factor+1)*np.pi/step_degree).astype(int)
|
||
|
||
# 生成标准的8字形轨迹 (Gerono卵形线)
|
||
t_values = np.linspace(0.5*np.pi, (2*num_cycle_factor + 1.5)* np.pi, num_points)
|
||
|
||
x_curve = a * np.sin(t_values)
|
||
if direction == 'CCW':
|
||
y_curve = -b * np.sin(2 * t_values)
|
||
else:
|
||
y_curve = b * np.sin(2 * t_values)
|
||
|
||
# 构建旋转矩阵
|
||
rotation_matrix = np.array([[np.cos(angle), -np.sin(angle)],
|
||
[np.sin(angle), np.cos(angle)]])
|
||
|
||
# 旋转并平移轨迹
|
||
curve_points = np.vstack((x_curve, y_curve)).T
|
||
rotated_points = curve_points @ rotation_matrix.T
|
||
shifted_points = rotated_points + [(x0 + x1) / 2, (y0 + y1) / 2]
|
||
|
||
# 将插值结果四舍五入为整数
|
||
return np.round(shifted_points).astype(int)
|
||
|
||
@track_function("generate_circle_cloudPoint",True)
|
||
def generate_circle_cloudPoint(self, start_point, center, step_degree=np.pi/12, num_cycle_factor = 3):
|
||
"""
|
||
center: 圆心坐标,形如 (x_c, y_c)
|
||
start_point: 起始点坐标,形如 (x_0, y_0)
|
||
radius: 圆的半径
|
||
delta_theta: 每次插补的角度增量
|
||
num_turns: 绕圈的次数
|
||
"""
|
||
# 确定总共需要生成的插补点数
|
||
num_points = int((2 * np.pi * num_cycle_factor) / step_degree)
|
||
|
||
# 圆心
|
||
x_c, y_c = center
|
||
|
||
radius = np.linalg.norm(np.array(start_point) - np.array(center)) # 半径
|
||
|
||
# 计算起始点的初始角度
|
||
x_0, y_0 = start_point
|
||
theta_0 = np.arctan2(y_0 - y_c, x_0 - x_c)
|
||
|
||
# 初始化存储插补点的列表
|
||
circle_points = []
|
||
|
||
# 生成插补点
|
||
for i in range(num_points):
|
||
# 当前角度
|
||
theta_i = theta_0 + i * step_degree
|
||
|
||
# 计算插补点的坐标
|
||
x_i = x_c + radius * np.cos(theta_i)
|
||
y_i = y_c + radius * np.sin(theta_i)
|
||
|
||
# 将点添加到列表中
|
||
|
||
circle_points.append((np.round(x_i).astype(int), np.round(y_i).astype(int)))
|
||
|
||
circle_points.append((np.round(x_0).astype(int), np.round(y_0).astype(int)))
|
||
|
||
return circle_points
|
||
|
||
@track_function("generate_point_cloudPoint",True)
|
||
def generate_point_cloudPoint(self, point, num_cycle_factor=None, width=None, direction: Literal["CW", "CCW"] = 'CW'):
|
||
if num_cycle_factor is None:
|
||
num_cycle_factor = 12
|
||
if num_cycle_factor < 1:
|
||
print("圈数必须为大于1的整数")
|
||
num_cycle_factor = 1
|
||
num_cycle_factor = np.abs(np.round(num_cycle_factor).astype(int))
|
||
if width is None:
|
||
width = 20
|
||
if width < 2:
|
||
print("宽度必须为大于2的整数")
|
||
width = 2
|
||
width = np.abs(np.round(width).astype(int))
|
||
|
||
radius = width/2
|
||
|
||
points = np.round((num_cycle_factor * np.pi * 2)/(np.pi/9)).astype(int)
|
||
thetas = np.linspace(0, num_cycle_factor * 2 * np.pi, points)
|
||
radiuss = np.ones_like(thetas) * radius
|
||
|
||
for i in range(len(thetas)):
|
||
if thetas[i] <= np.pi/2:
|
||
radiuss[i] = np.sin(thetas[i]) * radius
|
||
if thetas[i] >= num_cycle_factor * np.pi * 2 - np.pi / 2:
|
||
radiuss[i] = np.sin(num_cycle_factor * np.pi * 2 - thetas[i]) * radius
|
||
|
||
|
||
|
||
if direction == "CCW":
|
||
x_ellipse = radiuss * np.cos(-thetas)
|
||
y_ellipse = radiuss * np.sin(-thetas)
|
||
else:
|
||
x_ellipse = radiuss * np.cos(thetas)
|
||
y_ellipse = radiuss * np.sin(thetas)
|
||
x_rot = point[0] + x_ellipse
|
||
y_rot = point[1] + y_ellipse
|
||
result_points = np.hstack((np.round(x_rot).astype(int).reshape(-1, 1), np.round(y_rot).astype(int).reshape(-1, 1)))
|
||
return self.resample_curve_strict(points=result_points, num_resampled_points=len(result_points))
|
||
|
||
@track_function("generate_line_cloudPoint",True)
|
||
def generate_line_cloudPoint(self,start, end, step_distance = 5):
|
||
"""
|
||
生成起点和终点之间的线性插值点
|
||
|
||
参数:
|
||
start: 起点 (标量或数组,例如 [x1, y1, z1])
|
||
end: 终点 (标量或数组,例如 [x2, y2, z2])
|
||
num_points: 插值点的数量(包括起点和终点)
|
||
|
||
返回:
|
||
包含线性插值点的数组
|
||
"""
|
||
start = np.array(start)
|
||
end = np.array(end)
|
||
distance = np.linalg.norm(end - start)
|
||
num_points = int(distance / step_distance)
|
||
|
||
if num_points <=2:
|
||
num_points = 2
|
||
# 使用 numpy 的 linspace 在起点和终点之间生成线性插值点
|
||
interpolated_points = np.linspace(start, end, num_points)
|
||
|
||
# 将插值结果四舍五入为整数
|
||
return np.round(interpolated_points).astype(int)
|
||
|
||
@track_function("generate_repeat_line_cloudPoint",True)
|
||
def generate_repeat_line_cloudPoint(self, start_point, end_point, num_cycle_factor = 5):
|
||
"""
|
||
start_point: 起始点坐标,形如 (x_0, y_0)
|
||
end_point: 终点坐标,形如 (x_0, y_0)
|
||
num_cycle_factor: 重复的次数
|
||
"""
|
||
|
||
# 初始化存储插补点的列表
|
||
result_points = []
|
||
# 生成插补点
|
||
for i in range(num_cycle_factor):
|
||
# 将点添加到列表中
|
||
result_points.append(start_point)
|
||
result_points.append(end_point)
|
||
return np.round(np.array(result_points)).astype(int)
|
||
|
||
@track_function("generate_finger_circle_cloudPoint",True)
|
||
def generate_finger_circle_cloudPoint(self, center_point, num_cycle_factor=5, radius=30,deviation=15):
|
||
"""
|
||
生成一指禅绕球心的圆弧轨迹
|
||
center_point: 圆心坐标 [x, y, z, roll, pitch, yaw]
|
||
num_cycle_factor: 轨迹的重复次数
|
||
radius: 圆的半径
|
||
"""
|
||
result_points = []
|
||
result_points.append(copy.deepcopy(center_point))
|
||
deviation = deviation * np.pi/180
|
||
|
||
totalCounts = int(360 / radius)
|
||
shift = 1
|
||
|
||
|
||
for i in range(num_cycle_factor):
|
||
for countR in range(0, totalCounts):
|
||
# 当前旋转矩阵
|
||
temp_pose = copy.deepcopy(center_point)
|
||
rotation = R.from_euler('xyz', temp_pose[3:], degrees=False)
|
||
current_rotation_matrix = rotation.as_matrix()
|
||
|
||
|
||
if i == 0:
|
||
shift = countR/(90/radius) if countR/(90/radius) < 1 else 1
|
||
if i == num_cycle_factor-1:
|
||
shift = (totalCounts-countR)/(90/radius) if (totalCounts-countR)/(90/radius) < 1 else 1
|
||
|
||
|
||
# 生成增量旋转矩阵
|
||
increment_rotation= R.from_euler('xyz', [deviation*shift,0,countR * np.pi*2 /totalCounts], degrees=False).as_matrix()
|
||
|
||
# 更新旋转矩阵
|
||
updated_rotation_matrix = current_rotation_matrix @ increment_rotation
|
||
|
||
# 将旋转矩阵转换回欧拉角
|
||
temp_pose[3:] = R.from_matrix(updated_rotation_matrix).as_euler('xyz', degrees=False)
|
||
|
||
# 添加当前点到结果列表
|
||
result_points.append(copy.deepcopy(temp_pose))
|
||
|
||
result_points.append(copy.deepcopy(center_point))
|
||
return result_points
|
||
|
||
def traj_generate(self, t: Union[int, float, List[float]], pose = None, wrench = None, interpolation: Literal["linear", "cubic","circle","cloud_point"] = 'linear',**kwargs):
|
||
"""
|
||
轨迹生成
|
||
param:
|
||
t: 时间或时间序列
|
||
pose(nx6): 世界坐标系下的位姿或位姿序列,当输入为None时,表示保持当前位姿
|
||
wrench(nx6): 力矩或力矩序列,当输入为None时,表示保持当前的力
|
||
interpolation: 插值方式
|
||
return:
|
||
traj: 生成的轨迹 -> dict
|
||
"""
|
||
# 确保时间输入是列表
|
||
if isinstance(t, (int, float)):
|
||
t = [t]
|
||
elif not isinstance(t, list) or any(not isinstance(i, (int, float)) for i in t):
|
||
self.logger.log_error("not isinstance(t, list) or any(not isinstance(i, (int, float)) for i in t)")
|
||
return None
|
||
# 检查时间点数量
|
||
if len(t) < 1:
|
||
self.logger.log_error("len(t) < 1")
|
||
return None
|
||
# 时间序列
|
||
time_points = [0.0] + t
|
||
time_points = np.array(time_points)
|
||
dt = self.command_rate.to_sec()
|
||
|
||
for i in range(len(time_points)):
|
||
time_points[i] = np.round(time_points[i] / dt) * dt
|
||
if np.abs(time_points[-1]) < dt:
|
||
time_points[-1] = dt
|
||
# 由于浮点数精度问题,time_step要乘0.9
|
||
time_seq = np.arange(time_points[0], time_points[-1] + dt * 0.9, dt)
|
||
pos_traj = None
|
||
quat_traj = None
|
||
wrench_traj = None
|
||
pos_vel = None
|
||
angular_vel = None
|
||
# 位置轨迹与速度轨迹生成
|
||
if pose is not None:
|
||
if interpolation != 'cloud_point':
|
||
pose = np.array(pose).reshape(-1, 6)
|
||
if pose.shape[0] != len(t):
|
||
self.logger.log_error("pose.shape[0] != len(t)")
|
||
print(pose.shape[0],len(t))
|
||
return None
|
||
pose_matrix = self.convert_to_7d(pose)
|
||
else:
|
||
pose = np.array(pose)
|
||
try:
|
||
# 插值
|
||
if interpolation == 'linear':
|
||
pos_traj, quat_traj = linear_interpolate(positions=pose_matrix[:, :3], quaternions=pose_matrix[:, 3:], time_points=time_points, time_step=dt)
|
||
elif interpolation == 'cubic':
|
||
pos_traj, quat_traj = spline_interpolate(positions=pose_matrix[:, :3], quaternions=pose_matrix[:, 3:], time_points=time_points, time_step=dt)
|
||
elif interpolation == 'circle':
|
||
if self.ion is not None:
|
||
pos_traj = circle_trajectory(center=pose, radius=0.07,time_points=time_points,time_step=dt,**kwargs)
|
||
else:
|
||
pos_traj = circle_trajectory(center=pose,time_points=time_points,time_step=dt,**kwargs)
|
||
quat_traj = np.tile(R.from_euler('xyz', np.array(pose[0][3:])).as_quat(), (pos_traj.shape[0], 1))
|
||
elif interpolation == 'cloud_point':
|
||
pos_traj, quat_traj = cloud_point_interpolate(positions=pose[:, :3], quaternions=pose[:, 3:], time_points=time_points, time_step=dt)
|
||
# self.logger.log_yellow(f'{pos_traj[0]} {pos_traj[-1]}')
|
||
except ValueError as error:
|
||
self.logger.log_error(error)
|
||
return None
|
||
else:
|
||
self.logger.log_error("未输入位置进行规划")
|
||
|
||
# TODO 力轨迹生成 改为线性增加
|
||
if wrench is not None:
|
||
wrench = np.array(wrench).reshape(-1, 6)
|
||
if wrench.shape[0] != len(t):
|
||
return None
|
||
indices = np.searchsorted(time_points, time_seq, side='left') - 1
|
||
indices = np.clip(indices, 0, len(wrench) - 1)
|
||
wrench_traj = wrench[indices]
|
||
# 构建返回数据字典
|
||
traj = {
|
||
't': time_seq,
|
||
'pos': pos_traj,
|
||
'quat': quat_traj,
|
||
'pos_vel': pos_vel,
|
||
'angular_vel': angular_vel,
|
||
'wrench': wrench_traj
|
||
}
|
||
return traj
|
||
|
||
|
||
def wrench_traj_generate(self, t: Union[int, float, List[float]], wrench = None, interpolation: Literal["linear", "oscillation"] = 'linear',**kwargs):
|
||
"""
|
||
力/力矩轨迹生成
|
||
param:
|
||
t: 时间或时间序列
|
||
wrench(nx6): 力矩或力矩序列
|
||
interpolation: 插值方式
|
||
return:
|
||
traj: 生成的轨迹 -> dict
|
||
"""
|
||
# 确保时间输入是列表
|
||
if isinstance(t, (int, float)):
|
||
t = [t]
|
||
elif not isinstance(t, list) or any(not isinstance(i, (int, float)) for i in t):
|
||
self.logger.log_error("not isinstance(t, list) or any(not isinstance(i, (int, float)) for i in t)")
|
||
return None
|
||
# 检查时间点数量
|
||
if len(t) < 1:
|
||
self.logger.log_error("len(t) < 1")
|
||
return None
|
||
|
||
if wrench is None:
|
||
self.logger.log_error("未输入力矩进行规划")
|
||
return None
|
||
# 时间序列
|
||
time_points = [0.0] + t
|
||
time_points = np.array(time_points)
|
||
dt = self.command_rate.to_sec()
|
||
|
||
for i in range(len(time_points)):
|
||
time_points[i] = np.round(time_points[i] / dt) * dt
|
||
if np.abs(time_points[-1]) < dt:
|
||
time_points[-1] = dt
|
||
# 由于浮点数精度问题,time_step要乘0.9
|
||
time_seq = np.arange(time_points[0], time_points[-1] + dt * 0.9, dt)
|
||
|
||
wrench_traj = None
|
||
|
||
|
||
|
||
|
||
# TODO 力轨迹生成 改为线性增加
|
||
if wrench is not None:
|
||
wrench = np.array(wrench).reshape(-1, 6)
|
||
|
||
|
||
if wrench.shape[0] != len(t):
|
||
return None
|
||
if interpolation == 'oscillation':
|
||
wrench_traj = oscillation_wrench_interpolate(wrench=wrench, time_points=time_points, time_step=dt)
|
||
else:
|
||
wrench_traj = linear_wrench_interpolate(wrench=wrench, time_points=time_points, time_step=dt)
|
||
|
||
# indices = np.searchsorted(time_points, time_seq, side='left') - 1
|
||
|
||
|
||
# indices = np.clip(indices, 0, len(wrench) - 1)
|
||
|
||
# wrench_traj = wrench[indices]
|
||
|
||
|
||
# 构建返回数据字典
|
||
traj = {
|
||
't': time_seq,
|
||
'wrench': wrench_traj
|
||
}
|
||
return traj
|
||
|
||
def force_traj_generate(self, t: Union[int, float, List[float]], wrench = None, interpolation: Literal["linear", "oscillation", "auto"] = "auto",**kwargs):
|
||
"""
|
||
力/力矩轨迹生成
|
||
param:
|
||
t: 时间或时间序列
|
||
wrench(nx6): 力矩或力矩序列
|
||
interpolation: 插值方式
|
||
return:
|
||
traj: 生成的轨迹 -> dict
|
||
"""
|
||
# 确保时间输入是列表
|
||
if isinstance(t, (int, float)):
|
||
t = [t]
|
||
elif not isinstance(t, list) or any(not isinstance(i, (int, float)) for i in t):
|
||
self.logger.log_error("not isinstance(t, list) or any(not isinstance(i, (int, float)) for i in t)")
|
||
return None
|
||
# 检查时间点数量
|
||
if len(t) < 1:
|
||
self.logger.log_error("len(t) < 1")
|
||
return None
|
||
|
||
if wrench is None:
|
||
self.logger.log_error("未输入力矩进行规划")
|
||
return None
|
||
# 时间序列
|
||
time_points = [0.0] + t
|
||
time_points = np.array(time_points)
|
||
dt = self.command_rate.to_sec()
|
||
|
||
for i in range(len(time_points)):
|
||
time_points[i] = np.round(time_points[i] / dt) * dt
|
||
if np.abs(time_points[-1]) < dt:
|
||
time_points[-1] = dt
|
||
# 由于浮点数精度问题,time_step要乘0.9
|
||
time_seq = np.arange(time_points[0], time_points[-1] + dt * 0.9, dt)
|
||
|
||
wrench_traj = None
|
||
|
||
|
||
|
||
|
||
# TODO 力轨迹生成 改为线性增加
|
||
if wrench is not None:
|
||
wrench = np.array(wrench).reshape(-1, 6)
|
||
|
||
|
||
if wrench.shape[0] != len(t):
|
||
return None
|
||
if interpolation == 'oscillation':
|
||
wrench_traj = self.force_plan.oscillation_wrench_interpolate(wrench=wrench, time_points=time_points, time_step=dt)
|
||
elif interpolation == "auto":
|
||
# target_wrench = []
|
||
# target_wrench.append(wrench)
|
||
wrench_traj = self.force_plan.S_shaped_wrench_interpolate(start=self.arm_state.desired_wrench, wrench=wrench, time_points=time_points, time_step=dt)
|
||
else:
|
||
wrench_traj = self.force_plan.linear_wrench_interpolate(wrench=wrench, time_points=time_points, time_step=dt)
|
||
|
||
# indices = np.searchsorted(time_points, time_seq, side='left') - 1
|
||
|
||
|
||
# indices = np.clip(indices, 0, len(wrench) - 1)
|
||
|
||
# wrench_traj = wrench[indices]
|
||
|
||
|
||
# 构建返回数据字典
|
||
traj = {
|
||
't': time_seq,
|
||
'wrench': wrench_traj
|
||
}
|
||
return traj
|
||
|
||
|
||
|
||
if __name__ == "__main__":
|
||
waypoints = [
|
||
{"time": 0.0, "position": [0.25, -0.135, 0.3443], "velocity": [0, 0, 0],
|
||
"acceleration": [0, 0, 0],
|
||
"orientation": R.from_euler("xyz", [0, 0, 0], degrees=True).as_quat()},
|
||
{"time": 5.0, "position": [0.30, -0.135, 0.3043], "velocity": [0, 0, 0],
|
||
"acceleration": [0, 0, 0],
|
||
"orientation": R.from_euler("xyz", [0, 30, 30], degrees=True).as_quat()},
|
||
] ## 单位 m deg
|
||
myInterpolate = TrajectoryInterpolator(waypoints)
|
||
ts = np.linspace(0,5,100)
|
||
robot = MassageRobot(arm_config_path="/home/jsfb/jsfb/MassageRobot_Dobot/Massage/MassageControl/config/robot_config.yaml")
|
||
xr_list, vr_list, ar_list = [], [], []
|
||
for t in ts:
|
||
xr, vr, ar = myInterpolate.interpolate(t)
|
||
xr_list.append(xr)
|
||
vr_list.append(vr)
|
||
ar_list.append(ar)
|
||
xr_array = np.array(xr_list)
|
||
vr_array = np.array(vr_list)
|
||
ar_array = np.array(ar_list)
|
||
|
||
robot.xr = xr_array
|
||
robot.vr = vr_array
|
||
robot.ar = ar_array
|
||
robot.ts = ts
|
||
robot.dt = ts[1] - ts[0]
|
||
|
||
|
||
ready_pose = [250.0, -135.0, 344.3392, -180.0, 0.0, -90.0]
|
||
robot.current_head = 'finger_head'
|
||
|
||
robot.force_sensor.disable_active_transmission()
|
||
time.sleep(0.5)
|
||
robot.force_sensor.disable_active_transmission()
|
||
time.sleep(0.5)
|
||
robot.force_sensor.disable_active_transmission()
|
||
time.sleep(0.5)
|
||
robot.force_sensor.set_zero()
|
||
time.sleep(0.5)
|
||
robot.force_sensor.enable_active_transmission()
|
||
|
||
robot.init_hardwares(ready_pose)
|
||
time.sleep(3)
|
||
robot.controller_manager.switch_controller('admittance')
|
||
# robot.controller_manager.switch_controller('position')
|
||
|
||
|
||
|
||
atexit.register(robot.force_sensor.disconnect)
|
||
robot.arm_state.desired_wrench = np.array([0,0,-0.5,0,0,0])
|
||
try:
|
||
robot_thread = threading.Thread(target=robot.start)
|
||
robot_thread.start()
|
||
|
||
except KeyboardInterrupt:
|
||
print("用户中断操作。")
|
||
except Exception as e:
|
||
print("Exception occurred at line:", e.__traceback__.tb_lineno)
|
||
print("发生异常:", e)
|
||
|
||
robot_thread.join() # 等待机械臂线程结束
|
||
# robot.arm.disableRobot() |