2025-05-27 15:46:31 +08:00

3567 lines
121 KiB
Python
Executable File
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from flask import (
Flask,
render_template,
jsonify,
redirect,
url_for,
request,
Response,
render_template_string,
session,
send_from_directory
)
from flask_socketio import SocketIO
from flask_cors import CORS
from flask_compress import Compress
from flask_sslify import SSLify
from zeroconf import Zeroconf, ServiceInfo
import atexit
import uuid
import socket
import netifaces
import time
from qq_music import (
fetch_latest_music,
get_song_list,
fetch_album_image,
get_music_info,
player,
)
import threading
from threading import Lock
import asyncio
import websockets
import json
import argparse
import librosa
import numpy as np
import subprocess
import signal
import os
import sys
import re
from datetime import datetime, timedelta
import paramiko
import requests
from bs4 import BeautifulSoup, NavigableString
import shutil
from urllib.parse import urljoin, urlparse, parse_qs
from requests.auth import HTTPBasicAuth
from tools.ssh_tools import execute_command_on_remote, is_host_down, ReverseSSH
import logging
from hw_obs import HW_OBS
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from force_sensor_aubo import XjcSensor
from aubo_C5_UI import AuboC5
from Massage.MassageControl.tools.auto_visual_calibration import Calibration
from Massage.MassageControl.tools.replay_trajectory_from_csv import RobotDanceController
import fitz # PyMuPDF
from io import BytesIO
from tools.wifi_tools import WifiManager
from tools import volume_control # 从 tools 文件夹导入 volume_control 模块
from tools import license_module
import math
import aiohttp
import serial
import serial.tools.list_ports
import yaml
from power_board import PowerBoard
import cv2
from pyzbar.pyzbar import decode
from PIL import Image
import io
from tools.deep_thought import set_deep_thought_status, get_deep_thought_status
from tools.ai_search import set_ai_search_status, get_ai_search_status
from tools.version_control import *
from VortXDB.client import VTXClient
from tools.log_utils import read_log_file
from modules.thermal.thermal_routes import thermal_bp, init_thermal_socketio
from modules.common.common_routes import common_bp
from modules.vtxdb.vtxdb_routes import vtxdb_bp
vtxdb = VTXClient(use_logger=False) # 创建VTXClient实例
DEBUG_MODE = False
# 配置 logging 模块
log_file = "../log/UI-next-app.log"
# 定义备份文件夹路径
backup_folder = os.path.abspath(os.path.join(os.path.dirname(__file__), "../../UILog"))
# 创建 UILog 文件夹(如果不存在)
if not os.path.exists(backup_folder):
os.makedirs(backup_folder) # 自动创建目录
# 备份日志文件
if os.path.exists(log_file):
# 生成带时间戳的备份文件名
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
backup_file = os.path.join(backup_folder, f"UI-next-app_{timestamp}.log")
shutil.copy2(log_file, backup_file)
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
handlers=[
# logging.FileHandler(log_file),
logging.FileHandler(log_file, mode="w"), # 覆盖模式,每次运行清空日志
logging.StreamHandler(sys.stdout),
],
)
# 定义日志记录器
class LoggerWriter:
def __init__(self, logger, level):
self.logger = logger
self.level = level
def write(self, message):
if message.strip() != "":
self.logger.log(self.level, message.strip())
def flush(self):
pass
# 重定向标准输出和标准错误
sys.stdout = LoggerWriter(logging.getLogger(), logging.INFO)
sys.stderr = LoggerWriter(logging.getLogger(), logging.ERROR)
if getattr(sys, "frozen", False):
template_folder = os.path.join(sys._MEIPASS, "templates")
static_folder = os.path.join(sys._MEIPASS, "static")
app = Flask(__name__, template_folder=template_folder, static_folder=static_folder)
else:
app = Flask(__name__)
# app = Flask(__name__)
# sslify = SSLify(app)
CORS(app)
app.secret_key = "jsfb" # 设置密钥,用于安全保护 session
app.config["SECRET_KEY"] = "jsfb"
app.config["COMPRESS_MIN_SIZE"] = 50
app.config["COMPRESS_LEVEL"] = 8
app.config["COMPRESS_ALGORITHM"] = 'gzip'
# 启用响应内容压缩
Compress(app)
app.register_blueprint(thermal_bp)
app.register_blueprint(common_bp)
app.register_blueprint(vtxdb_bp)
socketio = SocketIO(app, cors_allowed_origins="*")
# 在创建socketio实例后
init_thermal_socketio(socketio)
def signal_handler(signal, frame):
print("Shutting Down")
global power_board_ip, power_board_thread, power_board_thread_running
if power_board_thread_running:
power_board_thread_running = False
if power_board_thread:
try:
power_board_thread.join(timeout=2.0) # 设置超时时间,避免永久阻塞
except Exception as e:
print(f"Error stopping power board thread during shutdown: {e}")
power_board_thread = None
power_board_ip = None
sys.exit(0)
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
current_mode = "manual_mode" # manual_mode / smart_mode / handheld_mode
latest_music = []
all_playlists = []
@app.route("/get_music_data", methods=["GET"])
def get_music_data():
global latest_music
if not latest_music:
latest_music = fetch_latest_music()
return jsonify(latest_music)
def load_playlist_ids():
"""从配置文件加载歌单ID列表"""
config_path = "/home/jsfb/jsfb_ws/global_config/music.yaml"
default_playlist_ids = ["9126599100", "8429112432", "7299191148", "7132357466", "1137267511", "7284914844"]
# 确保目录存在
os.makedirs(os.path.dirname(config_path), exist_ok=True)
try:
# 如果文件不存在,创建文件并写入默认歌单
if not os.path.exists(config_path):
config = {'playlist_ids': default_playlist_ids}
with open(config_path, 'w', encoding='utf-8') as f:
yaml.dump(config, f, allow_unicode=True)
return default_playlist_ids
# 如果文件存在,读取配置
with open(config_path, 'r', encoding='utf-8') as f:
config = yaml.safe_load(f)
return config.get('playlist_ids', default_playlist_ids)
except Exception as e:
print(f"Error loading playlist IDs from config: {e}")
return default_playlist_ids
def save_playlist_ids(playlist_ids):
"""保存歌单ID列表到配置文件"""
config_path = "/home/jsfb/jsfb_ws/global_config/music.yaml"
try:
# 确保目录存在
os.makedirs(os.path.dirname(config_path), exist_ok=True)
config = {'playlist_ids': playlist_ids}
with open(config_path, 'w', encoding='utf-8') as f:
yaml.dump(config, f, allow_unicode=True)
return True
except Exception as e:
print(f"Error saving playlist IDs to config: {e}")
return False
# 初始化时从配置文件加载歌单ID
categoryIDs = load_playlist_ids()
@app.route("/get_playlists", methods=["GET"])
def get_playlists():
# print("get_playlists")
global all_playlists
if not all_playlists:
for categoryID in categoryIDs:
# print(categoryID)
diss_info, music_info_list = get_song_list(categoryID)
diss_info['categoryId'] = categoryID # 添加 categoryId 到 diss_info
all_playlists.append(
{"diss_info": diss_info, "music_info_list": music_info_list}
)
print(diss_info)
return jsonify(all_playlists)
@app.route("/search", methods=["POST"])
def search():
# 获取前端发送过来的数据
search_term = request.json.get("term")
print(search_term)
search_results = get_music_info(search_term)
# 返回处理后的数据给前端
return jsonify(search_results)
@app.route("/get_album_image", methods=["POST"])
def get_album_image():
data = request.get_json()
singer_name = data.get("singer_name")
if not singer_name:
return jsonify({"error": "Singer name is required"}), 400
album_img_url, error = fetch_album_image(singer_name)
if album_img_url:
return jsonify({"album_img_url": album_img_url}), 200
else:
return jsonify({"error": error}), 404
@app.route("/song_clicked", methods=["POST"])
def song_clicked():
global playlist, current_song_index
data = request.json
songmid = data.get("songmid")
print(f"Song clicked with songmid: {songmid}")
# 查找歌曲是否已在播放列表中
song_in_playlist = next(
(song for song in playlist if song["songmid"] == songmid), None
)
if song_in_playlist:
current_song_index = playlist.index(song_in_playlist)
next_song = playlist[current_song_index]
current_song.update(
{
"album_img_url": next_song["album_img_url"],
"music_name": next_song["music_name"],
"singer_name": next_song["singer_name"],
"is_playing": True,
}
)
print(current_song)
# 获取并播放下一首歌曲
# music_page = f"https://y.qq.com/n/yqq/song/{next_song['songmid']}.html"
music_page = f"https://robotstorm.tech/yqq/n/yqq/song/{next_song['songmid']}.html"
music_url = player.fetch_music_url_by_mid(next_song["songmid"])
if not music_url:
music_url = player.fetch_music_url(music_page)
socketio.sleep(0.5)
player.play_music(music_url)
else:
# 如果歌曲不在播放列表中,则添加到播放列表
# music_page = f"https://y.qq.com/n/yqq/song/{songmid}.html"
music_page = f"https://robotstorm.tech/yqq/n/yqq/song/{songmid}.html"
music_url = player.fetch_music_url_by_mid(songmid)
if not music_url:
music_url = player.fetch_music_url(music_page)
if music_url:
# 获取歌曲信息并添加到播放列表
song_info = {
"album_img_url": data.get("album_img_url"),
"music_name": data.get("music_name"),
"singer_name": data.get("singer_name"),
"songmid": songmid,
}
playlist.append(song_info)
current_song_index = len(playlist) - 1
# 开始播放该歌曲
current_song.update(
{
"album_img_url": song_info["album_img_url"],
"music_name": song_info["music_name"],
"singer_name": song_info["singer_name"],
"is_playing": True,
}
)
player.play_music(music_url)
print(playlist)
return jsonify({"status": "success", "message": "Music started"})
else:
return jsonify({"status": "error", "message": "Failed to fetch music URL"})
return jsonify({"status": "success", "message": "Song added to playlist"})
@app.route("/playlist_update", methods=["POST"])
def playlist_update():
global playlist, current_song_index
data = request.json
new_playlist = data.get("playlist", [])
if not new_playlist:
return jsonify({"status": "error", "message": "Playlist is empty"})
# 清空当前播放列表并更新为新的播放列表
playlist = []
for song in new_playlist:
song_info = {
"album_img_url": song.get("album_img_url"),
"music_name": song.get("music_name"),
"singer_name": song.get("singer_name"),
"songmid": song.get("songmid"),
}
playlist.append(song_info)
# 设置当前播放的歌曲为新列表中的第一首
current_song_index = 0
first_song = playlist[current_song_index]
current_song.update(
{
"album_img_url": first_song["album_img_url"],
"music_name": first_song["music_name"],
"singer_name": first_song["singer_name"],
"is_playing": True,
}
)
# 获取并播放第一首歌
# music_page = f"https://y.qq.com/n/yqq/song/{first_song['songmid']}.html"
music_page = f"https://robotstorm.tech/yqq/n/yqq/song/{first_song['songmid']}.html"
music_url = player.fetch_music_url_by_mid(first_song["songmid"])
if not music_url:
music_url = player.fetch_music_url(music_page)
if music_url:
socketio.sleep(0.5)
player.play_music(music_url)
print(playlist)
return jsonify(
{"status": "success", "message": "Playlist updated and playing first song"}
)
else:
return jsonify(
{"status": "error", "message": "Failed to fetch music URL for first song"}
)
@app.route("/pause_music", methods=["POST"])
def pause_music():
player.pause_music()
return jsonify({"status": "success", "message": "Music paused"})
@app.route("/resume_music", methods=["POST"])
def resume_music():
if player.is_playing:
player.resume_music()
else:
next_song = playlist[current_song_index]
current_song.update(
{
"album_img_url": next_song["album_img_url"],
"music_name": next_song["music_name"],
"singer_name": next_song["singer_name"],
"is_playing": True,
}
)
print(current_song)
# music_page = f"https://y.qq.com/n/yqq/song/{next_song['songmid']}.html"
music_page = f"https://robotstorm.tech/yqq/n/yqq/song/{next_song['songmid']}.html"
music_url = player.fetch_music_url_by_mid(next_song["songmid"])
if not music_url:
music_url = player.fetch_music_url(music_page)
socketio.sleep(0.5)
player.play_music(music_url)
return jsonify({"status": "success", "message": "Music resumed"})
@app.route("/stop_music", methods=["POST"])
def stop_music():
player.stop_music()
return jsonify({"status": "success", "message": "Music stopped"})
@app.route("/play_last_song", methods=["POST"])
def play_last_song():
global current_song_index
if current_song_index > 0:
current_song_index -= 1
else:
current_song_index = len(playlist) - 1
print(current_song_index)
next_song = playlist[current_song_index]
current_song.update(
{
"album_img_url": next_song["album_img_url"],
"music_name": next_song["music_name"],
"singer_name": next_song["singer_name"],
"is_playing": True,
}
)
# music_page = f"https://y.qq.com/n/yqq/song/{next_song['songmid']}.html"
music_page = f"https://robotstorm.tech/yqq/n/yqq/song/{next_song['songmid']}.html"
music_url = player.fetch_music_url_by_mid(next_song["songmid"])
if not music_url:
music_url = player.fetch_music_url(music_page)
if music_url:
player.play_music(music_url)
return jsonify({"status": "success", "message": "Playing last song"})
else:
return jsonify({"status": "error", "message": "Failed to fetch music URL"})
@app.route("/play_next_song", methods=["POST"])
def play_next_song():
global current_song_index
current_song_index = (current_song_index + 1) % len(playlist)
print(current_song_index)
next_song = playlist[current_song_index]
current_song.update(
{
"album_img_url": next_song["album_img_url"],
"music_name": next_song["music_name"],
"singer_name": next_song["singer_name"],
"is_playing": True,
}
)
# music_page = f"https://y.qq.com/n/yqq/song/{next_song['songmid']}.html"
music_page = f"https://robotstorm.tech/yqq/n/yqq/song/{next_song['songmid']}.html"
music_url = player.fetch_music_url_by_mid(next_song["songmid"])
if not music_url:
music_url = player.fetch_music_url(music_page)
if music_url:
player.play_music(music_url)
return jsonify({"status": "success", "message": "Playing next song"})
else:
return jsonify({"status": "error", "message": "Failed to fetch music URL"})
@socketio.on("seek_music")
def handle_seek_music(position):
print(position)
player.seek_music(position)
def generate_playlist(music_info_list):
# 使用列表推导式生成字典列表
# playlist = [
# {
# "music_name": song[0],
# "singer_name": song[1],
# "songmid": song[2],
# "album_img_url": (
# f"http://y.gtimg.cn/music/photo_new/T002R180x180M000{song[3]}.jpg"
# if song[3]
# else ""
# ),
# }
# for song in music_info_list
# ]
playlist = [
{
"music_name": song[0],
"singer_name": song[1],
"songmid": song[2],
"album_img_url": (
f"https://robotstorm.tech/ygtimg/music/photo_new/T002R180x180M000{song[3]}.jpg"
if song[3]
else ""
),
}
for song in music_info_list
]
return playlist
# 全局变量来存储歌曲信息和播放状态
current_song = {
"album_img_url": "https://via.placeholder.com/300x300",
"music_name": None,
"singer_name": None,
"is_playing": False, # 添加播放状态
}
# 全局变量来存储播放列表
# playlist = [{
# 'album_img_url': 'http://y.gtimg.cn/music/photo_new/T002R180x180M000001G7iIK00yQyr.jpg',
# 'music_name': '大鱼海棠主题曲——大鱼',
# 'singer_name': '凤凉柒',
# 'songmid': '000Qf8b01p0vIJ'}]
playlist = []
# Index to keep track of current song in playlist
current_song_index = 0
if len(playlist) > 0:
current_song.update(
{
"album_img_url": playlist[0]["album_img_url"],
"music_name": playlist[0]["music_name"],
"singer_name": playlist[0]["singer_name"],
"is_playing": False, # 或者根据实际情况设置
}
)
print("playlist: ", playlist)
else:
diss_info, music_info_list = get_song_list(categoryIDs[0])
# print(diss_info, music_info_list)
playlist = generate_playlist(music_info_list)
print("playlist:", playlist)
if not playlist:
playlist = [
{
"album_img_url": "https://via.placeholder.com/300x300",
"music_name": "默认音乐",
"singer_name": "未知艺术家",
"songmid": "000000",
}
]
current_song.update(
{
"album_img_url": playlist[0]["album_img_url"],
"music_name": playlist[0]["music_name"],
"singer_name": playlist[0]["singer_name"],
"is_playing": False, # 或者根据实际情况设置
}
)
# 创建线程锁,确保数据安全
thread_lock = Lock()
# 定时任务,每隔一秒发送当前歌曲信息到前端
def music_background_thread():
global playlist, current_song_index
while True:
socketio.sleep(1) # 间隔一秒钟
with thread_lock:
current_song["is_playing"] = not player.is_paused and player.is_playing
socketio.emit("current_song", current_song, namespace="/")
if player.is_playing:
current_position = player.get_current_position()
music_length = player.get_music_length()
# print(current_position)
if current_position < 0:
# 切换到下一首歌曲
current_song_index = (current_song_index + 1) % len(playlist)
next_song = playlist[current_song_index]
current_song.update(
{
"album_img_url": next_song["album_img_url"],
"music_name": next_song["music_name"],
"singer_name": next_song["singer_name"],
"is_playing": True,
}
)
print(current_song)
# 获取并播放下一首歌曲
# music_page = (
# f"https://y.qq.com/n/yqq/song/{next_song['songmid']}.html"
# )
music_page = (
f"https://robotstorm.tech/yqq/n/yqq/song/{next_song['songmid']}.html"
)
music_url = player.fetch_music_url_by_mid(next_song["songmid"])
if not music_url:
music_url = player.fetch_music_url(music_page)
socketio.sleep(0.15)
player.play_music(music_url)
socketio.emit(
"music_status",
{"position": current_position, "length": music_length},
)
@app.route("/set_current_song", methods=["POST"])
def set_current_song():
global current_song
data = request.json
current_song.update(
{
"album_img_url": data.get("album_img_url"),
"music_name": data.get("music_name"),
"singer_name": data.get("singer_name"),
"is_playing": data.get("is_playing", False), # 更新播放状态
}
)
return jsonify({"status": "success"})
@app.route("/get_current_song", methods=["GET"])
def get_current_song():
if current_song["music_name"]:
return jsonify({"status": "success", "data": current_song})
return jsonify({"status": "error", "message": "No song is currently set"})
ai_chat_list = []
async def send_message(message, port=8766):
uri = "ws://localhost:" + str(port)
# start_time = time.time()
async with websockets.connect(uri) as websocket:
# print(time.time() - start_time)
await websocket.send(message)
response = await websocket.recv()
return response
@socketio.on("send_message")
def handle_message(data):
message = data["message"]
if not message:
return jsonify({"error": "Message not provided"}), 400
socketio.emit("add_message", {"sender": "user", "message": message})
ai_chat_list.append({"sender": "user", "message": message})
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
task = asyncio.ensure_future(send_message(f"'user_input': \"{message}\""))
loop.run_until_complete(task)
last_ai_message_time = None
@app.route("/ai_respon", methods=["POST"])
def ai_response():
global last_ai_message_time
# 获取 POST 请求的数据
message = request.form.get("message")
is_reasoning = request.form.get("isReasoning", "false").lower() == "true" # 默认值为 false
if not message:
return jsonify({"status": "error", "message": "No message provided"})
current_time = time.time()
# 如果是首次或距离上次超过5秒添加新消息
if not last_ai_message_time or (current_time - last_ai_message_time) > 5:
if is_reasoning:
ai_chat_list.append({"sender": "ai", "message": {"reasoning_content": message, "content": ""}})
else:
ai_chat_list.append({"sender": "ai", "message": {"content": message, "reasoning_content": ""}})
else:
# 在最近一条AI消息上叠加
if ai_chat_list and ai_chat_list[-1]["sender"] == "ai":
if is_reasoning:
ai_chat_list[-1]["message"]["reasoning_content"] += message
else:
ai_chat_list[-1]["message"]["content"] += message
else:
if is_reasoning:
ai_chat_list.append({"sender": "ai", "message": {"reasoning_content": message, "content": ""}})
else:
ai_chat_list.append({"sender": "ai", "message": {"content": message, "reasoning_content": ""}})
# 更新最后一次AI消息时间
last_ai_message_time = current_time
socketio.emit("add_message", {"sender": "ai", "message": message, "isReasoning": is_reasoning})
return jsonify({"status": "success", "message": "Received ai message"})
@app.route("/user_input", methods=["POST"])
def user_input():
# 获取 POST 请求的数据
message = request.form.get("message")
if not message:
return jsonify({"status": "error", "message": "No message provided"})
socketio.emit("add_message", {"sender": "user", "message": message})
ai_chat_list.append({"sender": "user", "message": message})
# 返回 JSON 格式的响应
return jsonify({"status": "success", "message": "Received user message"})
@app.route("/get_history", methods=["GET"])
def get_history():
return jsonify(ai_chat_list)
def process_audio_and_emit(path):
x, sr = librosa.load(path, sr=8000)
start_time = time.time()
# Emit mouth movement data through socketio
try:
for _ in range(int(len(x) / 800)):
index = int((time.time() - start_time) * 8000) + 1
if index < len(x):
# new_value = x[index]
new_value = abs(x[index] * 5)
socketio.emit("mouth_movement", {"value": str(new_value)})
time.sleep(0.1)
except Exception as e:
print(f"Error in mouth movement emission: {e}")
# Reset mouth movement value at the end
socketio.emit("mouth_movement", {"value": str(0.0)})
@app.route("/lip_sync", methods=["POST"])
def lip_sync():
try:
# 获取 POST 请求的数据
path = request.form.get("path")
print(path)
if path.startswith("pre_mp3"):
path = "../" + path
# 在单独的线程中处理音频和口型数据
thread = threading.Thread(target=process_audio_and_emit, args=(path,))
thread.start()
# 立即返回 JSON 格式的响应
return jsonify({"status": "success"})
except Exception as e:
print(f"Error: {e}")
return jsonify({"status": "error", "message": str(e)})
import traceback
massage_status = {
"progress": 0,
"force": '',
"temperature": '',
"gear": '',
"shake": '',
"is_massaging": False,
"current_task": "",
"task_time": "",
"current_head": "",
"body_part": "back",
"massage_service_started": False,
"press": '',
"frequency": '',
"speed": '',
"direction": '',
"high": '',
"massage_state": '',
"is_acupoint": False,
"is_pause": False,
"manual_stage": 0,
"start_pos": "",
"end_pos": "",
"massage_path": ""
}
stone_status ={
"temper_head": 40,
"temperature": 1
}
@app.route("/update_massage_status", methods=["POST"])
def update_status():
global stored_command
global display_image_url
global display_image_type
try:
print(f"update_massage_status{request.form}")
temper_head = request.form.get("temper_head")
if temper_head:
temperature = request.form.get("temperature")
stone_status["temper_head"] = temper_head
stone_status["temperature"] = temperature
socketio.emit("update_stone_status", stone_status)
else:
is_massaging = request.form.get("is_massaging")
is_pause = request.form.get("is_pause")
if is_massaging and (is_massaging == "False" or is_massaging == "false"):
if is_pause == "False" or is_pause == "false":
if stored_command: # 如果存储了命令,且 is_massaging 为 false
print(f"Re-running stored command after 5 seconds delay...")
def run_async_in_thread():
time.sleep(5) # 等待 5 秒,确保命令执行完毕
asyncio.run(send_message(stored_command, 8765)) # 在新线程中运行异步函数
# 在新的线程中执行
threading.Thread(target=run_async_in_thread).start()
# 获取 POST 请求的数据
progress = request.form.get("progress")
force = request.form.get("force")
temperature = request.form.get("temperature")
gear = request.form.get("gear")
shake = request.form.get("shake")
is_massaging = request.form.get("is_massaging")
current_task = request.form.get("current_task")
task_time = request.form.get("task_time")
body_part = request.form.get("body_part")
current_head = request.form.get("current_head")
massage_service_started = request.form.get("massage_service_started")
press = request.form.get("press")
frequency = request.form.get("frequency")
speed = request.form.get("speed")
is_pause = request.form.get("is_pause")
direction = request.form.get("direction")
high = request.form.get("high")
massage_state = request.form.get("massage_state")
is_acupoint = request.form.get("is_acupoint")
manual_stage = request.form.get("manual_stage")
start_pos = request.form.get("start_pos")
end_pos = request.form.get("end_pos")
massage_path = request.form.get("massage_path")
# 仅在非空时更新字典中的值
if progress:
massage_status["progress"] = progress
if force and force != "0" and (is_massaging == "True" or is_massaging == "true" ):
print(force)
massage_status["force"] = force
if temperature:
massage_status["temperature"] = temperature
if gear:
massage_status["gear"] = gear
if shake:
massage_status["shake"] = shake
if is_massaging:
if is_massaging == "False" or is_massaging == "false":
is_massaging = False
display_image_url = "static/images/smart_mode/back.png"
socketio.emit("change_image", {"path": display_image_url, "type": display_image_type})
if is_massaging == "True" or is_massaging == "true":
is_massaging = True
socketio.emit("change_image", {"path": display_image_url, "type": display_image_type})
massage_status["is_massaging"] = is_massaging
if current_task:
task_to_index = {
"lung": 0,
"heart": 1,
"hepatobiliary": 2,
"spleen": 3,
"kidney": 4,
"lung_left": 0,
"heart_left": 1,
"hepatobiliary_left": 2,
"spleen_left": 3,
"kidney_left": 4,
"lung_right": 0,
"heart_right": 1,
"hepatobiliary_right": 2,
"spleen_right": 3,
"kidney_right": 4,
}
massage_status["current_task"] = current_task
# 获取对应的索引
index = task_to_index.get(current_task, None)
print(current_task, index)
if index is not None:
# 发送索引到前端
socketio.emit("highlightPlane", index)
else:
socketio.emit("highlightPlane", None)
elif current_task == "-1":
socketio.emit("highlightPlane", None)
if task_time is not None:
if task_time == "null":
task_time = ""
massage_status["task_time"] = task_time
if current_head:
massage_status["current_head"] = current_head
if body_part:
massage_status["body_part"] = body_part
if press:
massage_status["press"] = press
if frequency:
massage_status["frequency"] = frequency
if is_pause:
if is_pause == "False" or is_pause == "false":
is_pause = False
if is_pause == "True" or is_pause == "true":
is_pause = True
massage_status["is_pause"] = is_pause
if speed:
massage_status["speed"] = speed
if direction:
massage_status["direction"] = direction
if high:
massage_status["high"] = high
if massage_service_started:
print(massage_service_started)
if massage_service_started == "False" or massage_service_started == "false":
massage_service_started = False
if massage_service_started == "True" or massage_service_started == "true":
massage_service_started = True
massage_status["massage_service_started"] = massage_service_started
if massage_state:
massage_status["massage_state"] = massage_state
if is_acupoint:
if is_acupoint == "False" or is_acupoint == "false":
is_acupoint = False
if is_acupoint == "True" or is_acupoint == "true":
is_acupoint = True
massage_status["is_acupoint"] = is_acupoint
if manual_stage:
massage_status["manual_stage"] = int(manual_stage)
if start_pos:
massage_status["start_pos"] = start_pos
if end_pos:
massage_status["end_pos"] = end_pos
if massage_path:
massage_status["massage_path"] = massage_path
print(massage_status)
# 通过 SocketIO 发送更新后的数据
socketio.emit("update_massage_status", massage_status)
return jsonify({"status": "success"})
except Exception as e:
print(f"Error: {e}")
return jsonify({"status": "error", "message": str(e)})
def is_service_running(service_name):
try:
# 使用 subprocess.run 执行 systemctl is-active 命令
result = subprocess.run(
["/bin/systemctl", "is-active", service_name],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
check=True,
text=True,
)
# 如果返回的输出是 'active',则服务正在运行
return result.stdout.strip() == "active"
except subprocess.CalledProcessError:
# 如果服务未运行,或者其他错误,返回 False
return False
@app.route("/get_status", methods=["GET"])
def get_status():
global display_image_url
global display_image_type
try:
command = "get_status"
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
task = asyncio.ensure_future(send_message(command, 8765))
loop.run_until_complete(task)
except Exception as e:
print("error ! get_status error")
massage_status.update({
"progress": 0,
"force": '',
"temperature": '',
"gear": '',
"shake": '',
"is_massaging": False,
"current_task": "",
"task_time": "",
"current_head": "",
"body_part": "back",
"massage_service_started": False,
"press": '',
"frequency": '',
"speed": '',
"direction": '',
"high": '',
"massage_state": '',
"is_acupoint": False,
"is_pause": False,
"manual_stage": 0,
"start_pos": "",
"end_pos": "",
"massage_path": ""
})
print(f"Error: {e}")
file_url = display_image_url
# 使用 Socket.IO 发送 URL 给前端
socketio.emit("change_image", {"path": file_url, "type": display_image_type})
# try:
# command = 'get_status'
# loop = asyncio.new_event_loop()
# asyncio.set_event_loop(loop)
# task = asyncio.ensure_future(send_message(command,8766))
# loop.run_until_complete(task)
# except Exception as e:
# # print(f"Error: {e}")
print(massage_status)
return jsonify(massage_status)
######
last_command_time = 0
# 存储命令的全局变量
stored_command = None
countdown_value = 0
countdown_lock = threading.Lock()
def start_countdown():
global countdown_value, display_image_type, display_image_url
with countdown_lock:
countdown_value = 120 # 初始化为120
while True:
with countdown_lock:
if countdown_value <= 0:
break
countdown_value -= 1
print(f"Countdown: {countdown_value}")
socketio.emit("countdown_update", {"countdown": countdown_value, "message": "is_running"})
time.sleep(1)
# socketio.emit("countdown_update", {"countdown": countdown_value, "message": "倒计时结束"})
socketio.emit("change_image", {"path": display_image_url, "type": display_image_type})
async def send_to_port(command, port):
# try:
# loop = asyncio.new_event_loop()
# asyncio.set_event_loop(loop)
# await send_message(command, port)
# print(f"Sent to {port}: {command}")
# except Exception as e:
# print(f"Error sending to {port}: {e}")
"""
发送命令到指定端口,并打印详细的日志信息
参数:
command: 要发送的命令字符串
port: 目标端口号
"""
try:
# 创建新的事件循环(适用于非异步环境)
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
# 记录发送前的调试信息
print(f"[DEBUG] 准备发送命令到端口 {port}: {command}")
# 模拟发送操作(替换为实际发送逻辑)
await send_message(command, port) # 假设这是你的实际发送函数
# 成功日志
print(f"[SUCCESS] 命令已发送到端口 {port}: {command}")
return True
except ConnectionError as e:
print(f"[ERROR] 连接失败 (端口 {port}): {str(e)}")
return False
except asyncio.TimeoutError:
print(f"[ERROR] 发送超时 (端口 {port})")
return False
except Exception as e:
print(f"[CRITICAL] 未知错误: {type(e).__name__}: {str(e)}")
return False
finally:
if 'loop' in locals():
loop.close()
@socketio.on("send_command")
def send_command(data):
# global last_command_time
# current_time = time.time()
# if current_time - last_command_time < 1:
# # if less than 3 seconds passed since the last command, ignore this command
# return
# last_command_time = current_time
global stored_command
global display_image_url
global display_image_type
global countdown_value
if "begin" in data:
# 解析命令中的数据
parts = data.split(":")
if len(parts) >= 5: # 至少要有 5 部分(确保 mode_real 存在)
mode_real = parts[4]
# if parts[1] == "stone" and mode_real == "1":
# # 启动倒计时线程
# threading.Thread(target=start_countdown, daemon=True).start()
# 情况1parts长度=6解析use_mode
if len(parts) == 6:
use_mode = parts[5]
# 如果use_mode存在且不是manual或者parts长度<6没有use_mode都发送UI_start
if use_mode != 'manual':
asyncio.run(send_to_port("UI_start", 8766))
# 情况2parts长度<6没有use_mode直接发送UI_start
else:
asyncio.run(send_to_port("UI_start", 8766))
if mode_real == "3": # 如果 modeReal 为 3则保存命令
stored_command = data
else: # 如果 modeReal 不是 3则清空命令
stored_command = None
print(f"Stored command: {stored_command}")
if "manual_start" in data:
asyncio.run(send_to_port("UI_start", 8766))
if "stop" in data:
stored_command = None
display_image_url = 'static/images/smart_mode/back.png'
display_image_type = 0
countdown_value = 0
# if "pause" in data:
# stored_command = None
# display_image_url = 'static/images/smart_mode/back.png'
# display_image_type = 0
# countdown_value = 0
# 所有命令发送到 8765
asyncio.run(send_to_port(data, 8765))
@app.route("/get_countdown", methods=["GET"])
def get_countdown():
with countdown_lock:
return jsonify({"countdown": countdown_value})
@app.route('/get_command', methods=['GET'])
def get_command():
return jsonify({
"status": "success",
"command": stored_command
})
@socketio.on("change_awaken")
def change_awaken(command):
try:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
task = asyncio.ensure_future(send_message(command, 8766))
loop.run_until_complete(task)
except Exception as e:
print(f"Error: {e}")
@app.route("/get_awaken", methods=["GET"])
def get_awaken():
try:
# 文件路径
file_path = '../Language/Hotword_awaker/resource/keyword-nhxd.txt' # 使用相对路径
print("File path:", os.path.abspath(file_path)) # 打印文件的绝对路径
# 检查文件是否存在
if not os.path.exists(file_path):
return jsonify({"error": "File not found"}), 404
# 读取文件内容
with open(file_path, 'r', encoding='utf-8') as file:
content = file.read()
# 使用正则表达式提取字符串
match = re.match(r"([^\;]+);", content) # 提取第一个分号前的字符串
if match:
awaken = match.group(1) # 获取匹配的部分
else:
awaken = "No valid string found"
# 返回提取的字符串
return jsonify({"awaken": awaken}), 200
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route('/get_serial_number', methods=['GET'])
def get_serial_number():
try:
# 获取当前服务器的主机名
host_name = socket.gethostname()
# 这里可以将主机名映射到序列号,或者你可以根据主机名生成某种序列号
# 例如,返回一个假设的序列号,这里简单返回主机名作为序列号
serial_number = host_name.removeprefix('jsfb-')
# 返回 JSON 格式的响应
return jsonify({"serial_number": serial_number})
except Exception as e:
# 如果发生错误,返回错误信息
return jsonify({"error": str(e)}), 500
# @app.before_request
# def before_request():
# if request.url.startswith('https://'):
# url = request.url.replace('https://', 'http://', 1)
# code = 301 # redirect permanentlyvideo
response.headers.add("Access-Control-Allow-Headers", "Content-Type,Authorization")
response.headers.add("Access-Control-Allow-Methods", "GET,PUT,POST,DELETE")
return response
@app.route("/")
def index():
return render_template("login.html", time=int(time.time()))
@app.route("/home")
def home():
return render_template("home.html", time=int(time.time()))
@app.route("/health")
def health():
global current_mode
if current_mode == "manual_mode":
return render_template(
"select_program.html", time=int(time.time())
)
elif current_mode == "handheld_mode":
return render_template(
"handheld_mode.html", time=int(time.time())
)
else:
return render_template("smart_mode.html", time=int(time.time()))
@app.route("/setting")
def setting():
return render_template("setting.html", time=int(time.time()))
@app.route("/developer")
def developer():
return render_template("developer.html", time=int(time.time()))
@app.route("/learning")
def learning():
return render_template("learning.html", time=int(time.time()))
@app.route("/help")
def help():
return render_template("help.html", time=int(time.time()))
@app.route("/ai_ball")
def ai_ball():
return render_template("ai_ball_v2.html", time=int(time.time()))
@app.route("/model")
def model():
return render_template("3d_model_v6_local.html", time=int(time.time()))
@app.route("/ai_chatbot")
def ai_chatbot():
return render_template("ai_chatbot.html", time=int(time.time()))
@app.route("/dynamic_function")
def dynamic_function():
mode = request.args.get('mode', default=None)
return render_template("dynamic_function_v2.html", time=int(time.time()), mode=mode)
@app.route("/three_d_model")
def three_d_model():
return render_template("three_d_model.html", time=int(time.time()))
@app.route("/control_panel")
def control_panel():
mode = request.args.get('mode', default=None)
return render_template("control_panel_v2.html", time=int(time.time()), mode=mode)
@app.route("/music")
def music():
return render_template("full_music_player.html", time=int(time.time()))
@app.route("/switch_mode/<mode>")
def switch_mode(mode):
# session['current_mode'] = mode
global current_mode
current_mode = mode
return redirect(url_for("health"))
@app.route("/get-ip")
def get_ip():
# 获取服务器的IP
host = request.host.split(":")[0]
return jsonify({"ip": host})
@app.route("/login", methods=["POST"])
def login():
data = request.json
username = data["username"]
password = data["password"]
# 简单验证示例
if username == "jsfb" and password == "123456":
return jsonify(success=True)
else:
return jsonify(success=False)
powerboard = None
power_board_ip = None
power_board_thread = None
power_board_thread_running = False
def send_heartbeat():
global power_board_thread_running
udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
print(f"Sent heartbeat to {power_board_ip}:3660")
try:
# 设置socket超时避免阻塞
udp_socket.settimeout(1.0)
while power_board_thread_running:
try:
if power_board_ip:
udp_socket.sendto(b"ALIVE", (power_board_ip, 3660))
# print(f"Sent heartbeat to {power_board_ip}:3660")
except socket.timeout:
print(f"socket timeout")
continue
except Exception as e:
print(f"Error sending heartbeat: {e}")
time.sleep(1)
except Exception as e:
print(f"Error in heartbeat thread: {e}")
finally:
# try:
# # 发送关闭信号
# if power_board_ip:
# for _ in range(3): # 尝试发送3次确保信号能够送达
# try:
# udp_socket.sendto(b"SHUTDOWN", (power_board_ip, 3660))
# print(f"Sent shutdown signal to {power_board_ip}:3660")
# time.sleep(0.1) # 短暂延时,避免发送过快
# except Exception as e:
# print(f"Error sending shutdown signal: {e}")
# finally:
# udp_socket.close()
udp_socket.close()
@app.route("/power_board", methods=["POST"])
def power_board():
global power_board_ip, power_board_thread, power_board_thread_running
ip = request.form.get("ip")
# 如果收到空IP停止现有线程
if not ip:
if power_board_thread_running:
power_board_thread_running = False
if power_board_thread:
try:
power_board_thread.join(timeout=2.0) # 设置超时时间,避免永久阻塞
except Exception as e:
print(f"Error stopping power board thread: {e}")
power_board_thread = None
power_board_ip = None
return jsonify({"status": "success", "message": "Heartbeat stopped"})
# 如果收到新IP
power_board_ip = ip
print("power_board ip: ", power_board_ip)
# 如果线程不存在或已停止,创建新线程
if not power_board_thread or not power_board_thread.is_alive():
power_board_thread_running = True
power_board_thread = threading.Thread(target=send_heartbeat)
power_board_thread.daemon = True # 设置为守护线程,这样主程序退出时线程会自动结束
power_board_thread.start()
return jsonify({"status": "success", "message": "Heartbeat started"})
@app.route("/massage_control", methods=["POST"])
def massage_control():
action = request.form.get("action")
print("massage_control action: ", action)
sudo_password = "jsfb"
if action:
if action == "start":
action = "restart"
try:
command = "connect_arm"
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
task = asyncio.ensure_future(send_message(command, 8766))
loop.run_until_complete(task)
except Exception as e:
print(f"Error: {e}")
if action == "stop":
try:
command = "disconnect_arm"
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
task = asyncio.ensure_future(send_message(command, 8766))
loop.run_until_complete(task)
# 发送状态更新通知前端
global display_image_type
global display_image_url
display_image_type =0
display_image_url = 'static/images/smart_mode/back.png'
socketio.emit("update_massage_status", {
"progress": 0,
"force": '',
"temperature": '',
"gear": '',
"shake": '',
"is_massaging": False,
"current_task": "",
"task_time": "",
"current_head": "",
"body_part": "back",
"massage_service_started": False,
"press": '',
"frequency": '',
"speed": '',
"direction": '',
"massage_state": '',
"is_acupoint": False,
"is_pause": False,
"manual_stage": 0,
"start_pos": "",
"end_pos": "",
"massage_path": ""
})
except Exception as e:
print(f"Error: {e}")
# 处理本地关机
if action == "shutdown":
success = system_shutdown()
if success:
return jsonify({
"status": "success",
"message": "Remote and local systems shutdown successfully."
})
else:
return jsonify({
"status": "error",
"message": "Failed to execute shutdown process"
}), 500
# 非 shutdown 操作
else:
command = f'echo {sudo_password} | /bin/sh -c "/usr/bin/sudo -S /bin/systemctl {action} massage"'
try:
result = subprocess.run(
command,
shell=True,
check=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
print(result.stdout.decode())
print(f"Service {action} successfully.")
except subprocess.CalledProcessError as e:
print(f"Error when {action} service: {e.stderr.decode()}")
return jsonify({"status": "success"})
else:
return jsonify({"status": "error", "message": "Invalid action"}), 400
# 文件服务器的基本信息
obs = HW_OBS()
@app.route("/get_versions", methods=["GET"])
def get_versions():
return get_local_versions()
@app.route("/get_remote_versions", methods=["GET"])
def get_remote_versions_route():
return get_remote_versions(obs)
@app.route("/download_package", methods=["POST"])
def download_package():
data = request.get_json()
version = data.get("version")
return start_download_package(version, obs, socketio)
@app.route("/get_current_version", methods=["GET"])
def get_current_version_route():
return get_current_version(__file__)
@app.route("/switch_version", methods=["POST"])
def switch_version_route():
data = request.json
version = data.get("version")
return switch_version(version)
# VortXDB版本控制路由
@app.route("/get_vtx_versions", methods=["GET"])
def get_vtx_versions_route():
return get_vtx_local_versions()
@app.route("/get_vtx_remote_versions", methods=["GET"])
def get_vtx_remote_versions_route():
return get_vtx_remote_versions(obs)
@app.route("/download_vtx_package", methods=["POST"])
def download_vtx_package():
data = request.get_json()
version = data.get("version")
return start_vtx_download_package(version, obs, socketio)
@app.route("/get_vtx_current_version", methods=["GET"])
def get_vtx_current_version_route():
return get_vtx_current_version(vtxdb)
@app.route("/switch_vtx_version", methods=["POST"])
def switch_vtx_version_route():
data = request.json
version = data.get("version")
return switch_vtx_version(version)
@app.route("/sync_user_data", methods=["POST"])
def sync_user_data_route():
data = request.json
source_version = data.get("version")
current_version = data.get("current_version")
return sync_user_data(source_version, current_version)
@app.route("/on_message", methods=["POST"])
def on_message():
try:
# 获取 POST 请求中的 form 数据
message = request.form.get(
"message", ""
).strip() # 从表单数据中获取 'message' 字段,并去掉多余的空白字符
if not message:
# 如果 message 为空或只包含空格,返回错误响应,但不触发弹窗
return (
jsonify(
{
"status": "error",
"message": "Message cannot be empty, no popup triggered",
}
),
400,
)
# 使用 Socket.IO 向客户端发送消息
socketio.emit("on_message", {"message": message})
# 返回成功的响应
return jsonify(
{"status": "success", "message": "Popup triggered with message: " + message}
)
except Exception as e:
# 捕获所有异常并返回服务器错误
return (
jsonify({"status": "error", "message": "An error occurred: " + str(e)}),
500,
)
# 定义 static 文件夹的路径
STATIC_FOLDER = os.path.join(app.root_path, "static")
display_image_url = "static/images/smart_mode/back.png"
display_image_type = 0
# 定义接收 POST 请求的路由
@app.route("/change_image", methods=["POST"])
def change_image():
global display_image_url
global display_image_type
# 从请求中获取图片路径
image_path = request.form.get("path")
image_type = request.form.get("type", None)
# 检查路径是否存在
if not image_path or not os.path.exists(image_path):
return jsonify({"error": "Invalid image path"}), 400
# 获取文件名
filename = os.path.basename(image_path)
# 定义目标目录路径static/images/tmp_images/
target_dir = os.path.join(STATIC_FOLDER, "images", "tmp_images")
# 如果目标目录不存在,则创建它
if not os.path.exists(target_dir):
try:
os.makedirs(target_dir) # 创建目录(包括父目录)
except Exception as e:
return jsonify({"error": f"Failed to create directory: {str(e)}"}), 500
# 定义目标路径,将文件复制到 static 目录下
target_path = os.path.join(target_dir, filename)
try:
# 复制文件到 static 目录
shutil.copy(image_path, target_path)
except Exception as e:
return jsonify({"error": f"Failed to copy file: {str(e)}"}), 500
# 生成文件的 URL
file_url = "static/images/tmp_images/" + filename
# file_url = url_for('static', filename='images/' + filename, _external=True)
display_image_url = file_url
display_image_type = image_type
# 使用 Socket.IO 发送 URL 给前端
socketio.emit("change_image", {"path": file_url, "type": image_type})
# 返回图片的 URL
return jsonify({"url": file_url, "type": image_type}), 200
# 服务器相关信息
SERVER_IP = "8.138.8.114" # 替换为您的服务器 IP
SERVER_USER = "root" # 替换为 root 用户
SERVER_PASSWORD = "JuShenFengBao209" # 替换为 root 用户密码
REMOTE_PATH = "/var/www/html/devices"
reverse_ssh = ReverseSSH(SERVER_IP, SERVER_USER, SERVER_PASSWORD)
# 开启反向隧道
@app.route("/start_tunnel", methods=["GET"])
def start_tunnel():
result = reverse_ssh.start_tunnel()
return jsonify(result)
# 关闭反向隧道
@app.route("/stop_tunnel", methods=["GET"])
def stop_tunnel():
result = reverse_ssh.stop_tunnel()
return jsonify(result)
# 定义一个路由,根据请求参数执行不同的 shell 脚本
@app.route("/run-script", methods=["POST"])
def run_script():
print("run-script")
try:
# 从前端获取 JSON 格式的请求体
data = request.json
script_type = data.get("script_type")
# base_dir = "/home/jsfb/jsfb_ws"
# 获取当前脚本执行的绝对路径
# base_path = os.path.dirname(os.path.abspath(__file__))
# base_path = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
# 获取当前脚本执行的绝对路径
current_path = os.path.abspath(__file__)
# 获取当前脚本所在目录的上一级目录
base_path = os.path.dirname(os.path.dirname(current_path))
print("动态目录:", base_path)
# base_path = os.path.join(base_dir, "MassageRobot_aubo")
# print(f"Base path: {base_path}")
# script_path = os.path.join(base_path, f"../my_script.sh")
# 根据请求参数执行不同的 shell 脚本
if script_type == "zh":
script_path = os.path.join(base_path, "setup.sh")
# 确保路径是规范化的(处理相对路径)
script_path = os.path.normpath(script_path)
# 检查脚本路径是否存在
if not os.path.exists(script_path):
print(f"Script not found: {script_path}, 404")
return jsonify({"error": f"Script not found: {script_path}"}), 404
# 执行 shell 脚本
# subprocess.Popen(
# ["gnome-terminal", "--", "bash", "-c", f"cd {base_path} && bash setup.sh; exec bash"],
# cwd=base_path
# )
subprocess.Popen(
["gnome-terminal", "--", "bash", "-c", f"cd {base_path} && bash setup.sh; exit"],
cwd=base_path
)
elif script_type == "jp":
script_path = os.path.join(base_path, "setup_JP.sh")
# 确保路径是规范化的(处理相对路径)
script_path = os.path.normpath(script_path)
# 检查脚本路径是否存在
if not os.path.exists(script_path):
print(f"Script not found: {script_path}, 404")
return jsonify({"error": f"Script not found: {script_path}"}), 404
# 执行 shell 脚本
# subprocess.Popen(
# ["gnome-terminal", "--", "bash", "-c", f"cd {base_path} && bash setup_JP.sh; exec bash"],
# cwd=base_path
# )
subprocess.Popen(
["gnome-terminal", "--", "bash", "-c", f"cd {base_path} && bash setup_JP.sh; exit"],
cwd=base_path
)
elif script_type == "en":
script_path = os.path.join(base_path, "setup_EN.sh")
# 确保路径是规范化的(处理相对路径)
script_path = os.path.normpath(script_path)
# 检查脚本路径是否存在
if not os.path.exists(script_path):
print(f"Script not found: {script_path}, 404")
return jsonify({"error": f"Script not found: {script_path}"}), 404
# 执行 shell 脚本
# subprocess.Popen(
# ["gnome-terminal", "--", "bash", "-c", f"cd {base_path} && bash setup_EN.sh; exec bash"],
# cwd=base_path
# )
subprocess.Popen(
["gnome-terminal", "--", "bash", "-c", f"cd {base_path} && bash setup_EN.sh; exit"],
cwd=base_path
)
elif script_type == "ko":
script_path = os.path.join(base_path, "setup_KO.sh")
# 确保路径是规范化的(处理相对路径)
script_path = os.path.normpath(script_path)
# 检查脚本路径是否存在
if not os.path.exists(script_path):
print(f"Script not found: {script_path}, 404")
return jsonify({"error": f"Script not found: {script_path}"}), 404
# 执行 shell 脚本
# subprocess.Popen(
# ["gnome-terminal", "--", "bash", "-c", f"cd {base_path} && bash setup_KO.sh; exec bash"],
# cwd=base_path
# )
subprocess.Popen(
["gnome-terminal", "--", "bash", "-c", f"cd {base_path} && bash setup_KO.sh; exit"],
cwd=base_path
)
else:
print("Invalid script type provided, 400")
return jsonify({"error": "Invalid script type provided"}), 400
# 返回脚本执行结果
return jsonify(
{
"message": "Script executed successfully",
}
)
except Exception as e:
print(f"Error running script: {e}, 500")
return jsonify({"error": str(e)}), 500
@app.route('/set_zero', methods=['POST'])
def set_zero():
try:
# 检查服务是否运行
if not is_service_running("massage.service"):
sensor = XjcSensor()
max_try = 3 # 设置最大重试次数
delay = 0.5 # 每次重试前的延迟时间
# 尝试调用 set_zero 方法
for attempt in range(max_try):
sensor.disable_active_transmission()
time.sleep(0.5)
result = sensor.set_zero()
if result == 0:
# 设置成功,返回成功信息
return jsonify({"message": "Set zero success"}), 200
else:
# 设置失败,等待并重试
print(f"Set zero attempt {attempt + 1} failed, retrying...")
time.sleep(delay)
# 如果多次尝试后失败,返回错误信息
print("Set zero failed after multiple attempts.")
requests.post("http://127.0.0.1:5000/on_message", data={"message": "传感器初始化失败"})
return jsonify({"message": "Set zero failed after multiple attempts"}), 500
else:
return jsonify({"message": "Service is already running, no need to set zero"}), 200
except Exception as e:
# 捕获异常并返回错误信息
print(f"Error in /set_zero: {e}")
return jsonify({"message": "Set zero failed", "error": str(e)}), 500
# 全局变量,用于存储 AuboC5 实例
aubo_c5 = None
@app.route('/power_toggle', methods=['POST'])
def power_toggle():
global aubo_c5
try:
# 获取 power_state 参数
power_state = request.form.get('power_state')
# 检查 power_state 是否有效
if not power_state:
return jsonify({"error": "Missing 'power_state' parameter"}), 400
if not is_service_running("massage.service"):
# power_on 操作
if power_state == 'power_on':
if aubo_c5 is None: # 如果 Aubo_C5 实例还没有创建
aubo_c5 = AuboC5()
requests.post("http://127.0.0.1:5000/on_message", data={"message": "上电成功"})
# power_off 操作
elif power_state == 'power_off':
if aubo_c5 is not None: # 如果 Aubo_C5 实例已创建
aubo_c5.power_off()
# 延迟 5 秒后删除实例
time.sleep(5) # 阻塞主线程 5 秒
aubo_c5 = None # 删除实例
print("Aubo_C5 instance has been deleted after 5 seconds.")
# requests.post("http://127.0.0.1:5000/on_message", data={"message": "断电成功"})
return jsonify({"message": "AuboC5 powered off successfully."})
else:
return jsonify({"error": "Aubo_C5 is not powered on."}), 400
else:
return jsonify({"error": "command is error"}), 400
else:
return jsonify({"error": "Invalid power_state value. Use 'power_on' or 'power_off'."}), 400
return jsonify({"message": f"Aubo_C5 {power_state} successfully."})
except Exception as e:
print(f"Error in /power_toggle: {e}")
return jsonify({"error": f"An error occurred: {str(e)}"}), 500
@app.route('/pack_mode', methods=['POST'])
def pack_mode():
global aubo_c5
try:
if aubo_c5 is None: # 如果 Aubo_C5 实例还没有创建
aubo_c5 = AuboC5()
aubo_c5.pack()
requests.post("http://127.0.0.1:5000/on_message", data={"message": "请确认是否到达打包位置,如有异常,请上电后手动调整"})
return jsonify({"message": f"Aubo_C5 mover to pack pos successfully."})
except Exception as e:
print(f"Error in /pack_mode: {e}")
return jsonify({"error": f"An error occurred: {str(e)}"}), 500
@app.route("/reset-language", methods=["POST"])
def reset_language():
print("reset-language")
try:
# 获取当前脚本执行的绝对路径
current_path = os.path.abspath(__file__)
# 获取当前脚本所在目录的上一级目录
base_path = os.path.dirname(os.path.dirname(current_path))
print("动态目录:", base_path)
# 构造 restart_language.sh 脚本路径
script_path = os.path.join(base_path, "restart_language.sh")
# 确保路径是规范化的(处理相对路径)
script_path = os.path.normpath(script_path)
# 检查脚本路径是否存在
if not os.path.exists(script_path):
print(f"Script not found: {script_path}, 404")
return jsonify({"error": f"Script not found: {script_path}"}), 404
# 执行 restart_language.sh 脚本
subprocess.Popen(
["gnome-terminal", "--", "bash", "-c", f"cd {base_path} && bash restart_language.sh; exit"],
cwd=base_path
)
# 返回成功响应
return jsonify({
"message": "Restart language script executed successfully in new terminal"
})
except Exception as e:
print(f"Error running script: {e}, 500")
return jsonify({"error": str(e)}), 500
# 初始化 WifiManager 对象
wifi_manager = WifiManager()
@app.route('/scan_wifi', methods=['GET'])
def scan_wifi():
"""
扫描 Wi-Fi 网络并返回扫描结果
"""
try:
wifi_networks = wifi_manager.scan_wifi()
return jsonify({"status": "success", "wifi_networks": wifi_networks}), 200
except Exception as e:
return jsonify({"status": "error", "message": str(e)}), 500
@app.route('/connect_wifi', methods=['POST'])
def connect_wifi():
"""
连接到指定的 Wi-Fi 网络
请求体应包含 SSID 和 password
"""
try:
data = request.get_json()
ssid = data.get('ssid')
password = data.get('password')
if not ssid or not password:
return jsonify({"status": "error", "message": "SSID 和密码是必需的"}), 400
response = wifi_manager.connect_wifi(ssid, password)
return jsonify({"status": "success", "message": response}), 200
except Exception as e:
return jsonify({"status": "error", "message": str(e)}), 500
@app.route('/disconnect_wifi', methods=['POST'])
def disconnect_wifi():
"""
断开当前的 Wi-Fi 连接
"""
try:
response = wifi_manager.disconnect_wifi()
return jsonify({"status": "success", "message": response}), 200
except Exception as e:
return jsonify({"status": "error", "message": str(e)}), 500
@app.route('/get_current_connection', methods=['GET'])
def get_current_connection():
"""
获取当前连接的 Wi-Fi 信息
"""
try:
connection_info = wifi_manager.get_current_connection()
return jsonify({"status": "success", "connection_info": connection_info}), 200
except Exception as e:
return jsonify({"status": "error", "message": str(e)}), 500
@app.route('/is_connected', methods=['GET'])
def is_connected():
"""
判断是否已连接 Wi-Fi
"""
try:
connected = wifi_manager.is_connected()
return jsonify({"status": "success", "is_connected": connected}), 200
except Exception as e:
return jsonify({"status": "error", "message": str(e)}), 500
# 写入文件并上传到服务器
def write_and_upload_info(ngrok_url, reverse_ssh_port):
hostname = socket.gethostname()
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# 生成用于连接的反向 SSH 命令
reverse_ssh_command = f"ssh jsfb@{SERVER_IP} -p {reverse_ssh_port}"
# 文件内容包括主机名、ngrok URL、在线时间和 SSH 命令
file_content = (
f"Hostname: {hostname}\n"
f"Ngrok URL: {ngrok_url}\n"
f"Online Time: {timestamp}\n"
f"SSH Command: {reverse_ssh_command}"
)
file_path = f"../tmp/{hostname}.txt"
# 写入文件
with open(file_path, "w") as file:
file.write(file_content)
# 使用 paramiko 通过 SFTP 上传文件
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
try:
ssh.connect(SERVER_IP, username=SERVER_USER, password=SERVER_PASSWORD)
sftp = ssh.open_sftp()
sftp.put(file_path, f"{REMOTE_PATH}/{hostname}.txt")
print(f"File {file_path} uploaded to {REMOTE_PATH}/{hostname}.txt on server.")
finally:
sftp.close()
ssh.close()
# 启动 ngrok 隧道并写入/上传文件
def start_ngrok(port, max_retries=3, retry_interval=2):
global ngrok_process
for attempt in range(max_retries):
print(f"Attempting to start ngrok (Attempt {attempt + 1}/{max_retries})")
ngrok_process = subprocess.Popen(
["/usr/local/bin/ngrok", "http", str(port)],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
time.sleep(retry_interval) # 等待 ngrok 启动
# 检查 ngrok 是否启动成功
try:
ngrok_url = get_ngrok_url()
print(f"Ngrok started successfully with URL: {ngrok_url}")
# write_and_upload_info(ngrok_url) # 写入和上传信息文件
return ngrok_url
except Exception as e:
print(f"Failed to get ngrok URL: {e}")
ngrok_process.terminate()
time.sleep(retry_interval)
print("Failed to start ngrok after multiple attempts")
return "Error"
# 获取 ngrok 公共 URL
def get_ngrok_url():
try:
response = requests.get(
"http://localhost:4040/api/tunnels", timeout=5
) # ngrok 本地 API
response.raise_for_status() # 检查是否请求成功
data = response.json()
return data["tunnels"][0]["public_url"]
except requests.RequestException as e:
raise RuntimeError(f"Error fetching ngrok URL: {e}")
# Socket.IO 事件处理函数,当客户端连接时启动定时任务
@socketio.on("connect", namespace="/")
def connect():
socketio.start_background_task(music_background_thread)
zeroconf = Zeroconf()
# 变量用于存储是否已经注册了服务
service_registered = False
def get_all_local_ips():
"""获取本机的所有 IPv4 地址,不进行任何筛选"""
interfaces = netifaces.interfaces() # 获取所有网络接口
print("可用网络接口:", interfaces)
local_ips = []
for interface in interfaces:
addrs = netifaces.ifaddresses(interface) # 获取接口的地址信息
print(f"接口 {interface} 的地址信息: {addrs}")
# 检查 IPv4 地址 (AF_INET)
if netifaces.AF_INET in addrs:
for addr_info in addrs[netifaces.AF_INET]:
print(f"接口 {interface} 的 IPv4 地址信息: {addr_info}")
ip_addr = addr_info.get("addr")
# 直接收集所有的 IPv4 地址
if ip_addr:
local_ips.append(ip_addr)
if DEBUG_MODE:
return ["127.0.0.1"] # 调试用
# 如果没有找到 IPv4 地址,返回一个回环地址列表
return local_ips if local_ips else ["127.0.0.1"]
def register_service(local_ips):
print(local_ips)
global service_registered
if service_registered:
return
# # 确保服务名称唯一,使用 uuid 生成一个唯一标识符
# unique_id = uuid.uuid4()
hostname = socket.gethostname()
# print(hostname)
for ip in local_ips:
if ip.startswith("127."):
continue
if ip.startswith("192.168.100"):
continue
# 将 IP 地址转换为字节格式
service_info = ServiceInfo(
"_http._tcp.local.", # 服务类型
f"LL-X1-{hostname}._http._tcp.local.", # 确保服务名称唯一
addresses=[socket.inet_aton(ip)], # 使用局域网 IP 地址
port=5000, # Flask 监听的端口
properties={}, # 可选的额外服务信息
server=f"{hostname}.local.", # 主机名
)
zeroconf.register_service(service_info, allow_name_change=True)
print(f"mDNS 服务已注册IP 地址:{ip}")
service_registered = True
# 在应用退出时注销 zeroconf 服务并关闭 ngrok
def cleanup():
global ngrok_process
print("注销 mDNS 服务并关闭 ngrok...")
reverse_ssh.stop_tunnel()
if ngrok_process:
ngrok_process.terminate()
zeroconf.close() # 注销 zeroconf 服务
def fetch_music_list_with_retry():
"""后台定时任务:不断尝试获取音乐列表,直到成功"""
global all_playlists, playlist, current_song
while True:
try:
# 清空现有的播放列表
all_playlists.clear()
# 遍历分类获取音乐数据
for categoryID in categoryIDs:
diss_info, music_info_list = get_song_list(categoryID)
all_playlists.append(
{"diss_info": diss_info, "music_info_list": music_info_list}
)
print(diss_info)
# 如果数据获取成功,更新播放列表
if all_playlists:
playlist = generate_playlist(all_playlists[0]["music_info_list"])
current_song.update(
{
"album_img_url": playlist[0]["album_img_url"],
"music_name": playlist[0]["music_name"],
"singer_name": playlist[0]["singer_name"],
"is_playing": False,
}
)
print("音乐列表更新成功!")
break # 成功后退出循环
except Exception as e:
print(f"获取音乐列表失败,重试中... 错误信息: {e}")
print(f"Failed to fetch playlist for category {categoryID}: {e}")
time.sleep(5) # 等待5秒后重试
def initialize_app(port):
"""运行 Flask-SocketIO 之前的初始化逻辑。"""
global playlist, current_song, powerboard
# local_ips = get_all_local_ips()
# register_service(local_ips)
retry_count = 0
max_retries = 10
while retry_count < max_retries:
local_ips = get_all_local_ips()
# 如果local_ips列表的长度大于2则直接调用register_service
if len(local_ips) > 1:
register_service(local_ips)
print("Service registered successfully with local IPs:", local_ips)
break
# 如果列表长度不足2则进行重试
print(f"Retrying... Current IP list length: {len(local_ips)}")
retry_count += 1
time.sleep(3) # 等待3秒后重试
# 如果重试超过5次仍然只有1个IP执行register_service并打印警告
if len(local_ips) == 1:
print(f"Warning: Only one local IP found after {max_retries} retries. Registering with this IP.")
register_service(local_ips)
# 初始化PowerBoard并设置回调
powerboard = PowerBoard()
powerboard.set_callback(system_shutdown)
powerboard.start_monitoring()
# # 获取最新音乐数据
# global latest_music
# try:
# latest_music = fetch_latest_music()
# except Exception as e:
# print(f"Failed to fetch latest music data: {e}")
# try:
# # 尝试初始化音乐列表
# fetch_music_list_with_retry()
# except Exception as e:
# print(f"初始化音乐列表失败:{e}")
# # 启动一个后台任务,不断尝试获取音乐数据
# socketio.start_background_task(fetch_music_list_with_retry)
# try:
# # 启动 ngrok 隧道并获取 URL
# ngrok_url = ''
# print("Ngrok URL:", ngrok_url)
# result = reverse_ssh.start_tunnel()
# print(result)
# reverse_ssh_port = result["port"]
# write_and_upload_info(ngrok_url,reverse_ssh_port) # 写入和上传信息文件
# except RuntimeError as e:
# print("Error:", e)
def system_shutdown():
"""执行系统关机流程,包括远程主机关机、电源板关机和本地系统关机"""
remote_host = "192.168.100.20" # 远程主机的 IP 地址
remote_username = "root" # 远程主机的用户名
remote_password = "bestcobot" # 远程主机的密码
remote_port = 8822 # 远程主机的 SSH 端口
sudo_password = "jsfb"
try:
# 1. 先执行远程主机关机命令
if not is_host_down(remote_host, 8822):
print("Remote host is alive, processing shutdown")
remote_command = "shutdown -h now"
stdout, stderr = execute_command_on_remote(
remote_host,
remote_username,
remote_password,
remote_command,
port=remote_port,
)
# 2. 等待远程主机关机并检测
print("Waiting for remote host to shutdown...")
if not is_host_down(remote_host, 8822):
print("Remote host did not shutdown in time")
return False
# 3. 处理电源板关机
global powerboard, power_board_ip, power_board_thread, power_board_thread_running
if power_board_thread_running:
power_board_thread_running = False
if power_board_thread:
try:
power_board_thread.join(timeout=2.0)
except Exception as e:
print(f"Error stopping power board thread during shutdown: {e}")
power_board_thread = None
# 4. 发送关机信号到电源板
if power_board_ip:
udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
udp_socket.settimeout(1.0)
for _ in range(3):
try:
udp_socket.sendto(b"SHUTDOWN", (power_board_ip, 3660))
print(f"Sent shutdown signal to {power_board_ip}:3660")
time.sleep(0.1)
except Exception as e:
print(f"Error sending shutdown signal: {e}")
power_board_ip = None
powerboard.send_cmd("UI-SHUTDOWN")
# 5. 执行本地系统关机
command = f'echo {sudo_password} | /bin/sh -c "/usr/bin/sudo -S /sbin/shutdown now"'
result = subprocess.run(
command,
shell=True,
check=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
print("System is shutting down.")
return True
except Exception as e:
print(f"Error during shutdown process: {e}")
return False
initialize_app(5000)
def check_network():
try:
# 测试连接到公共 DNS 服务器
socket.create_connection(("8.8.8.8", 53), timeout=5)
return True
except OSError:
return False
# 全局变量用于存储设备位置信息
device_location = {
'latitude': None,
'longitude': None,
'accuracy': None,
'timestamp': None
}
@app.route('/update_device_location', methods=['POST'])
def update_device_location():
"""
更新设备位置信息并自动获取天气数据
请求体示例:
{
"latitude": 35.123456,
"longitude": 139.789012,
"accuracy": 10.5
}
"""
try:
location_data = request.json
print(f"Received location data: {location_data}")
# 检查必要的位置数据是否存在
required_fields = ['latitude', 'longitude']
if not all(field in location_data for field in required_fields):
return jsonify({
'status': 'error',
'message': 'Missing required location data fields'
}), 400
# 更新全局位置信息
global device_location
device_location.update({
'latitude': location_data.get('latitude'),
'longitude': location_data.get('longitude'),
'accuracy': location_data.get('accuracy'),
'timestamp': datetime.now().strftime("%Y-%m-%d %H:%M:%S")
})
# 打印位置信息
print(f"Device Location Updated: {device_location}")
# 获取主机名作为设备名称
hostname = socket.gethostname()
# 获取软件版本信息
version_response = get_local_versions()
version_data = version_response.json # 使用.json属性获取数据
versions = version_data.get('versions', [])
software_version = versions[0] if versions else "default" # 使用第一个版本如果没有则默认为1.0
vtxdb_version = vtxdb.get_version()
iot_config = vtxdb.get("robot_config","IoT")
# 准备发送到物联网平台的数据
iot_data = {
"device_name": hostname,
"device_id": f"{iot_config['product_id']}_{hostname}",
"longitude": location_data.get('longitude'),
"latitude": location_data.get('latitude'),
"produce_name": "RS-LL-X1",
"product_id": iot_config['product_id'],
"software_version": software_version,
"vtxdb_version": vtxdb_version,
"timestamp": device_location['timestamp']
}
# 发送位置信息到物联网平台
try:
iot_response = requests.post(
"http://app.robotstorm.tech:8080/iot/location",
json=iot_data,
timeout=5 # 设置5秒超时
)
print(f"IoT platform response: {iot_response.status_code}")
except Exception as iot_error:
print(f"Error sending data to IoT platform: {iot_error}")
# 异步获取天气数据
weather_updated = False
if should_update_weather(device_location['latitude'], device_location['longitude']):
try:
weather_results = asyncio.run(get_all_weather_data(
device_location['latitude'],
device_location['longitude']
))
if all(weather_results.values()):
update_weather_cache(
weather_results,
device_location['latitude'],
device_location['longitude']
)
weather_updated = True
print("Weather data updated successfully")
except Exception as weather_error:
print(f"Error updating weather data: {weather_error}")
return jsonify({
'status': 'success',
'message': 'Location data updated successfully' + (' and weather data refreshed' if weather_updated else ''),
'data': {
'location': device_location,
'weather_updated': weather_updated
}
}), 200
except Exception as e:
print(f"Error processing location data: {e}")
return jsonify({
'status': 'error',
'message': f'Failed to process location data: {str(e)}'
}), 500
# 坐标系转换工具函数
def wgs84_to_gcj02(lat, lng):
"""
WGS84坐标系转GCJ02坐标系
"""
if not lat or not lng:
return None, None
a = 6378245.0 # 长半轴
ee = 0.00669342162296594323 # 偏心率平方
def transform_lat(x, y):
ret = -100.0 + 2.0 * x + 3.0 * y + 0.2 * y * y + 0.1 * x * y + 0.2 * math.sqrt(abs(x))
ret += (20.0 * math.sin(6.0 * x * math.pi) + 20.0 * math.sin(2.0 * x * math.pi)) * 2.0 / 3.0
ret += (20.0 * math.sin(y * math.pi) + 40.0 * math.sin(y / 3.0 * math.pi)) * 2.0 / 3.0
ret += (160.0 * math.sin(y / 12.0 * math.pi) + 320 * math.sin(y * math.pi / 30.0)) * 2.0 / 3.0
return ret
def transform_lng(x, y):
ret = 300.0 + x + 2.0 * y + 0.1 * x * x + 0.1 * x * y + 0.1 * math.sqrt(abs(x))
ret += (20.0 * math.sin(6.0 * x * math.pi) + 20.0 * math.sin(2.0 * x * math.pi)) * 2.0 / 3.0
ret += (20.0 * math.sin(x * math.pi) + 40.0 * math.sin(x / 3.0 * math.pi)) * 2.0 / 3.0
ret += (150.0 * math.sin(x / 12.0 * math.pi) + 300.0 * math.sin(x / 30.0 * math.pi)) * 2.0 / 3.0
return ret
dLat = transform_lat(lng - 105.0, lat - 35.0)
dLng = transform_lng(lng - 105.0, lat - 35.0)
radLat = lat / 180.0 * math.pi
magic = math.sin(radLat)
magic = 1 - ee * magic * magic
sqrtMagic = math.sqrt(magic)
dLat = (dLat * 180.0) / ((a * (1 - ee)) / (magic * sqrtMagic) * math.pi)
dLng = (dLng * 180.0) / (a / sqrtMagic * math.cos(radLat) * math.pi)
gcj_lat = lat + dLat
gcj_lng = lng + dLng
return gcj_lat, gcj_lng
def gcj02_to_bd09(lat, lng):
"""
GCJ02坐标系转BD09坐标系
"""
if not lat or not lng:
return None, None
x_pi = math.pi * 3000.0 / 180.0
z = math.sqrt(lng * lng + lat * lat) + 0.00002 * math.sin(lat * x_pi)
theta = math.atan2(lat, lng) + 0.000003 * math.cos(lng * x_pi)
bd_lng = z * math.cos(theta) + 0.0065
bd_lat = z * math.sin(theta) + 0.006
return bd_lat, bd_lng
@app.route('/get_device_location', methods=['GET'])
def get_device_location():
"""
获取设备位置信息,支持不同坐标系
可选参数 coordinate_system:
- wgs84 (默认): 原始GPS坐标
- gcj02: 国测局坐标系
- bd09: 百度坐标系
GET /get_device_location
# 或
GET /get_device_location?coordinate_system=wgs84
GET /get_device_location?coordinate_system=gcj02
GET /get_device_location?coordinate_system=bd09
"""
try:
global device_location
coordinate_system = request.args.get('coordinate_system', 'wgs84').lower()
# 检查是否有位置信息
if not device_location['latitude'] or not device_location['longitude']:
return jsonify({
'status': 'error',
'message': 'No location data available'
}), 404
# 获取原始WGS84坐标
wgs84_lat = device_location['latitude']
wgs84_lng = device_location['longitude']
# 根据请求的坐标系进行转换
if coordinate_system == 'gcj02':
lat, lng = wgs84_to_gcj02(wgs84_lat, wgs84_lng)
coordinate_type = 'GCJ02'
elif coordinate_system == 'bd09':
gcj02_lat, gcj02_lng = wgs84_to_gcj02(wgs84_lat, wgs84_lng)
lat, lng = gcj02_to_bd09(gcj02_lat, gcj02_lng)
coordinate_type = 'BD09'
else: # wgs84
lat, lng = wgs84_lat, wgs84_lng
coordinate_type = 'WGS84'
return jsonify({
'status': 'success',
'data': {
'latitude': lat,
'longitude': lng,
'accuracy': device_location['accuracy'],
'timestamp': device_location['timestamp'],
'coordinate_system': coordinate_type
}
}), 200
except Exception as e:
print(f"Error getting location data: {e}")
return jsonify({
'status': 'error',
'message': f'Failed to get location data: {str(e)}'
}), 500
# 和风天气API配置
QWEATHER_API_KEY = "79b35822391e4b999dbda0f90dfc1dcf" # 请替换为实际的API密钥
QWEATHER_API_BASE = "https://devapi.qweather.com/v7" # 免费api
QWEATHER_GEO_API_BASE = "https://geoapi.qweather.com/v2" # GeoAPI接口
# 全局变量存储天气数据
weather_data = {
'now': None, # 实时天气
'hourly': None, # 逐小时预报
'daily': None, # 每日预报
'last_update': None, # 最后更新时间
'location': None, # 最后请求的位置
'geo_info': None # 地理位置信息
}
async def fetch_weather_data(session, url):
"""异步获取天气数据"""
try:
async with session.get(url) as response:
return await response.json()
except Exception as e:
print(f"Error fetching weather data: {e}")
return None
async def get_all_weather_data(latitude, longitude):
"""并发获取所有天气数据和地理信息"""
base_params = f"location={longitude},{latitude}&key={QWEATHER_API_KEY}"
urls = [
f"{QWEATHER_API_BASE}/grid-weather/now?{base_params}",
f"{QWEATHER_API_BASE}/grid-weather/24h?{base_params}",
f"{QWEATHER_API_BASE}/grid-weather/7d?{base_params}"
]
async with aiohttp.ClientSession() as session:
# 创建所有任务,包括天气数据和地理信息
weather_tasks = [fetch_weather_data(session, url) for url in urls]
geo_task = fetch_geo_info(session, latitude, longitude)
# 并发执行所有任务
all_tasks = weather_tasks + [geo_task]
results = await asyncio.gather(*all_tasks)
# 分离结果
weather_results = results[:3]
geo_result = results[3]
return {
'now': weather_results[0],
'hourly': weather_results[1],
'daily': weather_results[2],
'geo': geo_result
}
def update_weather_cache(weather_results, latitude, longitude):
"""更新天气数据缓存"""
global weather_data
weather_data.update({
'now': weather_results['now'],
'hourly': weather_results['hourly'],
'daily': weather_results['daily'],
'geo_info': weather_results['geo'],
'last_update': datetime.now(),
'location': {'latitude': latitude, 'longitude': longitude}
})
def should_update_weather(latitude=None, longitude=None, force_update=False):
"""检查是否需要更新天气数据"""
if force_update:
return True
if not weather_data['last_update']:
return True
# 检查时间是否过期1小时更新一次
time_expired = (datetime.now() - weather_data['last_update']) > timedelta(hours=1)
# 检查位置是否改变超过0.01度则认为位置改变)
location_changed = False
if latitude and longitude and weather_data['location']:
old_lat = weather_data['location']['latitude']
old_lng = weather_data['location']['longitude']
location_changed = (
abs(latitude - old_lat) > 0.01 or
abs(longitude - old_lng) > 0.01
)
return time_expired or location_changed
async def fetch_geo_info(session, latitude, longitude):
"""异步获取地理位置信息"""
try:
url = f"{QWEATHER_GEO_API_BASE}/city/lookup?location={longitude},{latitude}&key={QWEATHER_API_KEY}"
async with session.get(url) as response:
return await response.json()
except Exception as e:
print(f"Error fetching geo info: {e}")
return None
@app.route('/get_weather', methods=['GET'])
def get_weather():
"""
获取天气信息
可选参数:
- force_update: 是否强制更新天气数据
- coordinate_system: 坐标系统(wgs84/gcj02/bd09)
# 使用默认设置获取天气
GET /get_weather
# 强制更新天气数据
GET /get_weather?force_update=true
# 使用特定坐标系
GET /get_weather?coordinate_system=gcj02
"""
try:
force_update = request.args.get('force_update', '').lower() == 'true'
coordinate_system = request.args.get('coordinate_system', 'wgs84').lower()
# 获取当前位置
global device_location
if not device_location['latitude'] or not device_location['longitude']:
return jsonify({
'status': 'error',
'message': 'No location data available'
}), 404
# 根据坐标系统获取正确的经纬度
if coordinate_system == 'gcj02':
lat, lng = wgs84_to_gcj02(device_location['latitude'], device_location['longitude'])
elif coordinate_system == 'bd09':
gcj02_lat, gcj02_lng = wgs84_to_gcj02(device_location['latitude'], device_location['longitude'])
lat, lng = gcj02_to_bd09(gcj02_lat, gcj02_lng)
else: # wgs84
lat, lng = device_location['latitude'], device_location['longitude']
# 检查是否需要更新天气数据
if should_update_weather(lat, lng, force_update):
# 异步获取所有天气数据和地理信息
weather_results = asyncio.run(get_all_weather_data(lat, lng))
# 检查是否所有请求都成功
if not all(weather_results.values()):
return jsonify({
'status': 'error',
'message': 'Failed to fetch weather data'
}), 500
# 更新缓存
update_weather_cache(weather_results, lat, lng)
# 返回天气数据
return jsonify({
'status': 'success',
'data': {
'now': weather_data['now'],
'hourly': weather_data['hourly'],
'daily': weather_data['daily'],
'geo_info': weather_data['geo_info'],
'last_update': weather_data['last_update'].strftime("%Y-%m-%d %H:%M:%S") if weather_data['last_update'] else None,
'location': {
'latitude': lat,
'longitude': lng,
'coordinate_system': coordinate_system.upper()
}
}
}), 200
except Exception as e:
print(f"Error getting weather data: {e}")
return jsonify({
'status': 'error',
'message': f'Failed to get weather data: {str(e)}'
}), 500
@app.route("/get_demo_video")
def get_demo_video():
path = "static/images/video/demo.m4v"
range_header = request.headers.get("Range", None)
if not range_header:
return Response(open(path, "rb"), mimetype="video/mp4")
# 解析 Range 请求头
size = os.path.getsize(path)
byte_range = range_header.split("=")[1]
start, end = byte_range.split("-")
start = int(start)
end = int(end) if end else size - 1
chunk_size = 1024 * 1024 # 1MB 片段
with open(path, "rb") as f:
f.seek(start)
data = f.read(end - start + 1)
headers = {
"Content-Range": f"bytes {start}-{end}/{size}",
"Accept-Ranges": "bytes",
"Content-Length": str(end - start + 1),
"Content-Type": "video/mp4",
}
return Response(data, status=206, headers=headers)
@app.route('/get_volume', methods=['GET'])
def get_volume():
"""
获取当前音量并返回
"""
print("访问了 /get_volume 路由") # 添加调试日志
current_volume = volume_control.get_current_volume()
if current_volume is not None:
return jsonify({"current_volume": current_volume}), 200
else:
return jsonify({"error": "无法获取当前音量"}), 500
@app.route('/adjust_volume', methods=['POST'])
def adjust_volume():
try:
# 获取请求体中的指令和第二个参数(是否发送到前端)
adjust_volumn_result = request.json.get('adjust_volumn_result')
notify_frontend = request.json.get('notify_frontend', True) # 默认值为 True如果没有传递则会发送通知
if adjust_volumn_result is None:
return jsonify({"error": "没有提供调整指令"}), 400
# 调用 volume_control 模块进行音量调整
volume_control.adjust_volume(adjust_volumn_result)
# 如果 `notify_frontend` 为 True则发送通知到前端
if notify_frontend:
socketio.emit('volume_adjusted', {'message': f"音量已调整为 {adjust_volumn_result}"})
return jsonify({"status": "success", "message": f"音量已调整为 {adjust_volumn_result}"}), 200
except Exception as e:
return jsonify({"error": str(e)}), 500
def send_audio_command(command, port=8766):
try:
# 输出调试信息
print(f"正在发送命令: {command} 到端口: {port}")
# 创建新的事件循环
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
# 确保任务已创建
task = asyncio.ensure_future(send_message(command, port))
# 等待任务完成
loop.run_until_complete(task)
# 输出发送命令后的成功信息
print(f"命令 {command} 成功发送到端口 {port}")
except Exception as e:
print(f"发送命令时发生错误: {e}")
@socketio.on("speech_audio_control")
def speech_audio_control(data):
if "start" in data:
command = "speech_audio_start"
send_audio_command(command)
elif "cancel" in data:
command = "speech_audio_cancel"
send_audio_command(command)
@app.route('/upload_speak', methods=['POST'])
def upload_speak():
try:
# 检查是否有文件上传
if 'audio' not in request.files:
return jsonify({'error': 'No audio file provided'}), 400
audio_file = request.files['audio']
if audio_file.filename == '':
return jsonify({'error': 'No selected file'}), 400
# 确保 ../tmp 目录存在
tmp_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', 'tmp'))
os.makedirs(tmp_dir, exist_ok=True)
# 删除旧的音频文件
for old_file in os.listdir(tmp_dir):
if old_file.startswith("speak_") and old_file.endswith(".mp3"):
old_file_path = os.path.join(tmp_dir, old_file)
try:
os.remove(old_file_path)
print(f"Deleted old audio file: {old_file_path}")
except Exception as e:
print(f"Error deleting old audio file {old_file_path}: {e}")
# 生成带时间戳的文件名
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"speak_{timestamp}.mp3"
file_path = os.path.join(tmp_dir, filename)
# 保存文件
audio_file.save(file_path)
print(f"Audio file saved to: {file_path}")
# 发送指令到端口 8766告知新的音频文件上传并发送文件绝对路径
command = f"speech_audio_stop:{file_path}"
send_audio_command(command) # 使用公共函数发送指令
return jsonify({'message': 'Audio file uploaded successfully', 'filename': filename}), 200
except Exception as e:
print(f"Error uploading audio file: {e}")
return jsonify({'error': str(e)}), 500
@app.route('/heartbeat')
def heartbeat():
return 'OK'
@app.route("/upload_playlist_image", methods=["POST"])
def upload_playlist_image():
try:
if 'image' not in request.files:
return jsonify({"status": "error", "message": "No image file uploaded"}), 400
image_file = request.files['image']
image_stream = io.BytesIO(image_file.read())
image = Image.open(image_stream)
cv_image = cv2.cvtColor(np.array(image), cv2.COLOR_RGB2BGR)
decoded_objects = decode(cv_image)
if not decoded_objects:
return jsonify({"status": "error", "message": "No QR code found in image"}), 400
qr_data = decoded_objects[0].data.decode('utf-8')
print("Original QR URL:", qr_data)
# 替换原始QR码URL中的域名
if qr_data.startswith('https://c6.y.qq.com/'):
qr_data = qr_data.replace('https://c6.y.qq.com/', 'https://robotstorm.tech/c6yqq/')
headers = {
'User-Agent': 'Mozilla/5.0 (iPhone; CPU iPhone OS 14_0 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.0 Mobile/15E148 Safari/604.1',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
'Accept-Language': 'en-US,en;q=0.5',
'Connection': 'keep-alive',
}
response = requests.get(qr_data, headers=headers, allow_redirects=True)
final_url = response.url
# 替换重定向后URL中的域名
if final_url.startswith('https://y.qq.com/'):
final_url = final_url.replace('https://y.qq.com/', 'https://robotstorm.tech/yqq/')
print("Final URL:", final_url)
parsed_url = urlparse(final_url)
query_params = parse_qs(parsed_url.query)
playlist_id = query_params.get('id', [None])[0]
if not playlist_id:
path_parts = parsed_url.path.split('/')
for part in path_parts:
if part.isdigit():
playlist_id = part
break
if not playlist_id:
return jsonify({"status": "error", "message": "No playlist ID found in QR code"}), 400
print("Found playlist ID:", playlist_id)
global categoryIDs
if playlist_id not in categoryIDs:
categoryIDs.insert(0, playlist_id)
# 保存更新后的歌单ID到配置文件
if not save_playlist_ids(categoryIDs):
return jsonify({"status": "error", "message": "Failed to save playlist configuration"}), 500
# Clear the playlists cache to force refresh
global all_playlists
all_playlists = []
return jsonify({"status": "success", "message": "Playlist added successfully"})
except Exception as e:
print(f"Error processing playlist image: {e}")
return jsonify({"status": "error", "message": str(e)}), 500
@app.route("/delete_playlist", methods=["POST"])
def delete_playlist():
try:
data = request.get_json()
category_id = data.get('categoryId')
if not category_id:
return jsonify({"status": "error", "message": "未提供歌单ID"}), 400
global categoryIDs, all_playlists
if category_id in categoryIDs:
categoryIDs.remove(category_id)
# 保存更新后的歌单ID到配置文件
if not save_playlist_ids(categoryIDs):
return jsonify({"status": "error", "message": "Failed to save playlist configuration"}), 500
# 从 all_playlists 中删除对应的歌单
all_playlists = [p for p in all_playlists if p['diss_info'].get('categoryId') != category_id]
return jsonify({"status": "success", "message": "歌单删除成功"})
else:
return jsonify({"status": "error", "message": "歌单不存在"}), 404
except Exception as e:
print(f"Error deleting playlist: {e}")
return jsonify({"status": "error", "message": str(e)}), 500
@app.route('/api/license/use', methods=['POST'])
def use_license():
# 调用模块中的逻辑,获取返回结果
result = license_module.use_license()
# 如果返回的是错误返回500状态码
if 'error' in result:
return jsonify(result), 500
else:
return jsonify(result), 200 # 返回成功的数据
@app.route('/api/license/check', methods=['GET'])
def check_license():
# 调用模块中的逻辑,获取返回结果
result = license_module.check_license()
# 如果返回的是错误返回500状态码
if 'error' in result:
return jsonify(result), 500
else:
return jsonify(result), 200 # 返回成功的数据
@app.route('/api/license/info', methods=['GET'])
def get_license_info():
# 调用模块中的逻辑,获取返回结果
result = license_module.get_license_info()
# 如果返回的是错误返回500状态码
if 'error' in result:
return jsonify(result), 500
else:
return jsonify(result), 200 # 返回成功的数据
# 设置深度思考状态的接口
@app.route("/set_deep_thought", methods=["POST"])
def set_deep_thought():
try:
# 获取请求数据,设定深度思考状态
data = request.json
status = data.get("status", False)
# 设置深度思考状态
set_deep_thought_status(status)
return jsonify({"status": "success", "message": f"Deep thought status set to {status}"}), 200
except Exception as e:
return jsonify({"status": "error", "message": str(e)}), 500
# 获取深度思考状态的接口
@app.route("/get_deep_thought", methods=["GET"])
def get_deep_thought():
try:
# 获取深度思考状态
status = get_deep_thought_status()
return jsonify({"status": "success", "deep_thought_active": status}), 200
except Exception as e:
return jsonify({"status": "error", "message": str(e)}), 500
# 设置联网搜索状态的接口
@app.route("/set_ai_search", methods=["POST"])
def set_ai_search():
try:
# 获取请求数据,设定联网搜索状态
data = request.json
status = data.get("status", False)
# 设置联网搜索状态
set_ai_search_status(status)
return jsonify({"status": "success", "message": f"Deep thought status set to {status}"}), 200
except Exception as e:
return jsonify({"status": "error", "message": str(e)}), 500
# 获取联网搜索状态的接口
@app.route("/get_ai_search", methods=["GET"])
def get_ai_search():
try:
# 获取联网搜索状态
status = get_ai_search_status()
return jsonify({"status": "success", "ai_search_active": status}), 200
except Exception as e:
return jsonify({"status": "error", "message": str(e)}), 500
@app.route("/massage_plans")
def massage_plans():
# 获取URL参数
choose_task = request.args.get('choose_task', '')
body_part = request.args.get('body_part', '')
# 将参数传递给模板
return render_template(
"massage_plan.html",
initial_choose_task=choose_task,
initial_body_part=body_part
)
@app.route("/get_massage_plan", methods=["GET"])
def get_massage_plan():
try:
# 获取筛选参数
plan_name = request.args.get('plan_name', '')
choose_task = request.args.get('choose_task', '')
body_part = request.args.get('body_part', '')
# 从vtxdb获取按摩计划数据
massage_plans = vtxdb.get("massage_plan", plan_name)
# 如果指定了plan_name直接返回结果
if plan_name:
return jsonify({"status": "success", "data": massage_plans})
# 如果没有指定plan_name但指定了筛选条件进行筛选
if choose_task or body_part:
filtered_plans = {}
for name, plan in massage_plans.items():
# 同时满足两个条件
if choose_task and body_part:
if plan.get('choose_task') == choose_task and plan.get('body_part') == body_part:
filtered_plans[name] = plan
# 只满足choose_task条件
elif choose_task:
if plan.get('choose_task') == choose_task:
filtered_plans[name] = plan
# 只满足body_part条件
elif body_part:
if plan.get('body_part') == body_part:
filtered_plans[name] = plan
return jsonify({"status": "success", "data": filtered_plans})
# 如果没有任何筛选条件,返回所有计划
return jsonify({"status": "success", "data": massage_plans})
except Exception as e:
return jsonify({"status": "error", "message": str(e)}), 500
@app.route("/set_massage_plan", methods=["POST", "DELETE"])
def set_massage_plan():
try:
data = request.json
plan_name = data.get('plan_name')
if request.method == "DELETE":
if not plan_name:
return jsonify({"status": "error", "message": "缺少疗程名称"}), 400
# 先获取疗程信息,检查是否可以删除
plan_data = vtxdb.get("massage_plan", plan_name)
if not plan_data or not isinstance(plan_data, dict):
return jsonify({"status": "error", "message": "疗程不存在"}), 404
if not plan_data.get('can_delete', False):
return jsonify({"status": "error", "message": "该疗程不允许删除"}), 403
# 执行删除操作
vtxdb.delete("massage_plan", plan_name)
return jsonify({"status": "success", "message": "删除成功"})
else: # POST method
plan_data = data.get('plan_data')
if not plan_name or not plan_data:
return jsonify({"status": "error", "message": "缺少必要参数"}), 400
# 确保复制的疗程可以删除
if isinstance(plan_data, dict):
plan_data['can_delete'] = True
# 保存到vtxdb
vtxdb.set("massage_plan", plan_name, plan_data)
return jsonify({"status": "success", "message": "保存成功"})
except Exception as e:
return jsonify({"status": "error", "message": str(e)}), 500
@app.route("/develop_login", methods=["POST"])
def develop_login():
data = request.json
password = data["password"]
# 简单验证示例
if password == "cd123456":
return jsonify(success=True)
else:
return jsonify(success=False)
@app.route("/plans_edit", methods=["POST"])
def plans_edit():
data = request.json
password = data["password"]
# 简单验证示例
if password == "robotstorm":
return jsonify(success=True)
else:
return jsonify(success=False)
@app.route("/get_log", methods=["GET"])
def get_log():
log_type = request.args.get("type") # 必填参数ui/language/massage
keyword = request.args.get("keyword", "") # 可选参数:筛选关键字
page = int(request.args.get("page", 1))
page_size = int(request.args.get("pageSize", 400))
# 参数校验
if log_type not in ["ui", "language", "massage"]:
return jsonify({"error": "无效的日志类型"}), 400
# 调用工具函数查询日志
result = read_log_file(log_type, keyword, page, page_size)
return jsonify(result)
# 存储校准任务的状态
calibration_tasks = {}
def run_calibration():
"""在后台运行校准任务"""
try:
# 通知前端开始校准
socketio.emit('calibration_status', {
'status': 'running',
'message': '开始自动校准...'
})
# 设置标定板参数
calibration_board_size = [6, 3] # width=6, height=3
calibration_board_square_size = 0.030 # 单位:米
# 创建校准对象并执行校准
calib = Calibration(calibration_board_size, calibration_board_square_size,"/home/jsfb/Documents/")
# 收集数据
socketio.emit('calibration_status', {
'status': 'collecting',
'message': '正在收集标定数据...'
})
calib.collect_data()
# 执行校准
socketio.emit('calibration_status', {
'status': 'calibrating',
'message': '正在计算标定参数...'
})
r, t ,intrinsics= calib.calibrate()
print("旋转矩阵:")
print(r)
print("平移向量:")
print(t)
print("内参矩阵:")
print(intrinsics)
# 转换结果为可序列化的格式
rotation_matrix = r.tolist() if hasattr(r, 'tolist') else r
translation_vector = t.flatten().tolist() if hasattr(t, 'tolist') else t
# 发送完成信号和结果
socketio.emit('calibration_status', {
'status': 'completed',
'message': '校准完成',
'results': {
'rotation_matrix': rotation_matrix,
'translation_vector': translation_vector,
'intrinsics': intrinsics # 直接使用字典格式的内参
}
})
except Exception as e:
error_message = str(e)
print(f"校准过程出错: {error_message}")
# 发送错误信息
socketio.emit('calibration_status', {
'status': 'error',
'message': f'校准失败: {error_message}'
})
@app.route("/start_calibration", methods=["POST"])
def start_calibration():
"""启动自动校准过程"""
try:
# 启动后台线程进行校准
thread = threading.Thread(target=run_calibration)
thread.daemon = True
thread.start()
return jsonify({
'status': 'success',
'message': '校准任务已启动'
})
except Exception as e:
return jsonify({
'status': 'error',
'message': str(e)
}), 500
@app.route("/get_calibration_status/<task_id>", methods=["GET"])
def get_calibration_status(task_id):
"""获取校准任务的状态"""
if task_id not in calibration_tasks:
return jsonify({
'status': 'error',
'message': '任务不存在'
}), 404
task = calibration_tasks[task_id]
response = {
'status': task['status'],
'started_at': task['started_at']
}
if task['status'] == 'completed' and 'results' in task:
response['results'] = task['results']
elif task['status'] == 'error' and 'error' in task:
response['error'] = task['error']
return jsonify(response)
@app.route("/cancel_calibration/<task_id>", methods=["POST"])
def cancel_calibration(task_id):
"""取消正在进行的校准任务"""
if task_id not in calibration_tasks:
return jsonify({
'status': 'error',
'message': '任务不存在'
}), 404
if calibration_tasks[task_id]['status'] == 'completed':
return jsonify({
'status': 'error',
'message': '任务已完成,无法取消'
}), 400
# 从任务列表中移除任务
del calibration_tasks[task_id]
return jsonify({
'status': 'success',
'message': '校准任务已取消'
})
@app.route("/save_calibration", methods=["POST"])
def save_calibration():
"""保存标定结果到数据库"""
try:
data = request.get_json()
rotation_matrix = data.get('rotation_matrix')
translation_vector = data.get('translation_vector')
intrinsics = data.get('intrinsics')
if not rotation_matrix or not translation_vector or not intrinsics:
return jsonify({
'status': 'error',
'message': '缺少必要的标定数据'
}), 400
# 保存到数据库
vtxdb.set("robot_config", "camera.camera_matrix", rotation_matrix)
vtxdb.set("robot_config", "camera.camera_trans", translation_vector)
vtxdb.set("robot_config", "camera.intrinsics", intrinsics) # 保存字典格式的内参
return jsonify({
'status': 'success',
'message': '标定结果已保存'
})
except Exception as e:
print(f"保存标定结果时出错: {str(e)}")
return jsonify({
'status': 'error',
'message': str(e)
}), 500
@app.route("/massage_plan_visualization")
def massage_plan_visualization():
body_part = request.args.get('body_part', 'back') # 默认为背部
# 腰部(waist)、肩膀(shoulder)和背部(back)都使用相同的图片和点位
if body_part in ['waist', 'shoulder']:
body_part = 'back'
return render_template("massage_plan_visualization.html", body_part=body_part)
@app.route("/massage_plan_control")
def massage_plan_control():
body_part = request.args.get('body_part', 'back') # 默认为背部
return render_template("massage_plan_control.html", body_part=body_part)
# 默认选中的按摩头
@app.route("/get_massage_heads", methods=["GET"])
def get_massage_heads():
try:
# 从vtxdb获取按摩头配置
heads_config = vtxdb.get("system_config", "massage_heads")
# 如果不存在,初始化默认配置
if not heads_config:
default_heads = {
'thermotherapy': {'display': True, 'name': '深部热疗'},
'shockwave': {'display': True, 'name': '点阵按摩'},
'ball': {'display': False, 'name': '全能滚珠'},
'finger': {'display': True, 'name': '指疗通络'},
'roller': {'display': True, 'name': '滚滚刺疗'},
'stone': {'display': True, 'name': '温砭舒揉'},
'ion': {'display': False, 'name': '离子光灸'},
"heat": {'display': False, 'name': '能量热疗'},
"spheres": {'display': False, 'name': '天球滚捏'},
}
vtxdb.set("system_config", "massage_heads", default_heads)
heads_config = default_heads
return jsonify({
"status": "success",
"data": heads_config
})
except Exception as e:
return jsonify({
"status": "error",
"message": f"获取按摩头配置失败: {str(e)}"
}), 500
@app.route("/set_massage_heads", methods=["POST"])
def set_massage_heads():
try:
data = request.json
heads_config = data.get('heads_config')
if not heads_config:
return jsonify({
"status": "error",
"message": "缺少按摩头配置参数"
}), 400
# 验证配置格式
required_fields = ['thermotherapy', 'shockwave', 'ball',
'finger', 'roller', 'stone', 'ion']
if not all(head in heads_config for head in required_fields):
return jsonify({
"status": "error",
"message": "按摩头配置不完整"
}), 400
# 保存到vtxdb
vtxdb.set("system_config", "massage_heads", heads_config)
return jsonify({
"status": "success",
"message": "按摩头配置保存成功"
})
except Exception as e:
return jsonify({
"status": "error",
"message": f"保存按摩头配置失败: {str(e)}"
}), 500
@app.route("/get_jump_mode", methods=["GET"])
def get_jump_mode():
try:
# 从vtxdb获取跳跃模式配置
jump_config = vtxdb.get("system_config", "jump_mode")
# 如果不存在,初始化默认配置
if not jump_config:
default_config = {
"enabled": True, # 默认开启跳跃模式
}
vtxdb.set("system_config", "jump_mode", default_config)
jump_config = default_config
return jsonify({
"status": "success",
"data": jump_config
})
except Exception as e:
return jsonify({
"status": "error",
"message": f"获取跳跃模式失败: {str(e)}"
}), 500
@app.route("/set_jump_mode", methods=["POST"])
def set_jump_mode():
try:
data = request.json
enabled = data.get("enabled")
if enabled is None:
return jsonify({
"status": "error",
"message": "缺少enabled参数"
}), 400
# 获取当前配置
current_config = vtxdb.get("system_config", "jump_mode") or {
"enabled": True,
}
# 更新配置
current_config["enabled"] = bool(enabled)
vtxdb.set("system_config", "jump_mode", current_config)
return jsonify({
"status": "success",
"message": "跳跃模式设置成功",
"data": current_config
})
except Exception as e:
return jsonify({
"status": "error",
"message": f"设置跳跃模式失败: {str(e)}"
}), 500
if __name__ == "__main__":
# 使用 argparse 解析命令行参数
parser = argparse.ArgumentParser(description="Run Flask-SocketIO server.")
parser.add_argument("--debug", action="store_true", help="Enable debug mode")
args, unknown = parser.parse_known_args()
# 设置调试模式
DEBUG_MODE = args.debug
while True:
try:
# 尝试启动 Flask-SocketIO 应用
socketio.run(
app, debug=DEBUG_MODE, host="0.0.0.0", port=5000, allow_unsafe_werkzeug=True
)
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
print("Restarting server in 5 seconds...")
time.sleep(5) # 等待5秒后重启
# MAX_WAIT_TIME = 300 # 最多等待 5 分钟
# elapsed_time = 0
# while not check_network() and elapsed_time < MAX_WAIT_TIME:
# print(f"Network unavailable. Retrying in 5 seconds... ({elapsed_time}/{MAX_WAIT_TIME}s)")
# time.sleep(5)
# elapsed_time += 5
# if elapsed_time >= MAX_WAIT_TIME:
# print("Network initialization failed. Proceeding without network...")
# # 启动 Flask-SocketIO 服务
# socketio.run(app, host="0.0.0.0", port=5000)
# # 运行 Flask-SocketIO 应用
# socketio.run(
# app, debug=DEBUG_MODE, host="0.0.0.0", port=5000, allow_unsafe_werkzeug=True
# )
# socketio.run(app, debug=DEBUG_MODE, host="0.0.0.0", port=5000) #如果使用gunicorn启动