MassageRobot_Dobot/Massage/MassageControl/tools/replay_trajectory_from_csv.py
LiangShiyun 3981c448c4 stash
2025-06-03 21:22:04 +08:00

328 lines
11 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# cython: language_level=3
import time
import csv
import atexit
from pathlib import Path
import sys
import ctypes
import threading
import pygame # 添加pygame导入
sys.path.append(str(Path(__file__).resolve().parent.parent))
from hardware.aubo_C5 import AuboC5
from hardware.force_sensor_aubo import XjcSensor
# 初始化libc
try:
libc = ctypes.CDLL("libc.so.6")
except Exception as e:
print(f"Failed to load libc: {e}")
libc = None
class JointTrajectoryPlayer:
def __init__(self, csv_path: str, arm: AuboC5 = None, sensor: XjcSensor = None):
"""
初始化轨迹回放器
:param csv_path: 轨迹CSV文件路径
:param arm: 可选,机械臂控制器
:param sensor: 可选,力传感器
"""
self.csv_path = csv_path
self.arm = arm
self.sensor = sensor
self.file = None
self.reader = None
self.header = None
self.keep_playing = True
self.play_thread = None
def __enter__(self):
"""上下文管理器入口,打开文件"""
self.file = open(self.csv_path, 'r')
self.reader = csv.reader(self.file)
self.header = next(self.reader) # 读取表头
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""上下文管理器退出,关闭文件"""
if self.file:
self.file.close()
def read_next_joint_angles(self):
"""
读取下一行关节角度
:return: (timestamp, [j1,j2,j3,j4,j5,j6]) 或 None(文件结束)
"""
try:
row = next(self.reader)
timestamp = float(row[0])
angles = [float(x) for x in row[1:7]] # 读取6个关节角度
return timestamp, angles
except (StopIteration, ValueError, IndexError):
return None
def playback(self, speed_factor: float = 1.0):
"""
回放轨迹
:param speed_factor: 回放速度因子(1.0为原速)
"""
while self.keep_playing: # 外部可以通过设置keep_playing为False来停止播放
# 每次循环开始时重新打开文件和读取器
self.file = open(self.csv_path, 'r')
self.reader = csv.reader(self.file)
self.header = next(self.reader) # 读取表头
start_time = time.time()
first_timestamp = None
while self.keep_playing: # 内部循环,播放单次轨迹
data = self.read_next_joint_angles()
if data is None:
break
timestamp, angles = data
# 初始化第一个时间戳
if first_timestamp is None:
first_timestamp = timestamp
continue
usleep(15000)
# 执行运动
if self.arm:
self.arm.mc.servoJoint(angles, 0, 0, 0.15, 0.1, 200)
print(f"时间: {timestamp:.3f}s, 角度: {angles}")
# 关闭当前文件
self.file.close()
return 0
def start(self, speed_factor: float = 1.0):
"""启动轨迹播放,返回是否成功启动"""
if self.play_thread and self.play_thread.is_alive():
return False # 已经在运行中
self.keep_playing = True
self.play_thread = threading.Thread(target=self.playback, kwargs={'speed_factor': speed_factor})
self.play_thread.daemon = True
self.play_thread.start()
return True
def stop(self):
"""停止轨迹播放,返回是否成功停止"""
if not self.play_thread or not self.play_thread.is_alive():
return False # 未在运行或已停止
self.keep_playing = False
return True
class RobotDanceController:
"""机器人舞蹈控制器,单例模式"""
_instance = None
def __new__(cls, *args, **kwargs):
if cls._instance is None:
cls._instance = super(RobotDanceController, cls).__new__(cls)
cls._instance._initialized = False
return cls._instance
def __init__(self):
if self._initialized:
return
self.arm = None
self.player = None
self.is_running = False
self._initialized = True
self.music_playing = False
def initialize(self):
"""初始化机械臂和轨迹播放器"""
try:
# 初始化机械臂
self.arm = AuboC5()
self.arm.init()
# 设置CSV路径
script_dir = Path(__file__).resolve().parent
csv_path = script_dir / "joint_processed_0.025s_6.csv"
# 创建轨迹播放器
self.player = JointTrajectoryPlayer(str(csv_path), self.arm)
return True
except Exception as e:
print(f"初始化失败: {e}")
return False
def play_music(self):
"""播放背景音乐,从头开始,循环播放"""
try:
# 初始化pygame混音器
pygame.mixer.init()
# 获取音乐文件路径
script_dir = Path(__file__).resolve().parent
# music_path = script_dir / "demo_music.mp3"
music_path = script_dir / "demo_music1.mp3"
# 加载并播放音乐从头开始设置循环播放loops=-1表示无限循环
pygame.mixer.music.load(str(music_path))
pygame.mixer.music.play(loops=-1)
self.music_playing = True
print("背景音乐已开始播放(循环模式)")
return True
except Exception as e:
print(f"播放音乐失败: {e}")
return False
def stop_music(self):
"""停止背景音乐"""
if self.music_playing:
try:
pygame.mixer.music.stop()
pygame.mixer.quit()
self.music_playing = False
print("背景音乐已停止")
except Exception as e:
print(f"停止音乐失败: {e}")
def start_demo(self):
"""开始演示,返回是否成功启动"""
if self.is_running:
return False
# 如果未初始化,先初始化
if not self.arm or not self.player:
if not self.initialize():
return False
try:
# 移动到舞蹈准备姿势
self.arm.move_joint([1.571,0.578,2.35,0.205,1.569,0.0])
time.sleep(2)
# 先播放背景音乐
self.play_music()
# 创建一个线程等待14秒后启动机器人运动
def start_robot_movement_at_music_mark():
try:
start_time = time.time()
# wait_time = 8.0 # 8秒
wait_time = 4.0 # 4秒
print(f"等待音乐播放到{wait_time}秒...")
time.sleep(wait_time)
# 检查是否仍在运行状态(可能用户已经停止了演示)
if self.is_running:
print(f"音乐已播放{wait_time}秒,开始机器人运动")
# 启动轨迹播放器
# 启动伺服
self.arm.enable_servo()
time.sleep(1)
self.arm.enable_servo()
time.sleep(1)
self.arm.enable_servo()
time.sleep(1)
self.player.start()
except Exception as e:
print(f"启动机器人运动失败: {e}")
# 设置为正在运行状态,这样线程中可以检查状态
self.is_running = True
# 启动等待线程
threading.Thread(target=start_robot_movement_at_music_mark, daemon=True).start()
return True
except Exception as e:
# 发生错误时,确保停止音乐
self.stop_music()
self.is_running = False
print(f"启动演示失败: {e}")
return False
def stop_demo(self):
"""停止演示,返回是否成功停止"""
if not self.is_running:
return False
try:
# 停止背景音乐
self.stop_music()
# 停止播放器
if self.player:
self.player.stop()
# 关闭伺服
if self.arm:
self.arm.disable_servo()
time.sleep(1)
self.arm.disable_servo()
time.sleep(1)
self.arm.disable_servo()
time.sleep(1)
# 回到初始位置
self.arm.move_joint(self.arm.off_pos)
self.is_running = False
return True
except Exception as e:
print(f"停止演示失败: {e}")
# 即使发生异常,也将状态设为未运行
self.is_running = False
return False
def cleanup(self):
"""清理资源"""
try:
if self.is_running:
self.stop_demo()
else:
# 确保音乐停止
self.stop_music()
if self.arm:
self.arm.power_off()
return True
except Exception as e:
print(f"清理资源失败: {e}")
return False
def usleep(microseconds):
"""Linux 下的微秒级休眠(比 time.sleep 更精确)"""
if libc and hasattr(libc, 'usleep'):
libc.usleep(microseconds)
else:
# 如果libc不可用使用time.sleep作为备选
time.sleep(microseconds / 1000000.0)
if __name__ == '__main__':
# 作为独立脚本运行时提供简单的CLI接口
controller = RobotDanceController()
try:
print("开始机器人舞蹈演示...")
if controller.start_demo():
print("演示已开始按Ctrl+C停止...")
# 等待用户按Ctrl+C
while True:
time.sleep(1)
except KeyboardInterrupt:
print("\n接收到停止信号,正在停止演示...")
finally:
controller.stop_demo()
controller.cleanup()
print("演示已停止,资源已清理。")