diff --git a/Massage/MassageControl/MassageRobot_nova5.py b/Massage/MassageControl/MassageRobot_nova5.py index 994e136..bf7a7d7 100644 --- a/Massage/MassageControl/MassageRobot_nova5.py +++ b/Massage/MassageControl/MassageRobot_nova5.py @@ -17,6 +17,7 @@ from scipy.spatial.transform import Rotation as R import ctypes libc = ctypes.CDLL("libc.so.6") import atexit +from tools.Trajectory import TrajectoryInterpolator class MassageRobot: def __init__(self,arm_config_path,is_log=False): self.logger = CustomLogger( @@ -112,7 +113,12 @@ class MassageRobot: # 机械臂初始化,适配中间层 self.arm.init() - + # REF TRAJ + self.xr = [] + self.vr = [] + self.ar = [] + self.ts = [] # time sequence + self.dt = [] # derivate of time # --- self.last_time = -1 self.cur_time = -1 @@ -201,14 +207,83 @@ class MassageRobot: self.exit_event.set() # 控制退出while self.sensor_rate.sleep() # 控制频率 + def arm_command_loop_traj(self): + self.logger.log_info("机械臂控制线程启动") + # 离线轨迹启动时间记录 + self.traj_start_time = time.time() + while (not self.arm.is_exit) and (not self.exit_event.is_set()): + try: + if not self.is_waitting: + process_start_time = time.time() + control_cycle = self.control_rate.to_sec() + t_now = time.time() - self.traj_start_time + i = int(t_now/self.dt) + if i >= len(self.ts) - 1: + i = len(self.ts) - 2 + alpha = (t_now - self.ts[i])/self.dt + + pos = (1 - alpha) * self.xr[i][:3] + alpha * self.xr[i+1][:3] + r1 = R.from_quat(self.xr[i][3:]) + r2 = R.from_quat(self.xr[i+1][3:]) + slerp_rot = R.slerp([0, 1], R.concatenate([r1, r2])) + rot_interp = slerp_rot(alpha).as_quat() + + xr = np.concatenate([pos, rot_interp]) + # xr = (1-alpha) * self.xr[i] + alpha * self.xr[i+1] + vr = (1 - alpha) * self.vr[i] + alpha * self.vr[i + 1] + ar = (1 - alpha) * self.ar[i] + alpha * self.ar[i + 1] + + self.arm_state.desired_position = xr[:3] + self.arm_state.desired_orientation = xr[3:] + self.arm_state.desired_twist = vr + self.arm_state.desired_acc = ar + + self.controller_manager.step_traj(control_cycle) + # self.controller_manager.step(control_cycle) + command_pose = np.concatenate([ + self.arm_state.arm_position_command * 1000, # 转毫米 + self.arm_state.arm_orientation_command + ]) + ## 输入滤波 + status, pose_processed = self.arm.process_command( + command_pose[:3], + command_pose[3:] + ) + # print(f"pose_processed:{pose_processed}") + + tcp_command = ( + f"ServoP({pose_processed[0]:.3f},{pose_processed[1]:.3f}," + f"{pose_processed[2]:.3f},{pose_processed[3]:.3f}," + f"{pose_processed[4]:.3f},{pose_processed[5]:.3f}," + "t=0.02, aheadtime=20, gain=200)" + ).encode() + + run_time = time.time() - process_start_time + sleep_duration = self.control_rate.to_sec() - run_time + b2 =time.time() + if sleep_duration > 0: + time.sleep(sleep_duration) + print(f"real sleep:{time.time()-b2}") + self.arm.dashboard.socket_dobot.sendall(tcp_command) + if self.arm.feedbackData.robotMode == 10: # 10 当前工程暂停 + print("机械臂为暂停状态") + res = self.arm.dashboard.Continue() + code = self.arm.parseResultId(res)[0] + if code == 0: + print("机械臂继续已暂停的运动指令") + + except Exception as e: + self.logger.log_error(f"机械臂控制失败:{e}") + self.exit_event.set() + # print(f"循环周期时间: {time.time()-process_start_time:.4f} 秒",run_time,sleep_duration) + def arm_command_loop(self): self.logger.log_info("机械臂控制线程启动") while (not self.arm.is_exit) and (not self.exit_event.is_set()): try: if not self.is_waitting: process_start_time = time.time() - - control_cycle = self.control_rate.to_sec() + control_cycle = self.control_rate.to_sec() self.controller_manager.step(control_cycle) command_pose = np.concatenate([ self.arm_state.arm_position_command * 1000, # 转毫米 @@ -281,13 +356,13 @@ class MassageRobot: self.arm_state.arm_orientation_command = quat_rot ## POSITION CONTROLLER TEST ## - delta_pos = np.array([0.1,0.1,-0.1],dtype=np.float64) - # delta_quat = R.from_euler('xyz', [0, 0, 0], degrees=True).as_quat() - self.arm_state.desired_position = position + delta_pos - self.arm_state.desired_orientation = quat_rot - + # delta_pos = np.array([0.1,0.1,-0.1],dtype=np.float64) + # # delta_quat = R.from_euler('xyz', [0, 0, 0], degrees=True).as_quat() + # self.arm_state.desired_position = position + delta_pos + # self.arm_state.desired_orientation = quat_rot # 线程启动 self.arm_control_thread.start() ## 赋初始值后控制线程 + self.logger.log_info("MassageRobot启动") time.sleep(0.5) @@ -361,7 +436,34 @@ class MassageRobot: return np.concatenate([force, torque]) if __name__ == "__main__": + waypoints = [ + {"time": 0.0, "position": [0, 0, 0], "velocity": [0, 0, 0], + "acceleration": [0, 0, 0], + "orientation": R.from_euler("xyz", [0, 0, 0], degrees=True).as_quat()}, + {"time": 2.0, "position": [1, 2, 0], "velocity": [0, 0, 0], + "acceleration": [0, 0, 0], + "orientation": R.from_euler("xyz", [0, 45, 90], degrees=True).as_quat()}, + ] + myInterpolate = TrajectoryInterpolator(waypoints) + ts = np.linspace(0,2,100) robot = MassageRobot(arm_config_path="/home/jsfb/jsfb/MassageRobot_Dobot/Massage/MassageControl/config/robot_config.yaml") + xr_list, vr_list, ar_list = [], [], [] + for t in ts: + xr, vr, ar = myInterpolate.interpolate() + xr_list.append(xr) + vr_list.append(vr) + ar_list.append(ar) + xr_array = np.array(xr_list) + vr_array = np.array(vr_list) + ar_array = np.array(ar_list) + + robot.xr = xr_array + robot.vr = vr_array + robot.ar = ar_array + robot.ts = ts + robot.dt = ts[1] - ts[0] + + ready_pose = [250.0, -135.0, 344.3392, -180.0, 0.0, -90.0] robot.current_head = 'finger_head' diff --git a/Massage/MassageControl/algorithms/admittance_controller.py b/Massage/MassageControl/algorithms/admittance_controller.py index 9443c75..7c5f3a7 100644 --- a/Massage/MassageControl/algorithms/admittance_controller.py +++ b/Massage/MassageControl/algorithms/admittance_controller.py @@ -8,6 +8,7 @@ sys.path.append(str(Path(__file__).resolve().parent.parent)) import time from tools.yaml_operator import read_yaml import random + class AdmittanceController(BaseController): def __init__(self, name, state:ArmState,config_path) -> None: super().__init__(name, state) @@ -76,48 +77,42 @@ class AdmittanceController(BaseController): # 更新位置 self.state.arm_position_command = self.state.arm_position + delta_pose[:3] - - - # # 计算误差 位置直接作差,姿态误差以旋转向量表示 - # temp_pose_error = self.state.arm_position - self.state.desired_position - # if self.state.desired_orientation.dot(self.state.arm_orientation) < 0: - # self.state.arm_orientation = -self.state.arm_orientation - # self.state.pose_error[:3] = R.from_quat(self.state.arm_orientation).as_matrix().T @ temp_pose_error - # rot_err_mat = R.from_quat(self.state.arm_orientation).as_matrix().T @ R.from_quat(self.state.desired_orientation).as_matrix() - # rot_err_rotvex = R.from_matrix(rot_err_mat).as_rotvec(degrees=False) - # self.state.pose_error[3:] = -rot_err_rotvex - # wrench_err = self.state.external_wrench_tcp - self.state.desired_wrench - # self.state.arm_desired_acc = self.M_inv @ (wrench_err - self.D @ (self.state.arm_desired_twist -self.state.desired_twist) - self.K @ self.state.pose_error) - # self.clip_command(self.state.arm_desired_acc,"acc") - # self.state.arm_desired_twist = self.state.arm_desired_acc * dt + self.state.arm_desired_twist - # self.clip_command(self.state.arm_desired_twist,"vel") - # delta_pose = self.state.arm_desired_twist * dt - # delta_pose[:3] = R.from_quat(self.state.arm_orientation).as_matrix() @ delta_pose[:3] - # self.clip_command(delta_pose,"pose") - # # testlsy - # delta_ori_mat = R.from_rotvec(delta_pose[3:]).as_matrix() - # ## 关闭姿态转动 - # # random_array = np.random.rand(3) # 生成长度为 3 的数组 - # # delta_ori_mat = R.from_rotvec(random_array/100000).as_matrix() - # arm_ori_mat = delta_ori_mat @ R.from_quat(self.state.arm_orientation).as_matrix() - # arm_ori_mat = R.from_quat(self.state.arm_orientation).as_matrix() @ delta_ori_mat - # self.state.arm_orientation_command = R.from_matrix(arm_ori_mat).as_quat() - # # 归一化四元数 - # self.state.arm_orientation_command /= np.linalg.norm(self.state.arm_orientation_command) - # self.state.arm_position_command = self.state.arm_position + delta_pose[:3] - # # if time.time() - self.laset_print_time > 2: - # # print("-------------admittance_1-------------------------------") - # # print("arm_position:",self.state.arm_position) - # # print("desired_position:",self.state.desired_position) - # # print(f'wrench_err: {wrench_err} ||| external_wrench_tcp: {self.state.external_wrench_tcp} ||| desired_wrench: {self.state.desired_wrench}') - # # print("self.state.arm_desired_acc:",self.state.arm_desired_acc) - # # print("arm_orientation",R.from_quat(self.state.arm_orientation).as_euler('xyz',degrees=True)) - # # print("desired_orientation",R.from_quat(self.state.desired_orientation).as_euler('xyz',degrees=True)) - # # print("arm_position_command",self.state.arm_position_command) - # # print("arm_orientation_command",R.from_quat(self.state.arm_orientation_command).as_euler('xyz',degrees=True)) - # # print("delta_pose:",delta_pose) - # # print("delta_ori_mat",delta_ori_mat) - # # self.laset_print_time = time.time() + def step_traj(self,dt): + # 方向统一 + if self.state.desired_orientation.dot(self.state.arm_orientation) < 0: + self.state.arm_orientation = -self.state.arm_orientation + # 缓存常用计算 + arm_ori_quat = R.from_quat(self.state.arm_orientation) + arm_ori_mat = arm_ori_quat.as_matrix() + # 位置误差 + temp_pose_error = self.state.arm_position - self.state.desired_position + self.state.desired_twist[:3] * dt + self.state.pose_error[:3] = arm_ori_mat.T @ temp_pose_error + # 姿态误差(四元数) + rot_err_quat = (R.from_quat(self.state.desired_twist[3:] * dt)).inv() * arm_ori_quat.inv() * R.from_quat(self.state.desired_orientation) + self.state.pose_error[3:] = -rot_err_quat.as_rotvec(degrees=False) + # 期望加速度 + wrench_err = self.state.external_wrench_tcp - self.state.desired_wrench + D_vel = self.D @ (self.state.arm_desired_twist - self.state.desired_twist + self.state.desired_acc*dt) + K_pose = self.K @ self.state.pose_error + self.state.arm_desired_acc = self.M_inv @ (wrench_err - D_vel - K_pose) + self.state.desired_acc + self.clip_command(self.state.arm_desired_acc, "acc") + ## 更新速度和位姿 + self.state.arm_desired_twist += self.state.arm_desired_acc * dt + self.clip_command(self.state.arm_desired_twist, "vel") + # 计算位姿变化 + delta_pose = np.concatenate([ + arm_ori_mat @ (self.state.arm_desired_twist[:3] * dt), + self.state.arm_desired_twist[3:] * dt + ]) + self.clip_command(delta_pose, "pose") + # 更新四元数 + delta_ori_quat = R.from_rotvec(delta_pose[3:]).as_quat() + arm_ori_quat_new = arm_ori_quat * R.from_quat(delta_ori_quat) + self.state.arm_orientation_command = arm_ori_quat_new.as_quat() + # 归一化四元数 + self.state.arm_orientation_command /= np.linalg.norm(self.state.arm_orientation_command) + # 更新位置 + self.state.arm_position_command = self.state.arm_position + delta_pose[:3] diff --git a/Massage/MassageControl/algorithms/arm_state.py b/Massage/MassageControl/algorithms/arm_state.py index 3d28154..bdcf7e8 100644 --- a/Massage/MassageControl/algorithms/arm_state.py +++ b/Massage/MassageControl/algorithms/arm_state.py @@ -20,6 +20,7 @@ class ArmState: self.desired_orientation = np.array([0.0,0,0,1]) # [qx, qy, qz, qw] self.desired_wrench = np.zeros(6,dtype=np.float64) self.desired_twist = np.zeros(6,dtype=np.float64) + self.desired_acc = np.zeros(6,dtype=np.float64) # 导纳计算过程变量 self.arm_desired_twist = np.zeros(6,dtype=np.float64) diff --git a/Massage/MassageControl/algorithms/controller_manager.py b/Massage/MassageControl/algorithms/controller_manager.py index 9d755db..3e67a7f 100644 --- a/Massage/MassageControl/algorithms/controller_manager.py +++ b/Massage/MassageControl/algorithms/controller_manager.py @@ -39,4 +39,8 @@ class ControllerManager: def step(self,dt): if self.current_controller: - self.current_controller.step(dt) \ No newline at end of file + self.current_controller.step(dt) + + def step_traj(self,dt): + if self.current_controller: + self.current_controller.step_traj(dt) \ No newline at end of file diff --git a/Massage/MassageControl/tools/Trajectory.py b/Massage/MassageControl/tools/Trajectory.py new file mode 100644 index 0000000..7602fc2 --- /dev/null +++ b/Massage/MassageControl/tools/Trajectory.py @@ -0,0 +1,184 @@ +import numpy as np +from scipy.spatial.transform import Rotation as R, Slerp + +class TrajectoryInterpolator: + def __init__(self, waypoints): + self.waypoints = sorted(waypoints, key=lambda x: x["time"]) + self.times = [wp["time"] for wp in self.waypoints] + self._compute_position_coeffs() + self._prepare_slerp() + + def _compute_position_coeffs(self): + self.coeffs_pos = [] + for i in range(len(self.waypoints) - 1): + wp0 = self.waypoints[i] + wp1 = self.waypoints[i + 1] + T = wp1["time"] - wp0["time"] + + p0 = np.array(wp0["position"]) + v0 = np.array(wp0.get("velocity", np.zeros(3))) + a0 = np.array(wp0.get("acceleration", np.zeros(3))) + + p1 = np.array(wp1["position"]) + v1 = np.array(wp1.get("velocity", np.zeros(3))) + a1 = np.array(wp1.get("acceleration", np.zeros(3))) + + # 系数矩阵 A(以 tau = 0~T) + A = np.array([ + [1, 0, 0, 0, 0, 0], + [0, 1, 0, 0, 0, 0], + [0, 0, 2, 0, 0, 0], + [1, T, T**2, T**3, T**4, T**5], + [0, 1, 2*T, 3*T**2,4*T**3, 5*T**4], + [0, 0, 2, 6*T, 12*T**2, 20*T**3] + ]) + + b = np.vstack([p0, v0, a0, p1, v1, a1]) # shape (6,3) + coeffs = np.linalg.solve(A, b) # shape (6,3), 每列是一个维度的系数 + self.coeffs_pos.append((coeffs, T)) + + def _prepare_slerp(self): + self.rot_slerps = [] + for i in range(len(self.waypoints) - 1): + t0 = self.waypoints[i]["time"] + t1 = self.waypoints[i + 1]["time"] + rot0 = R.from_quat(self.waypoints[i]["orientation"]) + rot1 = R.from_quat(self.waypoints[i + 1]["orientation"]) + self.rot_slerps.append(Slerp([0, 1], R.concatenate([rot0, rot1]))) + + def interpolate(self, t): + if t <= self.times[0]: + return self._format_output(self.waypoints[0]) + elif t >= self.times[-1]: + return self._format_output(self.waypoints[-1]) + + i = np.searchsorted(self.times, t, side='right') - 1 + tau = t - self.times[i] + coeffs, T = self.coeffs_pos[i] + alpha = tau / T + + # 每个维度分开处理 + pos = np.array([np.polyval(coeffs[:, dim][::-1], tau) for dim in range(3)]) + vel = np.array([np.polyval(np.polyder(coeffs[:, dim][::-1], 1), tau) for dim in range(3)]) + acc = np.array([np.polyval(np.polyder(coeffs[:, dim][::-1], 2), tau) for dim in range(3)]) + + # 姿态 slerp + ori = self.rot_slerps[i]([alpha])[0].as_quat() + + ang_vel, ang_acc = self._estimate_angular_derivatives(t) + + x_r = np.concatenate([pos, ori]) + v_r = np.concatenate([vel, ang_vel]) + a_r = np.concatenate([acc, ang_acc]) + return x_r, v_r, a_r + + def _format_output(self, wp): + pos = np.array(wp["position"]) + ori = np.array(wp["orientation"]) + vel = np.array(wp.get("velocity", np.zeros(3))) + acc = np.array(wp.get("acceleration", np.zeros(3))) + x_r = np.concatenate([pos, ori]) + v_r = np.concatenate([vel, np.zeros(3)]) + a_r = np.concatenate([acc, np.zeros(3)]) + return x_r, v_r, a_r + def _quat_diff_to_angular_velocity(self, q1, q0, dt): + """计算从 q0 到 q1 的角速度,单位 rad/s""" + dq = R.from_quat(q1) * R.from_quat(q0).inv() + angle = dq.magnitude() + axis = dq.as_rotvec() + if dt == 0 or angle == 0: + return np.zeros(3) + return axis / dt # rotvec 本身是 axis * angle + + def _estimate_angular_derivatives(self, t, delta=0.03): + """数值估计角速度和角加速度,避免递归调用""" + if t <= self.times[0] or t >= self.times[-1]: + return np.zeros(3), np.zeros(3) + + i = np.searchsorted(self.times, t, side='right') - 1 + t0, t1 = self.times[i], self.times[i + 1] + T = t1 - t0 + alpha = (t - t0) / T + + alpha_prev = max(0.0, (t - delta - t0) / T) + alpha_next = min(1.0, (t + delta - t0) / T) + + q_prev = self.rot_slerps[i]([alpha_prev])[0].as_quat() + q_now = self.rot_slerps[i]([alpha])[0].as_quat() + q_next = self.rot_slerps[i]([alpha_next])[0].as_quat() + + w1 = self._quat_diff_to_angular_velocity(q_now, q_prev, delta) + w2 = self._quat_diff_to_angular_velocity(q_next, q_now, delta) + w = w1 + a = (w2 - w1) / (2 * delta) + return w, a + + + +if __name__ == "__main__": + import matplotlib.pyplot as plt + + waypoints = [ + {"time": 0.0, "position": [0, 0, 0], "velocity": [0, 0, 0], + "acceleration": [0, 0, 0], + "orientation": R.from_euler("xyz", [0, 0, 0], degrees=True).as_quat()}, + {"time": 2.0, "position": [1, 2, 0], "velocity": [0, 0, 0], + "acceleration": [0, 0, 0], + "orientation": R.from_euler("xyz", [0, 45, 90], degrees=True).as_quat()}, + ] + + interpolator = TrajectoryInterpolator(waypoints) + ts = np.linspace(0, 2, 100) + + positions = [] + velocities = [] + accelerations = [] + orientations = [] + angular_velocities = [] + angular_accelerations = [] + + for t in ts: + x_r, v_r, a_r = interpolator.interpolate(t) + positions.append(x_r[:3]) + orientations.append(R.from_quat(x_r[3:]).as_euler('xyz', degrees=True)) + velocities.append(v_r[:3]) + angular_velocities.append(v_r[3:]) + accelerations.append(a_r[:3]) + angular_accelerations.append(a_r[3:]) + + positions = np.array(positions) + orientations = np.array(orientations) + velocities = np.array(velocities) + angular_velocities = np.array(angular_velocities) + accelerations = np.array(accelerations) + angular_accelerations = np.array(angular_accelerations) + + fig, axs = plt.subplots(6, 1, figsize=(10, 16), sharex=True) + + axs[0].plot(ts, positions) + axs[0].set_ylabel("Position [m]") + axs[0].legend(['x', 'y', 'z']) + + axs[1].plot(ts, velocities) + axs[1].set_ylabel("Velocity [m/s]") + axs[1].legend(['vx', 'vy', 'vz']) + + axs[2].plot(ts, accelerations) + axs[2].set_ylabel("Acceleration [m/s²]") + axs[2].legend(['ax', 'ay', 'az']) + + axs[3].plot(ts, orientations) + axs[3].set_ylabel("Orientation [deg]") + axs[3].legend(['roll', 'pitch', 'yaw']) + + axs[4].plot(ts, angular_velocities) + axs[4].set_ylabel("Ang Vel [rad/s]") + axs[4].legend(['wx', 'wy', 'wz']) + + axs[5].plot(ts, angular_accelerations) + axs[5].set_ylabel("Ang Acc [rad/s²]") + axs[5].set_xlabel("Time [s]") + axs[5].legend(['awx', 'awy', 'awz']) + + plt.tight_layout() + plt.show()