from flask import ( Flask, render_template, jsonify, redirect, url_for, request, Response, render_template_string, session, send_from_directory ) from flask_socketio import SocketIO from flask_cors import CORS from flask_compress import Compress from flask_sslify import SSLify from zeroconf import Zeroconf, ServiceInfo import atexit import uuid import socket import netifaces import time from qq_music import ( fetch_latest_music, get_song_list, fetch_album_image, get_music_info, player, ) import threading from threading import Lock import asyncio import websockets import json import argparse import librosa import numpy as np import subprocess import signal import os import sys import re from datetime import datetime, timedelta import paramiko import requests from bs4 import BeautifulSoup, NavigableString import shutil from urllib.parse import urljoin, urlparse, parse_qs from requests.auth import HTTPBasicAuth from tools.ssh_tools import execute_command_on_remote, is_host_down, ReverseSSH import logging from hw_obs import HW_OBS sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from force_sensor_aubo import XjcSensor from aubo_C5_UI import AuboC5 from Massage.MassageControl.tools.auto_visual_calibration import Calibration from Massage.MassageControl.tools.replay_trajectory_from_csv import RobotDanceController import fitz # PyMuPDF from io import BytesIO from tools.wifi_tools import WifiManager from tools import volume_control # 从 tools 文件夹导入 volume_control 模块 from tools import license_module import math import aiohttp import serial import serial.tools.list_ports import yaml from power_board import PowerBoard import cv2 from pyzbar.pyzbar import decode from PIL import Image import io from tools.deep_thought import set_deep_thought_status, get_deep_thought_status from tools.ai_search import set_ai_search_status, get_ai_search_status from tools.version_control import * from VortXDB.client import VTXClient from tools.log_utils import read_log_file from modules.thermal.thermal_routes import thermal_bp, init_thermal_socketio from modules.common.common_routes import common_bp from modules.vtxdb.vtxdb_routes import vtxdb_bp vtxdb = VTXClient(use_logger=False) # 创建VTXClient实例 DEBUG_MODE = False # 配置 logging 模块 log_file = "../log/UI-next-app.log" # 定义备份文件夹路径 backup_folder = os.path.abspath(os.path.join(os.path.dirname(__file__), "../../UILog")) # 创建 UILog 文件夹(如果不存在) if not os.path.exists(backup_folder): os.makedirs(backup_folder) # 自动创建目录 # 备份日志文件 if os.path.exists(log_file): # 生成带时间戳的备份文件名 timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") backup_file = os.path.join(backup_folder, f"UI-next-app_{timestamp}.log") shutil.copy2(log_file, backup_file) logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s", handlers=[ # logging.FileHandler(log_file), logging.FileHandler(log_file, mode="w"), # 覆盖模式,每次运行清空日志 logging.StreamHandler(sys.stdout), ], ) # 定义日志记录器 class LoggerWriter: def __init__(self, logger, level): self.logger = logger self.level = level def write(self, message): if message.strip() != "": self.logger.log(self.level, message.strip()) def flush(self): pass # 重定向标准输出和标准错误 sys.stdout = LoggerWriter(logging.getLogger(), logging.INFO) sys.stderr = LoggerWriter(logging.getLogger(), logging.ERROR) if getattr(sys, "frozen", False): template_folder = os.path.join(sys._MEIPASS, "templates") static_folder = os.path.join(sys._MEIPASS, "static") app = Flask(__name__, template_folder=template_folder, static_folder=static_folder) else: app = Flask(__name__) # app = Flask(__name__) # sslify = SSLify(app) CORS(app) app.secret_key = "jsfb" # 设置密钥,用于安全保护 session app.config["SECRET_KEY"] = "jsfb" app.config["COMPRESS_MIN_SIZE"] = 50 app.config["COMPRESS_LEVEL"] = 8 app.config["COMPRESS_ALGORITHM"] = 'gzip' # 启用响应内容压缩 Compress(app) app.register_blueprint(thermal_bp) app.register_blueprint(common_bp) app.register_blueprint(vtxdb_bp) socketio = SocketIO(app, cors_allowed_origins="*") # 在创建socketio实例后 init_thermal_socketio(socketio) def signal_handler(signal, frame): print("Shutting Down") global power_board_ip, power_board_thread, power_board_thread_running if power_board_thread_running: power_board_thread_running = False if power_board_thread: try: power_board_thread.join(timeout=2.0) # 设置超时时间,避免永久阻塞 except Exception as e: print(f"Error stopping power board thread during shutdown: {e}") power_board_thread = None power_board_ip = None sys.exit(0) signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) current_mode = "manual_mode" # manual_mode / smart_mode / handheld_mode latest_music = [] all_playlists = [] @app.route("/get_music_data", methods=["GET"]) def get_music_data(): global latest_music if not latest_music: latest_music = fetch_latest_music() return jsonify(latest_music) def load_playlist_ids(): """从配置文件加载歌单ID列表""" config_path = "/home/jsfb/jsfb_ws/global_config/music.yaml" default_playlist_ids = ["9126599100", "8429112432", "7299191148", "7132357466", "1137267511", "7284914844"] # 确保目录存在 os.makedirs(os.path.dirname(config_path), exist_ok=True) try: # 如果文件不存在,创建文件并写入默认歌单 if not os.path.exists(config_path): config = {'playlist_ids': default_playlist_ids} with open(config_path, 'w', encoding='utf-8') as f: yaml.dump(config, f, allow_unicode=True) return default_playlist_ids # 如果文件存在,读取配置 with open(config_path, 'r', encoding='utf-8') as f: config = yaml.safe_load(f) return config.get('playlist_ids', default_playlist_ids) except Exception as e: print(f"Error loading playlist IDs from config: {e}") return default_playlist_ids def save_playlist_ids(playlist_ids): """保存歌单ID列表到配置文件""" config_path = "/home/jsfb/jsfb_ws/global_config/music.yaml" try: # 确保目录存在 os.makedirs(os.path.dirname(config_path), exist_ok=True) config = {'playlist_ids': playlist_ids} with open(config_path, 'w', encoding='utf-8') as f: yaml.dump(config, f, allow_unicode=True) return True except Exception as e: print(f"Error saving playlist IDs to config: {e}") return False # 初始化时从配置文件加载歌单ID categoryIDs = load_playlist_ids() @app.route("/get_playlists", methods=["GET"]) def get_playlists(): # print("get_playlists") global all_playlists if not all_playlists: for categoryID in categoryIDs: # print(categoryID) diss_info, music_info_list = get_song_list(categoryID) diss_info['categoryId'] = categoryID # 添加 categoryId 到 diss_info all_playlists.append( {"diss_info": diss_info, "music_info_list": music_info_list} ) print(diss_info) return jsonify(all_playlists) @app.route("/search", methods=["POST"]) def search(): # 获取前端发送过来的数据 search_term = request.json.get("term") print(search_term) search_results = get_music_info(search_term) # 返回处理后的数据给前端 return jsonify(search_results) @app.route("/get_album_image", methods=["POST"]) def get_album_image(): data = request.get_json() singer_name = data.get("singer_name") if not singer_name: return jsonify({"error": "Singer name is required"}), 400 album_img_url, error = fetch_album_image(singer_name) if album_img_url: return jsonify({"album_img_url": album_img_url}), 200 else: return jsonify({"error": error}), 404 @app.route("/song_clicked", methods=["POST"]) def song_clicked(): global playlist, current_song_index data = request.json songmid = data.get("songmid") print(f"Song clicked with songmid: {songmid}") # 查找歌曲是否已在播放列表中 song_in_playlist = next( (song for song in playlist if song["songmid"] == songmid), None ) if song_in_playlist: current_song_index = playlist.index(song_in_playlist) next_song = playlist[current_song_index] current_song.update( { "album_img_url": next_song["album_img_url"], "music_name": next_song["music_name"], "singer_name": next_song["singer_name"], "is_playing": True, } ) print(current_song) # 获取并播放下一首歌曲 # music_page = f"https://y.qq.com/n/yqq/song/{next_song['songmid']}.html" music_page = f"https://robotstorm.tech/yqq/n/yqq/song/{next_song['songmid']}.html" music_url = player.fetch_music_url_by_mid(next_song["songmid"]) if not music_url: music_url = player.fetch_music_url(music_page) socketio.sleep(0.5) player.play_music(music_url) else: # 如果歌曲不在播放列表中,则添加到播放列表 # music_page = f"https://y.qq.com/n/yqq/song/{songmid}.html" music_page = f"https://robotstorm.tech/yqq/n/yqq/song/{songmid}.html" music_url = player.fetch_music_url_by_mid(songmid) if not music_url: music_url = player.fetch_music_url(music_page) if music_url: # 获取歌曲信息并添加到播放列表 song_info = { "album_img_url": data.get("album_img_url"), "music_name": data.get("music_name"), "singer_name": data.get("singer_name"), "songmid": songmid, } playlist.append(song_info) current_song_index = len(playlist) - 1 # 开始播放该歌曲 current_song.update( { "album_img_url": song_info["album_img_url"], "music_name": song_info["music_name"], "singer_name": song_info["singer_name"], "is_playing": True, } ) player.play_music(music_url) print(playlist) return jsonify({"status": "success", "message": "Music started"}) else: return jsonify({"status": "error", "message": "Failed to fetch music URL"}) return jsonify({"status": "success", "message": "Song added to playlist"}) @app.route("/playlist_update", methods=["POST"]) def playlist_update(): global playlist, current_song_index data = request.json new_playlist = data.get("playlist", []) if not new_playlist: return jsonify({"status": "error", "message": "Playlist is empty"}) # 清空当前播放列表并更新为新的播放列表 playlist = [] for song in new_playlist: song_info = { "album_img_url": song.get("album_img_url"), "music_name": song.get("music_name"), "singer_name": song.get("singer_name"), "songmid": song.get("songmid"), } playlist.append(song_info) # 设置当前播放的歌曲为新列表中的第一首 current_song_index = 0 first_song = playlist[current_song_index] current_song.update( { "album_img_url": first_song["album_img_url"], "music_name": first_song["music_name"], "singer_name": first_song["singer_name"], "is_playing": True, } ) # 获取并播放第一首歌 # music_page = f"https://y.qq.com/n/yqq/song/{first_song['songmid']}.html" music_page = f"https://robotstorm.tech/yqq/n/yqq/song/{first_song['songmid']}.html" music_url = player.fetch_music_url_by_mid(first_song["songmid"]) if not music_url: music_url = player.fetch_music_url(music_page) if music_url: socketio.sleep(0.5) player.play_music(music_url) print(playlist) return jsonify( {"status": "success", "message": "Playlist updated and playing first song"} ) else: return jsonify( {"status": "error", "message": "Failed to fetch music URL for first song"} ) @app.route("/pause_music", methods=["POST"]) def pause_music(): player.pause_music() return jsonify({"status": "success", "message": "Music paused"}) @app.route("/resume_music", methods=["POST"]) def resume_music(): if player.is_playing: player.resume_music() else: next_song = playlist[current_song_index] current_song.update( { "album_img_url": next_song["album_img_url"], "music_name": next_song["music_name"], "singer_name": next_song["singer_name"], "is_playing": True, } ) print(current_song) # music_page = f"https://y.qq.com/n/yqq/song/{next_song['songmid']}.html" music_page = f"https://robotstorm.tech/yqq/n/yqq/song/{next_song['songmid']}.html" music_url = player.fetch_music_url_by_mid(next_song["songmid"]) if not music_url: music_url = player.fetch_music_url(music_page) socketio.sleep(0.5) player.play_music(music_url) return jsonify({"status": "success", "message": "Music resumed"}) @app.route("/stop_music", methods=["POST"]) def stop_music(): player.stop_music() return jsonify({"status": "success", "message": "Music stopped"}) @app.route("/play_last_song", methods=["POST"]) def play_last_song(): global current_song_index if current_song_index > 0: current_song_index -= 1 else: current_song_index = len(playlist) - 1 print(current_song_index) next_song = playlist[current_song_index] current_song.update( { "album_img_url": next_song["album_img_url"], "music_name": next_song["music_name"], "singer_name": next_song["singer_name"], "is_playing": True, } ) # music_page = f"https://y.qq.com/n/yqq/song/{next_song['songmid']}.html" music_page = f"https://robotstorm.tech/yqq/n/yqq/song/{next_song['songmid']}.html" music_url = player.fetch_music_url_by_mid(next_song["songmid"]) if not music_url: music_url = player.fetch_music_url(music_page) if music_url: player.play_music(music_url) return jsonify({"status": "success", "message": "Playing last song"}) else: return jsonify({"status": "error", "message": "Failed to fetch music URL"}) @app.route("/play_next_song", methods=["POST"]) def play_next_song(): global current_song_index current_song_index = (current_song_index + 1) % len(playlist) print(current_song_index) next_song = playlist[current_song_index] current_song.update( { "album_img_url": next_song["album_img_url"], "music_name": next_song["music_name"], "singer_name": next_song["singer_name"], "is_playing": True, } ) # music_page = f"https://y.qq.com/n/yqq/song/{next_song['songmid']}.html" music_page = f"https://robotstorm.tech/yqq/n/yqq/song/{next_song['songmid']}.html" music_url = player.fetch_music_url_by_mid(next_song["songmid"]) if not music_url: music_url = player.fetch_music_url(music_page) if music_url: player.play_music(music_url) return jsonify({"status": "success", "message": "Playing next song"}) else: return jsonify({"status": "error", "message": "Failed to fetch music URL"}) @socketio.on("seek_music") def handle_seek_music(position): print(position) player.seek_music(position) def generate_playlist(music_info_list): # 使用列表推导式生成字典列表 # playlist = [ # { # "music_name": song[0], # "singer_name": song[1], # "songmid": song[2], # "album_img_url": ( # f"http://y.gtimg.cn/music/photo_new/T002R180x180M000{song[3]}.jpg" # if song[3] # else "" # ), # } # for song in music_info_list # ] playlist = [ { "music_name": song[0], "singer_name": song[1], "songmid": song[2], "album_img_url": ( f"https://robotstorm.tech/ygtimg/music/photo_new/T002R180x180M000{song[3]}.jpg" if song[3] else "" ), } for song in music_info_list ] return playlist # 全局变量来存储歌曲信息和播放状态 current_song = { "album_img_url": "https://via.placeholder.com/300x300", "music_name": None, "singer_name": None, "is_playing": False, # 添加播放状态 } # 全局变量来存储播放列表 # playlist = [{ # 'album_img_url': 'http://y.gtimg.cn/music/photo_new/T002R180x180M000001G7iIK00yQyr.jpg', # 'music_name': '大鱼海棠主题曲——大鱼', # 'singer_name': '凤凉柒', # 'songmid': '000Qf8b01p0vIJ'}] playlist = [] # Index to keep track of current song in playlist current_song_index = 0 if len(playlist) > 0: current_song.update( { "album_img_url": playlist[0]["album_img_url"], "music_name": playlist[0]["music_name"], "singer_name": playlist[0]["singer_name"], "is_playing": False, # 或者根据实际情况设置 } ) print("playlist: ", playlist) else: diss_info, music_info_list = get_song_list(categoryIDs[0]) # print(diss_info, music_info_list) playlist = generate_playlist(music_info_list) print("playlist:", playlist) if not playlist: playlist = [ { "album_img_url": "https://via.placeholder.com/300x300", "music_name": "默认音乐", "singer_name": "未知艺术家", "songmid": "000000", } ] current_song.update( { "album_img_url": playlist[0]["album_img_url"], "music_name": playlist[0]["music_name"], "singer_name": playlist[0]["singer_name"], "is_playing": False, # 或者根据实际情况设置 } ) # 创建线程锁,确保数据安全 thread_lock = Lock() # 定时任务,每隔一秒发送当前歌曲信息到前端 def music_background_thread(): global playlist, current_song_index while True: socketio.sleep(1) # 间隔一秒钟 with thread_lock: current_song["is_playing"] = not player.is_paused and player.is_playing socketio.emit("current_song", current_song, namespace="/") if player.is_playing: current_position = player.get_current_position() music_length = player.get_music_length() # print(current_position) if current_position < 0: # 切换到下一首歌曲 current_song_index = (current_song_index + 1) % len(playlist) next_song = playlist[current_song_index] current_song.update( { "album_img_url": next_song["album_img_url"], "music_name": next_song["music_name"], "singer_name": next_song["singer_name"], "is_playing": True, } ) print(current_song) # 获取并播放下一首歌曲 # music_page = ( # f"https://y.qq.com/n/yqq/song/{next_song['songmid']}.html" # ) music_page = ( f"https://robotstorm.tech/yqq/n/yqq/song/{next_song['songmid']}.html" ) music_url = player.fetch_music_url_by_mid(next_song["songmid"]) if not music_url: music_url = player.fetch_music_url(music_page) socketio.sleep(0.15) player.play_music(music_url) socketio.emit( "music_status", {"position": current_position, "length": music_length}, ) @app.route("/set_current_song", methods=["POST"]) def set_current_song(): global current_song data = request.json current_song.update( { "album_img_url": data.get("album_img_url"), "music_name": data.get("music_name"), "singer_name": data.get("singer_name"), "is_playing": data.get("is_playing", False), # 更新播放状态 } ) return jsonify({"status": "success"}) @app.route("/get_current_song", methods=["GET"]) def get_current_song(): if current_song["music_name"]: return jsonify({"status": "success", "data": current_song}) return jsonify({"status": "error", "message": "No song is currently set"}) ai_chat_list = [] async def send_message(message, port=8766): uri = "ws://localhost:" + str(port) # start_time = time.time() async with websockets.connect(uri) as websocket: # print(time.time() - start_time) await websocket.send(message) response = await websocket.recv() return response @socketio.on("send_message") def handle_message(data): message = data["message"] if not message: return jsonify({"error": "Message not provided"}), 400 socketio.emit("add_message", {"sender": "user", "message": message}) ai_chat_list.append({"sender": "user", "message": message}) loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) task = asyncio.ensure_future(send_message(f"'user_input': \"{message}\"")) loop.run_until_complete(task) last_ai_message_time = None @app.route("/ai_respon", methods=["POST"]) def ai_response(): global last_ai_message_time # 获取 POST 请求的数据 message = request.form.get("message") is_reasoning = request.form.get("isReasoning", "false").lower() == "true" # 默认值为 false if not message: return jsonify({"status": "error", "message": "No message provided"}) current_time = time.time() # 如果是首次或距离上次超过5秒,添加新消息 if not last_ai_message_time or (current_time - last_ai_message_time) > 5: if is_reasoning: ai_chat_list.append({"sender": "ai", "message": {"reasoning_content": message, "content": ""}}) else: ai_chat_list.append({"sender": "ai", "message": {"content": message, "reasoning_content": ""}}) else: # 在最近一条AI消息上叠加 if ai_chat_list and ai_chat_list[-1]["sender"] == "ai": if is_reasoning: ai_chat_list[-1]["message"]["reasoning_content"] += message else: ai_chat_list[-1]["message"]["content"] += message else: if is_reasoning: ai_chat_list.append({"sender": "ai", "message": {"reasoning_content": message, "content": ""}}) else: ai_chat_list.append({"sender": "ai", "message": {"content": message, "reasoning_content": ""}}) # 更新最后一次AI消息时间 last_ai_message_time = current_time socketio.emit("add_message", {"sender": "ai", "message": message, "isReasoning": is_reasoning}) return jsonify({"status": "success", "message": "Received ai message"}) @app.route("/user_input", methods=["POST"]) def user_input(): # 获取 POST 请求的数据 message = request.form.get("message") if not message: return jsonify({"status": "error", "message": "No message provided"}) socketio.emit("add_message", {"sender": "user", "message": message}) ai_chat_list.append({"sender": "user", "message": message}) # 返回 JSON 格式的响应 return jsonify({"status": "success", "message": "Received user message"}) @app.route("/get_history", methods=["GET"]) def get_history(): return jsonify(ai_chat_list) def process_audio_and_emit(path): x, sr = librosa.load(path, sr=8000) start_time = time.time() # Emit mouth movement data through socketio try: for _ in range(int(len(x) / 800)): index = int((time.time() - start_time) * 8000) + 1 if index < len(x): # new_value = x[index] new_value = abs(x[index] * 5) socketio.emit("mouth_movement", {"value": str(new_value)}) time.sleep(0.1) except Exception as e: print(f"Error in mouth movement emission: {e}") # Reset mouth movement value at the end socketio.emit("mouth_movement", {"value": str(0.0)}) @app.route("/lip_sync", methods=["POST"]) def lip_sync(): try: # 获取 POST 请求的数据 path = request.form.get("path") print(path) if path.startswith("pre_mp3"): path = "../" + path # 在单独的线程中处理音频和口型数据 thread = threading.Thread(target=process_audio_and_emit, args=(path,)) thread.start() # 立即返回 JSON 格式的响应 return jsonify({"status": "success"}) except Exception as e: print(f"Error: {e}") return jsonify({"status": "error", "message": str(e)}) import traceback massage_status = { "progress": 0, "force": '', "temperature": '', "gear": '', "shake": '', "is_massaging": False, "current_task": "", "task_time": "", "current_head": "", "body_part": "back", "massage_service_started": False, "press": '', "frequency": '', "speed": '', "direction": '', "high": '', "massage_state": '', "is_acupoint": False, "is_pause": False, "manual_stage": 0, "start_pos": "", "end_pos": "", "massage_path": "" } stone_status ={ "temper_head": 40, "temperature": 1 } @app.route("/update_massage_status", methods=["POST"]) def update_status(): global stored_command global display_image_url global display_image_type try: print(f"update_massage_status{request.form}") temper_head = request.form.get("temper_head") if temper_head: temperature = request.form.get("temperature") stone_status["temper_head"] = temper_head stone_status["temperature"] = temperature socketio.emit("update_stone_status", stone_status) else: is_massaging = request.form.get("is_massaging") is_pause = request.form.get("is_pause") if is_massaging and (is_massaging == "False" or is_massaging == "false"): if is_pause == "False" or is_pause == "false": if stored_command: # 如果存储了命令,且 is_massaging 为 false print(f"Re-running stored command after 5 seconds delay...") def run_async_in_thread(): time.sleep(5) # 等待 5 秒,确保命令执行完毕 asyncio.run(send_message(stored_command, 8765)) # 在新线程中运行异步函数 # 在新的线程中执行 threading.Thread(target=run_async_in_thread).start() # 获取 POST 请求的数据 progress = request.form.get("progress") force = request.form.get("force") temperature = request.form.get("temperature") gear = request.form.get("gear") shake = request.form.get("shake") is_massaging = request.form.get("is_massaging") current_task = request.form.get("current_task") task_time = request.form.get("task_time") body_part = request.form.get("body_part") current_head = request.form.get("current_head") massage_service_started = request.form.get("massage_service_started") press = request.form.get("press") frequency = request.form.get("frequency") speed = request.form.get("speed") is_pause = request.form.get("is_pause") direction = request.form.get("direction") high = request.form.get("high") massage_state = request.form.get("massage_state") is_acupoint = request.form.get("is_acupoint") manual_stage = request.form.get("manual_stage") start_pos = request.form.get("start_pos") end_pos = request.form.get("end_pos") massage_path = request.form.get("massage_path") # 仅在非空时更新字典中的值 if progress: massage_status["progress"] = progress if force and force != "0" and (is_massaging == "True" or is_massaging == "true" ): print(force) massage_status["force"] = force if temperature: massage_status["temperature"] = temperature if gear: massage_status["gear"] = gear if shake: massage_status["shake"] = shake if is_massaging: if is_massaging == "False" or is_massaging == "false": is_massaging = False display_image_url = "static/images/smart_mode/back.png" socketio.emit("change_image", {"path": display_image_url, "type": display_image_type}) if is_massaging == "True" or is_massaging == "true": is_massaging = True socketio.emit("change_image", {"path": display_image_url, "type": display_image_type}) massage_status["is_massaging"] = is_massaging if current_task: task_to_index = { "lung": 0, "heart": 1, "hepatobiliary": 2, "spleen": 3, "kidney": 4, "lung_left": 0, "heart_left": 1, "hepatobiliary_left": 2, "spleen_left": 3, "kidney_left": 4, "lung_right": 0, "heart_right": 1, "hepatobiliary_right": 2, "spleen_right": 3, "kidney_right": 4, } massage_status["current_task"] = current_task # 获取对应的索引 index = task_to_index.get(current_task, None) print(current_task, index) if index is not None: # 发送索引到前端 socketio.emit("highlightPlane", index) else: socketio.emit("highlightPlane", None) elif current_task == "-1": socketio.emit("highlightPlane", None) if task_time is not None: if task_time == "null": task_time = "" massage_status["task_time"] = task_time if current_head: massage_status["current_head"] = current_head if body_part: massage_status["body_part"] = body_part if press: massage_status["press"] = press if frequency: massage_status["frequency"] = frequency if is_pause: if is_pause == "False" or is_pause == "false": is_pause = False if is_pause == "True" or is_pause == "true": is_pause = True massage_status["is_pause"] = is_pause if speed: massage_status["speed"] = speed if direction: massage_status["direction"] = direction if high: massage_status["high"] = high if massage_service_started: print(massage_service_started) if massage_service_started == "False" or massage_service_started == "false": massage_service_started = False if massage_service_started == "True" or massage_service_started == "true": massage_service_started = True massage_status["massage_service_started"] = massage_service_started if massage_state: massage_status["massage_state"] = massage_state if is_acupoint: if is_acupoint == "False" or is_acupoint == "false": is_acupoint = False if is_acupoint == "True" or is_acupoint == "true": is_acupoint = True massage_status["is_acupoint"] = is_acupoint if manual_stage: massage_status["manual_stage"] = int(manual_stage) if start_pos: massage_status["start_pos"] = start_pos if end_pos: massage_status["end_pos"] = end_pos if massage_path: massage_status["massage_path"] = massage_path print(massage_status) # 通过 SocketIO 发送更新后的数据 socketio.emit("update_massage_status", massage_status) return jsonify({"status": "success"}) except Exception as e: print(f"Error: {e}") return jsonify({"status": "error", "message": str(e)}) def is_service_running(service_name): try: # 使用 subprocess.run 执行 systemctl is-active 命令 result = subprocess.run( ["/bin/systemctl", "is-active", service_name], stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=True, text=True, ) # 如果返回的输出是 'active',则服务正在运行 return result.stdout.strip() == "active" except subprocess.CalledProcessError: # 如果服务未运行,或者其他错误,返回 False return False @app.route("/get_status", methods=["GET"]) def get_status(): global display_image_url global display_image_type try: command = "get_status" loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) task = asyncio.ensure_future(send_message(command, 8765)) loop.run_until_complete(task) except Exception as e: print("error ! get_status error") massage_status.update({ "progress": 0, "force": '', "temperature": '', "gear": '', "shake": '', "is_massaging": False, "current_task": "", "task_time": "", "current_head": "", "body_part": "back", "massage_service_started": False, "press": '', "frequency": '', "speed": '', "direction": '', "high": '', "massage_state": '', "is_acupoint": False, "is_pause": False, "manual_stage": 0, "start_pos": "", "end_pos": "", "massage_path": "" }) print(f"Error: {e}") file_url = display_image_url # 使用 Socket.IO 发送 URL 给前端 socketio.emit("change_image", {"path": file_url, "type": display_image_type}) # try: # command = 'get_status' # loop = asyncio.new_event_loop() # asyncio.set_event_loop(loop) # task = asyncio.ensure_future(send_message(command,8766)) # loop.run_until_complete(task) # except Exception as e: # # print(f"Error: {e}") print(massage_status) return jsonify(massage_status) ###### last_command_time = 0 # 存储命令的全局变量 stored_command = None countdown_value = 0 countdown_lock = threading.Lock() def start_countdown(): global countdown_value, display_image_type, display_image_url with countdown_lock: countdown_value = 120 # 初始化为120 while True: with countdown_lock: if countdown_value <= 0: break countdown_value -= 1 print(f"Countdown: {countdown_value}") socketio.emit("countdown_update", {"countdown": countdown_value, "message": "is_running"}) time.sleep(1) # socketio.emit("countdown_update", {"countdown": countdown_value, "message": "倒计时结束"}) socketio.emit("change_image", {"path": display_image_url, "type": display_image_type}) async def send_to_port(command, port): # try: # loop = asyncio.new_event_loop() # asyncio.set_event_loop(loop) # await send_message(command, port) # print(f"Sent to {port}: {command}") # except Exception as e: # print(f"Error sending to {port}: {e}") """ 发送命令到指定端口,并打印详细的日志信息 参数: command: 要发送的命令字符串 port: 目标端口号 """ try: # 创建新的事件循环(适用于非异步环境) loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) # 记录发送前的调试信息 print(f"[DEBUG] 准备发送命令到端口 {port}: {command}") # 模拟发送操作(替换为实际发送逻辑) await send_message(command, port) # 假设这是你的实际发送函数 # 成功日志 print(f"[SUCCESS] 命令已发送到端口 {port}: {command}") return True except ConnectionError as e: print(f"[ERROR] 连接失败 (端口 {port}): {str(e)}") return False except asyncio.TimeoutError: print(f"[ERROR] 发送超时 (端口 {port})") return False except Exception as e: print(f"[CRITICAL] 未知错误: {type(e).__name__}: {str(e)}") return False finally: if 'loop' in locals(): loop.close() @socketio.on("send_command") def send_command(data): # global last_command_time # current_time = time.time() # if current_time - last_command_time < 1: # # if less than 3 seconds passed since the last command, ignore this command # return # last_command_time = current_time global stored_command global display_image_url global display_image_type global countdown_value if "begin" in data: # 解析命令中的数据 parts = data.split(":") if len(parts) >= 5: # 至少要有 5 部分(确保 mode_real 存在) mode_real = parts[4] # if parts[1] == "stone" and mode_real == "1": # # 启动倒计时线程 # threading.Thread(target=start_countdown, daemon=True).start() # 情况1:parts长度=6,解析use_mode if len(parts) == 6: use_mode = parts[5] # 如果use_mode存在且不是manual,或者parts长度<6(没有use_mode),都发送UI_start if use_mode != 'manual': asyncio.run(send_to_port("UI_start", 8766)) # 情况2:parts长度<6(没有use_mode),直接发送UI_start else: asyncio.run(send_to_port("UI_start", 8766)) if mode_real == "3": # 如果 modeReal 为 3,则保存命令 stored_command = data else: # 如果 modeReal 不是 3,则清空命令 stored_command = None print(f"Stored command: {stored_command}") if "manual_start" in data: asyncio.run(send_to_port("UI_start", 8766)) if "stop" in data: stored_command = None display_image_url = 'static/images/smart_mode/back.png' display_image_type = 0 countdown_value = 0 # if "pause" in data: # stored_command = None # display_image_url = 'static/images/smart_mode/back.png' # display_image_type = 0 # countdown_value = 0 # 所有命令发送到 8765 asyncio.run(send_to_port(data, 8765)) @app.route("/get_countdown", methods=["GET"]) def get_countdown(): with countdown_lock: return jsonify({"countdown": countdown_value}) @app.route('/get_command', methods=['GET']) def get_command(): return jsonify({ "status": "success", "command": stored_command }) @socketio.on("change_awaken") def change_awaken(command): try: loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) task = asyncio.ensure_future(send_message(command, 8766)) loop.run_until_complete(task) except Exception as e: print(f"Error: {e}") @app.route("/get_awaken", methods=["GET"]) def get_awaken(): try: # 文件路径 file_path = '../Language/Hotword_awaker/resource/keyword-nhxd.txt' # 使用相对路径 print("File path:", os.path.abspath(file_path)) # 打印文件的绝对路径 # 检查文件是否存在 if not os.path.exists(file_path): return jsonify({"error": "File not found"}), 404 # 读取文件内容 with open(file_path, 'r', encoding='utf-8') as file: content = file.read() # 使用正则表达式提取字符串 match = re.match(r"([^\;]+);", content) # 提取第一个分号前的字符串 if match: awaken = match.group(1) # 获取匹配的部分 else: awaken = "No valid string found" # 返回提取的字符串 return jsonify({"awaken": awaken}), 200 except Exception as e: return jsonify({"error": str(e)}), 500 @app.route('/get_serial_number', methods=['GET']) def get_serial_number(): try: # 获取当前服务器的主机名 host_name = socket.gethostname() # 这里可以将主机名映射到序列号,或者你可以根据主机名生成某种序列号 # 例如,返回一个假设的序列号,这里简单返回主机名作为序列号 serial_number = host_name.removeprefix('jsfb-') # 返回 JSON 格式的响应 return jsonify({"serial_number": serial_number}) except Exception as e: # 如果发生错误,返回错误信息 return jsonify({"error": str(e)}), 500 # @app.before_request # def before_request(): # if request.url.startswith('https://'): # url = request.url.replace('https://', 'http://', 1) # code = 301 # redirect permanentlyvideo response.headers.add("Access-Control-Allow-Headers", "Content-Type,Authorization") response.headers.add("Access-Control-Allow-Methods", "GET,PUT,POST,DELETE") return response @app.route("/") def index(): return render_template("login.html", time=int(time.time())) @app.route("/home") def home(): return render_template("home.html", time=int(time.time())) @app.route("/health") def health(): global current_mode if current_mode == "manual_mode": return render_template( "select_program.html", time=int(time.time()) ) elif current_mode == "handheld_mode": return render_template( "handheld_mode.html", time=int(time.time()) ) else: return render_template("smart_mode.html", time=int(time.time())) @app.route("/setting") def setting(): return render_template("setting.html", time=int(time.time())) @app.route("/developer") def developer(): return render_template("developer.html", time=int(time.time())) @app.route("/learning") def learning(): return render_template("learning.html", time=int(time.time())) @app.route("/help") def help(): return render_template("help.html", time=int(time.time())) @app.route("/ai_ball") def ai_ball(): return render_template("ai_ball_v2.html", time=int(time.time())) @app.route("/model") def model(): return render_template("3d_model_v6_local.html", time=int(time.time())) @app.route("/ai_chatbot") def ai_chatbot(): return render_template("ai_chatbot.html", time=int(time.time())) @app.route("/dynamic_function") def dynamic_function(): mode = request.args.get('mode', default=None) return render_template("dynamic_function_v2.html", time=int(time.time()), mode=mode) @app.route("/three_d_model") def three_d_model(): return render_template("three_d_model.html", time=int(time.time())) @app.route("/control_panel") def control_panel(): mode = request.args.get('mode', default=None) return render_template("control_panel_v2.html", time=int(time.time()), mode=mode) @app.route("/music") def music(): return render_template("full_music_player.html", time=int(time.time())) @app.route("/switch_mode/") def switch_mode(mode): # session['current_mode'] = mode global current_mode current_mode = mode return redirect(url_for("health")) @app.route("/get-ip") def get_ip(): # 获取服务器的IP host = request.host.split(":")[0] return jsonify({"ip": host}) @app.route("/login", methods=["POST"]) def login(): data = request.json username = data["username"] password = data["password"] # 简单验证示例 if username == "jsfb" and password == "123456": return jsonify(success=True) else: return jsonify(success=False) powerboard = None power_board_ip = None power_board_thread = None power_board_thread_running = False def send_heartbeat(): global power_board_thread_running udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) print(f"Sent heartbeat to {power_board_ip}:3660") try: # 设置socket超时,避免阻塞 udp_socket.settimeout(1.0) while power_board_thread_running: try: if power_board_ip: udp_socket.sendto(b"ALIVE", (power_board_ip, 3660)) # print(f"Sent heartbeat to {power_board_ip}:3660") except socket.timeout: print(f"socket timeout") continue except Exception as e: print(f"Error sending heartbeat: {e}") time.sleep(1) except Exception as e: print(f"Error in heartbeat thread: {e}") finally: # try: # # 发送关闭信号 # if power_board_ip: # for _ in range(3): # 尝试发送3次,确保信号能够送达 # try: # udp_socket.sendto(b"SHUTDOWN", (power_board_ip, 3660)) # print(f"Sent shutdown signal to {power_board_ip}:3660") # time.sleep(0.1) # 短暂延时,避免发送过快 # except Exception as e: # print(f"Error sending shutdown signal: {e}") # finally: # udp_socket.close() udp_socket.close() @app.route("/power_board", methods=["POST"]) def power_board(): global power_board_ip, power_board_thread, power_board_thread_running ip = request.form.get("ip") # 如果收到空IP,停止现有线程 if not ip: if power_board_thread_running: power_board_thread_running = False if power_board_thread: try: power_board_thread.join(timeout=2.0) # 设置超时时间,避免永久阻塞 except Exception as e: print(f"Error stopping power board thread: {e}") power_board_thread = None power_board_ip = None return jsonify({"status": "success", "message": "Heartbeat stopped"}) # 如果收到新IP power_board_ip = ip print("power_board ip: ", power_board_ip) # 如果线程不存在或已停止,创建新线程 if not power_board_thread or not power_board_thread.is_alive(): power_board_thread_running = True power_board_thread = threading.Thread(target=send_heartbeat) power_board_thread.daemon = True # 设置为守护线程,这样主程序退出时线程会自动结束 power_board_thread.start() return jsonify({"status": "success", "message": "Heartbeat started"}) @app.route("/massage_control", methods=["POST"]) def massage_control(): action = request.form.get("action") print("massage_control action: ", action) sudo_password = "jsfb" if action: if action == "start": action = "restart" try: command = "connect_arm" loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) task = asyncio.ensure_future(send_message(command, 8766)) loop.run_until_complete(task) except Exception as e: print(f"Error: {e}") if action == "stop": try: command = "disconnect_arm" loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) task = asyncio.ensure_future(send_message(command, 8766)) loop.run_until_complete(task) # 发送状态更新通知前端 global display_image_type global display_image_url display_image_type =0 display_image_url = 'static/images/smart_mode/back.png' socketio.emit("update_massage_status", { "progress": 0, "force": '', "temperature": '', "gear": '', "shake": '', "is_massaging": False, "current_task": "", "task_time": "", "current_head": "", "body_part": "back", "massage_service_started": False, "press": '', "frequency": '', "speed": '', "direction": '', "massage_state": '', "is_acupoint": False, "is_pause": False, "manual_stage": 0, "start_pos": "", "end_pos": "", "massage_path": "" }) except Exception as e: print(f"Error: {e}") # 处理本地关机 if action == "shutdown": success = system_shutdown() if success: return jsonify({ "status": "success", "message": "Remote and local systems shutdown successfully." }) else: return jsonify({ "status": "error", "message": "Failed to execute shutdown process" }), 500 # 非 shutdown 操作 else: command = f'echo {sudo_password} | /bin/sh -c "/usr/bin/sudo -S /bin/systemctl {action} massage"' try: result = subprocess.run( command, shell=True, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, ) print(result.stdout.decode()) print(f"Service {action} successfully.") except subprocess.CalledProcessError as e: print(f"Error when {action} service: {e.stderr.decode()}") return jsonify({"status": "success"}) else: return jsonify({"status": "error", "message": "Invalid action"}), 400 # 文件服务器的基本信息 obs = HW_OBS() @app.route("/get_versions", methods=["GET"]) def get_versions(): return get_local_versions() @app.route("/get_remote_versions", methods=["GET"]) def get_remote_versions_route(): return get_remote_versions(obs) @app.route("/download_package", methods=["POST"]) def download_package(): data = request.get_json() version = data.get("version") return start_download_package(version, obs, socketio) @app.route("/get_current_version", methods=["GET"]) def get_current_version_route(): return get_current_version(__file__) @app.route("/switch_version", methods=["POST"]) def switch_version_route(): data = request.json version = data.get("version") return switch_version(version) # VortXDB版本控制路由 @app.route("/get_vtx_versions", methods=["GET"]) def get_vtx_versions_route(): return get_vtx_local_versions() @app.route("/get_vtx_remote_versions", methods=["GET"]) def get_vtx_remote_versions_route(): return get_vtx_remote_versions(obs) @app.route("/download_vtx_package", methods=["POST"]) def download_vtx_package(): data = request.get_json() version = data.get("version") return start_vtx_download_package(version, obs, socketio) @app.route("/get_vtx_current_version", methods=["GET"]) def get_vtx_current_version_route(): return get_vtx_current_version(vtxdb) @app.route("/switch_vtx_version", methods=["POST"]) def switch_vtx_version_route(): data = request.json version = data.get("version") return switch_vtx_version(version) @app.route("/sync_user_data", methods=["POST"]) def sync_user_data_route(): data = request.json source_version = data.get("version") current_version = data.get("current_version") return sync_user_data(source_version, current_version) @app.route("/on_message", methods=["POST"]) def on_message(): try: # 获取 POST 请求中的 form 数据 message = request.form.get( "message", "" ).strip() # 从表单数据中获取 'message' 字段,并去掉多余的空白字符 if not message: # 如果 message 为空或只包含空格,返回错误响应,但不触发弹窗 return ( jsonify( { "status": "error", "message": "Message cannot be empty, no popup triggered", } ), 400, ) # 使用 Socket.IO 向客户端发送消息 socketio.emit("on_message", {"message": message}) # 返回成功的响应 return jsonify( {"status": "success", "message": "Popup triggered with message: " + message} ) except Exception as e: # 捕获所有异常并返回服务器错误 return ( jsonify({"status": "error", "message": "An error occurred: " + str(e)}), 500, ) # 定义 static 文件夹的路径 STATIC_FOLDER = os.path.join(app.root_path, "static") display_image_url = "static/images/smart_mode/back.png" display_image_type = 0 # 定义接收 POST 请求的路由 @app.route("/change_image", methods=["POST"]) def change_image(): global display_image_url global display_image_type # 从请求中获取图片路径 image_path = request.form.get("path") image_type = request.form.get("type", None) # 检查路径是否存在 if not image_path or not os.path.exists(image_path): return jsonify({"error": "Invalid image path"}), 400 # 获取文件名 filename = os.path.basename(image_path) # 定义目标目录路径(static/images/tmp_images/) target_dir = os.path.join(STATIC_FOLDER, "images", "tmp_images") # 如果目标目录不存在,则创建它 if not os.path.exists(target_dir): try: os.makedirs(target_dir) # 创建目录(包括父目录) except Exception as e: return jsonify({"error": f"Failed to create directory: {str(e)}"}), 500 # 定义目标路径,将文件复制到 static 目录下 target_path = os.path.join(target_dir, filename) try: # 复制文件到 static 目录 shutil.copy(image_path, target_path) except Exception as e: return jsonify({"error": f"Failed to copy file: {str(e)}"}), 500 # 生成文件的 URL file_url = "static/images/tmp_images/" + filename # file_url = url_for('static', filename='images/' + filename, _external=True) display_image_url = file_url display_image_type = image_type # 使用 Socket.IO 发送 URL 给前端 socketio.emit("change_image", {"path": file_url, "type": image_type}) # 返回图片的 URL return jsonify({"url": file_url, "type": image_type}), 200 # 服务器相关信息 SERVER_IP = "8.138.8.114" # 替换为您的服务器 IP SERVER_USER = "root" # 替换为 root 用户 SERVER_PASSWORD = "JuShenFengBao209" # 替换为 root 用户密码 REMOTE_PATH = "/var/www/html/devices" reverse_ssh = ReverseSSH(SERVER_IP, SERVER_USER, SERVER_PASSWORD) # 开启反向隧道 @app.route("/start_tunnel", methods=["GET"]) def start_tunnel(): result = reverse_ssh.start_tunnel() return jsonify(result) # 关闭反向隧道 @app.route("/stop_tunnel", methods=["GET"]) def stop_tunnel(): result = reverse_ssh.stop_tunnel() return jsonify(result) # 定义一个路由,根据请求参数执行不同的 shell 脚本 @app.route("/run-script", methods=["POST"]) def run_script(): print("run-script") try: # 从前端获取 JSON 格式的请求体 data = request.json script_type = data.get("script_type") # base_dir = "/home/jsfb/jsfb_ws" # 获取当前脚本执行的绝对路径 # base_path = os.path.dirname(os.path.abspath(__file__)) # base_path = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) # 获取当前脚本执行的绝对路径 current_path = os.path.abspath(__file__) # 获取当前脚本所在目录的上一级目录 base_path = os.path.dirname(os.path.dirname(current_path)) print("动态目录:", base_path) # base_path = os.path.join(base_dir, "MassageRobot_aubo") # print(f"Base path: {base_path}") # script_path = os.path.join(base_path, f"../my_script.sh") # 根据请求参数执行不同的 shell 脚本 if script_type == "zh": script_path = os.path.join(base_path, "setup.sh") # 确保路径是规范化的(处理相对路径) script_path = os.path.normpath(script_path) # 检查脚本路径是否存在 if not os.path.exists(script_path): print(f"Script not found: {script_path}, 404") return jsonify({"error": f"Script not found: {script_path}"}), 404 # 执行 shell 脚本 # subprocess.Popen( # ["gnome-terminal", "--", "bash", "-c", f"cd {base_path} && bash setup.sh; exec bash"], # cwd=base_path # ) subprocess.Popen( ["gnome-terminal", "--", "bash", "-c", f"cd {base_path} && bash setup.sh; exit"], cwd=base_path ) elif script_type == "jp": script_path = os.path.join(base_path, "setup_JP.sh") # 确保路径是规范化的(处理相对路径) script_path = os.path.normpath(script_path) # 检查脚本路径是否存在 if not os.path.exists(script_path): print(f"Script not found: {script_path}, 404") return jsonify({"error": f"Script not found: {script_path}"}), 404 # 执行 shell 脚本 # subprocess.Popen( # ["gnome-terminal", "--", "bash", "-c", f"cd {base_path} && bash setup_JP.sh; exec bash"], # cwd=base_path # ) subprocess.Popen( ["gnome-terminal", "--", "bash", "-c", f"cd {base_path} && bash setup_JP.sh; exit"], cwd=base_path ) elif script_type == "en": script_path = os.path.join(base_path, "setup_EN.sh") # 确保路径是规范化的(处理相对路径) script_path = os.path.normpath(script_path) # 检查脚本路径是否存在 if not os.path.exists(script_path): print(f"Script not found: {script_path}, 404") return jsonify({"error": f"Script not found: {script_path}"}), 404 # 执行 shell 脚本 # subprocess.Popen( # ["gnome-terminal", "--", "bash", "-c", f"cd {base_path} && bash setup_EN.sh; exec bash"], # cwd=base_path # ) subprocess.Popen( ["gnome-terminal", "--", "bash", "-c", f"cd {base_path} && bash setup_EN.sh; exit"], cwd=base_path ) elif script_type == "ko": script_path = os.path.join(base_path, "setup_KO.sh") # 确保路径是规范化的(处理相对路径) script_path = os.path.normpath(script_path) # 检查脚本路径是否存在 if not os.path.exists(script_path): print(f"Script not found: {script_path}, 404") return jsonify({"error": f"Script not found: {script_path}"}), 404 # 执行 shell 脚本 # subprocess.Popen( # ["gnome-terminal", "--", "bash", "-c", f"cd {base_path} && bash setup_KO.sh; exec bash"], # cwd=base_path # ) subprocess.Popen( ["gnome-terminal", "--", "bash", "-c", f"cd {base_path} && bash setup_KO.sh; exit"], cwd=base_path ) else: print("Invalid script type provided, 400") return jsonify({"error": "Invalid script type provided"}), 400 # 返回脚本执行结果 return jsonify( { "message": "Script executed successfully", } ) except Exception as e: print(f"Error running script: {e}, 500") return jsonify({"error": str(e)}), 500 @app.route('/set_zero', methods=['POST']) def set_zero(): try: # 检查服务是否运行 if not is_service_running("massage.service"): sensor = XjcSensor() max_try = 3 # 设置最大重试次数 delay = 0.5 # 每次重试前的延迟时间 # 尝试调用 set_zero 方法 for attempt in range(max_try): sensor.disable_active_transmission() time.sleep(0.5) result = sensor.set_zero() if result == 0: # 设置成功,返回成功信息 return jsonify({"message": "Set zero success"}), 200 else: # 设置失败,等待并重试 print(f"Set zero attempt {attempt + 1} failed, retrying...") time.sleep(delay) # 如果多次尝试后失败,返回错误信息 print("Set zero failed after multiple attempts.") requests.post("http://127.0.0.1:5000/on_message", data={"message": "传感器初始化失败"}) return jsonify({"message": "Set zero failed after multiple attempts"}), 500 else: return jsonify({"message": "Service is already running, no need to set zero"}), 200 except Exception as e: # 捕获异常并返回错误信息 print(f"Error in /set_zero: {e}") return jsonify({"message": "Set zero failed", "error": str(e)}), 500 # 全局变量,用于存储 AuboC5 实例 aubo_c5 = None @app.route('/power_toggle', methods=['POST']) def power_toggle(): global aubo_c5 try: # 获取 power_state 参数 power_state = request.form.get('power_state') # 检查 power_state 是否有效 if not power_state: return jsonify({"error": "Missing 'power_state' parameter"}), 400 if not is_service_running("massage.service"): # power_on 操作 if power_state == 'power_on': if aubo_c5 is None: # 如果 Aubo_C5 实例还没有创建 aubo_c5 = AuboC5() requests.post("http://127.0.0.1:5000/on_message", data={"message": "上电成功"}) # power_off 操作 elif power_state == 'power_off': if aubo_c5 is not None: # 如果 Aubo_C5 实例已创建 aubo_c5.power_off() # 延迟 5 秒后删除实例 time.sleep(5) # 阻塞主线程 5 秒 aubo_c5 = None # 删除实例 print("Aubo_C5 instance has been deleted after 5 seconds.") # requests.post("http://127.0.0.1:5000/on_message", data={"message": "断电成功"}) return jsonify({"message": "AuboC5 powered off successfully."}) else: return jsonify({"error": "Aubo_C5 is not powered on."}), 400 else: return jsonify({"error": "command is error"}), 400 else: return jsonify({"error": "Invalid power_state value. Use 'power_on' or 'power_off'."}), 400 return jsonify({"message": f"Aubo_C5 {power_state} successfully."}) except Exception as e: print(f"Error in /power_toggle: {e}") return jsonify({"error": f"An error occurred: {str(e)}"}), 500 @app.route('/pack_mode', methods=['POST']) def pack_mode(): global aubo_c5 try: if aubo_c5 is None: # 如果 Aubo_C5 实例还没有创建 aubo_c5 = AuboC5() aubo_c5.pack() requests.post("http://127.0.0.1:5000/on_message", data={"message": "请确认是否到达打包位置,如有异常,请上电后手动调整"}) return jsonify({"message": f"Aubo_C5 mover to pack pos successfully."}) except Exception as e: print(f"Error in /pack_mode: {e}") return jsonify({"error": f"An error occurred: {str(e)}"}), 500 @app.route("/reset-language", methods=["POST"]) def reset_language(): print("reset-language") try: # 获取当前脚本执行的绝对路径 current_path = os.path.abspath(__file__) # 获取当前脚本所在目录的上一级目录 base_path = os.path.dirname(os.path.dirname(current_path)) print("动态目录:", base_path) # 构造 restart_language.sh 脚本路径 script_path = os.path.join(base_path, "restart_language.sh") # 确保路径是规范化的(处理相对路径) script_path = os.path.normpath(script_path) # 检查脚本路径是否存在 if not os.path.exists(script_path): print(f"Script not found: {script_path}, 404") return jsonify({"error": f"Script not found: {script_path}"}), 404 # 执行 restart_language.sh 脚本 subprocess.Popen( ["gnome-terminal", "--", "bash", "-c", f"cd {base_path} && bash restart_language.sh; exit"], cwd=base_path ) # 返回成功响应 return jsonify({ "message": "Restart language script executed successfully in new terminal" }) except Exception as e: print(f"Error running script: {e}, 500") return jsonify({"error": str(e)}), 500 # 初始化 WifiManager 对象 wifi_manager = WifiManager() @app.route('/scan_wifi', methods=['GET']) def scan_wifi(): """ 扫描 Wi-Fi 网络并返回扫描结果 """ try: wifi_networks = wifi_manager.scan_wifi() return jsonify({"status": "success", "wifi_networks": wifi_networks}), 200 except Exception as e: return jsonify({"status": "error", "message": str(e)}), 500 @app.route('/connect_wifi', methods=['POST']) def connect_wifi(): """ 连接到指定的 Wi-Fi 网络 请求体应包含 SSID 和 password """ try: data = request.get_json() ssid = data.get('ssid') password = data.get('password') if not ssid or not password: return jsonify({"status": "error", "message": "SSID 和密码是必需的"}), 400 response = wifi_manager.connect_wifi(ssid, password) return jsonify({"status": "success", "message": response}), 200 except Exception as e: return jsonify({"status": "error", "message": str(e)}), 500 @app.route('/disconnect_wifi', methods=['POST']) def disconnect_wifi(): """ 断开当前的 Wi-Fi 连接 """ try: response = wifi_manager.disconnect_wifi() return jsonify({"status": "success", "message": response}), 200 except Exception as e: return jsonify({"status": "error", "message": str(e)}), 500 @app.route('/get_current_connection', methods=['GET']) def get_current_connection(): """ 获取当前连接的 Wi-Fi 信息 """ try: connection_info = wifi_manager.get_current_connection() return jsonify({"status": "success", "connection_info": connection_info}), 200 except Exception as e: return jsonify({"status": "error", "message": str(e)}), 500 @app.route('/is_connected', methods=['GET']) def is_connected(): """ 判断是否已连接 Wi-Fi """ try: connected = wifi_manager.is_connected() return jsonify({"status": "success", "is_connected": connected}), 200 except Exception as e: return jsonify({"status": "error", "message": str(e)}), 500 # 写入文件并上传到服务器 def write_and_upload_info(ngrok_url, reverse_ssh_port): hostname = socket.gethostname() timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") # 生成用于连接的反向 SSH 命令 reverse_ssh_command = f"ssh jsfb@{SERVER_IP} -p {reverse_ssh_port}" # 文件内容,包括主机名、ngrok URL、在线时间和 SSH 命令 file_content = ( f"Hostname: {hostname}\n" f"Ngrok URL: {ngrok_url}\n" f"Online Time: {timestamp}\n" f"SSH Command: {reverse_ssh_command}" ) file_path = f"../tmp/{hostname}.txt" # 写入文件 with open(file_path, "w") as file: file.write(file_content) # 使用 paramiko 通过 SFTP 上传文件 ssh = paramiko.SSHClient() ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) try: ssh.connect(SERVER_IP, username=SERVER_USER, password=SERVER_PASSWORD) sftp = ssh.open_sftp() sftp.put(file_path, f"{REMOTE_PATH}/{hostname}.txt") print(f"File {file_path} uploaded to {REMOTE_PATH}/{hostname}.txt on server.") finally: sftp.close() ssh.close() # 启动 ngrok 隧道并写入/上传文件 def start_ngrok(port, max_retries=3, retry_interval=2): global ngrok_process for attempt in range(max_retries): print(f"Attempting to start ngrok (Attempt {attempt + 1}/{max_retries})") ngrok_process = subprocess.Popen( ["/usr/local/bin/ngrok", "http", str(port)], stdout=subprocess.PIPE, stderr=subprocess.PIPE, ) time.sleep(retry_interval) # 等待 ngrok 启动 # 检查 ngrok 是否启动成功 try: ngrok_url = get_ngrok_url() print(f"Ngrok started successfully with URL: {ngrok_url}") # write_and_upload_info(ngrok_url) # 写入和上传信息文件 return ngrok_url except Exception as e: print(f"Failed to get ngrok URL: {e}") ngrok_process.terminate() time.sleep(retry_interval) print("Failed to start ngrok after multiple attempts") return "Error" # 获取 ngrok 公共 URL def get_ngrok_url(): try: response = requests.get( "http://localhost:4040/api/tunnels", timeout=5 ) # ngrok 本地 API response.raise_for_status() # 检查是否请求成功 data = response.json() return data["tunnels"][0]["public_url"] except requests.RequestException as e: raise RuntimeError(f"Error fetching ngrok URL: {e}") # Socket.IO 事件处理函数,当客户端连接时启动定时任务 @socketio.on("connect", namespace="/") def connect(): socketio.start_background_task(music_background_thread) zeroconf = Zeroconf() # 变量用于存储是否已经注册了服务 service_registered = False def get_all_local_ips(): """获取本机的所有 IPv4 地址,不进行任何筛选""" interfaces = netifaces.interfaces() # 获取所有网络接口 print("可用网络接口:", interfaces) local_ips = [] for interface in interfaces: addrs = netifaces.ifaddresses(interface) # 获取接口的地址信息 print(f"接口 {interface} 的地址信息: {addrs}") # 检查 IPv4 地址 (AF_INET) if netifaces.AF_INET in addrs: for addr_info in addrs[netifaces.AF_INET]: print(f"接口 {interface} 的 IPv4 地址信息: {addr_info}") ip_addr = addr_info.get("addr") # 直接收集所有的 IPv4 地址 if ip_addr: local_ips.append(ip_addr) if DEBUG_MODE: return ["127.0.0.1"] # 调试用 # 如果没有找到 IPv4 地址,返回一个回环地址列表 return local_ips if local_ips else ["127.0.0.1"] def register_service(local_ips): print(local_ips) global service_registered if service_registered: return # # 确保服务名称唯一,使用 uuid 生成一个唯一标识符 # unique_id = uuid.uuid4() hostname = socket.gethostname() # print(hostname) for ip in local_ips: if ip.startswith("127."): continue if ip.startswith("192.168.100"): continue # 将 IP 地址转换为字节格式 service_info = ServiceInfo( "_http._tcp.local.", # 服务类型 f"LL-X1-{hostname}._http._tcp.local.", # 确保服务名称唯一 addresses=[socket.inet_aton(ip)], # 使用局域网 IP 地址 port=5000, # Flask 监听的端口 properties={}, # 可选的额外服务信息 server=f"{hostname}.local.", # 主机名 ) zeroconf.register_service(service_info, allow_name_change=True) print(f"mDNS 服务已注册,IP 地址:{ip}") service_registered = True # 在应用退出时注销 zeroconf 服务并关闭 ngrok def cleanup(): global ngrok_process print("注销 mDNS 服务并关闭 ngrok...") reverse_ssh.stop_tunnel() if ngrok_process: ngrok_process.terminate() zeroconf.close() # 注销 zeroconf 服务 def fetch_music_list_with_retry(): """后台定时任务:不断尝试获取音乐列表,直到成功""" global all_playlists, playlist, current_song while True: try: # 清空现有的播放列表 all_playlists.clear() # 遍历分类获取音乐数据 for categoryID in categoryIDs: diss_info, music_info_list = get_song_list(categoryID) all_playlists.append( {"diss_info": diss_info, "music_info_list": music_info_list} ) print(diss_info) # 如果数据获取成功,更新播放列表 if all_playlists: playlist = generate_playlist(all_playlists[0]["music_info_list"]) current_song.update( { "album_img_url": playlist[0]["album_img_url"], "music_name": playlist[0]["music_name"], "singer_name": playlist[0]["singer_name"], "is_playing": False, } ) print("音乐列表更新成功!") break # 成功后退出循环 except Exception as e: print(f"获取音乐列表失败,重试中... 错误信息: {e}") print(f"Failed to fetch playlist for category {categoryID}: {e}") time.sleep(5) # 等待5秒后重试 def initialize_app(port): """运行 Flask-SocketIO 之前的初始化逻辑。""" global playlist, current_song, powerboard # local_ips = get_all_local_ips() # register_service(local_ips) retry_count = 0 max_retries = 10 while retry_count < max_retries: local_ips = get_all_local_ips() # 如果local_ips列表的长度大于2,则直接调用register_service if len(local_ips) > 1: register_service(local_ips) print("Service registered successfully with local IPs:", local_ips) break # 如果列表长度不足2,则进行重试 print(f"Retrying... Current IP list length: {len(local_ips)}") retry_count += 1 time.sleep(3) # 等待3秒后重试 # 如果重试超过5次仍然只有1个IP,执行register_service并打印警告 if len(local_ips) == 1: print(f"Warning: Only one local IP found after {max_retries} retries. Registering with this IP.") register_service(local_ips) # 初始化PowerBoard并设置回调 powerboard = PowerBoard() powerboard.set_callback(system_shutdown) powerboard.start_monitoring() # # 获取最新音乐数据 # global latest_music # try: # latest_music = fetch_latest_music() # except Exception as e: # print(f"Failed to fetch latest music data: {e}") # try: # # 尝试初始化音乐列表 # fetch_music_list_with_retry() # except Exception as e: # print(f"初始化音乐列表失败:{e}") # # 启动一个后台任务,不断尝试获取音乐数据 # socketio.start_background_task(fetch_music_list_with_retry) # try: # # 启动 ngrok 隧道并获取 URL # ngrok_url = '' # print("Ngrok URL:", ngrok_url) # result = reverse_ssh.start_tunnel() # print(result) # reverse_ssh_port = result["port"] # write_and_upload_info(ngrok_url,reverse_ssh_port) # 写入和上传信息文件 # except RuntimeError as e: # print("Error:", e) def system_shutdown(): """执行系统关机流程,包括远程主机关机、电源板关机和本地系统关机""" remote_host = "192.168.100.20" # 远程主机的 IP 地址 remote_username = "root" # 远程主机的用户名 remote_password = "bestcobot" # 远程主机的密码 remote_port = 8822 # 远程主机的 SSH 端口 sudo_password = "jsfb" try: # 1. 先执行远程主机关机命令 if not is_host_down(remote_host, 8822): print("Remote host is alive, processing shutdown") remote_command = "shutdown -h now" stdout, stderr = execute_command_on_remote( remote_host, remote_username, remote_password, remote_command, port=remote_port, ) # 2. 等待远程主机关机并检测 print("Waiting for remote host to shutdown...") if not is_host_down(remote_host, 8822): print("Remote host did not shutdown in time") return False # 3. 处理电源板关机 global powerboard, power_board_ip, power_board_thread, power_board_thread_running if power_board_thread_running: power_board_thread_running = False if power_board_thread: try: power_board_thread.join(timeout=2.0) except Exception as e: print(f"Error stopping power board thread during shutdown: {e}") power_board_thread = None # 4. 发送关机信号到电源板 if power_board_ip: udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) udp_socket.settimeout(1.0) for _ in range(3): try: udp_socket.sendto(b"SHUTDOWN", (power_board_ip, 3660)) print(f"Sent shutdown signal to {power_board_ip}:3660") time.sleep(0.1) except Exception as e: print(f"Error sending shutdown signal: {e}") power_board_ip = None powerboard.send_cmd("UI-SHUTDOWN") # 5. 执行本地系统关机 command = f'echo {sudo_password} | /bin/sh -c "/usr/bin/sudo -S /sbin/shutdown now"' result = subprocess.run( command, shell=True, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, ) print("System is shutting down.") return True except Exception as e: print(f"Error during shutdown process: {e}") return False initialize_app(5000) def check_network(): try: # 测试连接到公共 DNS 服务器 socket.create_connection(("8.8.8.8", 53), timeout=5) return True except OSError: return False # 全局变量用于存储设备位置信息 device_location = { 'latitude': None, 'longitude': None, 'accuracy': None, 'timestamp': None } @app.route('/update_device_location', methods=['POST']) def update_device_location(): """ 更新设备位置信息并自动获取天气数据 请求体示例: { "latitude": 35.123456, "longitude": 139.789012, "accuracy": 10.5 } """ try: location_data = request.json print(f"Received location data: {location_data}") # 检查必要的位置数据是否存在 required_fields = ['latitude', 'longitude'] if not all(field in location_data for field in required_fields): return jsonify({ 'status': 'error', 'message': 'Missing required location data fields' }), 400 # 更新全局位置信息 global device_location device_location.update({ 'latitude': location_data.get('latitude'), 'longitude': location_data.get('longitude'), 'accuracy': location_data.get('accuracy'), 'timestamp': datetime.now().strftime("%Y-%m-%d %H:%M:%S") }) # 打印位置信息 print(f"Device Location Updated: {device_location}") # 获取主机名作为设备名称 hostname = socket.gethostname() # 获取软件版本信息 version_response = get_local_versions() version_data = version_response.json # 使用.json属性获取数据 versions = version_data.get('versions', []) software_version = versions[0] if versions else "default" # 使用第一个版本,如果没有则默认为1.0 vtxdb_version = vtxdb.get_version() iot_config = vtxdb.get("robot_config","IoT") # 准备发送到物联网平台的数据 iot_data = { "device_name": hostname, "device_id": f"{iot_config['product_id']}_{hostname}", "longitude": location_data.get('longitude'), "latitude": location_data.get('latitude'), "produce_name": "RS-LL-X1", "product_id": iot_config['product_id'], "software_version": software_version, "vtxdb_version": vtxdb_version, "timestamp": device_location['timestamp'] } # 发送位置信息到物联网平台 try: iot_response = requests.post( "http://app.robotstorm.tech:8080/iot/location", json=iot_data, timeout=5 # 设置5秒超时 ) print(f"IoT platform response: {iot_response.status_code}") except Exception as iot_error: print(f"Error sending data to IoT platform: {iot_error}") # 异步获取天气数据 weather_updated = False if should_update_weather(device_location['latitude'], device_location['longitude']): try: weather_results = asyncio.run(get_all_weather_data( device_location['latitude'], device_location['longitude'] )) if all(weather_results.values()): update_weather_cache( weather_results, device_location['latitude'], device_location['longitude'] ) weather_updated = True print("Weather data updated successfully") except Exception as weather_error: print(f"Error updating weather data: {weather_error}") return jsonify({ 'status': 'success', 'message': 'Location data updated successfully' + (' and weather data refreshed' if weather_updated else ''), 'data': { 'location': device_location, 'weather_updated': weather_updated } }), 200 except Exception as e: print(f"Error processing location data: {e}") return jsonify({ 'status': 'error', 'message': f'Failed to process location data: {str(e)}' }), 500 # 坐标系转换工具函数 def wgs84_to_gcj02(lat, lng): """ WGS84坐标系转GCJ02坐标系 """ if not lat or not lng: return None, None a = 6378245.0 # 长半轴 ee = 0.00669342162296594323 # 偏心率平方 def transform_lat(x, y): ret = -100.0 + 2.0 * x + 3.0 * y + 0.2 * y * y + 0.1 * x * y + 0.2 * math.sqrt(abs(x)) ret += (20.0 * math.sin(6.0 * x * math.pi) + 20.0 * math.sin(2.0 * x * math.pi)) * 2.0 / 3.0 ret += (20.0 * math.sin(y * math.pi) + 40.0 * math.sin(y / 3.0 * math.pi)) * 2.0 / 3.0 ret += (160.0 * math.sin(y / 12.0 * math.pi) + 320 * math.sin(y * math.pi / 30.0)) * 2.0 / 3.0 return ret def transform_lng(x, y): ret = 300.0 + x + 2.0 * y + 0.1 * x * x + 0.1 * x * y + 0.1 * math.sqrt(abs(x)) ret += (20.0 * math.sin(6.0 * x * math.pi) + 20.0 * math.sin(2.0 * x * math.pi)) * 2.0 / 3.0 ret += (20.0 * math.sin(x * math.pi) + 40.0 * math.sin(x / 3.0 * math.pi)) * 2.0 / 3.0 ret += (150.0 * math.sin(x / 12.0 * math.pi) + 300.0 * math.sin(x / 30.0 * math.pi)) * 2.0 / 3.0 return ret dLat = transform_lat(lng - 105.0, lat - 35.0) dLng = transform_lng(lng - 105.0, lat - 35.0) radLat = lat / 180.0 * math.pi magic = math.sin(radLat) magic = 1 - ee * magic * magic sqrtMagic = math.sqrt(magic) dLat = (dLat * 180.0) / ((a * (1 - ee)) / (magic * sqrtMagic) * math.pi) dLng = (dLng * 180.0) / (a / sqrtMagic * math.cos(radLat) * math.pi) gcj_lat = lat + dLat gcj_lng = lng + dLng return gcj_lat, gcj_lng def gcj02_to_bd09(lat, lng): """ GCJ02坐标系转BD09坐标系 """ if not lat or not lng: return None, None x_pi = math.pi * 3000.0 / 180.0 z = math.sqrt(lng * lng + lat * lat) + 0.00002 * math.sin(lat * x_pi) theta = math.atan2(lat, lng) + 0.000003 * math.cos(lng * x_pi) bd_lng = z * math.cos(theta) + 0.0065 bd_lat = z * math.sin(theta) + 0.006 return bd_lat, bd_lng @app.route('/get_device_location', methods=['GET']) def get_device_location(): """ 获取设备位置信息,支持不同坐标系 可选参数 coordinate_system: - wgs84 (默认): 原始GPS坐标 - gcj02: 国测局坐标系 - bd09: 百度坐标系 GET /get_device_location # 或 GET /get_device_location?coordinate_system=wgs84 GET /get_device_location?coordinate_system=gcj02 GET /get_device_location?coordinate_system=bd09 """ try: global device_location coordinate_system = request.args.get('coordinate_system', 'wgs84').lower() # 检查是否有位置信息 if not device_location['latitude'] or not device_location['longitude']: return jsonify({ 'status': 'error', 'message': 'No location data available' }), 404 # 获取原始WGS84坐标 wgs84_lat = device_location['latitude'] wgs84_lng = device_location['longitude'] # 根据请求的坐标系进行转换 if coordinate_system == 'gcj02': lat, lng = wgs84_to_gcj02(wgs84_lat, wgs84_lng) coordinate_type = 'GCJ02' elif coordinate_system == 'bd09': gcj02_lat, gcj02_lng = wgs84_to_gcj02(wgs84_lat, wgs84_lng) lat, lng = gcj02_to_bd09(gcj02_lat, gcj02_lng) coordinate_type = 'BD09' else: # wgs84 lat, lng = wgs84_lat, wgs84_lng coordinate_type = 'WGS84' return jsonify({ 'status': 'success', 'data': { 'latitude': lat, 'longitude': lng, 'accuracy': device_location['accuracy'], 'timestamp': device_location['timestamp'], 'coordinate_system': coordinate_type } }), 200 except Exception as e: print(f"Error getting location data: {e}") return jsonify({ 'status': 'error', 'message': f'Failed to get location data: {str(e)}' }), 500 # 和风天气API配置 QWEATHER_API_KEY = "79b35822391e4b999dbda0f90dfc1dcf" # 请替换为实际的API密钥 QWEATHER_API_BASE = "https://devapi.qweather.com/v7" # 免费api QWEATHER_GEO_API_BASE = "https://geoapi.qweather.com/v2" # GeoAPI接口 # 全局变量存储天气数据 weather_data = { 'now': None, # 实时天气 'hourly': None, # 逐小时预报 'daily': None, # 每日预报 'last_update': None, # 最后更新时间 'location': None, # 最后请求的位置 'geo_info': None # 地理位置信息 } async def fetch_weather_data(session, url): """异步获取天气数据""" try: async with session.get(url) as response: return await response.json() except Exception as e: print(f"Error fetching weather data: {e}") return None async def get_all_weather_data(latitude, longitude): """并发获取所有天气数据和地理信息""" base_params = f"location={longitude},{latitude}&key={QWEATHER_API_KEY}" urls = [ f"{QWEATHER_API_BASE}/grid-weather/now?{base_params}", f"{QWEATHER_API_BASE}/grid-weather/24h?{base_params}", f"{QWEATHER_API_BASE}/grid-weather/7d?{base_params}" ] async with aiohttp.ClientSession() as session: # 创建所有任务,包括天气数据和地理信息 weather_tasks = [fetch_weather_data(session, url) for url in urls] geo_task = fetch_geo_info(session, latitude, longitude) # 并发执行所有任务 all_tasks = weather_tasks + [geo_task] results = await asyncio.gather(*all_tasks) # 分离结果 weather_results = results[:3] geo_result = results[3] return { 'now': weather_results[0], 'hourly': weather_results[1], 'daily': weather_results[2], 'geo': geo_result } def update_weather_cache(weather_results, latitude, longitude): """更新天气数据缓存""" global weather_data weather_data.update({ 'now': weather_results['now'], 'hourly': weather_results['hourly'], 'daily': weather_results['daily'], 'geo_info': weather_results['geo'], 'last_update': datetime.now(), 'location': {'latitude': latitude, 'longitude': longitude} }) def should_update_weather(latitude=None, longitude=None, force_update=False): """检查是否需要更新天气数据""" if force_update: return True if not weather_data['last_update']: return True # 检查时间是否过期(1小时更新一次) time_expired = (datetime.now() - weather_data['last_update']) > timedelta(hours=1) # 检查位置是否改变(超过0.01度则认为位置改变) location_changed = False if latitude and longitude and weather_data['location']: old_lat = weather_data['location']['latitude'] old_lng = weather_data['location']['longitude'] location_changed = ( abs(latitude - old_lat) > 0.01 or abs(longitude - old_lng) > 0.01 ) return time_expired or location_changed async def fetch_geo_info(session, latitude, longitude): """异步获取地理位置信息""" try: url = f"{QWEATHER_GEO_API_BASE}/city/lookup?location={longitude},{latitude}&key={QWEATHER_API_KEY}" async with session.get(url) as response: return await response.json() except Exception as e: print(f"Error fetching geo info: {e}") return None @app.route('/get_weather', methods=['GET']) def get_weather(): """ 获取天气信息 可选参数: - force_update: 是否强制更新天气数据 - coordinate_system: 坐标系统(wgs84/gcj02/bd09) # 使用默认设置获取天气 GET /get_weather # 强制更新天气数据 GET /get_weather?force_update=true # 使用特定坐标系 GET /get_weather?coordinate_system=gcj02 """ try: force_update = request.args.get('force_update', '').lower() == 'true' coordinate_system = request.args.get('coordinate_system', 'wgs84').lower() # 获取当前位置 global device_location if not device_location['latitude'] or not device_location['longitude']: return jsonify({ 'status': 'error', 'message': 'No location data available' }), 404 # 根据坐标系统获取正确的经纬度 if coordinate_system == 'gcj02': lat, lng = wgs84_to_gcj02(device_location['latitude'], device_location['longitude']) elif coordinate_system == 'bd09': gcj02_lat, gcj02_lng = wgs84_to_gcj02(device_location['latitude'], device_location['longitude']) lat, lng = gcj02_to_bd09(gcj02_lat, gcj02_lng) else: # wgs84 lat, lng = device_location['latitude'], device_location['longitude'] # 检查是否需要更新天气数据 if should_update_weather(lat, lng, force_update): # 异步获取所有天气数据和地理信息 weather_results = asyncio.run(get_all_weather_data(lat, lng)) # 检查是否所有请求都成功 if not all(weather_results.values()): return jsonify({ 'status': 'error', 'message': 'Failed to fetch weather data' }), 500 # 更新缓存 update_weather_cache(weather_results, lat, lng) # 返回天气数据 return jsonify({ 'status': 'success', 'data': { 'now': weather_data['now'], 'hourly': weather_data['hourly'], 'daily': weather_data['daily'], 'geo_info': weather_data['geo_info'], 'last_update': weather_data['last_update'].strftime("%Y-%m-%d %H:%M:%S") if weather_data['last_update'] else None, 'location': { 'latitude': lat, 'longitude': lng, 'coordinate_system': coordinate_system.upper() } } }), 200 except Exception as e: print(f"Error getting weather data: {e}") return jsonify({ 'status': 'error', 'message': f'Failed to get weather data: {str(e)}' }), 500 @app.route("/get_demo_video") def get_demo_video(): path = "static/images/video/demo.m4v" range_header = request.headers.get("Range", None) if not range_header: return Response(open(path, "rb"), mimetype="video/mp4") # 解析 Range 请求头 size = os.path.getsize(path) byte_range = range_header.split("=")[1] start, end = byte_range.split("-") start = int(start) end = int(end) if end else size - 1 chunk_size = 1024 * 1024 # 1MB 片段 with open(path, "rb") as f: f.seek(start) data = f.read(end - start + 1) headers = { "Content-Range": f"bytes {start}-{end}/{size}", "Accept-Ranges": "bytes", "Content-Length": str(end - start + 1), "Content-Type": "video/mp4", } return Response(data, status=206, headers=headers) @app.route('/get_volume', methods=['GET']) def get_volume(): """ 获取当前音量并返回 """ print("访问了 /get_volume 路由") # 添加调试日志 current_volume = volume_control.get_current_volume() if current_volume is not None: return jsonify({"current_volume": current_volume}), 200 else: return jsonify({"error": "无法获取当前音量"}), 500 @app.route('/adjust_volume', methods=['POST']) def adjust_volume(): try: # 获取请求体中的指令和第二个参数(是否发送到前端) adjust_volumn_result = request.json.get('adjust_volumn_result') notify_frontend = request.json.get('notify_frontend', True) # 默认值为 True,如果没有传递则会发送通知 if adjust_volumn_result is None: return jsonify({"error": "没有提供调整指令"}), 400 # 调用 volume_control 模块进行音量调整 volume_control.adjust_volume(adjust_volumn_result) # 如果 `notify_frontend` 为 True,则发送通知到前端 if notify_frontend: socketio.emit('volume_adjusted', {'message': f"音量已调整为 {adjust_volumn_result}"}) return jsonify({"status": "success", "message": f"音量已调整为 {adjust_volumn_result}"}), 200 except Exception as e: return jsonify({"error": str(e)}), 500 def send_audio_command(command, port=8766): try: # 输出调试信息 print(f"正在发送命令: {command} 到端口: {port}") # 创建新的事件循环 loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) # 确保任务已创建 task = asyncio.ensure_future(send_message(command, port)) # 等待任务完成 loop.run_until_complete(task) # 输出发送命令后的成功信息 print(f"命令 {command} 成功发送到端口 {port}") except Exception as e: print(f"发送命令时发生错误: {e}") @socketio.on("speech_audio_control") def speech_audio_control(data): if "start" in data: command = "speech_audio_start" send_audio_command(command) elif "cancel" in data: command = "speech_audio_cancel" send_audio_command(command) @app.route('/upload_speak', methods=['POST']) def upload_speak(): try: # 检查是否有文件上传 if 'audio' not in request.files: return jsonify({'error': 'No audio file provided'}), 400 audio_file = request.files['audio'] if audio_file.filename == '': return jsonify({'error': 'No selected file'}), 400 # 确保 ../tmp 目录存在 tmp_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', 'tmp')) os.makedirs(tmp_dir, exist_ok=True) # 删除旧的音频文件 for old_file in os.listdir(tmp_dir): if old_file.startswith("speak_") and old_file.endswith(".mp3"): old_file_path = os.path.join(tmp_dir, old_file) try: os.remove(old_file_path) print(f"Deleted old audio file: {old_file_path}") except Exception as e: print(f"Error deleting old audio file {old_file_path}: {e}") # 生成带时间戳的文件名 timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") filename = f"speak_{timestamp}.mp3" file_path = os.path.join(tmp_dir, filename) # 保存文件 audio_file.save(file_path) print(f"Audio file saved to: {file_path}") # 发送指令到端口 8766,告知新的音频文件上传并发送文件绝对路径 command = f"speech_audio_stop:{file_path}" send_audio_command(command) # 使用公共函数发送指令 return jsonify({'message': 'Audio file uploaded successfully', 'filename': filename}), 200 except Exception as e: print(f"Error uploading audio file: {e}") return jsonify({'error': str(e)}), 500 @app.route('/heartbeat') def heartbeat(): return 'OK' @app.route("/upload_playlist_image", methods=["POST"]) def upload_playlist_image(): try: if 'image' not in request.files: return jsonify({"status": "error", "message": "No image file uploaded"}), 400 image_file = request.files['image'] image_stream = io.BytesIO(image_file.read()) image = Image.open(image_stream) cv_image = cv2.cvtColor(np.array(image), cv2.COLOR_RGB2BGR) decoded_objects = decode(cv_image) if not decoded_objects: return jsonify({"status": "error", "message": "No QR code found in image"}), 400 qr_data = decoded_objects[0].data.decode('utf-8') print("Original QR URL:", qr_data) # 替换原始QR码URL中的域名 if qr_data.startswith('https://c6.y.qq.com/'): qr_data = qr_data.replace('https://c6.y.qq.com/', 'https://robotstorm.tech/c6yqq/') headers = { 'User-Agent': 'Mozilla/5.0 (iPhone; CPU iPhone OS 14_0 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.0 Mobile/15E148 Safari/604.1', 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8', 'Accept-Language': 'en-US,en;q=0.5', 'Connection': 'keep-alive', } response = requests.get(qr_data, headers=headers, allow_redirects=True) final_url = response.url # 替换重定向后URL中的域名 if final_url.startswith('https://y.qq.com/'): final_url = final_url.replace('https://y.qq.com/', 'https://robotstorm.tech/yqq/') print("Final URL:", final_url) parsed_url = urlparse(final_url) query_params = parse_qs(parsed_url.query) playlist_id = query_params.get('id', [None])[0] if not playlist_id: path_parts = parsed_url.path.split('/') for part in path_parts: if part.isdigit(): playlist_id = part break if not playlist_id: return jsonify({"status": "error", "message": "No playlist ID found in QR code"}), 400 print("Found playlist ID:", playlist_id) global categoryIDs if playlist_id not in categoryIDs: categoryIDs.insert(0, playlist_id) # 保存更新后的歌单ID到配置文件 if not save_playlist_ids(categoryIDs): return jsonify({"status": "error", "message": "Failed to save playlist configuration"}), 500 # Clear the playlists cache to force refresh global all_playlists all_playlists = [] return jsonify({"status": "success", "message": "Playlist added successfully"}) except Exception as e: print(f"Error processing playlist image: {e}") return jsonify({"status": "error", "message": str(e)}), 500 @app.route("/delete_playlist", methods=["POST"]) def delete_playlist(): try: data = request.get_json() category_id = data.get('categoryId') if not category_id: return jsonify({"status": "error", "message": "未提供歌单ID"}), 400 global categoryIDs, all_playlists if category_id in categoryIDs: categoryIDs.remove(category_id) # 保存更新后的歌单ID到配置文件 if not save_playlist_ids(categoryIDs): return jsonify({"status": "error", "message": "Failed to save playlist configuration"}), 500 # 从 all_playlists 中删除对应的歌单 all_playlists = [p for p in all_playlists if p['diss_info'].get('categoryId') != category_id] return jsonify({"status": "success", "message": "歌单删除成功"}) else: return jsonify({"status": "error", "message": "歌单不存在"}), 404 except Exception as e: print(f"Error deleting playlist: {e}") return jsonify({"status": "error", "message": str(e)}), 500 @app.route('/api/license/use', methods=['POST']) def use_license(): # 调用模块中的逻辑,获取返回结果 result = license_module.use_license() # 如果返回的是错误,返回500状态码 if 'error' in result: return jsonify(result), 500 else: return jsonify(result), 200 # 返回成功的数据 @app.route('/api/license/check', methods=['GET']) def check_license(): # 调用模块中的逻辑,获取返回结果 result = license_module.check_license() # 如果返回的是错误,返回500状态码 if 'error' in result: return jsonify(result), 500 else: return jsonify(result), 200 # 返回成功的数据 @app.route('/api/license/info', methods=['GET']) def get_license_info(): # 调用模块中的逻辑,获取返回结果 result = license_module.get_license_info() # 如果返回的是错误,返回500状态码 if 'error' in result: return jsonify(result), 500 else: return jsonify(result), 200 # 返回成功的数据 # 设置深度思考状态的接口 @app.route("/set_deep_thought", methods=["POST"]) def set_deep_thought(): try: # 获取请求数据,设定深度思考状态 data = request.json status = data.get("status", False) # 设置深度思考状态 set_deep_thought_status(status) return jsonify({"status": "success", "message": f"Deep thought status set to {status}"}), 200 except Exception as e: return jsonify({"status": "error", "message": str(e)}), 500 # 获取深度思考状态的接口 @app.route("/get_deep_thought", methods=["GET"]) def get_deep_thought(): try: # 获取深度思考状态 status = get_deep_thought_status() return jsonify({"status": "success", "deep_thought_active": status}), 200 except Exception as e: return jsonify({"status": "error", "message": str(e)}), 500 # 设置联网搜索状态的接口 @app.route("/set_ai_search", methods=["POST"]) def set_ai_search(): try: # 获取请求数据,设定联网搜索状态 data = request.json status = data.get("status", False) # 设置联网搜索状态 set_ai_search_status(status) return jsonify({"status": "success", "message": f"Deep thought status set to {status}"}), 200 except Exception as e: return jsonify({"status": "error", "message": str(e)}), 500 # 获取联网搜索状态的接口 @app.route("/get_ai_search", methods=["GET"]) def get_ai_search(): try: # 获取联网搜索状态 status = get_ai_search_status() return jsonify({"status": "success", "ai_search_active": status}), 200 except Exception as e: return jsonify({"status": "error", "message": str(e)}), 500 @app.route("/massage_plans") def massage_plans(): # 获取URL参数 choose_task = request.args.get('choose_task', '') body_part = request.args.get('body_part', '') # 将参数传递给模板 return render_template( "massage_plan.html", initial_choose_task=choose_task, initial_body_part=body_part ) @app.route("/get_massage_plan", methods=["GET"]) def get_massage_plan(): try: # 获取筛选参数 plan_name = request.args.get('plan_name', '') choose_task = request.args.get('choose_task', '') body_part = request.args.get('body_part', '') # 从vtxdb获取按摩计划数据 massage_plans = vtxdb.get("massage_plan", plan_name) # 如果指定了plan_name,直接返回结果 if plan_name: return jsonify({"status": "success", "data": massage_plans}) # 如果没有指定plan_name但指定了筛选条件,进行筛选 if choose_task or body_part: filtered_plans = {} for name, plan in massage_plans.items(): # 同时满足两个条件 if choose_task and body_part: if plan.get('choose_task') == choose_task and plan.get('body_part') == body_part: filtered_plans[name] = plan # 只满足choose_task条件 elif choose_task: if plan.get('choose_task') == choose_task: filtered_plans[name] = plan # 只满足body_part条件 elif body_part: if plan.get('body_part') == body_part: filtered_plans[name] = plan return jsonify({"status": "success", "data": filtered_plans}) # 如果没有任何筛选条件,返回所有计划 return jsonify({"status": "success", "data": massage_plans}) except Exception as e: return jsonify({"status": "error", "message": str(e)}), 500 @app.route("/set_massage_plan", methods=["POST", "DELETE"]) def set_massage_plan(): try: data = request.json plan_name = data.get('plan_name') if request.method == "DELETE": if not plan_name: return jsonify({"status": "error", "message": "缺少疗程名称"}), 400 # 先获取疗程信息,检查是否可以删除 plan_data = vtxdb.get("massage_plan", plan_name) if not plan_data or not isinstance(plan_data, dict): return jsonify({"status": "error", "message": "疗程不存在"}), 404 if not plan_data.get('can_delete', False): return jsonify({"status": "error", "message": "该疗程不允许删除"}), 403 # 执行删除操作 vtxdb.delete("massage_plan", plan_name) return jsonify({"status": "success", "message": "删除成功"}) else: # POST method plan_data = data.get('plan_data') if not plan_name or not plan_data: return jsonify({"status": "error", "message": "缺少必要参数"}), 400 # 确保复制的疗程可以删除 if isinstance(plan_data, dict): plan_data['can_delete'] = True # 保存到vtxdb vtxdb.set("massage_plan", plan_name, plan_data) return jsonify({"status": "success", "message": "保存成功"}) except Exception as e: return jsonify({"status": "error", "message": str(e)}), 500 @app.route("/develop_login", methods=["POST"]) def develop_login(): data = request.json password = data["password"] # 简单验证示例 if password == "cd123456": return jsonify(success=True) else: return jsonify(success=False) @app.route("/plans_edit", methods=["POST"]) def plans_edit(): data = request.json password = data["password"] # 简单验证示例 if password == "robotstorm": return jsonify(success=True) else: return jsonify(success=False) @app.route("/get_log", methods=["GET"]) def get_log(): log_type = request.args.get("type") # 必填参数:ui/language/massage keyword = request.args.get("keyword", "") # 可选参数:筛选关键字 page = int(request.args.get("page", 1)) page_size = int(request.args.get("pageSize", 400)) # 参数校验 if log_type not in ["ui", "language", "massage"]: return jsonify({"error": "无效的日志类型"}), 400 # 调用工具函数查询日志 result = read_log_file(log_type, keyword, page, page_size) return jsonify(result) # 存储校准任务的状态 calibration_tasks = {} def run_calibration(): """在后台运行校准任务""" try: # 通知前端开始校准 socketio.emit('calibration_status', { 'status': 'running', 'message': '开始自动校准...' }) # 设置标定板参数 calibration_board_size = [6, 3] # width=6, height=3 calibration_board_square_size = 0.030 # 单位:米 # 创建校准对象并执行校准 calib = Calibration(calibration_board_size, calibration_board_square_size,"/home/jsfb/Documents/") # 收集数据 socketio.emit('calibration_status', { 'status': 'collecting', 'message': '正在收集标定数据...' }) calib.collect_data() # 执行校准 socketio.emit('calibration_status', { 'status': 'calibrating', 'message': '正在计算标定参数...' }) r, t ,intrinsics= calib.calibrate() print("旋转矩阵:") print(r) print("平移向量:") print(t) print("内参矩阵:") print(intrinsics) # 转换结果为可序列化的格式 rotation_matrix = r.tolist() if hasattr(r, 'tolist') else r translation_vector = t.flatten().tolist() if hasattr(t, 'tolist') else t # 发送完成信号和结果 socketio.emit('calibration_status', { 'status': 'completed', 'message': '校准完成', 'results': { 'rotation_matrix': rotation_matrix, 'translation_vector': translation_vector, 'intrinsics': intrinsics # 直接使用字典格式的内参 } }) except Exception as e: error_message = str(e) print(f"校准过程出错: {error_message}") # 发送错误信息 socketio.emit('calibration_status', { 'status': 'error', 'message': f'校准失败: {error_message}' }) @app.route("/start_calibration", methods=["POST"]) def start_calibration(): """启动自动校准过程""" try: # 启动后台线程进行校准 thread = threading.Thread(target=run_calibration) thread.daemon = True thread.start() return jsonify({ 'status': 'success', 'message': '校准任务已启动' }) except Exception as e: return jsonify({ 'status': 'error', 'message': str(e) }), 500 @app.route("/get_calibration_status/", methods=["GET"]) def get_calibration_status(task_id): """获取校准任务的状态""" if task_id not in calibration_tasks: return jsonify({ 'status': 'error', 'message': '任务不存在' }), 404 task = calibration_tasks[task_id] response = { 'status': task['status'], 'started_at': task['started_at'] } if task['status'] == 'completed' and 'results' in task: response['results'] = task['results'] elif task['status'] == 'error' and 'error' in task: response['error'] = task['error'] return jsonify(response) @app.route("/cancel_calibration/", methods=["POST"]) def cancel_calibration(task_id): """取消正在进行的校准任务""" if task_id not in calibration_tasks: return jsonify({ 'status': 'error', 'message': '任务不存在' }), 404 if calibration_tasks[task_id]['status'] == 'completed': return jsonify({ 'status': 'error', 'message': '任务已完成,无法取消' }), 400 # 从任务列表中移除任务 del calibration_tasks[task_id] return jsonify({ 'status': 'success', 'message': '校准任务已取消' }) @app.route("/save_calibration", methods=["POST"]) def save_calibration(): """保存标定结果到数据库""" try: data = request.get_json() rotation_matrix = data.get('rotation_matrix') translation_vector = data.get('translation_vector') intrinsics = data.get('intrinsics') if not rotation_matrix or not translation_vector or not intrinsics: return jsonify({ 'status': 'error', 'message': '缺少必要的标定数据' }), 400 # 保存到数据库 vtxdb.set("robot_config", "camera.camera_matrix", rotation_matrix) vtxdb.set("robot_config", "camera.camera_trans", translation_vector) vtxdb.set("robot_config", "camera.intrinsics", intrinsics) # 保存字典格式的内参 return jsonify({ 'status': 'success', 'message': '标定结果已保存' }) except Exception as e: print(f"保存标定结果时出错: {str(e)}") return jsonify({ 'status': 'error', 'message': str(e) }), 500 @app.route("/massage_plan_visualization") def massage_plan_visualization(): body_part = request.args.get('body_part', 'back') # 默认为背部 # 腰部(waist)、肩膀(shoulder)和背部(back)都使用相同的图片和点位 if body_part in ['waist', 'shoulder']: body_part = 'back' return render_template("massage_plan_visualization.html", body_part=body_part) @app.route("/massage_plan_control") def massage_plan_control(): body_part = request.args.get('body_part', 'back') # 默认为背部 return render_template("massage_plan_control.html", body_part=body_part) # 默认选中的按摩头 @app.route("/get_massage_heads", methods=["GET"]) def get_massage_heads(): try: # 从vtxdb获取按摩头配置 heads_config = vtxdb.get("system_config", "massage_heads") # 如果不存在,初始化默认配置 if not heads_config: default_heads = { 'thermotherapy': {'display': True, 'name': '深部热疗'}, 'shockwave': {'display': True, 'name': '点阵按摩'}, 'ball': {'display': False, 'name': '全能滚珠'}, 'finger': {'display': True, 'name': '指疗通络'}, 'roller': {'display': True, 'name': '滚滚刺疗'}, 'stone': {'display': True, 'name': '温砭舒揉'}, 'ion': {'display': False, 'name': '离子光灸'}, "heat": {'display': False, 'name': '能量热疗'}, "spheres": {'display': False, 'name': '天球滚捏'}, } vtxdb.set("system_config", "massage_heads", default_heads) heads_config = default_heads return jsonify({ "status": "success", "data": heads_config }) except Exception as e: return jsonify({ "status": "error", "message": f"获取按摩头配置失败: {str(e)}" }), 500 @app.route("/set_massage_heads", methods=["POST"]) def set_massage_heads(): try: data = request.json heads_config = data.get('heads_config') if not heads_config: return jsonify({ "status": "error", "message": "缺少按摩头配置参数" }), 400 # 验证配置格式 required_fields = ['thermotherapy', 'shockwave', 'ball', 'finger', 'roller', 'stone', 'ion'] if not all(head in heads_config for head in required_fields): return jsonify({ "status": "error", "message": "按摩头配置不完整" }), 400 # 保存到vtxdb vtxdb.set("system_config", "massage_heads", heads_config) return jsonify({ "status": "success", "message": "按摩头配置保存成功" }) except Exception as e: return jsonify({ "status": "error", "message": f"保存按摩头配置失败: {str(e)}" }), 500 @app.route("/get_jump_mode", methods=["GET"]) def get_jump_mode(): try: # 从vtxdb获取跳跃模式配置 jump_config = vtxdb.get("system_config", "jump_mode") # 如果不存在,初始化默认配置 if not jump_config: default_config = { "enabled": True, # 默认开启跳跃模式 } vtxdb.set("system_config", "jump_mode", default_config) jump_config = default_config return jsonify({ "status": "success", "data": jump_config }) except Exception as e: return jsonify({ "status": "error", "message": f"获取跳跃模式失败: {str(e)}" }), 500 @app.route("/set_jump_mode", methods=["POST"]) def set_jump_mode(): try: data = request.json enabled = data.get("enabled") if enabled is None: return jsonify({ "status": "error", "message": "缺少enabled参数" }), 400 # 获取当前配置 current_config = vtxdb.get("system_config", "jump_mode") or { "enabled": True, } # 更新配置 current_config["enabled"] = bool(enabled) vtxdb.set("system_config", "jump_mode", current_config) return jsonify({ "status": "success", "message": "跳跃模式设置成功", "data": current_config }) except Exception as e: return jsonify({ "status": "error", "message": f"设置跳跃模式失败: {str(e)}" }), 500 if __name__ == "__main__": # 使用 argparse 解析命令行参数 parser = argparse.ArgumentParser(description="Run Flask-SocketIO server.") parser.add_argument("--debug", action="store_true", help="Enable debug mode") args, unknown = parser.parse_known_args() # 设置调试模式 DEBUG_MODE = args.debug while True: try: # 尝试启动 Flask-SocketIO 应用 socketio.run( app, debug=DEBUG_MODE, host="0.0.0.0", port=5000, allow_unsafe_werkzeug=True ) except Exception as e: print(f"Error: {e}", file=sys.stderr) print("Restarting server in 5 seconds...") time.sleep(5) # 等待5秒后重启 # MAX_WAIT_TIME = 300 # 最多等待 5 分钟 # elapsed_time = 0 # while not check_network() and elapsed_time < MAX_WAIT_TIME: # print(f"Network unavailable. Retrying in 5 seconds... ({elapsed_time}/{MAX_WAIT_TIME}s)") # time.sleep(5) # elapsed_time += 5 # if elapsed_time >= MAX_WAIT_TIME: # print("Network initialization failed. Proceeding without network...") # # 启动 Flask-SocketIO 服务 # socketio.run(app, host="0.0.0.0", port=5000) # # 运行 Flask-SocketIO 应用 # socketio.run( # app, debug=DEBUG_MODE, host="0.0.0.0", port=5000, allow_unsafe_werkzeug=True # ) # socketio.run(app, debug=DEBUG_MODE, host="0.0.0.0", port=5000) #如果使用gunicorn启动