# cython: language_level=3 import time import csv import atexit from pathlib import Path import sys import ctypes import threading import pygame # 添加pygame导入 sys.path.append(str(Path(__file__).resolve().parent.parent)) from hardware.aubo_C5 import AuboC5 from hardware.force_sensor_aubo import XjcSensor # 初始化libc try: libc = ctypes.CDLL("libc.so.6") except Exception as e: print(f"Failed to load libc: {e}") libc = None class JointTrajectoryPlayer: def __init__(self, csv_path: str, arm: AuboC5 = None, sensor: XjcSensor = None): """ 初始化轨迹回放器 :param csv_path: 轨迹CSV文件路径 :param arm: 可选,机械臂控制器 :param sensor: 可选,力传感器 """ self.csv_path = csv_path self.arm = arm self.sensor = sensor self.file = None self.reader = None self.header = None self.keep_playing = True self.play_thread = None def __enter__(self): """上下文管理器入口,打开文件""" self.file = open(self.csv_path, 'r') self.reader = csv.reader(self.file) self.header = next(self.reader) # 读取表头 return self def __exit__(self, exc_type, exc_val, exc_tb): """上下文管理器退出,关闭文件""" if self.file: self.file.close() def read_next_joint_angles(self): """ 读取下一行关节角度 :return: (timestamp, [j1,j2,j3,j4,j5,j6]) 或 None(文件结束) """ try: row = next(self.reader) timestamp = float(row[0]) angles = [float(x) for x in row[1:7]] # 读取6个关节角度 return timestamp, angles except (StopIteration, ValueError, IndexError): return None def playback(self, speed_factor: float = 1.0): """ 回放轨迹 :param speed_factor: 回放速度因子(1.0为原速) """ while self.keep_playing: # 外部可以通过设置keep_playing为False来停止播放 # 每次循环开始时重新打开文件和读取器 self.file = open(self.csv_path, 'r') self.reader = csv.reader(self.file) self.header = next(self.reader) # 读取表头 start_time = time.time() first_timestamp = None while self.keep_playing: # 内部循环,播放单次轨迹 data = self.read_next_joint_angles() if data is None: break timestamp, angles = data # 初始化第一个时间戳 if first_timestamp is None: first_timestamp = timestamp continue usleep(15000) # 执行运动 if self.arm: self.arm.mc.servoJoint(angles, 0, 0, 0.15, 0.1, 200) print(f"时间: {timestamp:.3f}s, 角度: {angles}") # 关闭当前文件 self.file.close() return 0 def start(self, speed_factor: float = 1.0): """启动轨迹播放,返回是否成功启动""" if self.play_thread and self.play_thread.is_alive(): return False # 已经在运行中 self.keep_playing = True self.play_thread = threading.Thread(target=self.playback, kwargs={'speed_factor': speed_factor}) self.play_thread.daemon = True self.play_thread.start() return True def stop(self): """停止轨迹播放,返回是否成功停止""" if not self.play_thread or not self.play_thread.is_alive(): return False # 未在运行或已停止 self.keep_playing = False return True class RobotDanceController: """机器人舞蹈控制器,单例模式""" _instance = None def __new__(cls, *args, **kwargs): if cls._instance is None: cls._instance = super(RobotDanceController, cls).__new__(cls) cls._instance._initialized = False return cls._instance def __init__(self): if self._initialized: return self.arm = None self.player = None self.is_running = False self._initialized = True self.music_playing = False def initialize(self): """初始化机械臂和轨迹播放器""" try: # 初始化机械臂 self.arm = AuboC5() self.arm.init() # 设置CSV路径 script_dir = Path(__file__).resolve().parent csv_path = script_dir / "joint_processed_0.025s_6.csv" # 创建轨迹播放器 self.player = JointTrajectoryPlayer(str(csv_path), self.arm) return True except Exception as e: print(f"初始化失败: {e}") return False def play_music(self): """播放背景音乐,从头开始,循环播放""" try: # 初始化pygame混音器 pygame.mixer.init() # 获取音乐文件路径 script_dir = Path(__file__).resolve().parent # music_path = script_dir / "demo_music.mp3" music_path = script_dir / "demo_music1.mp3" # 加载并播放音乐从头开始,设置循环播放(loops=-1表示无限循环) pygame.mixer.music.load(str(music_path)) pygame.mixer.music.play(loops=-1) self.music_playing = True print("背景音乐已开始播放(循环模式)") return True except Exception as e: print(f"播放音乐失败: {e}") return False def stop_music(self): """停止背景音乐""" if self.music_playing: try: pygame.mixer.music.stop() pygame.mixer.quit() self.music_playing = False print("背景音乐已停止") except Exception as e: print(f"停止音乐失败: {e}") def start_demo(self): """开始演示,返回是否成功启动""" if self.is_running: return False # 如果未初始化,先初始化 if not self.arm or not self.player: if not self.initialize(): return False try: # 移动到舞蹈准备姿势 self.arm.move_joint([1.571,0.578,2.35,0.205,1.569,0.0]) time.sleep(2) # 先播放背景音乐 self.play_music() # 创建一个线程等待14秒后启动机器人运动 def start_robot_movement_at_music_mark(): try: start_time = time.time() # wait_time = 8.0 # 8秒 wait_time = 4.0 # 4秒 print(f"等待音乐播放到{wait_time}秒...") time.sleep(wait_time) # 检查是否仍在运行状态(可能用户已经停止了演示) if self.is_running: print(f"音乐已播放{wait_time}秒,开始机器人运动") # 启动轨迹播放器 # 启动伺服 self.arm.enable_servo() time.sleep(1) self.arm.enable_servo() time.sleep(1) self.arm.enable_servo() time.sleep(1) self.player.start() except Exception as e: print(f"启动机器人运动失败: {e}") # 设置为正在运行状态,这样线程中可以检查状态 self.is_running = True # 启动等待线程 threading.Thread(target=start_robot_movement_at_music_mark, daemon=True).start() return True except Exception as e: # 发生错误时,确保停止音乐 self.stop_music() self.is_running = False print(f"启动演示失败: {e}") return False def stop_demo(self): """停止演示,返回是否成功停止""" if not self.is_running: return False try: # 停止背景音乐 self.stop_music() # 停止播放器 if self.player: self.player.stop() # 关闭伺服 if self.arm: self.arm.disable_servo() time.sleep(1) self.arm.disable_servo() time.sleep(1) self.arm.disable_servo() time.sleep(1) # 回到初始位置 self.arm.move_joint(self.arm.off_pos) self.is_running = False return True except Exception as e: print(f"停止演示失败: {e}") # 即使发生异常,也将状态设为未运行 self.is_running = False return False def cleanup(self): """清理资源""" try: if self.is_running: self.stop_demo() else: # 确保音乐停止 self.stop_music() if self.arm: self.arm.power_off() return True except Exception as e: print(f"清理资源失败: {e}") return False def usleep(microseconds): """Linux 下的微秒级休眠(比 time.sleep 更精确)""" if libc and hasattr(libc, 'usleep'): libc.usleep(microseconds) else: # 如果libc不可用,使用time.sleep作为备选 time.sleep(microseconds / 1000000.0) if __name__ == '__main__': # 作为独立脚本运行时,提供简单的CLI接口 controller = RobotDanceController() try: print("开始机器人舞蹈演示...") if controller.start_demo(): print("演示已开始,按Ctrl+C停止...") # 等待用户按Ctrl+C while True: time.sleep(1) except KeyboardInterrupt: print("\n接收到停止信号,正在停止演示...") finally: controller.stop_demo() controller.cleanup() print("演示已停止,资源已清理。")