From 5c8f3ef75dc99755efcbcc1fd2b88ee6b6d0daa7 Mon Sep 17 00:00:00 2001
From: swayneleong <15723182159@163.com>
Date: Tue, 27 May 2025 22:08:48 +0800
Subject: [PATCH] =?UTF-8?q?massage=5Fnova5=E5=88=9D=E6=AD=A5?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

---
 Massage/MassageControl/MassageRobot_nova5.py  | 1164 ++++++++++++++++-
 .../MassageControl/algorithms/arm_state.py    |    4 +
 .../MassageControl/hardware/dobot_nova5.py    |    9 +-
 .../MassageControl/tools/decorator_tools.py   |   29 +
 Massage/MassageControl/tools/serial_tools.py  |   94 ++
 5 files changed, 1272 insertions(+), 28 deletions(-)
 create mode 100755 Massage/MassageControl/tools/decorator_tools.py
 create mode 100755 Massage/MassageControl/tools/serial_tools.py

diff --git a/Massage/MassageControl/MassageRobot_nova5.py b/Massage/MassageControl/MassageRobot_nova5.py
index 58f8627..f0add3d 100644
--- a/Massage/MassageControl/MassageRobot_nova5.py
+++ b/Massage/MassageControl/MassageRobot_nova5.py
@@ -1,49 +1,140 @@
-from hardware.dobot_nova5 import dobot_nova5
-from hardware.force_sensor import XjcSensor
-
-from algorithms.arm_state import ArmState
-from algorithms.controller_manager import ControllerManager
-from algorithms.admittance_controller import AdmittanceController
-from algorithms.position_controller import PositionController
-
-from tools.log import CustomLogger
-from tools.yaml_operator import read_yaml
-from tools.Rate import Rate
+from functools import wraps
+import sys
 import numpy as np
 import threading
 import time
+from typing import List, Literal, Union
+
+from scipy.interpolate import interp1d
+
+import requests
 import os
 from scipy.spatial.transform import Rotation as R
 import ctypes
 libc = ctypes.CDLL("libc.so.6")
 import atexit
-from tools.Trajectory import TrajectoryInterpolator
+import copy
+
 from scipy.spatial.transform import Slerp
+
+
+try:
+    # 导入算法
+    from .algorithms.arm_state import ArmState
+    from .algorithms.controller_manager import ControllerManager
+    from .algorithms.admittance_controller import AdmittanceController
+    from .algorithms.hybrid_controller import HybridController
+    from .algorithms.admithybrid_controller import AdmitHybridController
+    from .algorithms.position_controller import PositionController
+    from .algorithms.hybridPid_controller import HybridPidController
+    from .algorithms.hybridAdmit_controller import HybridAdmitController
+    from .algorithms.positionerSensor_controller import PositionerSensorController
+    from .algorithms.admittanceZ_controller import AdmittanceZController
+    from .algorithms.interpolation import linear_interpolate,spline_interpolate,circle_trajectory,cloud_point_interpolate,oscillation_wrench_interpolate,linear_wrench_interpolate
+    from .algorithms.force_planner import ForcePlanner
+
+    # 导入硬件
+    from .hardware.dobot_nova5 import dobot_nova5
+    from .hardware.force_sensor import XjcSensor
+
+    # 导入工具
+    from .tools.log import CustomLogger
+    from .tools.Rate import Rate
+    from .tools.yaml_operator import read_yaml
+    from .tools.decorator_tools import custom_decorator,apply_decorators
+    from .tools.serial_tools import find_serial_by_location
+    from .tools.Trajectory import TrajectoryInterpolator
+except:
+    #导入算法
+    from algorithms.arm_state import ArmState
+    from algorithms.controller_manager import ControllerManager
+    from algorithms.admittance_controller import AdmittanceController
+    from algorithms.admittanceZ_controller import AdmittanceZController
+    from algorithms.hybrid_controller import HybridController
+    from algorithms.admithybrid_controller import AdmitHybridController
+    from algorithms.position_controller import PositionController
+    from algorithms.hybridPid_controller import HybridPidController
+    from algorithms.hybridAdmit_controller import HybridAdmitController
+    from algorithms.positionerSensor_controller import PositionerSensorController
+    from algorithms.interpolation import linear_interpolate,spline_interpolate,circle_trajectory,cloud_point_interpolate,oscillation_wrench_interpolate,linear_wrench_interpolate
+    from algorithms.force_planner import ForcePlanner
+
+    # 导入硬件
+    from hardware.dobot_nova5 import dobot_nova5
+    from hardware.force_sensor import XjcSensor
+
+    # 导入工具
+    from tools.log import CustomLogger
+    from tools.Rate import Rate
+    from tools.yaml_operator import read_yaml
+    from tools.decorator_tools import custom_decorator,apply_decorators
+    from tools.serial_tools import find_serial_by_location
+    from tools.Trajectory import TrajectoryInterpolator
+
+import matplotlib.pyplot as plt
+from matplotlib.animation import FuncAnimation 
+
+current_file_path = os.path.abspath(__file__)
+MassageRobot_Dobot_Path = os.path.dirname(os.path.dirname(os.path.dirname(current_file_path)))
+print(MassageRobot_Dobot_Path)
+# 添加目标文件夹到系统路径
+sys.path.append(MassageRobot_Dobot_Path)
+from VortXDB.client import VTXClient
+
+"""
+在 Python 中,atexit.register 用于注册程序退出时要执行的函数。atexit 模块提供了一个简单的接口,用于在程序正常终止时执行清理操作。
+当你注册多个函数时,它们的执行顺序遵循先进后出的顺序(LIFO:Last In, First Out)。也就是说,最后注册的函数会最先执行。
+"""
+
+def track_function(function_name,log = False):
+    def before(self, *args, **kwargs):
+        self.current_function = function_name
+        if log:
+            self.logger.log_info(f"Entering function: {function_name}")
+    def after(self, result, *args, **kwargs):
+        self.current_function = None
+        if log:
+            self.logger.log_info(f"Exiting function: {function_name},code: {result}")
+    return custom_decorator(before=before, after=after)
+
+@apply_decorators
 class MassageRobot:
     def __init__(self,arm_config_path,is_log=False):
-        self.logger = CustomLogger(
-            log_name="测试日志",
-            log_file="logs/MassageRobot_nova5_test.log",
-            precise_time=True)
+        self.logger = CustomLogger()
         if is_log:
-            self.logger_data = CustomLogger(log_name="运动数据日志",
-                                            log_file="logs/MassageRobot_kinematics_data.log",
-                                            precise_time=True)
+            self.logger_data = CustomLogger()
             # 日志线程    
             threading.Thread(target=self.log_thread,daemon=True).start()
+
+        self.vtxdb = VTXClient()
         
         self.arm_state = ArmState()
         self.arm_config = read_yaml(arm_config_path)
         # arm 实例化时机械臂类内部进行通讯连接
-        self.arm = dobot_nova5(self.arm_config['arm_ip'])
+        self.arm = dobot_nova5(arm_ip = self.arm_config['arm_ip'])
         self.arm.start()
+
+        self.thermotherapy = None
+        self.shockwave = None
+        self.stone = None
+        self.heat = None
+        self.ion = None
+        self.force_plan = ForcePlanner()
         # 传感器
+        # self.force_sensor = None
         self.force_sensor = XjcSensor()
-        self.force_sensor.connect()
-        # 控制器初始化(初始化为导纳控制)
+        # self.force_sensor.connect()
+        # 控制器,初始为导纳控制
         self.controller_manager = ControllerManager(self.arm_state)
         self.controller_manager.add_controller(AdmittanceController,'admittance',self.arm_config['controller'][0])
-        self.controller_manager.add_controller(PositionController,'position',self.arm_config['controller'][1])
+        self.controller_manager.add_controller(HybridController,'hybrid',self.arm_config['controller'][1])
+        self.controller_manager.add_controller(PositionController,'position',self.arm_config['controller'][2])
+        self.controller_manager.add_controller(AdmitHybridController,'admithybrid',self.arm_config['controller'][3])
+        self.controller_manager.add_controller(HybridPidController,'hybridPid',self.arm_config['controller'][4])
+        self.controller_manager.add_controller(HybridAdmitController,'hybridAdmit',self.arm_config['controller'][5])
+        self.controller_manager.add_controller(PositionerSensorController,'positionerSensor',self.arm_config['controller'][6])
+        self.controller_manager.add_controller(AdmittanceZController,'admittanceZ',self.arm_config['controller'][7])
+        
         self.controller_manager.switch_controller('admittance')
         # 按摩头参数暂时使用本地数据
         massage_head_dir = self.arm_config['massage_head_dir']
@@ -84,8 +175,8 @@ class MassageRobot:
         self.skip_envent = threading.Event()
         self.skip_envent.clear() # 跳过初始化为False
         # 按摩调整
-        self.adjust_wrench_event = threading.Event()
-        self.adjust_wrench_event.clear() # 调整初始化为False
+        self.adjust_wrench_envent = threading.Event()
+        self.adjust_wrench_envent.clear() # 调整初始化为False
         self.pos_increment = np.zeros(3,dtype=np.float64)
         self.adjust_wrench = np.zeros(6,dtype=np.float64)
 
@@ -123,6 +214,114 @@ class MassageRobot:
         # ---
         self.last_time = -1
         self.cur_time = -1
+
+
+    def sensor_set_zero(self):
+        self.arm_position_set0,self.arm_orientation_set0 = self.arm.get_arm_position()
+        arm_orientation_set0 = self.arm_orientation_set0
+        print("arm_orientation_set0",R.from_quat(arm_orientation_set0).as_euler('xyz',degrees=True))
+        self.b_rotation_s_set0 = R.from_quat(arm_orientation_set0).as_matrix()
+        #########
+        max_retries = 5
+        #no_head_wrong=np.array([-80.01506042, 116.34187317, -87.65788269, -0.32910481, -0.65792173, -0.61110526])#旧版传感器)
+        head_set0=np.array([0, 0, 0, 0, 0, 0])#新版传感器)
+        min_tolerance=0.001
+        retry_count = 0
+        self.force_sensor.disable_active_transmission()
+        time.sleep(0.5)
+        while retry_count < max_retries:
+            # 调用读取数据的函数,假设返回值是一个 numpy 数组
+            data = self.force_sensor.read_data_f32()
+            data = self.force_sensor.read_data_f32()
+            data = self.force_sensor.read_data_f32()
+            
+            # 检查返回值是否为 np.array 类型
+            if isinstance(data, np.ndarray):
+                # 判断数据是否接近目标值
+
+                if np.allclose(data, head_set0, atol=min_tolerance):
+                    print(f"检测到力传感器已置零:{data}") 
+                    break
+                else:
+                    print(f"检测到力传感器未置零:{data}") 
+
+                    # 禁用传感器传输
+                    self.force_sensor.disable_active_transmission()
+                    self.force_sensor.disable_active_transmission()
+                    time.sleep(0.5)
+
+                    # 尝试设置传感器清零
+                    code = self.force_sensor.set_zero()
+                    code = self.force_sensor.set_zero()
+                    max_try = 3
+
+                    while code != 0 and max_try > 0:
+                        self.logger.log_error("Force sensor set zero failed, retrying...")
+                        self.force_sensor.disable_active_transmission()
+                        self.force_sensor.disable_active_transmission()
+                        time.sleep(0.5)
+                        code = self.force_sensor.set_zero()
+                        code = self.force_sensor.set_zero()
+                        max_try -= 1
+                        time.sleep(0.5)
+
+                    # 如果多次尝试后失败
+                    if max_try == 0:
+                        self.logger.log_error("Force sensor set zero failed, exiting...")
+                        requests.post(
+                            "http://127.0.0.1:5000/on_message", data={"message": "传感器初始化失败"}
+                        )
+                        requests.post(
+                            "http://127.0.0.1:5000/update_massage_status", data={"massage_service_started": False}
+                        )
+                        # sys.exit(0)
+                        return -1  # 置零失败
+
+                    # 成功后跳出主循环
+                    break
+
+            else:
+                self.logger.log_error("读取数据失败或格式错误,尝试重试...")
+
+            # 增加重试次数
+            retry_count += 1
+            self.force_sensor.disable_active_transmission()
+            self.force_sensor.disconnect()
+            time.sleep(1)
+            self.force_sensor = XjcSensor()
+            time.sleep(1)  # 每次重试之间添加一个短暂的延迟
+
+        if retry_count == max_retries:
+            self.logger.log_error(f"超过最大重试次数 ({max_retries}),操作失败,退出程序...")
+            requests.post(
+                "http://127.0.0.1:5000/on_message", data={"message": "传感器初始化失败"}
+            )
+            requests.post(
+                "http://127.0.0.1:5000/update_massage_status", data={"massage_service_started": False}
+            )
+            return -2 #读取失败
+        
+        return 0 #读取成功并置零成功
+
+    def sensor_enable(self):
+        code  = self.force_sensor.enable_active_transmission()
+        max_try = 3
+        while code!= 0 and max_try > 0:
+            self.logger.log_error("Force sensor enable_active_transmission failed,retrying...")
+            code  = self.force_sensor.enable_active_transmission()
+            max_try -= 1
+            time.sleep(0.1)
+        if max_try == 0:
+            self.logger.log_error("Force sensor enable_active_transmission failed,exiting...")
+            requests.post(
+                "http://127.0.0.1:5000/on_message", data={"message": "传感器初始化失败"}
+            )
+            requests.post(
+                "http://127.0.0.1:5000/update_massage_status", data={"massage_service_started": False}
+            )
+            # sys.exit(0)
+            return -1
+        return 0
     
     # 预测步骤
     def kalman_predict(self,x, P, Q):
@@ -131,7 +330,7 @@ class MassageRobot:
         # 预测误差协方差
         P_predict = P + Q
         return x_predict, P_predict
-
+    
     # 更新步骤
     def kalman_update(self,x_predict, P_predict, z, R):
         # 卡尔曼增益
@@ -441,6 +640,919 @@ class MassageRobot:
         force = rot_matrix.T @ temp_force
         torque = rot_matrix.T @ np.cross(temp_force,vector_p) + rot_matrix.T @ torque
         return np.concatenate([force, torque])
+    
+    @track_function("set_position",True)
+    def set_position(self,pose,is_wait = False):
+        self.logger.log_info(f"set postion:{pose}")
+        x,y,z = pose[0],pose[1],pose[2]
+        roll,pitch,yaw = pose[3],pose[4],pose[5]
+        pose_command = np.concatenate([[x,y,z], [roll,pitch,yaw]])
+        time.sleep(0.1)
+        code = self.arm.RunPoint_P_inPose_M_RAD(poses_list = pose_command)
+        time.sleep(0.1)
+        return code
+
+    @track_function("move_to_point",True)
+    def move_to_point(self, t, pose, is_interrupt=True, wait=True, timeout=0.5, interpolation:Literal["linear","circle","cloud_point"] ='linear', algorithm:Literal["admittance","position","hybrid","admithybrid","hybridPid","hybridAdmit","admittanceZ"] = "admittance",is_switch_controller = False):
+        """
+        移动到指定的点
+        param:
+            t: 时间或时间序列
+            pose(nx6): 世界坐标系下的位姿或位姿序列,当输入为None时,表示保持当前位姿
+            is_interrupt: 是否允许中断
+            wait: 是否等待运动完成
+            timeout: 超时时间
+            interpolation: 插值方式
+            algorithm: 控制算法
+        return:
+            code: 0表示成功
+                1表示超时
+                2表示用户打断
+                3表示硬件错误
+                4表示参数错误
+        """
+        self.move_to_point_count += 1
+        self.logger.log_blue(f"move_to_point_count: {self.move_to_point_count}")
+        self.logger.log_yellow(f"move_to_point: {pose[-1]}")
+        # 在函数内部直接规划轨迹
+        traj = self.traj_generate(t, pose, None, interpolation=interpolation)
+        if traj is None:
+            self.logger.log_error("轨迹未生成或生成错误")
+            return 4
+
+        self.logger.log_yellow(f"traj_quat: {R.from_quat(traj['quat'][-1]).as_euler('xyz',degrees=False)}")
+        self.logger.log_yellow(f"traj_pos: {traj['pos'][-1]}")
+
+        time_seq = traj['t']
+        pos_traj = traj['pos']
+        quat_traj = traj['quat']
+
+        if is_switch_controller == True:
+            try:
+                self.controller_manager.switch_controller(algorithm)
+            except ValueError as error:
+                self.logger.log_error(error)
+                self.logger.log_error("切换到%s控制器失败" % algorithm)
+                return 4
+            self.logger.log_info("切换到%s控制器" % algorithm)
+
+        # self.logger.log_info("当前为%s控制器" % algorithm)
+        
+        for i in range(len(time_seq)):
+            if self.interrupt_event.is_set():
+                self.interrupt_event.clear()
+                self.logger.log_error("self.interrupt_event.clear()")
+                self.arm_state.desired_position = copy.deepcopy(self.arm_state.arm_position)
+                self.arm_state.desired_orientation = copy.deepcopy(self.arm_state.arm_orientation)
+                massage_wrench = np.array([0, 0, 0, 0, 0, 0])
+                self.arm_state.desired_wrench = massage_wrench
+                return 2
+            if self.pause_envent.is_set():
+                self.pause_envent.clear()
+                self.logger.log_error("self.pause_envent.clear()")
+                self.arm_state.desired_position = copy.deepcopy(self.arm_state.arm_position)
+                self.arm_state.desired_orientation = copy.deepcopy(self.arm_state.arm_orientation)
+                massage_wrench = np.array([0, 0, 0, 0, 0, 0])
+                self.arm_state.desired_wrench = massage_wrench
+                return 5
+            if self.skip_envent.is_set():
+                self.logger.log_error("self.skip_envent.clear()")
+                self.arm_state.desired_position = copy.deepcopy(self.arm_state.arm_position)
+                self.arm_state.desired_orientation = copy.deepcopy(self.arm_state.arm_orientation)
+                massage_wrench = np.array([0, 0, 0, 0, 0, 0])
+                self.arm_state.desired_wrench = massage_wrench 
+                return 6
+            if self.arm.is_exit or self.exit_event.is_set():
+                print("robot_hardware_error:",self.arm.is_exit,self.exit_event.is_set())
+                return 3
+            if self.adjust_wrench_envent.is_set() and self.is_execute == True:
+                if self.adjust_wrench is not None:
+                    self.arm_state.desired_wrench = self.adjust_wrench
+                self.adjust_wrench_envent.clear()
+
+            self.arm_state.desired_position = pos_traj[i]
+            self.arm_state.desired_orientation = quat_traj[i]
+
+            if self.ion is not None:
+
+                act_position = R.from_quat(self.arm_state.arm_orientation).as_matrix().T @ self.arm_state.arm_position
+                desired_position = R.from_quat(self.arm_state.arm_orientation).as_matrix().T @ self.arm_state.desired_position
+                if self.arm_state.positionerSensorData < 0.001:
+                    desired_position[2] = desired_position[2] - self.arm_state.desired_high
+                else:
+                    desired_position[2] = min(act_position[2] + self.arm_state.positionerSensorData - self.arm_state.desired_high , desired_position[2] - self.arm_state.desired_high)
+                self.arm_state.desired_position = R.from_quat(self.arm_state.arm_orientation).as_matrix() @ desired_position
+            self.command_rate.precise_sleep()
+
+        start = time.time()
+
+        while wait:
+            if self.interrupt_event.is_set():
+                self.interrupt_event.clear()
+                return 2
+            if self.pause_envent.is_set():
+                self.pause_envent.clear()
+                return 5
+            if self.skip_envent.is_set():
+                # self.skip_envent.clear()
+                return 6
+            if self.arm.is_exit or self.exit_event.is_set():
+                print("robot_hardware_error:",self.arm.is_exit,self.exit_event.is_set())
+                return 3
+            if time.time() - start > timeout:
+                return 1
+            self.command_rate.precise_sleep()
+
+        return 0
+
+    @track_function("apply_force",True)
+    def apply_force(self, t, wrench, interpolation:Literal["linear","oscillation"] = "linear", algorithm:Literal["admittance","position","hybrid","admithybrid","hybridPid","hybridAdmit","admittanceZ"] = "admittance"):
+        """
+        施加力
+        param:
+            t: 时间或时间序列
+            wrench(nx6): 力矩或力矩序列,当输入为None时,表示力为[0,0,0,0,0,0]
+            is_interrupt: 是否允许中断
+            wait: 是否等待运动完成
+            timeout: 超时时间
+            interpolation: 插值方式
+            algorithm: 控制算法
+        return:
+            code: 0表示成功
+                1表示超时
+                2表示用户打断
+                3表示硬件错误
+                4表示参数错误
+        """
+
+        if t==0:
+            self.arm_state.desired_wrench = wrench
+            return 0
+
+        # 在函数内部直接规划力矩轨迹
+        traj = self.wrench_traj_generate(t, wrench, interpolation=interpolation)
+        # # 在函数内部直接规划力矩轨迹
+        # traj = self.traj_generate(t, None, wrench, interpolation=interpolation)
+        if traj is None:
+            return 4
+
+        time_seq = traj['t']
+        wrench_traj = traj['wrench']
+
+        # try:
+        #     self.controller_manager.switch_controller(algorithm)
+        # except ValueError as error:
+        #     self.logger.log_error(error)
+        #     return 4
+
+        self.logger.log_info("切换到%s控制器" % algorithm)
+
+        for i in range(len(time_seq)):
+            if self.interrupt_event.is_set():
+                self.interrupt_event.clear()
+                return 2
+            if self.pause_envent.is_set():
+                self.pause_envent.clear()
+                return 5
+            if self.skip_envent.is_set():
+                self.skip_envent.clear()
+                return 6
+            if self.arm.is_exit or self.exit_event.is_set():
+                return 3
+            if self.adjust_wrench_envent.is_set():
+                # if wrench_traj is not None:
+                #     wrench_traj[i:] = self.adjust_wrench
+                self.adjust_wrench_envent.clear()
+            # print(wrench_traj[i])
+            self.arm_state.desired_wrench = wrench_traj[i] if wrench_traj is not None else np.zeros(6, dtype=np.float64)
+            self.command_rate.precise_sleep()
+
+        return 0
+    
+    @track_function("resample_curve_strict",True)
+    def resample_curve_strict(self, points, num_resampled_points):
+        """
+        修正的曲线重新采样函数,移除重复点并确保累积弧长严格递增。
+        """
+        # 计算累积弧长
+        distances = np.sqrt(np.sum(np.diff(points, axis=0)**2, axis=1))  # 计算相邻点之间的距离
+        cumulative_length = np.insert(np.cumsum(distances), 0, 0)  # 计算累积弧长
+
+        # 确保累积弧长严格递增(去除重复值)
+        unique_indices = np.where(np.diff(cumulative_length, prepend=-np.inf) > 0)[0]
+        cumulative_length = cumulative_length[unique_indices]
+        points = points[unique_indices]
+
+        # 生成均匀间隔的新弧长
+        target_lengths = np.linspace(0, cumulative_length[-1], num_resampled_points)
+
+        # 在累积弧长上进行插值计算新点位置
+        x_interp = interp1d(cumulative_length, points[:, 0], kind='linear', fill_value="extrapolate")  
+        y_interp = interp1d(cumulative_length, points[:, 1], kind='linear', fill_value="extrapolate")
+
+        new_x = x_interp(target_lengths)
+        new_y = y_interp(target_lengths)
+
+        return np.column_stack((np.round(new_x).astype(int), np.round(new_y).astype(int)))
+    
+    @track_function("generate_in_spiral_cloudPoint",True)
+    def generate_in_spiral_cloudPoint(self, start, end, num_cycle_factor=None, width=None, direction:Literal["CW","CCW"] ='CW'):
+        """
+        绘制由内向外的螺旋线。
+        
+        参数:
+        - start: 起点坐标 (x, y)
+        - end: 终点坐标 (x, y)
+        - num_cycle_factor: 螺旋的圈数
+        - points: 螺旋线上的点的数量
+        """
+        # 计算起点和终点之间的距离
+        dx = end[0] - start[0]
+        dy = end[1] - start[1]
+        distance = np.sqrt(dx**2 + dy**2)
+        if width is None:
+            width = self.width_default
+        if num_cycle_factor is None:
+            num_cycle_factor = 5
+
+        num_cycle_factor = np.round(num_cycle_factor).astype(int)
+        if distance < 2:
+            print("距离过短,无法绘制螺旋线")
+            return 0
+        if width <= 2:
+            width = 2
+        if num_cycle_factor < 1:
+            print("圈数必须为大于1的整数")
+            num_cycle_factor = 1
+        angle = np.arctan2(dy, dx)  # 计算从起点到终点的角度
+
+        points = np.round(num_cycle_factor *2*np.pi/(np.pi/9)).astype(int)
+        
+        # 螺旋的角度范围
+        theta = np.linspace(0, 2 * np.pi * num_cycle_factor, points)
+        
+        # 螺旋的半径随着角度增大而线性增长
+        r = np.linspace(0, distance, points)
+
+        tempK = np.abs(width/(distance*((2*num_cycle_factor-0.75)/num_cycle_factor)))
+        
+        # 计算螺旋的x, y坐标
+        if direction == "CCW":
+            # 反向旋转时,使用负的角度来反向螺旋
+            x = r * np.cos(-theta)  # 逆时针旋转
+            y = r * np.sin(-theta)*tempK
+        else:
+            # 正常旋转
+            x = r * np.cos(theta)
+            y = r * np.sin(theta)*tempK
+
+        # 旋转螺旋线以对齐起点和终点的角度
+        x_rotated = x * np.cos(angle) - y * np.sin(angle)
+        y_rotated = x * np.sin(angle) + y * np.cos(angle)
+
+        # 平移螺旋线,使其从起点开始
+        x_final = x_rotated + start[0] - x_rotated[0]
+        y_final = y_rotated + start[1] - y_rotated[0]
+
+        # return np.hstack((np.round(x_final).astype(int).reshape(-1, 1), np.round(y_final).astype(int).reshape(-1, 1)))
+        result_points = np.hstack((np.round(x_final).astype(int).reshape(-1, 1), 
+                                np.round(y_final).astype(int).reshape(-1, 1)))
+
+        return self.resample_curve_strict(points=result_points, num_resampled_points=len(result_points))
+
+    @track_function("generate_out_spiral_cloudPoint",True)
+    def generate_out_spiral_cloudPoint(self,start, end, num_cycle_factor=None, width=None, direction:Literal["CW","CCW"] ='CW'):
+        """
+        绘制由外向内的螺旋线。
+        
+        参数:
+        - start: 起点坐标 (x, y)
+        - end: 终点坐标 (x, y)
+        - num_cycle_factor: 螺旋的圈数
+        - points: 螺旋线上的点的数量
+        """
+        # 计算起点和终点之间的距离
+        dx = end[0] - start[0]
+        dy = end[1] - start[1]
+        angle = np.arctan2(dy, dx)  # 计算从起点到终点的角度
+        distance = np.sqrt(dx**2 + dy**2)
+        if width is None:
+            width = self.width_default
+        if num_cycle_factor is None:    
+            num_cycle_factor = 5
+
+        num_cycle_factor = np.round(num_cycle_factor).astype(int)
+        if distance < 2:
+            print("距离过短,无法绘制螺旋线")
+            return 0
+        if width <= 2:
+            width = 2
+        if num_cycle_factor < 1:
+            print("圈数必须为大于1的整数")
+            num_cycle_factor = 1
+
+        points = np.round(num_cycle_factor *2*np.pi/(np.pi/9)).astype(int)
+        
+        # 螺旋的角度范围
+        theta = np.linspace(0, 2 * np.pi * num_cycle_factor, points)
+        
+        # 螺旋的半径随着角度增大而线性增长
+        r = np.linspace(distance, 0, points)
+
+        tempK = np.abs(width/(distance*((2*num_cycle_factor-0.75)/num_cycle_factor)))
+
+        # 计算螺旋的x, y坐标
+        if direction == "CCW":
+            # 反向旋转时,使用负的角度来反向螺旋
+            x = r * np.cos(-theta)  # 逆时针旋转
+            y = r * np.sin(-theta)*tempK
+        else:
+            # 正常旋转
+            x = r * np.cos(theta)
+            y = r * np.sin(theta)*tempK
+
+        # 旋转螺旋线以对齐起点和终点的角度
+        x = x * np.cos(np.pi) - y * np.sin(np.pi)
+        y = x * np.sin(np.pi) + y * np.cos(np.pi)
+
+        # 旋转螺旋线以对齐起点和终点的角度
+        x_rotated = x * np.cos(angle) - y * np.sin(angle)
+        y_rotated = x * np.sin(angle) + y * np.cos(angle)
+
+        # 平移螺旋线,使其从起点开始
+        x_final = x_rotated + start[0] - x_rotated[0]
+        y_final = y_rotated + start[1] - y_rotated[0]
+
+        # return np.hstack((np.round(x_final).astype(int).reshape(-1, 1), np.round(y_final).astype(int).reshape(-1, 1)))
+        result_points = np.hstack((np.round(x_final).astype(int).reshape(-1, 1), 
+                                np.round(y_final).astype(int).reshape(-1, 1)))
+
+        return self.resample_curve_strict(points=result_points, num_resampled_points=len(result_points))
+    
+    @track_function("generate_prolateCycloid_cloudPoint",True)
+    def generate_prolateCycloid_cloudPoint(self, start, end, width=None, num_cycle_factor=None, direction: Literal["CW", "CCW"] = 'CW'):
+        # Parameters
+        step_degree = np.pi / 12
+        a_prolate = 4.5
+        radius = 30.0
+        distance = np.linalg.norm(np.array(end) - np.array(start))
+        angle = np.arctan2(end[1] - start[1], end[0] - start[0])
+
+        if width is None:
+            width = self.width_default
+        if num_cycle_factor is None:
+            num_cycle_factor = 5
+        num_cycle_factor = np.round(num_cycle_factor).astype(int)
+        if distance < 2:
+            print("距离过短,无法绘制Cycloid线")
+            return 0
+        if width <= 2:
+            width = 2
+        if num_cycle_factor < 1:
+            print("圈数必须为大于1的整数")
+            num_cycle_factor = 1
+
+        r_cycloid = radius / (2 * a_prolate)
+        num_cycles = int(num_cycle_factor)
+        temp_k = distance / (r_cycloid * (np.pi + 2 * a_prolate + 2 * np.pi * num_cycles))
+
+        # Generate the prolate cycloid
+        theta_prolate = np.linspace(0.5 * np.pi, num_cycles * 2 * np.pi + 1.5 * np.pi,
+                                    round((num_cycles * 2 * np.pi + 1.5 * np.pi) / step_degree))
+        if direction == 'CW':
+            theta_prolate = np.flip(theta_prolate)
+
+        x_prolate = r_cycloid * (theta_prolate - a_prolate * np.sin(theta_prolate)) * temp_k
+        y_prolate = r_cycloid * (1 - a_prolate * np.cos(theta_prolate))
+
+        # Scale y-prolate based on width
+        y_prolate = (y_prolate / 30) * width
+
+        # Rotation of cycloid
+        if direction == 'CW':
+            x_prolate, y_prolate = -x_prolate, -y_prolate
+
+        x_rotated = x_prolate * np.cos(angle) - y_prolate * np.sin(angle)
+        y_rotated = x_prolate * np.sin(angle) + y_prolate * np.cos(angle)
+
+        # Translate to starting point
+        x_final = x_rotated + start[0] - x_rotated[0]
+        y_final = y_rotated + start[1] - y_rotated[0]
+
+        return np.column_stack((np.round(x_final).astype(int), np.round(y_final).astype(int)))
+    
+    @track_function("generate_ellipse_cloudPoint",True)
+    def generate_ellipse_cloudPoint(self,start, end, num_cycle_factor=None, width=None, direction:Literal["CW","CCW"] ='CW'):
+        # 计算椭圆的中心(两个端点的中点)
+        center = (start + end) / 2
+        if num_cycle_factor is None:
+            num_cycle_factor = 1
+
+        num_cycle_factor = np.round(num_cycle_factor).astype(int)
+
+        distance = np.sqrt((end[0] - start[0])**2 + (end[1] - start[1])**2)
+
+        if width is None:
+            width = distance/2 
+
+
+        if distance < 2:
+            print("距离过短,无法绘制椭圆")
+            return 0
+        if width <= 2:
+            print("椭圆半径过小,无法绘制椭圆")
+            width = self.width_default
+        if num_cycle_factor < 1:
+            print("圈数必须为大于1的整数")
+            num_cycle_factor = 1
+
+        # 计算两个端点的距离,即长半轴长度
+        a = np.sqrt((end[0] - start[0])**2 + (end[1] - start[1])**2) / 2
+
+        # 假设短半轴长度为常量值radius
+        b = width / 2
+
+        # 计算旋转角度(由端点的方向确定)
+        angle = np.arctan2(end[1] - start[1], end[0] - start[0])
+        points = np.round(((num_cycle_factor + 1) * np.pi * 2)/(np.pi/9)).astype(int)
+
+        # 绘制椭圆的参数化公式
+        theta = np.linspace(np.pi,  (2+num_cycle_factor*2)* np.pi, points)
+
+        # 计算螺旋的x, y坐标
+        if direction == "CCW":
+            # 反向旋转时,使用负的角度来反向螺旋
+            x_ellipse = a * np.cos(-theta) #顺时针
+            y_ellipse = b * np.sin(-theta)
+        else:
+            # 正常旋转
+            x_ellipse = a * np.cos(theta)
+            y_ellipse = b * np.sin(theta)
+
+        # 绕中心旋转椭圆
+        x_rot = center[0] + x_ellipse * np.cos(angle) - y_ellipse * np.sin(angle)
+        y_rot = center[1] + x_ellipse * np.sin(angle) + y_ellipse * np.cos(angle)
+
+        # 返回椭圆点云,保留整数值
+        return np.hstack((np.round(x_rot).astype(int).reshape(-1, 1), np.round(y_rot).astype(int).reshape(-1, 1)))
+
+    @track_function("generate_Lemniscate_cloudPoint",True)
+    def generate_Lemniscate_cloudPoint(self, start, end, width = None,num_cycle_factor = None, direction:Literal["CW","CCW"] ='CW'):
+        """
+        生成从start到end的8字形插补路径
+        
+        :param start: 起点坐标 (x0, y0)
+        :param end: 终点坐标 (x1, y1)
+        :param num_points: 插补点的数量
+        :return: 插补路径的坐标列表
+        """
+        # 起点和终点的坐标
+        x1, y1 = start
+        x0, y0 = end
+        step_degree=np.pi/12
+
+        # 计算轨迹的缩放因子和旋转角度
+        distance = np.linalg.norm([x1 - x0, y1 - y0])
+
+        if width is None:
+            width = self.width_default
+        if num_cycle_factor is None:
+            num_cycle_factor = 1
+        if distance < 2:
+            print("距离过短,无法绘制螺旋线")
+            return 0
+        if width <= 2:
+            width = 2
+        if num_cycle_factor < 1:
+            print("圈数必须为大于1的整数")
+            num_cycle_factor = 1
+
+        a = distance  / 2  # 轨迹的宽度一半
+        b = width/2
+        angle = np.arctan2(y1 - y0, x1 - x0)
+
+        num_points = np.abs((2*num_cycle_factor+1)*np.pi/step_degree).astype(int)
+        
+        # 生成标准的8字形轨迹 (Gerono卵形线)
+        t_values = np.linspace(0.5*np.pi, (2*num_cycle_factor + 1.5)* np.pi, num_points)
+
+        x_curve = a * np.sin(t_values)
+        if direction == 'CCW':
+            y_curve = -b * np.sin(2 * t_values)
+        else:
+            y_curve = b * np.sin(2 * t_values)
+        
+        # 构建旋转矩阵
+        rotation_matrix = np.array([[np.cos(angle), -np.sin(angle)],
+                                    [np.sin(angle), np.cos(angle)]])
+        
+        # 旋转并平移轨迹
+        curve_points = np.vstack((x_curve, y_curve)).T
+        rotated_points = curve_points @ rotation_matrix.T
+        shifted_points = rotated_points + [(x0 + x1) / 2, (y0 + y1) / 2]
+        
+        # 将插值结果四舍五入为整数
+        return np.round(shifted_points).astype(int)
+    
+    @track_function("generate_circle_cloudPoint",True)
+    def generate_circle_cloudPoint(self, start_point, center, step_degree=np.pi/12, num_cycle_factor = 3):
+        """
+        center: 圆心坐标,形如 (x_c, y_c)
+        start_point: 起始点坐标,形如 (x_0, y_0)
+        radius: 圆的半径
+        delta_theta: 每次插补的角度增量
+        num_turns: 绕圈的次数
+        """
+        # 确定总共需要生成的插补点数
+        num_points = int((2 * np.pi * num_cycle_factor) / step_degree)
+        
+        # 圆心
+        x_c, y_c = center
+
+        radius =  np.linalg.norm(np.array(start_point) - np.array(center))  # 半径
+        
+        # 计算起始点的初始角度
+        x_0, y_0 = start_point
+        theta_0 = np.arctan2(y_0 - y_c, x_0 - x_c)
+        
+        # 初始化存储插补点的列表
+        circle_points = []
+        
+        # 生成插补点
+        for i in range(num_points):
+            # 当前角度
+            theta_i = theta_0 + i * step_degree
+            
+            # 计算插补点的坐标
+            x_i = x_c + radius * np.cos(theta_i)
+            y_i = y_c + radius * np.sin(theta_i)
+            
+            # 将点添加到列表中
+
+            circle_points.append((np.round(x_i).astype(int), np.round(y_i).astype(int)))
+
+        circle_points.append((np.round(x_0).astype(int), np.round(y_0).astype(int)))
+        
+        return circle_points
+
+    @track_function("generate_point_cloudPoint",True)
+    def generate_point_cloudPoint(self, point, num_cycle_factor=None, width=None, direction: Literal["CW", "CCW"] = 'CW'):
+        if num_cycle_factor is None:
+            num_cycle_factor = 12
+        if num_cycle_factor < 1:
+            print("圈数必须为大于1的整数")
+            num_cycle_factor = 1
+        num_cycle_factor = np.abs(np.round(num_cycle_factor).astype(int))
+        if width is None:
+            width = 20
+        if width < 2:
+            print("宽度必须为大于2的整数")
+            width = 2
+        width = np.abs(np.round(width).astype(int))
+
+        radius = width/2
+
+        points = np.round((num_cycle_factor * np.pi * 2)/(np.pi/9)).astype(int)
+        thetas = np.linspace(0,  num_cycle_factor  * 2 * np.pi, points)
+        radiuss = np.ones_like(thetas) * radius
+
+        for i in range(len(thetas)):
+            if thetas[i] <= np.pi/2:
+                radiuss[i] = np.sin(thetas[i]) * radius
+            if thetas[i] >= num_cycle_factor * np.pi * 2 - np.pi / 2:
+                radiuss[i] = np.sin(num_cycle_factor * np.pi * 2 - thetas[i]) * radius
+
+                
+
+        if direction == "CCW":
+            x_ellipse = radiuss * np.cos(-thetas)
+            y_ellipse = radiuss * np.sin(-thetas)
+        else:
+            x_ellipse = radiuss * np.cos(thetas)
+            y_ellipse = radiuss * np.sin(thetas)
+        x_rot = point[0] + x_ellipse 
+        y_rot = point[1] + y_ellipse
+        result_points = np.hstack((np.round(x_rot).astype(int).reshape(-1, 1), np.round(y_rot).astype(int).reshape(-1, 1)))
+        return self.resample_curve_strict(points=result_points, num_resampled_points=len(result_points))
+    
+    @track_function("generate_line_cloudPoint",True)
+    def generate_line_cloudPoint(self,start, end, step_distance = 5):
+        """
+        生成起点和终点之间的线性插值点
+
+        参数:
+        start: 起点 (标量或数组,例如 [x1, y1, z1])
+        end: 终点 (标量或数组,例如 [x2, y2, z2])
+        num_points: 插值点的数量(包括起点和终点)
+
+        返回:
+        包含线性插值点的数组
+        """
+        start = np.array(start)
+        end = np.array(end)
+        distance = np.linalg.norm(end - start)
+        num_points = int(distance / step_distance)
+
+        if num_points <=2:
+            num_points = 2
+        # 使用 numpy 的 linspace 在起点和终点之间生成线性插值点
+        interpolated_points = np.linspace(start, end, num_points)
+        
+        # 将插值结果四舍五入为整数
+        return np.round(interpolated_points).astype(int)
+
+    @track_function("generate_repeat_line_cloudPoint",True)
+    def generate_repeat_line_cloudPoint(self, start_point, end_point, num_cycle_factor = 5):
+        """
+        start_point: 起始点坐标,形如 (x_0, y_0)
+        end_point: 终点坐标,形如 (x_0, y_0)
+        num_cycle_factor: 重复的次数
+        """
+        
+        # 初始化存储插补点的列表
+        result_points = []
+        # 生成插补点
+        for i in range(num_cycle_factor):
+            # 将点添加到列表中
+            result_points.append(start_point)
+            result_points.append(end_point)
+        return np.round(np.array(result_points)).astype(int)
+
+    @track_function("generate_finger_circle_cloudPoint",True)
+    def generate_finger_circle_cloudPoint(self, center_point, num_cycle_factor=5, radius=30,deviation=15):
+        """
+        生成一指禅绕球心的圆弧轨迹
+        center_point: 圆心坐标 [x, y, z, roll, pitch, yaw]
+        num_cycle_factor: 轨迹的重复次数
+        radius: 圆的半径
+        """
+        result_points = []
+        result_points.append(copy.deepcopy(center_point))
+        deviation = deviation * np.pi/180
+
+        totalCounts = int(360 / radius)
+        shift = 1
+
+
+        for i in range(num_cycle_factor):
+            for countR in range(0, totalCounts):
+                # 当前旋转矩阵
+                temp_pose = copy.deepcopy(center_point)
+                rotation = R.from_euler('xyz', temp_pose[3:], degrees=False)
+                current_rotation_matrix = rotation.as_matrix()
+                
+
+                if i == 0:
+                    shift = countR/(90/radius) if countR/(90/radius) < 1 else 1
+                if i == num_cycle_factor-1:
+                    shift = (totalCounts-countR)/(90/radius) if (totalCounts-countR)/(90/radius) < 1 else 1
+
+
+                # 生成增量旋转矩阵
+                increment_rotation= R.from_euler('xyz', [deviation*shift,0,countR * np.pi*2 /totalCounts], degrees=False).as_matrix()
+
+                # 更新旋转矩阵
+                updated_rotation_matrix = current_rotation_matrix @ increment_rotation
+
+                # 将旋转矩阵转换回欧拉角
+                temp_pose[3:] = R.from_matrix(updated_rotation_matrix).as_euler('xyz', degrees=False)
+
+                # 添加当前点到结果列表
+                result_points.append(copy.deepcopy(temp_pose))
+
+        result_points.append(copy.deepcopy(center_point))
+        return result_points
+    
+    def traj_generate(self, t: Union[int, float, List[float]], pose = None, wrench = None, interpolation: Literal["linear", "cubic","circle","cloud_point"] = 'linear',**kwargs):
+        """
+        轨迹生成
+        param:
+            t: 时间或时间序列
+            pose(nx6): 世界坐标系下的位姿或位姿序列,当输入为None时,表示保持当前位姿
+            wrench(nx6): 力矩或力矩序列,当输入为None时,表示保持当前的力
+            interpolation: 插值方式
+        return:
+            traj: 生成的轨迹 -> dict
+        """
+        # 确保时间输入是列表
+        if isinstance(t, (int, float)):
+            t = [t]
+        elif not isinstance(t, list) or any(not isinstance(i, (int, float)) for i in t):
+            self.logger.log_error("not isinstance(t, list) or any(not isinstance(i, (int, float)) for i in t)")
+            return None
+        # 检查时间点数量
+        if len(t) < 1:
+            self.logger.log_error("len(t) < 1")
+            return None
+        # 时间序列
+        time_points = [0.0] + t
+        time_points = np.array(time_points)
+        dt = self.command_rate.to_sec()
+
+        for i in range(len(time_points)):
+            time_points[i] = np.round(time_points[i] / dt) * dt
+        if np.abs(time_points[-1]) < dt:
+            time_points[-1] = dt
+        # 由于浮点数精度问题,time_step要乘0.9
+        time_seq = np.arange(time_points[0], time_points[-1] + dt * 0.9, dt)
+        pos_traj = None
+        quat_traj = None
+        wrench_traj = None
+        pos_vel = None
+        angular_vel = None
+        # 位置轨迹与速度轨迹生成
+        if pose is not None:
+            if interpolation != 'cloud_point':
+                pose = np.array(pose).reshape(-1, 6)
+                if pose.shape[0] != len(t):
+                    self.logger.log_error("pose.shape[0] != len(t)")
+                    print(pose.shape[0],len(t))
+                    return None
+                pose_matrix = self.convert_to_7d(pose)
+            else:
+                pose = np.array(pose)
+            try:
+                # 插值
+                if interpolation == 'linear':
+                    pos_traj, quat_traj = linear_interpolate(positions=pose_matrix[:, :3], quaternions=pose_matrix[:, 3:], time_points=time_points, time_step=dt)
+                elif interpolation == 'cubic':
+                    pos_traj, quat_traj = spline_interpolate(positions=pose_matrix[:, :3], quaternions=pose_matrix[:, 3:], time_points=time_points, time_step=dt)
+                elif interpolation == 'circle':
+                    if self.ion is not None:
+                        pos_traj = circle_trajectory(center=pose, radius=0.07,time_points=time_points,time_step=dt,**kwargs)
+                    else:
+                        pos_traj = circle_trajectory(center=pose,time_points=time_points,time_step=dt,**kwargs)
+                    quat_traj = np.tile(R.from_euler('xyz', np.array(pose[0][3:])).as_quat(), (pos_traj.shape[0], 1))
+                elif interpolation == 'cloud_point':
+                    pos_traj, quat_traj = cloud_point_interpolate(positions=pose[:, :3], quaternions=pose[:, 3:], time_points=time_points, time_step=dt)
+                    # self.logger.log_yellow(f'{pos_traj[0]} {pos_traj[-1]}')
+            except ValueError as error:
+                self.logger.log_error(error)
+                return None
+        else:
+            self.logger.log_error("未输入位置进行规划")
+
+        # TODO 力轨迹生成 改为线性增加 
+        if wrench is not None:
+            wrench = np.array(wrench).reshape(-1, 6)
+            if wrench.shape[0] != len(t):
+                return None
+            indices = np.searchsorted(time_points, time_seq, side='left') - 1
+            indices = np.clip(indices, 0, len(wrench) - 1)
+            wrench_traj = wrench[indices]
+        # 构建返回数据字典
+        traj = {
+            't': time_seq,
+            'pos': pos_traj,
+            'quat': quat_traj,
+            'pos_vel': pos_vel,
+            'angular_vel': angular_vel,
+            'wrench': wrench_traj
+        }
+        return traj
+    
+
+    def wrench_traj_generate(self, t: Union[int, float, List[float]], wrench = None, interpolation: Literal["linear", "oscillation"] = 'linear',**kwargs):
+        """
+        力/力矩轨迹生成
+        param:
+            t: 时间或时间序列
+            wrench(nx6): 力矩或力矩序列
+            interpolation: 插值方式
+        return:
+            traj: 生成的轨迹 -> dict
+        """
+        # 确保时间输入是列表
+        if isinstance(t, (int, float)):
+            t = [t]
+        elif not isinstance(t, list) or any(not isinstance(i, (int, float)) for i in t):
+            self.logger.log_error("not isinstance(t, list) or any(not isinstance(i, (int, float)) for i in t)")
+            return None
+        # 检查时间点数量
+        if len(t) < 1:
+            self.logger.log_error("len(t) < 1")
+            return None
+        
+        if wrench is None:
+            self.logger.log_error("未输入力矩进行规划")
+            return None
+        # 时间序列
+        time_points = [0.0] + t
+        time_points = np.array(time_points)
+        dt = self.command_rate.to_sec()
+
+        for i in range(len(time_points)):
+            time_points[i] = np.round(time_points[i] / dt) * dt
+        if np.abs(time_points[-1]) < dt:
+            time_points[-1] = dt
+        # 由于浮点数精度问题,time_step要乘0.9
+        time_seq = np.arange(time_points[0], time_points[-1] + dt * 0.9, dt)
+
+        wrench_traj = None
+
+
+
+
+        # TODO 力轨迹生成 改为线性增加 
+        if wrench is not None:
+            wrench = np.array(wrench).reshape(-1, 6)
+
+
+            if wrench.shape[0] != len(t):
+                return None
+            if interpolation == 'oscillation':
+                wrench_traj = oscillation_wrench_interpolate(wrench=wrench, time_points=time_points, time_step=dt)
+            else:
+                wrench_traj = linear_wrench_interpolate(wrench=wrench, time_points=time_points, time_step=dt)
+
+                # indices = np.searchsorted(time_points, time_seq, side='left') - 1
+
+
+                # indices = np.clip(indices, 0, len(wrench) - 1)
+
+                # wrench_traj = wrench[indices]
+
+
+        # 构建返回数据字典
+        traj = {
+            't': time_seq,
+            'wrench': wrench_traj
+        }
+        return traj
+    
+    def force_traj_generate(self, t: Union[int, float, List[float]], wrench = None, interpolation: Literal["linear", "oscillation", "auto"] = "auto",**kwargs):
+        """
+        力/力矩轨迹生成
+        param:
+            t: 时间或时间序列
+            wrench(nx6): 力矩或力矩序列
+            interpolation: 插值方式
+        return:
+            traj: 生成的轨迹 -> dict
+        """
+        # 确保时间输入是列表
+        if isinstance(t, (int, float)):
+            t = [t]
+        elif not isinstance(t, list) or any(not isinstance(i, (int, float)) for i in t):
+            self.logger.log_error("not isinstance(t, list) or any(not isinstance(i, (int, float)) for i in t)")
+            return None
+        # 检查时间点数量
+        if len(t) < 1:
+            self.logger.log_error("len(t) < 1")
+            return None
+        
+        if wrench is None:
+            self.logger.log_error("未输入力矩进行规划")
+            return None
+        # 时间序列
+        time_points = [0.0] + t
+        time_points = np.array(time_points)
+        dt = self.command_rate.to_sec()
+
+        for i in range(len(time_points)):
+            time_points[i] = np.round(time_points[i] / dt) * dt
+        if np.abs(time_points[-1]) < dt:
+            time_points[-1] = dt
+        # 由于浮点数精度问题,time_step要乘0.9
+        time_seq = np.arange(time_points[0], time_points[-1] + dt * 0.9, dt)
+
+        wrench_traj = None
+
+
+
+
+        # TODO 力轨迹生成 改为线性增加 
+        if wrench is not None:
+            wrench = np.array(wrench).reshape(-1, 6)
+
+
+            if wrench.shape[0] != len(t):
+                return None
+            if interpolation == 'oscillation':
+                wrench_traj = self.force_plan.oscillation_wrench_interpolate(wrench=wrench, time_points=time_points, time_step=dt)
+            elif interpolation == "auto":
+                # target_wrench = []
+                # target_wrench.append(wrench)
+                wrench_traj = self.force_plan.S_shaped_wrench_interpolate(start=self.arm_state.desired_wrench, wrench=wrench, time_points=time_points, time_step=dt)
+            else:
+                wrench_traj = self.force_plan.linear_wrench_interpolate(wrench=wrench, time_points=time_points, time_step=dt)
+
+                # indices = np.searchsorted(time_points, time_seq, side='left') - 1
+
+
+                # indices = np.clip(indices, 0, len(wrench) - 1)
+
+                # wrench_traj = wrench[indices]
+
+
+        # 构建返回数据字典
+        traj = {
+            't': time_seq,
+            'wrench': wrench_traj
+        }
+        return traj
+    
+
 
 if __name__ == "__main__":
     waypoints = [
diff --git a/Massage/MassageControl/algorithms/arm_state.py b/Massage/MassageControl/algorithms/arm_state.py
index bdcf7e8..4fd4147 100644
--- a/Massage/MassageControl/algorithms/arm_state.py
+++ b/Massage/MassageControl/algorithms/arm_state.py
@@ -14,6 +14,10 @@ class ArmState:
         self.last_arm_orientation = np.array([0.0,0.0,0.0,1.0]) # [qx, qy, qz, qw]
         self.last_external_wrench_base = np.zeros(6,dtype=np.float64)
         self.last_external_wrench_tcp = np.zeros(6,dtype=np.float64)
+
+        # 传感器数据
+        self.positionerSensorData = 0.0
+        self.desired_high = 0.06
         
         # 目标状态
         self.desired_position = np.zeros(3,dtype=np.float64)
diff --git a/Massage/MassageControl/hardware/dobot_nova5.py b/Massage/MassageControl/hardware/dobot_nova5.py
index acca5c1..236ef1f 100644
--- a/Massage/MassageControl/hardware/dobot_nova5.py
+++ b/Massage/MassageControl/hardware/dobot_nova5.py
@@ -5,6 +5,7 @@ except:
 import threading
 from time import sleep,time
 import re
+from tools.log import CustomLogger
 import copy
 import numpy as np
 from scipy.spatial.transform import Rotation as R
@@ -35,8 +36,8 @@ import math
 '''
 
 class dobot_nova5:
-    def __init__(self,ip):
-        self.ip = ip
+    def __init__(self,arm_ip = '192.168.5.1'):
+        self.ip = arm_ip ## 机械臂IP地址
         self.dashboardPort = 29999 ## 一发一收-设置-控制及运动端口号
         self.feedFourPort = 30004 ## 第四实时反馈(8ms)反馈数据端口号
         self.dashboard = None ## 初始化控制及运动端口实例
@@ -60,6 +61,8 @@ class dobot_nova5:
                 self.ToolValue = -1
                 '''
         self.feedbackData = feedbackItem() # 自定义反馈数据结构体
+        self.logger = CustomLogger('Dobot_nova5',True)
+
 
     def parseResultId(self,valueRecv):
         # 解析返回值
@@ -170,12 +173,14 @@ class dobot_nova5:
                     self.dashboard.Stop()
                     self.dashboard.EmergencyStop(mode=1)
                     print("当前工作停止,机械臂处于急停状态")
+                    return -1
                 if self.feedbackData.robotMode == 10: # 10 当前工程暂停
                     print("机械臂为暂停状态")
                     res = self.dashboard.Continue()
                     code = self.parseResultId(res)[0]
                     if code == 0:
                         print("机械臂继续已暂停的运动指令")
+                    return -1
                 if self.feedbackData.robotMode == 5 and self.feedbackData.robotCurrentCommandID == currentCommandID:
                     print("该次运动完成")
                     break
diff --git a/Massage/MassageControl/tools/decorator_tools.py b/Massage/MassageControl/tools/decorator_tools.py
new file mode 100755
index 0000000..da0ac03
--- /dev/null
+++ b/Massage/MassageControl/tools/decorator_tools.py
@@ -0,0 +1,29 @@
+from functools import wraps
+
+def custom_decorator(before=None, after=None):
+    """
+    创建一个自定义装饰器,可以在函数调用之前和之后执行自定义操作。
+    
+    :param before: 在函数调用之前执行的操作,接受同样的参数。
+    :param after: 在函数调用之后执行的操作,接受同样的参数和返回值。
+    :return: 一个装饰器。
+    """
+    def decorator(func):
+        @wraps(func)
+        def wrapper(self, *args, **kwargs):
+            if before:
+                before(self, *args, **kwargs)
+            result = func(self, *args, **kwargs)
+            if after:
+                after(self, result, *args, **kwargs)
+            return result
+        return wrapper
+    return decorator
+
+def apply_decorators(cls):
+    for attr_name, attr_value in cls.__dict__.items():
+        if callable(attr_value) and hasattr(attr_value, '_decorator'):
+            decorated_func = attr_value._decorator(attr_value)
+            setattr(cls, attr_name, decorated_func)
+    return cls
+
diff --git a/Massage/MassageControl/tools/serial_tools.py b/Massage/MassageControl/tools/serial_tools.py
new file mode 100755
index 0000000..f25c484
--- /dev/null
+++ b/Massage/MassageControl/tools/serial_tools.py
@@ -0,0 +1,94 @@
+import serial.tools.list_ports as list_ports
+import subprocess
+
+def find_serial_by_serial_number(serial_number):
+    ports = list_ports.comports()
+    for port in ports:
+        if port.serial_number == serial_number:
+            return port.device
+    return None
+
+def find_serial_by_location(location):
+    ports = list_ports.comports()
+    for port in ports:
+        if port.location == location:
+            return port.device
+    return None
+
+def list_usb_ports_with_details():
+    ports = list_ports.comports()
+    usb_ports = {}
+
+    for port in ports:
+        if "ttyUSB" in port.device:
+            usb_ports[port.device] = {
+                'serial_number': port.serial_number,
+                'vid': port.vid,
+                'pid': port.pid,
+                'location': port.location,
+                'description': port.description,
+                'manufacturer': port.manufacturer
+            }
+
+    return usb_ports
+
+# 维护 socat 进程的字典,用于跟踪每个虚拟串口的进程
+socat_processes = {}
+
+def start_virtual_serial(remote_ip, remote_port, device_name):
+    """
+    启动一个 socat 进程来创建虚拟串口。
+    :param remote_ip: 远程主机 IP 地址
+    :param remote_port: 远程主机端口
+    :param device_name: 虚拟串口的设备名称
+    """
+    # 构建 socat 命令
+    socat_command = f'socat PTY,raw,echo=0,link={device_name} TCP:{remote_ip}:{remote_port}'
+    # 启动 socat 进程
+    process = subprocess.Popen(socat_command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+    # 将进程存储到字典中
+    socat_processes[device_name] = process
+    print(f"Started socat process for {device_name} on {remote_ip}:{remote_port}")
+
+def stop_virtual_serial(device_name):
+    """
+    停止一个 socat 进程。
+    :param device_name: 虚拟串口的设备名称
+    """
+    # 检查进程是否在字典中
+    if device_name in socat_processes:
+        # 获取进程对象
+        process = socat_processes[device_name]
+        # 使用 pkill 终止相应的 socat 进程
+        subprocess.call(['pkill', '-f', f'{device_name}'])
+        # 终止进程
+        process.terminate()
+        process.wait()
+        # 移除已终止的进程
+        del socat_processes[device_name]
+        print(f"Stopped socat process for {device_name}")
+    else:
+        print(f"No running socat process found for {device_name}")
+
+if __name__ == "__main__":
+    usb_ports_with_details = list_usb_ports_with_details()
+
+    if usb_ports_with_details:
+        print("USB Ports and their Details:")
+        for device, details in usb_ports_with_details.items():
+            print(f"Device: {device}, Serial Number: {details['serial_number']}, VID: {details['vid']}, PID: {details['pid']}, Location: {details['location']}, Description: {details['description']}, Manufacturer: {details['manufacturer']}")
+    else:
+        print("No USB ports found.")
+
+    # serial_number = "1234567890"  # Replace with your serial number
+    # port = find_serial_by_serial_number(serial_number)
+    # if port:
+    #     print(f"Serial number {serial_number} found on port {port}")
+    # else:
+    #     print(f"Serial number {serial_number} not found")
+
+    port = find_serial_by_location("1-8")
+    if port:
+        print(f"Port found: {port}")
+    else:
+        print("Port not found")
\ No newline at end of file