328 lines
11 KiB
Python
328 lines
11 KiB
Python
# cython: language_level=3
|
||
import time
|
||
import csv
|
||
import atexit
|
||
from pathlib import Path
|
||
import sys
|
||
import ctypes
|
||
import threading
|
||
import pygame # 添加pygame导入
|
||
|
||
sys.path.append(str(Path(__file__).resolve().parent.parent))
|
||
from hardware.aubo_C5 import AuboC5
|
||
from hardware.force_sensor_aubo import XjcSensor
|
||
|
||
# 初始化libc
|
||
try:
|
||
libc = ctypes.CDLL("libc.so.6")
|
||
except Exception as e:
|
||
print(f"Failed to load libc: {e}")
|
||
libc = None
|
||
|
||
class JointTrajectoryPlayer:
|
||
def __init__(self, csv_path: str, arm: AuboC5 = None, sensor: XjcSensor = None):
|
||
"""
|
||
初始化轨迹回放器
|
||
:param csv_path: 轨迹CSV文件路径
|
||
:param arm: 可选,机械臂控制器
|
||
:param sensor: 可选,力传感器
|
||
"""
|
||
self.csv_path = csv_path
|
||
self.arm = arm
|
||
self.sensor = sensor
|
||
self.file = None
|
||
self.reader = None
|
||
self.header = None
|
||
self.keep_playing = True
|
||
self.play_thread = None
|
||
|
||
def __enter__(self):
|
||
"""上下文管理器入口,打开文件"""
|
||
self.file = open(self.csv_path, 'r')
|
||
self.reader = csv.reader(self.file)
|
||
self.header = next(self.reader) # 读取表头
|
||
return self
|
||
|
||
def __exit__(self, exc_type, exc_val, exc_tb):
|
||
"""上下文管理器退出,关闭文件"""
|
||
if self.file:
|
||
self.file.close()
|
||
|
||
def read_next_joint_angles(self):
|
||
"""
|
||
读取下一行关节角度
|
||
:return: (timestamp, [j1,j2,j3,j4,j5,j6]) 或 None(文件结束)
|
||
"""
|
||
try:
|
||
row = next(self.reader)
|
||
timestamp = float(row[0])
|
||
angles = [float(x) for x in row[1:7]] # 读取6个关节角度
|
||
return timestamp, angles
|
||
except (StopIteration, ValueError, IndexError):
|
||
return None
|
||
|
||
def playback(self, speed_factor: float = 1.0):
|
||
"""
|
||
回放轨迹
|
||
:param speed_factor: 回放速度因子(1.0为原速)
|
||
"""
|
||
while self.keep_playing: # 外部可以通过设置keep_playing为False来停止播放
|
||
# 每次循环开始时重新打开文件和读取器
|
||
self.file = open(self.csv_path, 'r')
|
||
self.reader = csv.reader(self.file)
|
||
self.header = next(self.reader) # 读取表头
|
||
|
||
start_time = time.time()
|
||
first_timestamp = None
|
||
|
||
while self.keep_playing: # 内部循环,播放单次轨迹
|
||
data = self.read_next_joint_angles()
|
||
if data is None:
|
||
break
|
||
|
||
timestamp, angles = data
|
||
|
||
# 初始化第一个时间戳
|
||
if first_timestamp is None:
|
||
first_timestamp = timestamp
|
||
continue
|
||
|
||
usleep(15000)
|
||
|
||
# 执行运动
|
||
if self.arm:
|
||
self.arm.mc.servoJoint(angles, 0, 0, 0.15, 0.1, 200)
|
||
|
||
print(f"时间: {timestamp:.3f}s, 角度: {angles}")
|
||
|
||
# 关闭当前文件
|
||
self.file.close()
|
||
|
||
return 0
|
||
|
||
def start(self, speed_factor: float = 1.0):
|
||
"""启动轨迹播放,返回是否成功启动"""
|
||
if self.play_thread and self.play_thread.is_alive():
|
||
return False # 已经在运行中
|
||
|
||
self.keep_playing = True
|
||
self.play_thread = threading.Thread(target=self.playback, kwargs={'speed_factor': speed_factor})
|
||
self.play_thread.daemon = True
|
||
self.play_thread.start()
|
||
return True
|
||
|
||
def stop(self):
|
||
"""停止轨迹播放,返回是否成功停止"""
|
||
if not self.play_thread or not self.play_thread.is_alive():
|
||
return False # 未在运行或已停止
|
||
|
||
self.keep_playing = False
|
||
return True
|
||
|
||
|
||
class RobotDanceController:
|
||
"""机器人舞蹈控制器,单例模式"""
|
||
_instance = None
|
||
|
||
def __new__(cls, *args, **kwargs):
|
||
if cls._instance is None:
|
||
cls._instance = super(RobotDanceController, cls).__new__(cls)
|
||
cls._instance._initialized = False
|
||
return cls._instance
|
||
|
||
def __init__(self):
|
||
if self._initialized:
|
||
return
|
||
|
||
self.arm = None
|
||
self.player = None
|
||
self.is_running = False
|
||
self._initialized = True
|
||
self.music_playing = False
|
||
|
||
def initialize(self):
|
||
"""初始化机械臂和轨迹播放器"""
|
||
try:
|
||
# 初始化机械臂
|
||
self.arm = AuboC5()
|
||
self.arm.init()
|
||
|
||
# 设置CSV路径
|
||
script_dir = Path(__file__).resolve().parent
|
||
csv_path = script_dir / "joint_processed_0.025s_6.csv"
|
||
|
||
# 创建轨迹播放器
|
||
self.player = JointTrajectoryPlayer(str(csv_path), self.arm)
|
||
|
||
return True
|
||
except Exception as e:
|
||
print(f"初始化失败: {e}")
|
||
return False
|
||
|
||
def play_music(self):
|
||
"""播放背景音乐,从头开始,循环播放"""
|
||
try:
|
||
# 初始化pygame混音器
|
||
pygame.mixer.init()
|
||
|
||
# 获取音乐文件路径
|
||
script_dir = Path(__file__).resolve().parent
|
||
# music_path = script_dir / "demo_music.mp3"
|
||
music_path = script_dir / "demo_music1.mp3"
|
||
|
||
# 加载并播放音乐从头开始,设置循环播放(loops=-1表示无限循环)
|
||
pygame.mixer.music.load(str(music_path))
|
||
pygame.mixer.music.play(loops=-1)
|
||
self.music_playing = True
|
||
print("背景音乐已开始播放(循环模式)")
|
||
|
||
return True
|
||
except Exception as e:
|
||
print(f"播放音乐失败: {e}")
|
||
return False
|
||
|
||
def stop_music(self):
|
||
"""停止背景音乐"""
|
||
if self.music_playing:
|
||
try:
|
||
pygame.mixer.music.stop()
|
||
pygame.mixer.quit()
|
||
self.music_playing = False
|
||
print("背景音乐已停止")
|
||
except Exception as e:
|
||
print(f"停止音乐失败: {e}")
|
||
|
||
def start_demo(self):
|
||
"""开始演示,返回是否成功启动"""
|
||
if self.is_running:
|
||
return False
|
||
|
||
# 如果未初始化,先初始化
|
||
if not self.arm or not self.player:
|
||
if not self.initialize():
|
||
return False
|
||
|
||
try:
|
||
# 移动到舞蹈准备姿势
|
||
self.arm.move_joint([1.571,0.578,2.35,0.205,1.569,0.0])
|
||
time.sleep(2)
|
||
|
||
# 先播放背景音乐
|
||
self.play_music()
|
||
|
||
# 创建一个线程等待14秒后启动机器人运动
|
||
def start_robot_movement_at_music_mark():
|
||
try:
|
||
start_time = time.time()
|
||
# wait_time = 8.0 # 8秒
|
||
wait_time = 4.0 # 4秒
|
||
print(f"等待音乐播放到{wait_time}秒...")
|
||
time.sleep(wait_time)
|
||
|
||
# 检查是否仍在运行状态(可能用户已经停止了演示)
|
||
if self.is_running:
|
||
print(f"音乐已播放{wait_time}秒,开始机器人运动")
|
||
# 启动轨迹播放器
|
||
# 启动伺服
|
||
self.arm.enable_servo()
|
||
time.sleep(1)
|
||
self.arm.enable_servo()
|
||
time.sleep(1)
|
||
self.arm.enable_servo()
|
||
time.sleep(1)
|
||
self.player.start()
|
||
except Exception as e:
|
||
print(f"启动机器人运动失败: {e}")
|
||
|
||
# 设置为正在运行状态,这样线程中可以检查状态
|
||
self.is_running = True
|
||
|
||
# 启动等待线程
|
||
threading.Thread(target=start_robot_movement_at_music_mark, daemon=True).start()
|
||
|
||
return True
|
||
except Exception as e:
|
||
# 发生错误时,确保停止音乐
|
||
self.stop_music()
|
||
self.is_running = False
|
||
print(f"启动演示失败: {e}")
|
||
return False
|
||
|
||
def stop_demo(self):
|
||
"""停止演示,返回是否成功停止"""
|
||
if not self.is_running:
|
||
return False
|
||
|
||
try:
|
||
# 停止背景音乐
|
||
self.stop_music()
|
||
|
||
# 停止播放器
|
||
if self.player:
|
||
self.player.stop()
|
||
|
||
# 关闭伺服
|
||
if self.arm:
|
||
self.arm.disable_servo()
|
||
time.sleep(1)
|
||
self.arm.disable_servo()
|
||
time.sleep(1)
|
||
self.arm.disable_servo()
|
||
time.sleep(1)
|
||
|
||
# 回到初始位置
|
||
self.arm.move_joint(self.arm.off_pos)
|
||
|
||
self.is_running = False
|
||
return True
|
||
except Exception as e:
|
||
print(f"停止演示失败: {e}")
|
||
# 即使发生异常,也将状态设为未运行
|
||
self.is_running = False
|
||
return False
|
||
|
||
def cleanup(self):
|
||
"""清理资源"""
|
||
try:
|
||
if self.is_running:
|
||
self.stop_demo()
|
||
else:
|
||
# 确保音乐停止
|
||
self.stop_music()
|
||
|
||
if self.arm:
|
||
self.arm.power_off()
|
||
|
||
return True
|
||
except Exception as e:
|
||
print(f"清理资源失败: {e}")
|
||
return False
|
||
|
||
|
||
def usleep(microseconds):
|
||
"""Linux 下的微秒级休眠(比 time.sleep 更精确)"""
|
||
if libc and hasattr(libc, 'usleep'):
|
||
libc.usleep(microseconds)
|
||
else:
|
||
# 如果libc不可用,使用time.sleep作为备选
|
||
time.sleep(microseconds / 1000000.0)
|
||
|
||
|
||
if __name__ == '__main__':
|
||
# 作为独立脚本运行时,提供简单的CLI接口
|
||
controller = RobotDanceController()
|
||
|
||
try:
|
||
print("开始机器人舞蹈演示...")
|
||
if controller.start_demo():
|
||
print("演示已开始,按Ctrl+C停止...")
|
||
|
||
# 等待用户按Ctrl+C
|
||
while True:
|
||
time.sleep(1)
|
||
except KeyboardInterrupt:
|
||
print("\n接收到停止信号,正在停止演示...")
|
||
finally:
|
||
controller.stop_demo()
|
||
controller.cleanup()
|
||
print("演示已停止,资源已清理。") |