3567 lines
121 KiB
Python
Executable File
3567 lines
121 KiB
Python
Executable File
from flask import (
|
||
Flask,
|
||
render_template,
|
||
jsonify,
|
||
redirect,
|
||
url_for,
|
||
request,
|
||
Response,
|
||
render_template_string,
|
||
session,
|
||
send_from_directory
|
||
)
|
||
from flask_socketio import SocketIO
|
||
from flask_cors import CORS
|
||
from flask_compress import Compress
|
||
from flask_sslify import SSLify
|
||
from zeroconf import Zeroconf, ServiceInfo
|
||
import atexit
|
||
import uuid
|
||
import socket
|
||
import netifaces
|
||
import time
|
||
from qq_music import (
|
||
fetch_latest_music,
|
||
get_song_list,
|
||
fetch_album_image,
|
||
get_music_info,
|
||
player,
|
||
)
|
||
import threading
|
||
from threading import Lock
|
||
import asyncio
|
||
import websockets
|
||
import json
|
||
import argparse
|
||
import librosa
|
||
import numpy as np
|
||
import subprocess
|
||
import signal
|
||
import os
|
||
import sys
|
||
import re
|
||
from datetime import datetime, timedelta
|
||
import paramiko
|
||
import requests
|
||
from bs4 import BeautifulSoup, NavigableString
|
||
import shutil
|
||
from urllib.parse import urljoin, urlparse, parse_qs
|
||
from requests.auth import HTTPBasicAuth
|
||
from tools.ssh_tools import execute_command_on_remote, is_host_down, ReverseSSH
|
||
import logging
|
||
from hw_obs import HW_OBS
|
||
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
||
from force_sensor_aubo import XjcSensor
|
||
from aubo_C5_UI import AuboC5
|
||
from Massage.MassageControl.tools.auto_visual_calibration import Calibration
|
||
from Massage.MassageControl.tools.replay_trajectory_from_csv import RobotDanceController
|
||
import fitz # PyMuPDF
|
||
from io import BytesIO
|
||
from tools.wifi_tools import WifiManager
|
||
from tools import volume_control # 从 tools 文件夹导入 volume_control 模块
|
||
from tools import license_module
|
||
import math
|
||
import aiohttp
|
||
import serial
|
||
import serial.tools.list_ports
|
||
import yaml
|
||
from power_board import PowerBoard
|
||
import cv2
|
||
from pyzbar.pyzbar import decode
|
||
from PIL import Image
|
||
import io
|
||
from tools.deep_thought import set_deep_thought_status, get_deep_thought_status
|
||
from tools.ai_search import set_ai_search_status, get_ai_search_status
|
||
from tools.version_control import *
|
||
from VortXDB.client import VTXClient
|
||
from tools.log_utils import read_log_file
|
||
|
||
from modules.thermal.thermal_routes import thermal_bp, init_thermal_socketio
|
||
from modules.common.common_routes import common_bp
|
||
from modules.vtxdb.vtxdb_routes import vtxdb_bp
|
||
|
||
vtxdb = VTXClient(use_logger=False) # 创建VTXClient实例
|
||
|
||
DEBUG_MODE = False
|
||
|
||
# 配置 logging 模块
|
||
log_file = "../log/UI-next-app.log"
|
||
|
||
|
||
# 定义备份文件夹路径
|
||
backup_folder = os.path.abspath(os.path.join(os.path.dirname(__file__), "../../UILog"))
|
||
|
||
# 创建 UILog 文件夹(如果不存在)
|
||
if not os.path.exists(backup_folder):
|
||
os.makedirs(backup_folder) # 自动创建目录
|
||
|
||
# 备份日志文件
|
||
if os.path.exists(log_file):
|
||
# 生成带时间戳的备份文件名
|
||
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
||
backup_file = os.path.join(backup_folder, f"UI-next-app_{timestamp}.log")
|
||
shutil.copy2(log_file, backup_file)
|
||
|
||
|
||
logging.basicConfig(
|
||
level=logging.INFO,
|
||
format="%(asctime)s - %(levelname)s - %(message)s",
|
||
handlers=[
|
||
# logging.FileHandler(log_file),
|
||
logging.FileHandler(log_file, mode="w"), # 覆盖模式,每次运行清空日志
|
||
logging.StreamHandler(sys.stdout),
|
||
],
|
||
)
|
||
|
||
|
||
# 定义日志记录器
|
||
class LoggerWriter:
|
||
def __init__(self, logger, level):
|
||
self.logger = logger
|
||
self.level = level
|
||
|
||
def write(self, message):
|
||
if message.strip() != "":
|
||
self.logger.log(self.level, message.strip())
|
||
|
||
def flush(self):
|
||
pass
|
||
|
||
|
||
# 重定向标准输出和标准错误
|
||
sys.stdout = LoggerWriter(logging.getLogger(), logging.INFO)
|
||
sys.stderr = LoggerWriter(logging.getLogger(), logging.ERROR)
|
||
|
||
if getattr(sys, "frozen", False):
|
||
template_folder = os.path.join(sys._MEIPASS, "templates")
|
||
static_folder = os.path.join(sys._MEIPASS, "static")
|
||
app = Flask(__name__, template_folder=template_folder, static_folder=static_folder)
|
||
else:
|
||
app = Flask(__name__)
|
||
|
||
# app = Flask(__name__)
|
||
# sslify = SSLify(app)
|
||
CORS(app)
|
||
app.secret_key = "jsfb" # 设置密钥,用于安全保护 session
|
||
app.config["SECRET_KEY"] = "jsfb"
|
||
app.config["COMPRESS_MIN_SIZE"] = 50
|
||
app.config["COMPRESS_LEVEL"] = 8
|
||
app.config["COMPRESS_ALGORITHM"] = 'gzip'
|
||
|
||
# 启用响应内容压缩
|
||
Compress(app)
|
||
|
||
app.register_blueprint(thermal_bp)
|
||
app.register_blueprint(common_bp)
|
||
app.register_blueprint(vtxdb_bp)
|
||
|
||
socketio = SocketIO(app, cors_allowed_origins="*")
|
||
|
||
# 在创建socketio实例后
|
||
init_thermal_socketio(socketio)
|
||
|
||
|
||
|
||
def signal_handler(signal, frame):
|
||
print("Shutting Down")
|
||
global power_board_ip, power_board_thread, power_board_thread_running
|
||
if power_board_thread_running:
|
||
power_board_thread_running = False
|
||
if power_board_thread:
|
||
try:
|
||
power_board_thread.join(timeout=2.0) # 设置超时时间,避免永久阻塞
|
||
except Exception as e:
|
||
print(f"Error stopping power board thread during shutdown: {e}")
|
||
power_board_thread = None
|
||
power_board_ip = None
|
||
sys.exit(0)
|
||
|
||
|
||
signal.signal(signal.SIGINT, signal_handler)
|
||
signal.signal(signal.SIGTERM, signal_handler)
|
||
|
||
current_mode = "manual_mode" # manual_mode / smart_mode / handheld_mode
|
||
|
||
latest_music = []
|
||
all_playlists = []
|
||
|
||
@app.route("/get_music_data", methods=["GET"])
|
||
def get_music_data():
|
||
global latest_music
|
||
if not latest_music:
|
||
latest_music = fetch_latest_music()
|
||
return jsonify(latest_music)
|
||
|
||
|
||
def load_playlist_ids():
|
||
"""从配置文件加载歌单ID列表"""
|
||
config_path = "/home/jsfb/jsfb_ws/global_config/music.yaml"
|
||
default_playlist_ids = ["9126599100", "8429112432", "7299191148", "7132357466", "1137267511", "7284914844"]
|
||
|
||
# 确保目录存在
|
||
os.makedirs(os.path.dirname(config_path), exist_ok=True)
|
||
|
||
try:
|
||
# 如果文件不存在,创建文件并写入默认歌单
|
||
if not os.path.exists(config_path):
|
||
config = {'playlist_ids': default_playlist_ids}
|
||
with open(config_path, 'w', encoding='utf-8') as f:
|
||
yaml.dump(config, f, allow_unicode=True)
|
||
return default_playlist_ids
|
||
|
||
# 如果文件存在,读取配置
|
||
with open(config_path, 'r', encoding='utf-8') as f:
|
||
config = yaml.safe_load(f)
|
||
return config.get('playlist_ids', default_playlist_ids)
|
||
except Exception as e:
|
||
print(f"Error loading playlist IDs from config: {e}")
|
||
return default_playlist_ids
|
||
|
||
def save_playlist_ids(playlist_ids):
|
||
"""保存歌单ID列表到配置文件"""
|
||
config_path = "/home/jsfb/jsfb_ws/global_config/music.yaml"
|
||
|
||
try:
|
||
# 确保目录存在
|
||
os.makedirs(os.path.dirname(config_path), exist_ok=True)
|
||
|
||
config = {'playlist_ids': playlist_ids}
|
||
with open(config_path, 'w', encoding='utf-8') as f:
|
||
yaml.dump(config, f, allow_unicode=True)
|
||
return True
|
||
except Exception as e:
|
||
print(f"Error saving playlist IDs to config: {e}")
|
||
return False
|
||
|
||
# 初始化时从配置文件加载歌单ID
|
||
categoryIDs = load_playlist_ids()
|
||
|
||
@app.route("/get_playlists", methods=["GET"])
|
||
def get_playlists():
|
||
# print("get_playlists")
|
||
global all_playlists
|
||
if not all_playlists:
|
||
for categoryID in categoryIDs:
|
||
# print(categoryID)
|
||
diss_info, music_info_list = get_song_list(categoryID)
|
||
diss_info['categoryId'] = categoryID # 添加 categoryId 到 diss_info
|
||
all_playlists.append(
|
||
{"diss_info": diss_info, "music_info_list": music_info_list}
|
||
)
|
||
print(diss_info)
|
||
return jsonify(all_playlists)
|
||
|
||
|
||
@app.route("/search", methods=["POST"])
|
||
def search():
|
||
# 获取前端发送过来的数据
|
||
search_term = request.json.get("term")
|
||
print(search_term)
|
||
search_results = get_music_info(search_term)
|
||
|
||
# 返回处理后的数据给前端
|
||
return jsonify(search_results)
|
||
|
||
|
||
@app.route("/get_album_image", methods=["POST"])
|
||
def get_album_image():
|
||
data = request.get_json()
|
||
singer_name = data.get("singer_name")
|
||
|
||
if not singer_name:
|
||
return jsonify({"error": "Singer name is required"}), 400
|
||
|
||
album_img_url, error = fetch_album_image(singer_name)
|
||
|
||
if album_img_url:
|
||
return jsonify({"album_img_url": album_img_url}), 200
|
||
else:
|
||
return jsonify({"error": error}), 404
|
||
|
||
|
||
@app.route("/song_clicked", methods=["POST"])
|
||
def song_clicked():
|
||
global playlist, current_song_index
|
||
data = request.json
|
||
songmid = data.get("songmid")
|
||
print(f"Song clicked with songmid: {songmid}")
|
||
# 查找歌曲是否已在播放列表中
|
||
song_in_playlist = next(
|
||
(song for song in playlist if song["songmid"] == songmid), None
|
||
)
|
||
|
||
if song_in_playlist:
|
||
current_song_index = playlist.index(song_in_playlist)
|
||
next_song = playlist[current_song_index]
|
||
current_song.update(
|
||
{
|
||
"album_img_url": next_song["album_img_url"],
|
||
"music_name": next_song["music_name"],
|
||
"singer_name": next_song["singer_name"],
|
||
"is_playing": True,
|
||
}
|
||
)
|
||
print(current_song)
|
||
# 获取并播放下一首歌曲
|
||
# music_page = f"https://y.qq.com/n/yqq/song/{next_song['songmid']}.html"
|
||
music_page = f"https://robotstorm.tech/yqq/n/yqq/song/{next_song['songmid']}.html"
|
||
music_url = player.fetch_music_url_by_mid(next_song["songmid"])
|
||
if not music_url:
|
||
music_url = player.fetch_music_url(music_page)
|
||
socketio.sleep(0.5)
|
||
player.play_music(music_url)
|
||
else:
|
||
# 如果歌曲不在播放列表中,则添加到播放列表
|
||
# music_page = f"https://y.qq.com/n/yqq/song/{songmid}.html"
|
||
music_page = f"https://robotstorm.tech/yqq/n/yqq/song/{songmid}.html"
|
||
music_url = player.fetch_music_url_by_mid(songmid)
|
||
if not music_url:
|
||
music_url = player.fetch_music_url(music_page)
|
||
|
||
if music_url:
|
||
# 获取歌曲信息并添加到播放列表
|
||
song_info = {
|
||
"album_img_url": data.get("album_img_url"),
|
||
"music_name": data.get("music_name"),
|
||
"singer_name": data.get("singer_name"),
|
||
"songmid": songmid,
|
||
}
|
||
playlist.append(song_info)
|
||
current_song_index = len(playlist) - 1
|
||
|
||
# 开始播放该歌曲
|
||
current_song.update(
|
||
{
|
||
"album_img_url": song_info["album_img_url"],
|
||
"music_name": song_info["music_name"],
|
||
"singer_name": song_info["singer_name"],
|
||
"is_playing": True,
|
||
}
|
||
)
|
||
player.play_music(music_url)
|
||
print(playlist)
|
||
|
||
return jsonify({"status": "success", "message": "Music started"})
|
||
else:
|
||
return jsonify({"status": "error", "message": "Failed to fetch music URL"})
|
||
|
||
return jsonify({"status": "success", "message": "Song added to playlist"})
|
||
|
||
|
||
@app.route("/playlist_update", methods=["POST"])
|
||
def playlist_update():
|
||
global playlist, current_song_index
|
||
data = request.json
|
||
new_playlist = data.get("playlist", [])
|
||
|
||
if not new_playlist:
|
||
return jsonify({"status": "error", "message": "Playlist is empty"})
|
||
|
||
# 清空当前播放列表并更新为新的播放列表
|
||
playlist = []
|
||
for song in new_playlist:
|
||
song_info = {
|
||
"album_img_url": song.get("album_img_url"),
|
||
"music_name": song.get("music_name"),
|
||
"singer_name": song.get("singer_name"),
|
||
"songmid": song.get("songmid"),
|
||
}
|
||
playlist.append(song_info)
|
||
|
||
# 设置当前播放的歌曲为新列表中的第一首
|
||
current_song_index = 0
|
||
first_song = playlist[current_song_index]
|
||
current_song.update(
|
||
{
|
||
"album_img_url": first_song["album_img_url"],
|
||
"music_name": first_song["music_name"],
|
||
"singer_name": first_song["singer_name"],
|
||
"is_playing": True,
|
||
}
|
||
)
|
||
|
||
# 获取并播放第一首歌
|
||
# music_page = f"https://y.qq.com/n/yqq/song/{first_song['songmid']}.html"
|
||
music_page = f"https://robotstorm.tech/yqq/n/yqq/song/{first_song['songmid']}.html"
|
||
music_url = player.fetch_music_url_by_mid(first_song["songmid"])
|
||
if not music_url:
|
||
music_url = player.fetch_music_url(music_page)
|
||
|
||
if music_url:
|
||
socketio.sleep(0.5)
|
||
player.play_music(music_url)
|
||
print(playlist)
|
||
return jsonify(
|
||
{"status": "success", "message": "Playlist updated and playing first song"}
|
||
)
|
||
else:
|
||
return jsonify(
|
||
{"status": "error", "message": "Failed to fetch music URL for first song"}
|
||
)
|
||
|
||
|
||
@app.route("/pause_music", methods=["POST"])
|
||
def pause_music():
|
||
player.pause_music()
|
||
return jsonify({"status": "success", "message": "Music paused"})
|
||
|
||
|
||
@app.route("/resume_music", methods=["POST"])
|
||
def resume_music():
|
||
if player.is_playing:
|
||
player.resume_music()
|
||
else:
|
||
next_song = playlist[current_song_index]
|
||
current_song.update(
|
||
{
|
||
"album_img_url": next_song["album_img_url"],
|
||
"music_name": next_song["music_name"],
|
||
"singer_name": next_song["singer_name"],
|
||
"is_playing": True,
|
||
}
|
||
)
|
||
print(current_song)
|
||
# music_page = f"https://y.qq.com/n/yqq/song/{next_song['songmid']}.html"
|
||
music_page = f"https://robotstorm.tech/yqq/n/yqq/song/{next_song['songmid']}.html"
|
||
music_url = player.fetch_music_url_by_mid(next_song["songmid"])
|
||
if not music_url:
|
||
music_url = player.fetch_music_url(music_page)
|
||
socketio.sleep(0.5)
|
||
player.play_music(music_url)
|
||
return jsonify({"status": "success", "message": "Music resumed"})
|
||
|
||
|
||
@app.route("/stop_music", methods=["POST"])
|
||
def stop_music():
|
||
player.stop_music()
|
||
return jsonify({"status": "success", "message": "Music stopped"})
|
||
|
||
|
||
@app.route("/play_last_song", methods=["POST"])
|
||
def play_last_song():
|
||
global current_song_index
|
||
if current_song_index > 0:
|
||
current_song_index -= 1
|
||
else:
|
||
current_song_index = len(playlist) - 1
|
||
print(current_song_index)
|
||
next_song = playlist[current_song_index]
|
||
current_song.update(
|
||
{
|
||
"album_img_url": next_song["album_img_url"],
|
||
"music_name": next_song["music_name"],
|
||
"singer_name": next_song["singer_name"],
|
||
"is_playing": True,
|
||
}
|
||
)
|
||
|
||
# music_page = f"https://y.qq.com/n/yqq/song/{next_song['songmid']}.html"
|
||
music_page = f"https://robotstorm.tech/yqq/n/yqq/song/{next_song['songmid']}.html"
|
||
music_url = player.fetch_music_url_by_mid(next_song["songmid"])
|
||
if not music_url:
|
||
music_url = player.fetch_music_url(music_page)
|
||
if music_url:
|
||
player.play_music(music_url)
|
||
return jsonify({"status": "success", "message": "Playing last song"})
|
||
else:
|
||
return jsonify({"status": "error", "message": "Failed to fetch music URL"})
|
||
|
||
|
||
@app.route("/play_next_song", methods=["POST"])
|
||
def play_next_song():
|
||
global current_song_index
|
||
current_song_index = (current_song_index + 1) % len(playlist)
|
||
print(current_song_index)
|
||
|
||
next_song = playlist[current_song_index]
|
||
current_song.update(
|
||
{
|
||
"album_img_url": next_song["album_img_url"],
|
||
"music_name": next_song["music_name"],
|
||
"singer_name": next_song["singer_name"],
|
||
"is_playing": True,
|
||
}
|
||
)
|
||
|
||
# music_page = f"https://y.qq.com/n/yqq/song/{next_song['songmid']}.html"
|
||
music_page = f"https://robotstorm.tech/yqq/n/yqq/song/{next_song['songmid']}.html"
|
||
music_url = player.fetch_music_url_by_mid(next_song["songmid"])
|
||
if not music_url:
|
||
music_url = player.fetch_music_url(music_page)
|
||
if music_url:
|
||
player.play_music(music_url)
|
||
return jsonify({"status": "success", "message": "Playing next song"})
|
||
else:
|
||
return jsonify({"status": "error", "message": "Failed to fetch music URL"})
|
||
|
||
|
||
@socketio.on("seek_music")
|
||
def handle_seek_music(position):
|
||
print(position)
|
||
player.seek_music(position)
|
||
|
||
|
||
def generate_playlist(music_info_list):
|
||
# 使用列表推导式生成字典列表
|
||
# playlist = [
|
||
# {
|
||
# "music_name": song[0],
|
||
# "singer_name": song[1],
|
||
# "songmid": song[2],
|
||
# "album_img_url": (
|
||
# f"http://y.gtimg.cn/music/photo_new/T002R180x180M000{song[3]}.jpg"
|
||
# if song[3]
|
||
# else ""
|
||
# ),
|
||
# }
|
||
# for song in music_info_list
|
||
# ]
|
||
playlist = [
|
||
{
|
||
"music_name": song[0],
|
||
"singer_name": song[1],
|
||
"songmid": song[2],
|
||
"album_img_url": (
|
||
f"https://robotstorm.tech/ygtimg/music/photo_new/T002R180x180M000{song[3]}.jpg"
|
||
if song[3]
|
||
else ""
|
||
),
|
||
}
|
||
for song in music_info_list
|
||
]
|
||
return playlist
|
||
|
||
|
||
# 全局变量来存储歌曲信息和播放状态
|
||
current_song = {
|
||
"album_img_url": "https://via.placeholder.com/300x300",
|
||
"music_name": None,
|
||
"singer_name": None,
|
||
"is_playing": False, # 添加播放状态
|
||
}
|
||
|
||
# 全局变量来存储播放列表
|
||
# playlist = [{
|
||
# 'album_img_url': 'http://y.gtimg.cn/music/photo_new/T002R180x180M000001G7iIK00yQyr.jpg',
|
||
# 'music_name': '大鱼海棠主题曲——大鱼',
|
||
# 'singer_name': '凤凉柒',
|
||
# 'songmid': '000Qf8b01p0vIJ'}]
|
||
playlist = []
|
||
# Index to keep track of current song in playlist
|
||
current_song_index = 0
|
||
|
||
if len(playlist) > 0:
|
||
current_song.update(
|
||
{
|
||
"album_img_url": playlist[0]["album_img_url"],
|
||
"music_name": playlist[0]["music_name"],
|
||
"singer_name": playlist[0]["singer_name"],
|
||
"is_playing": False, # 或者根据实际情况设置
|
||
}
|
||
)
|
||
print("playlist: ", playlist)
|
||
else:
|
||
|
||
diss_info, music_info_list = get_song_list(categoryIDs[0])
|
||
# print(diss_info, music_info_list)
|
||
playlist = generate_playlist(music_info_list)
|
||
print("playlist:", playlist)
|
||
|
||
if not playlist:
|
||
playlist = [
|
||
{
|
||
"album_img_url": "https://via.placeholder.com/300x300",
|
||
"music_name": "默认音乐",
|
||
"singer_name": "未知艺术家",
|
||
"songmid": "000000",
|
||
}
|
||
]
|
||
|
||
current_song.update(
|
||
{
|
||
"album_img_url": playlist[0]["album_img_url"],
|
||
"music_name": playlist[0]["music_name"],
|
||
"singer_name": playlist[0]["singer_name"],
|
||
"is_playing": False, # 或者根据实际情况设置
|
||
}
|
||
)
|
||
|
||
|
||
# 创建线程锁,确保数据安全
|
||
thread_lock = Lock()
|
||
|
||
|
||
# 定时任务,每隔一秒发送当前歌曲信息到前端
|
||
def music_background_thread():
|
||
global playlist, current_song_index
|
||
while True:
|
||
socketio.sleep(1) # 间隔一秒钟
|
||
with thread_lock:
|
||
current_song["is_playing"] = not player.is_paused and player.is_playing
|
||
socketio.emit("current_song", current_song, namespace="/")
|
||
if player.is_playing:
|
||
current_position = player.get_current_position()
|
||
music_length = player.get_music_length()
|
||
# print(current_position)
|
||
|
||
if current_position < 0:
|
||
# 切换到下一首歌曲
|
||
current_song_index = (current_song_index + 1) % len(playlist)
|
||
next_song = playlist[current_song_index]
|
||
current_song.update(
|
||
{
|
||
"album_img_url": next_song["album_img_url"],
|
||
"music_name": next_song["music_name"],
|
||
"singer_name": next_song["singer_name"],
|
||
"is_playing": True,
|
||
}
|
||
)
|
||
print(current_song)
|
||
# 获取并播放下一首歌曲
|
||
# music_page = (
|
||
# f"https://y.qq.com/n/yqq/song/{next_song['songmid']}.html"
|
||
# )
|
||
music_page = (
|
||
f"https://robotstorm.tech/yqq/n/yqq/song/{next_song['songmid']}.html"
|
||
)
|
||
music_url = player.fetch_music_url_by_mid(next_song["songmid"])
|
||
if not music_url:
|
||
music_url = player.fetch_music_url(music_page)
|
||
socketio.sleep(0.15)
|
||
player.play_music(music_url)
|
||
|
||
socketio.emit(
|
||
"music_status",
|
||
{"position": current_position, "length": music_length},
|
||
)
|
||
|
||
|
||
@app.route("/set_current_song", methods=["POST"])
|
||
def set_current_song():
|
||
global current_song
|
||
data = request.json
|
||
current_song.update(
|
||
{
|
||
"album_img_url": data.get("album_img_url"),
|
||
"music_name": data.get("music_name"),
|
||
"singer_name": data.get("singer_name"),
|
||
"is_playing": data.get("is_playing", False), # 更新播放状态
|
||
}
|
||
)
|
||
return jsonify({"status": "success"})
|
||
|
||
|
||
@app.route("/get_current_song", methods=["GET"])
|
||
def get_current_song():
|
||
if current_song["music_name"]:
|
||
return jsonify({"status": "success", "data": current_song})
|
||
return jsonify({"status": "error", "message": "No song is currently set"})
|
||
|
||
|
||
ai_chat_list = []
|
||
|
||
|
||
async def send_message(message, port=8766):
|
||
uri = "ws://localhost:" + str(port)
|
||
# start_time = time.time()
|
||
async with websockets.connect(uri) as websocket:
|
||
# print(time.time() - start_time)
|
||
await websocket.send(message)
|
||
response = await websocket.recv()
|
||
return response
|
||
|
||
|
||
@socketio.on("send_message")
|
||
def handle_message(data):
|
||
message = data["message"]
|
||
if not message:
|
||
return jsonify({"error": "Message not provided"}), 400
|
||
socketio.emit("add_message", {"sender": "user", "message": message})
|
||
ai_chat_list.append({"sender": "user", "message": message})
|
||
|
||
loop = asyncio.new_event_loop()
|
||
asyncio.set_event_loop(loop)
|
||
task = asyncio.ensure_future(send_message(f"'user_input': \"{message}\""))
|
||
loop.run_until_complete(task)
|
||
|
||
|
||
last_ai_message_time = None
|
||
|
||
|
||
@app.route("/ai_respon", methods=["POST"])
|
||
def ai_response():
|
||
global last_ai_message_time
|
||
|
||
# 获取 POST 请求的数据
|
||
message = request.form.get("message")
|
||
is_reasoning = request.form.get("isReasoning", "false").lower() == "true" # 默认值为 false
|
||
if not message:
|
||
return jsonify({"status": "error", "message": "No message provided"})
|
||
|
||
current_time = time.time()
|
||
|
||
# 如果是首次或距离上次超过5秒,添加新消息
|
||
if not last_ai_message_time or (current_time - last_ai_message_time) > 5:
|
||
if is_reasoning:
|
||
ai_chat_list.append({"sender": "ai", "message": {"reasoning_content": message, "content": ""}})
|
||
else:
|
||
ai_chat_list.append({"sender": "ai", "message": {"content": message, "reasoning_content": ""}})
|
||
else:
|
||
# 在最近一条AI消息上叠加
|
||
if ai_chat_list and ai_chat_list[-1]["sender"] == "ai":
|
||
if is_reasoning:
|
||
ai_chat_list[-1]["message"]["reasoning_content"] += message
|
||
else:
|
||
ai_chat_list[-1]["message"]["content"] += message
|
||
else:
|
||
if is_reasoning:
|
||
ai_chat_list.append({"sender": "ai", "message": {"reasoning_content": message, "content": ""}})
|
||
else:
|
||
ai_chat_list.append({"sender": "ai", "message": {"content": message, "reasoning_content": ""}})
|
||
|
||
# 更新最后一次AI消息时间
|
||
last_ai_message_time = current_time
|
||
|
||
socketio.emit("add_message", {"sender": "ai", "message": message, "isReasoning": is_reasoning})
|
||
return jsonify({"status": "success", "message": "Received ai message"})
|
||
|
||
|
||
@app.route("/user_input", methods=["POST"])
|
||
def user_input():
|
||
# 获取 POST 请求的数据
|
||
message = request.form.get("message")
|
||
if not message:
|
||
return jsonify({"status": "error", "message": "No message provided"})
|
||
socketio.emit("add_message", {"sender": "user", "message": message})
|
||
ai_chat_list.append({"sender": "user", "message": message})
|
||
# 返回 JSON 格式的响应
|
||
return jsonify({"status": "success", "message": "Received user message"})
|
||
|
||
|
||
@app.route("/get_history", methods=["GET"])
|
||
def get_history():
|
||
return jsonify(ai_chat_list)
|
||
|
||
|
||
def process_audio_and_emit(path):
|
||
x, sr = librosa.load(path, sr=8000)
|
||
|
||
start_time = time.time()
|
||
|
||
# Emit mouth movement data through socketio
|
||
try:
|
||
for _ in range(int(len(x) / 800)):
|
||
index = int((time.time() - start_time) * 8000) + 1
|
||
if index < len(x):
|
||
# new_value = x[index]
|
||
new_value = abs(x[index] * 5)
|
||
socketio.emit("mouth_movement", {"value": str(new_value)})
|
||
time.sleep(0.1)
|
||
except Exception as e:
|
||
print(f"Error in mouth movement emission: {e}")
|
||
|
||
# Reset mouth movement value at the end
|
||
socketio.emit("mouth_movement", {"value": str(0.0)})
|
||
|
||
|
||
@app.route("/lip_sync", methods=["POST"])
|
||
def lip_sync():
|
||
try:
|
||
# 获取 POST 请求的数据
|
||
path = request.form.get("path")
|
||
print(path)
|
||
if path.startswith("pre_mp3"):
|
||
path = "../" + path
|
||
|
||
# 在单独的线程中处理音频和口型数据
|
||
thread = threading.Thread(target=process_audio_and_emit, args=(path,))
|
||
thread.start()
|
||
|
||
# 立即返回 JSON 格式的响应
|
||
return jsonify({"status": "success"})
|
||
|
||
except Exception as e:
|
||
print(f"Error: {e}")
|
||
return jsonify({"status": "error", "message": str(e)})
|
||
|
||
import traceback
|
||
|
||
massage_status = {
|
||
"progress": 0,
|
||
"force": '',
|
||
"temperature": '',
|
||
"gear": '',
|
||
"shake": '',
|
||
"is_massaging": False,
|
||
"current_task": "",
|
||
"task_time": "",
|
||
"current_head": "",
|
||
"body_part": "back",
|
||
"massage_service_started": False,
|
||
"press": '',
|
||
"frequency": '',
|
||
"speed": '',
|
||
"direction": '',
|
||
"high": '',
|
||
"massage_state": '',
|
||
"is_acupoint": False,
|
||
"is_pause": False,
|
||
"manual_stage": 0,
|
||
"start_pos": "",
|
||
"end_pos": "",
|
||
"massage_path": ""
|
||
}
|
||
|
||
stone_status ={
|
||
"temper_head": 40,
|
||
"temperature": 1
|
||
}
|
||
|
||
@app.route("/update_massage_status", methods=["POST"])
|
||
def update_status():
|
||
global stored_command
|
||
global display_image_url
|
||
global display_image_type
|
||
try:
|
||
print(f"update_massage_status{request.form}")
|
||
temper_head = request.form.get("temper_head")
|
||
if temper_head:
|
||
temperature = request.form.get("temperature")
|
||
stone_status["temper_head"] = temper_head
|
||
stone_status["temperature"] = temperature
|
||
socketio.emit("update_stone_status", stone_status)
|
||
|
||
else:
|
||
is_massaging = request.form.get("is_massaging")
|
||
is_pause = request.form.get("is_pause")
|
||
if is_massaging and (is_massaging == "False" or is_massaging == "false"):
|
||
if is_pause == "False" or is_pause == "false":
|
||
if stored_command: # 如果存储了命令,且 is_massaging 为 false
|
||
print(f"Re-running stored command after 5 seconds delay...")
|
||
def run_async_in_thread():
|
||
time.sleep(5) # 等待 5 秒,确保命令执行完毕
|
||
asyncio.run(send_message(stored_command, 8765)) # 在新线程中运行异步函数
|
||
|
||
# 在新的线程中执行
|
||
threading.Thread(target=run_async_in_thread).start()
|
||
|
||
# 获取 POST 请求的数据
|
||
progress = request.form.get("progress")
|
||
force = request.form.get("force")
|
||
temperature = request.form.get("temperature")
|
||
gear = request.form.get("gear")
|
||
shake = request.form.get("shake")
|
||
is_massaging = request.form.get("is_massaging")
|
||
current_task = request.form.get("current_task")
|
||
task_time = request.form.get("task_time")
|
||
body_part = request.form.get("body_part")
|
||
current_head = request.form.get("current_head")
|
||
massage_service_started = request.form.get("massage_service_started")
|
||
press = request.form.get("press")
|
||
frequency = request.form.get("frequency")
|
||
speed = request.form.get("speed")
|
||
is_pause = request.form.get("is_pause")
|
||
direction = request.form.get("direction")
|
||
high = request.form.get("high")
|
||
massage_state = request.form.get("massage_state")
|
||
is_acupoint = request.form.get("is_acupoint")
|
||
manual_stage = request.form.get("manual_stage")
|
||
start_pos = request.form.get("start_pos")
|
||
end_pos = request.form.get("end_pos")
|
||
massage_path = request.form.get("massage_path")
|
||
|
||
# 仅在非空时更新字典中的值
|
||
if progress:
|
||
massage_status["progress"] = progress
|
||
if force and force != "0" and (is_massaging == "True" or is_massaging == "true" ):
|
||
print(force)
|
||
massage_status["force"] = force
|
||
if temperature:
|
||
massage_status["temperature"] = temperature
|
||
if gear:
|
||
massage_status["gear"] = gear
|
||
if shake:
|
||
massage_status["shake"] = shake
|
||
if is_massaging:
|
||
if is_massaging == "False" or is_massaging == "false":
|
||
is_massaging = False
|
||
display_image_url = "static/images/smart_mode/back.png"
|
||
socketio.emit("change_image", {"path": display_image_url, "type": display_image_type})
|
||
if is_massaging == "True" or is_massaging == "true":
|
||
is_massaging = True
|
||
socketio.emit("change_image", {"path": display_image_url, "type": display_image_type})
|
||
massage_status["is_massaging"] = is_massaging
|
||
if current_task:
|
||
task_to_index = {
|
||
"lung": 0,
|
||
"heart": 1,
|
||
"hepatobiliary": 2,
|
||
"spleen": 3,
|
||
"kidney": 4,
|
||
"lung_left": 0,
|
||
"heart_left": 1,
|
||
"hepatobiliary_left": 2,
|
||
"spleen_left": 3,
|
||
"kidney_left": 4,
|
||
"lung_right": 0,
|
||
"heart_right": 1,
|
||
"hepatobiliary_right": 2,
|
||
"spleen_right": 3,
|
||
"kidney_right": 4,
|
||
}
|
||
massage_status["current_task"] = current_task
|
||
# 获取对应的索引
|
||
index = task_to_index.get(current_task, None)
|
||
print(current_task, index)
|
||
if index is not None:
|
||
# 发送索引到前端
|
||
socketio.emit("highlightPlane", index)
|
||
else:
|
||
socketio.emit("highlightPlane", None)
|
||
elif current_task == "-1":
|
||
socketio.emit("highlightPlane", None)
|
||
if task_time is not None:
|
||
if task_time == "null":
|
||
task_time = ""
|
||
massage_status["task_time"] = task_time
|
||
if current_head:
|
||
massage_status["current_head"] = current_head
|
||
if body_part:
|
||
massage_status["body_part"] = body_part
|
||
if press:
|
||
massage_status["press"] = press
|
||
if frequency:
|
||
massage_status["frequency"] = frequency
|
||
if is_pause:
|
||
if is_pause == "False" or is_pause == "false":
|
||
is_pause = False
|
||
if is_pause == "True" or is_pause == "true":
|
||
is_pause = True
|
||
massage_status["is_pause"] = is_pause
|
||
if speed:
|
||
massage_status["speed"] = speed
|
||
if direction:
|
||
massage_status["direction"] = direction
|
||
if high:
|
||
massage_status["high"] = high
|
||
if massage_service_started:
|
||
print(massage_service_started)
|
||
if massage_service_started == "False" or massage_service_started == "false":
|
||
massage_service_started = False
|
||
if massage_service_started == "True" or massage_service_started == "true":
|
||
massage_service_started = True
|
||
massage_status["massage_service_started"] = massage_service_started
|
||
if massage_state:
|
||
massage_status["massage_state"] = massage_state
|
||
if is_acupoint:
|
||
if is_acupoint == "False" or is_acupoint == "false":
|
||
is_acupoint = False
|
||
if is_acupoint == "True" or is_acupoint == "true":
|
||
is_acupoint = True
|
||
massage_status["is_acupoint"] = is_acupoint
|
||
if manual_stage:
|
||
massage_status["manual_stage"] = int(manual_stage)
|
||
if start_pos:
|
||
massage_status["start_pos"] = start_pos
|
||
if end_pos:
|
||
massage_status["end_pos"] = end_pos
|
||
if massage_path:
|
||
massage_status["massage_path"] = massage_path
|
||
|
||
print(massage_status)
|
||
|
||
# 通过 SocketIO 发送更新后的数据
|
||
socketio.emit("update_massage_status", massage_status)
|
||
|
||
return jsonify({"status": "success"})
|
||
|
||
except Exception as e:
|
||
print(f"Error: {e}")
|
||
return jsonify({"status": "error", "message": str(e)})
|
||
|
||
def is_service_running(service_name):
|
||
try:
|
||
# 使用 subprocess.run 执行 systemctl is-active 命令
|
||
result = subprocess.run(
|
||
["/bin/systemctl", "is-active", service_name],
|
||
stdout=subprocess.PIPE,
|
||
stderr=subprocess.PIPE,
|
||
check=True,
|
||
text=True,
|
||
)
|
||
# 如果返回的输出是 'active',则服务正在运行
|
||
return result.stdout.strip() == "active"
|
||
except subprocess.CalledProcessError:
|
||
# 如果服务未运行,或者其他错误,返回 False
|
||
return False
|
||
|
||
|
||
@app.route("/get_status", methods=["GET"])
|
||
def get_status():
|
||
global display_image_url
|
||
global display_image_type
|
||
try:
|
||
command = "get_status"
|
||
loop = asyncio.new_event_loop()
|
||
asyncio.set_event_loop(loop)
|
||
task = asyncio.ensure_future(send_message(command, 8765))
|
||
loop.run_until_complete(task)
|
||
except Exception as e:
|
||
print("error ! get_status error")
|
||
massage_status.update({
|
||
"progress": 0,
|
||
"force": '',
|
||
"temperature": '',
|
||
"gear": '',
|
||
"shake": '',
|
||
"is_massaging": False,
|
||
"current_task": "",
|
||
"task_time": "",
|
||
"current_head": "",
|
||
"body_part": "back",
|
||
"massage_service_started": False,
|
||
"press": '',
|
||
"frequency": '',
|
||
"speed": '',
|
||
"direction": '',
|
||
"high": '',
|
||
"massage_state": '',
|
||
"is_acupoint": False,
|
||
"is_pause": False,
|
||
"manual_stage": 0,
|
||
"start_pos": "",
|
||
"end_pos": "",
|
||
"massage_path": ""
|
||
})
|
||
print(f"Error: {e}")
|
||
|
||
file_url = display_image_url
|
||
# 使用 Socket.IO 发送 URL 给前端
|
||
socketio.emit("change_image", {"path": file_url, "type": display_image_type})
|
||
# try:
|
||
# command = 'get_status'
|
||
# loop = asyncio.new_event_loop()
|
||
# asyncio.set_event_loop(loop)
|
||
# task = asyncio.ensure_future(send_message(command,8766))
|
||
# loop.run_until_complete(task)
|
||
# except Exception as e:
|
||
|
||
# # print(f"Error: {e}")
|
||
print(massage_status)
|
||
return jsonify(massage_status)
|
||
|
||
|
||
######
|
||
last_command_time = 0
|
||
|
||
# 存储命令的全局变量
|
||
stored_command = None
|
||
|
||
countdown_value = 0
|
||
countdown_lock = threading.Lock()
|
||
|
||
def start_countdown():
|
||
global countdown_value, display_image_type, display_image_url
|
||
with countdown_lock:
|
||
countdown_value = 120 # 初始化为120
|
||
|
||
while True:
|
||
with countdown_lock:
|
||
if countdown_value <= 0:
|
||
break
|
||
countdown_value -= 1
|
||
print(f"Countdown: {countdown_value}")
|
||
|
||
socketio.emit("countdown_update", {"countdown": countdown_value, "message": "is_running"})
|
||
|
||
time.sleep(1)
|
||
|
||
# socketio.emit("countdown_update", {"countdown": countdown_value, "message": "倒计时结束"})
|
||
socketio.emit("change_image", {"path": display_image_url, "type": display_image_type})
|
||
|
||
async def send_to_port(command, port):
|
||
# try:
|
||
# loop = asyncio.new_event_loop()
|
||
# asyncio.set_event_loop(loop)
|
||
# await send_message(command, port)
|
||
# print(f"Sent to {port}: {command}")
|
||
# except Exception as e:
|
||
# print(f"Error sending to {port}: {e}")
|
||
|
||
"""
|
||
发送命令到指定端口,并打印详细的日志信息
|
||
参数:
|
||
command: 要发送的命令字符串
|
||
port: 目标端口号
|
||
"""
|
||
try:
|
||
# 创建新的事件循环(适用于非异步环境)
|
||
loop = asyncio.new_event_loop()
|
||
asyncio.set_event_loop(loop)
|
||
|
||
# 记录发送前的调试信息
|
||
print(f"[DEBUG] 准备发送命令到端口 {port}: {command}")
|
||
|
||
# 模拟发送操作(替换为实际发送逻辑)
|
||
await send_message(command, port) # 假设这是你的实际发送函数
|
||
|
||
# 成功日志
|
||
print(f"[SUCCESS] 命令已发送到端口 {port}: {command}")
|
||
return True
|
||
|
||
except ConnectionError as e:
|
||
print(f"[ERROR] 连接失败 (端口 {port}): {str(e)}")
|
||
return False
|
||
|
||
except asyncio.TimeoutError:
|
||
print(f"[ERROR] 发送超时 (端口 {port})")
|
||
return False
|
||
|
||
except Exception as e:
|
||
print(f"[CRITICAL] 未知错误: {type(e).__name__}: {str(e)}")
|
||
return False
|
||
finally:
|
||
if 'loop' in locals():
|
||
loop.close()
|
||
|
||
@socketio.on("send_command")
|
||
def send_command(data):
|
||
# global last_command_time
|
||
|
||
# current_time = time.time()
|
||
# if current_time - last_command_time < 1:
|
||
# # if less than 3 seconds passed since the last command, ignore this command
|
||
# return
|
||
# last_command_time = current_time
|
||
|
||
global stored_command
|
||
global display_image_url
|
||
global display_image_type
|
||
global countdown_value
|
||
|
||
if "begin" in data:
|
||
|
||
# 解析命令中的数据
|
||
parts = data.split(":")
|
||
if len(parts) >= 5: # 至少要有 5 部分(确保 mode_real 存在)
|
||
mode_real = parts[4]
|
||
|
||
# if parts[1] == "stone" and mode_real == "1":
|
||
# # 启动倒计时线程
|
||
# threading.Thread(target=start_countdown, daemon=True).start()
|
||
|
||
# 情况1:parts长度=6,解析use_mode
|
||
if len(parts) == 6:
|
||
use_mode = parts[5]
|
||
# 如果use_mode存在且不是manual,或者parts长度<6(没有use_mode),都发送UI_start
|
||
if use_mode != 'manual':
|
||
asyncio.run(send_to_port("UI_start", 8766))
|
||
# 情况2:parts长度<6(没有use_mode),直接发送UI_start
|
||
else:
|
||
asyncio.run(send_to_port("UI_start", 8766))
|
||
|
||
if mode_real == "3": # 如果 modeReal 为 3,则保存命令
|
||
stored_command = data
|
||
else: # 如果 modeReal 不是 3,则清空命令
|
||
stored_command = None
|
||
|
||
print(f"Stored command: {stored_command}")
|
||
|
||
if "manual_start" in data:
|
||
asyncio.run(send_to_port("UI_start", 8766))
|
||
|
||
if "stop" in data:
|
||
stored_command = None
|
||
display_image_url = 'static/images/smart_mode/back.png'
|
||
display_image_type = 0
|
||
countdown_value = 0
|
||
|
||
# if "pause" in data:
|
||
# stored_command = None
|
||
# display_image_url = 'static/images/smart_mode/back.png'
|
||
# display_image_type = 0
|
||
# countdown_value = 0
|
||
|
||
# 所有命令发送到 8765
|
||
asyncio.run(send_to_port(data, 8765))
|
||
|
||
@app.route("/get_countdown", methods=["GET"])
|
||
def get_countdown():
|
||
with countdown_lock:
|
||
return jsonify({"countdown": countdown_value})
|
||
|
||
@app.route('/get_command', methods=['GET'])
|
||
def get_command():
|
||
return jsonify({
|
||
"status": "success",
|
||
"command": stored_command
|
||
})
|
||
|
||
@socketio.on("change_awaken")
|
||
def change_awaken(command):
|
||
try:
|
||
loop = asyncio.new_event_loop()
|
||
asyncio.set_event_loop(loop)
|
||
task = asyncio.ensure_future(send_message(command, 8766))
|
||
loop.run_until_complete(task)
|
||
except Exception as e:
|
||
print(f"Error: {e}")
|
||
|
||
@app.route("/get_awaken", methods=["GET"])
|
||
def get_awaken():
|
||
try:
|
||
# 文件路径
|
||
file_path = '../Language/Hotword_awaker/resource/keyword-nhxd.txt' # 使用相对路径
|
||
print("File path:", os.path.abspath(file_path)) # 打印文件的绝对路径
|
||
|
||
# 检查文件是否存在
|
||
if not os.path.exists(file_path):
|
||
return jsonify({"error": "File not found"}), 404
|
||
|
||
# 读取文件内容
|
||
with open(file_path, 'r', encoding='utf-8') as file:
|
||
content = file.read()
|
||
|
||
# 使用正则表达式提取字符串
|
||
match = re.match(r"([^\;]+);", content) # 提取第一个分号前的字符串
|
||
if match:
|
||
awaken = match.group(1) # 获取匹配的部分
|
||
else:
|
||
awaken = "No valid string found"
|
||
|
||
# 返回提取的字符串
|
||
return jsonify({"awaken": awaken}), 200
|
||
except Exception as e:
|
||
return jsonify({"error": str(e)}), 500
|
||
|
||
@app.route('/get_serial_number', methods=['GET'])
|
||
def get_serial_number():
|
||
try:
|
||
# 获取当前服务器的主机名
|
||
host_name = socket.gethostname()
|
||
|
||
# 这里可以将主机名映射到序列号,或者你可以根据主机名生成某种序列号
|
||
# 例如,返回一个假设的序列号,这里简单返回主机名作为序列号
|
||
serial_number = host_name.removeprefix('jsfb-')
|
||
|
||
# 返回 JSON 格式的响应
|
||
return jsonify({"serial_number": serial_number})
|
||
|
||
except Exception as e:
|
||
# 如果发生错误,返回错误信息
|
||
return jsonify({"error": str(e)}), 500
|
||
|
||
# @app.before_request
|
||
# def before_request():
|
||
# if request.url.startswith('https://'):
|
||
# url = request.url.replace('https://', 'http://', 1)
|
||
# code = 301 # redirect permanentlyvideo
|
||
response.headers.add("Access-Control-Allow-Headers", "Content-Type,Authorization")
|
||
response.headers.add("Access-Control-Allow-Methods", "GET,PUT,POST,DELETE")
|
||
return response
|
||
|
||
|
||
@app.route("/")
|
||
def index():
|
||
return render_template("login.html", time=int(time.time()))
|
||
|
||
|
||
@app.route("/home")
|
||
def home():
|
||
return render_template("home.html", time=int(time.time()))
|
||
|
||
|
||
@app.route("/health")
|
||
def health():
|
||
global current_mode
|
||
if current_mode == "manual_mode":
|
||
return render_template(
|
||
"select_program.html", time=int(time.time())
|
||
)
|
||
elif current_mode == "handheld_mode":
|
||
return render_template(
|
||
"handheld_mode.html", time=int(time.time())
|
||
)
|
||
else:
|
||
return render_template("smart_mode.html", time=int(time.time()))
|
||
|
||
|
||
@app.route("/setting")
|
||
def setting():
|
||
return render_template("setting.html", time=int(time.time()))
|
||
|
||
|
||
@app.route("/developer")
|
||
def developer():
|
||
return render_template("developer.html", time=int(time.time()))
|
||
|
||
|
||
@app.route("/learning")
|
||
def learning():
|
||
return render_template("learning.html", time=int(time.time()))
|
||
|
||
|
||
@app.route("/help")
|
||
def help():
|
||
return render_template("help.html", time=int(time.time()))
|
||
|
||
|
||
@app.route("/ai_ball")
|
||
def ai_ball():
|
||
return render_template("ai_ball_v2.html", time=int(time.time()))
|
||
|
||
|
||
@app.route("/model")
|
||
def model():
|
||
return render_template("3d_model_v6_local.html", time=int(time.time()))
|
||
|
||
|
||
@app.route("/ai_chatbot")
|
||
def ai_chatbot():
|
||
return render_template("ai_chatbot.html", time=int(time.time()))
|
||
|
||
|
||
@app.route("/dynamic_function")
|
||
def dynamic_function():
|
||
mode = request.args.get('mode', default=None)
|
||
return render_template("dynamic_function_v2.html", time=int(time.time()), mode=mode)
|
||
|
||
|
||
@app.route("/three_d_model")
|
||
def three_d_model():
|
||
return render_template("three_d_model.html", time=int(time.time()))
|
||
|
||
@app.route("/control_panel")
|
||
def control_panel():
|
||
mode = request.args.get('mode', default=None)
|
||
return render_template("control_panel_v2.html", time=int(time.time()), mode=mode)
|
||
|
||
|
||
@app.route("/music")
|
||
def music():
|
||
return render_template("full_music_player.html", time=int(time.time()))
|
||
|
||
|
||
@app.route("/switch_mode/<mode>")
|
||
def switch_mode(mode):
|
||
# session['current_mode'] = mode
|
||
global current_mode
|
||
current_mode = mode
|
||
return redirect(url_for("health"))
|
||
|
||
|
||
@app.route("/get-ip")
|
||
def get_ip():
|
||
# 获取服务器的IP
|
||
host = request.host.split(":")[0]
|
||
return jsonify({"ip": host})
|
||
|
||
|
||
@app.route("/login", methods=["POST"])
|
||
def login():
|
||
data = request.json
|
||
username = data["username"]
|
||
password = data["password"]
|
||
|
||
# 简单验证示例
|
||
if username == "jsfb" and password == "123456":
|
||
return jsonify(success=True)
|
||
else:
|
||
return jsonify(success=False)
|
||
|
||
powerboard = None
|
||
power_board_ip = None
|
||
power_board_thread = None
|
||
power_board_thread_running = False
|
||
|
||
def send_heartbeat():
|
||
global power_board_thread_running
|
||
udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
||
print(f"Sent heartbeat to {power_board_ip}:3660")
|
||
try:
|
||
# 设置socket超时,避免阻塞
|
||
udp_socket.settimeout(1.0)
|
||
while power_board_thread_running:
|
||
try:
|
||
if power_board_ip:
|
||
udp_socket.sendto(b"ALIVE", (power_board_ip, 3660))
|
||
# print(f"Sent heartbeat to {power_board_ip}:3660")
|
||
except socket.timeout:
|
||
print(f"socket timeout")
|
||
continue
|
||
except Exception as e:
|
||
print(f"Error sending heartbeat: {e}")
|
||
time.sleep(1)
|
||
except Exception as e:
|
||
print(f"Error in heartbeat thread: {e}")
|
||
finally:
|
||
# try:
|
||
# # 发送关闭信号
|
||
# if power_board_ip:
|
||
# for _ in range(3): # 尝试发送3次,确保信号能够送达
|
||
# try:
|
||
# udp_socket.sendto(b"SHUTDOWN", (power_board_ip, 3660))
|
||
# print(f"Sent shutdown signal to {power_board_ip}:3660")
|
||
# time.sleep(0.1) # 短暂延时,避免发送过快
|
||
# except Exception as e:
|
||
# print(f"Error sending shutdown signal: {e}")
|
||
# finally:
|
||
# udp_socket.close()
|
||
udp_socket.close()
|
||
|
||
@app.route("/power_board", methods=["POST"])
|
||
def power_board():
|
||
global power_board_ip, power_board_thread, power_board_thread_running
|
||
ip = request.form.get("ip")
|
||
|
||
# 如果收到空IP,停止现有线程
|
||
if not ip:
|
||
if power_board_thread_running:
|
||
power_board_thread_running = False
|
||
if power_board_thread:
|
||
try:
|
||
power_board_thread.join(timeout=2.0) # 设置超时时间,避免永久阻塞
|
||
except Exception as e:
|
||
print(f"Error stopping power board thread: {e}")
|
||
power_board_thread = None
|
||
power_board_ip = None
|
||
return jsonify({"status": "success", "message": "Heartbeat stopped"})
|
||
|
||
# 如果收到新IP
|
||
power_board_ip = ip
|
||
print("power_board ip: ", power_board_ip)
|
||
|
||
# 如果线程不存在或已停止,创建新线程
|
||
if not power_board_thread or not power_board_thread.is_alive():
|
||
power_board_thread_running = True
|
||
power_board_thread = threading.Thread(target=send_heartbeat)
|
||
power_board_thread.daemon = True # 设置为守护线程,这样主程序退出时线程会自动结束
|
||
power_board_thread.start()
|
||
|
||
return jsonify({"status": "success", "message": "Heartbeat started"})
|
||
|
||
@app.route("/massage_control", methods=["POST"])
|
||
def massage_control():
|
||
action = request.form.get("action")
|
||
print("massage_control action: ", action)
|
||
sudo_password = "jsfb"
|
||
|
||
if action:
|
||
if action == "start":
|
||
action = "restart"
|
||
try:
|
||
command = "connect_arm"
|
||
loop = asyncio.new_event_loop()
|
||
asyncio.set_event_loop(loop)
|
||
task = asyncio.ensure_future(send_message(command, 8766))
|
||
loop.run_until_complete(task)
|
||
except Exception as e:
|
||
print(f"Error: {e}")
|
||
if action == "stop":
|
||
try:
|
||
command = "disconnect_arm"
|
||
loop = asyncio.new_event_loop()
|
||
asyncio.set_event_loop(loop)
|
||
task = asyncio.ensure_future(send_message(command, 8766))
|
||
loop.run_until_complete(task)
|
||
|
||
# 发送状态更新通知前端
|
||
global display_image_type
|
||
global display_image_url
|
||
display_image_type =0
|
||
display_image_url = 'static/images/smart_mode/back.png'
|
||
|
||
socketio.emit("update_massage_status", {
|
||
"progress": 0,
|
||
"force": '',
|
||
"temperature": '',
|
||
"gear": '',
|
||
"shake": '',
|
||
"is_massaging": False,
|
||
"current_task": "",
|
||
"task_time": "",
|
||
"current_head": "",
|
||
"body_part": "back",
|
||
"massage_service_started": False,
|
||
"press": '',
|
||
"frequency": '',
|
||
"speed": '',
|
||
"direction": '',
|
||
"massage_state": '',
|
||
"is_acupoint": False,
|
||
"is_pause": False,
|
||
"manual_stage": 0,
|
||
"start_pos": "",
|
||
"end_pos": "",
|
||
"massage_path": ""
|
||
})
|
||
except Exception as e:
|
||
print(f"Error: {e}")
|
||
|
||
# 处理本地关机
|
||
if action == "shutdown":
|
||
success = system_shutdown()
|
||
if success:
|
||
return jsonify({
|
||
"status": "success",
|
||
"message": "Remote and local systems shutdown successfully."
|
||
})
|
||
else:
|
||
return jsonify({
|
||
"status": "error",
|
||
"message": "Failed to execute shutdown process"
|
||
}), 500
|
||
|
||
# 非 shutdown 操作
|
||
else:
|
||
command = f'echo {sudo_password} | /bin/sh -c "/usr/bin/sudo -S /bin/systemctl {action} massage"'
|
||
try:
|
||
result = subprocess.run(
|
||
command,
|
||
shell=True,
|
||
check=True,
|
||
stdout=subprocess.PIPE,
|
||
stderr=subprocess.PIPE,
|
||
)
|
||
print(result.stdout.decode())
|
||
print(f"Service {action} successfully.")
|
||
except subprocess.CalledProcessError as e:
|
||
print(f"Error when {action} service: {e.stderr.decode()}")
|
||
|
||
return jsonify({"status": "success"})
|
||
|
||
else:
|
||
return jsonify({"status": "error", "message": "Invalid action"}), 400
|
||
|
||
|
||
# 文件服务器的基本信息
|
||
obs = HW_OBS()
|
||
|
||
@app.route("/get_versions", methods=["GET"])
|
||
def get_versions():
|
||
return get_local_versions()
|
||
|
||
@app.route("/get_remote_versions", methods=["GET"])
|
||
def get_remote_versions_route():
|
||
return get_remote_versions(obs)
|
||
|
||
@app.route("/download_package", methods=["POST"])
|
||
def download_package():
|
||
data = request.get_json()
|
||
version = data.get("version")
|
||
return start_download_package(version, obs, socketio)
|
||
|
||
@app.route("/get_current_version", methods=["GET"])
|
||
def get_current_version_route():
|
||
return get_current_version(__file__)
|
||
|
||
@app.route("/switch_version", methods=["POST"])
|
||
def switch_version_route():
|
||
data = request.json
|
||
version = data.get("version")
|
||
return switch_version(version)
|
||
|
||
# VortXDB版本控制路由
|
||
@app.route("/get_vtx_versions", methods=["GET"])
|
||
def get_vtx_versions_route():
|
||
return get_vtx_local_versions()
|
||
|
||
@app.route("/get_vtx_remote_versions", methods=["GET"])
|
||
def get_vtx_remote_versions_route():
|
||
return get_vtx_remote_versions(obs)
|
||
|
||
@app.route("/download_vtx_package", methods=["POST"])
|
||
def download_vtx_package():
|
||
data = request.get_json()
|
||
version = data.get("version")
|
||
return start_vtx_download_package(version, obs, socketio)
|
||
|
||
@app.route("/get_vtx_current_version", methods=["GET"])
|
||
def get_vtx_current_version_route():
|
||
return get_vtx_current_version(vtxdb)
|
||
|
||
@app.route("/switch_vtx_version", methods=["POST"])
|
||
def switch_vtx_version_route():
|
||
data = request.json
|
||
version = data.get("version")
|
||
return switch_vtx_version(version)
|
||
|
||
@app.route("/sync_user_data", methods=["POST"])
|
||
def sync_user_data_route():
|
||
data = request.json
|
||
source_version = data.get("version")
|
||
current_version = data.get("current_version")
|
||
return sync_user_data(source_version, current_version)
|
||
|
||
@app.route("/on_message", methods=["POST"])
|
||
def on_message():
|
||
try:
|
||
# 获取 POST 请求中的 form 数据
|
||
message = request.form.get(
|
||
"message", ""
|
||
).strip() # 从表单数据中获取 'message' 字段,并去掉多余的空白字符
|
||
|
||
if not message:
|
||
# 如果 message 为空或只包含空格,返回错误响应,但不触发弹窗
|
||
return (
|
||
jsonify(
|
||
{
|
||
"status": "error",
|
||
"message": "Message cannot be empty, no popup triggered",
|
||
}
|
||
),
|
||
400,
|
||
)
|
||
|
||
# 使用 Socket.IO 向客户端发送消息
|
||
socketio.emit("on_message", {"message": message})
|
||
|
||
# 返回成功的响应
|
||
return jsonify(
|
||
{"status": "success", "message": "Popup triggered with message: " + message}
|
||
)
|
||
|
||
except Exception as e:
|
||
# 捕获所有异常并返回服务器错误
|
||
return (
|
||
jsonify({"status": "error", "message": "An error occurred: " + str(e)}),
|
||
500,
|
||
)
|
||
|
||
|
||
# 定义 static 文件夹的路径
|
||
STATIC_FOLDER = os.path.join(app.root_path, "static")
|
||
display_image_url = "static/images/smart_mode/back.png"
|
||
display_image_type = 0
|
||
|
||
|
||
# 定义接收 POST 请求的路由
|
||
@app.route("/change_image", methods=["POST"])
|
||
def change_image():
|
||
global display_image_url
|
||
global display_image_type
|
||
# 从请求中获取图片路径
|
||
image_path = request.form.get("path")
|
||
image_type = request.form.get("type", None)
|
||
|
||
# 检查路径是否存在
|
||
if not image_path or not os.path.exists(image_path):
|
||
return jsonify({"error": "Invalid image path"}), 400
|
||
|
||
# 获取文件名
|
||
filename = os.path.basename(image_path)
|
||
|
||
# 定义目标目录路径(static/images/tmp_images/)
|
||
target_dir = os.path.join(STATIC_FOLDER, "images", "tmp_images")
|
||
|
||
# 如果目标目录不存在,则创建它
|
||
if not os.path.exists(target_dir):
|
||
try:
|
||
os.makedirs(target_dir) # 创建目录(包括父目录)
|
||
except Exception as e:
|
||
return jsonify({"error": f"Failed to create directory: {str(e)}"}), 500
|
||
|
||
# 定义目标路径,将文件复制到 static 目录下
|
||
target_path = os.path.join(target_dir, filename)
|
||
|
||
try:
|
||
# 复制文件到 static 目录
|
||
shutil.copy(image_path, target_path)
|
||
except Exception as e:
|
||
return jsonify({"error": f"Failed to copy file: {str(e)}"}), 500
|
||
|
||
# 生成文件的 URL
|
||
file_url = "static/images/tmp_images/" + filename
|
||
# file_url = url_for('static', filename='images/' + filename, _external=True)
|
||
display_image_url = file_url
|
||
display_image_type = image_type
|
||
|
||
# 使用 Socket.IO 发送 URL 给前端
|
||
socketio.emit("change_image", {"path": file_url, "type": image_type})
|
||
|
||
# 返回图片的 URL
|
||
return jsonify({"url": file_url, "type": image_type}), 200
|
||
|
||
|
||
# 服务器相关信息
|
||
SERVER_IP = "8.138.8.114" # 替换为您的服务器 IP
|
||
SERVER_USER = "root" # 替换为 root 用户
|
||
SERVER_PASSWORD = "JuShenFengBao209" # 替换为 root 用户密码
|
||
REMOTE_PATH = "/var/www/html/devices"
|
||
|
||
reverse_ssh = ReverseSSH(SERVER_IP, SERVER_USER, SERVER_PASSWORD)
|
||
|
||
|
||
# 开启反向隧道
|
||
@app.route("/start_tunnel", methods=["GET"])
|
||
def start_tunnel():
|
||
result = reverse_ssh.start_tunnel()
|
||
return jsonify(result)
|
||
|
||
|
||
# 关闭反向隧道
|
||
@app.route("/stop_tunnel", methods=["GET"])
|
||
def stop_tunnel():
|
||
result = reverse_ssh.stop_tunnel()
|
||
return jsonify(result)
|
||
|
||
# 定义一个路由,根据请求参数执行不同的 shell 脚本
|
||
@app.route("/run-script", methods=["POST"])
|
||
def run_script():
|
||
print("run-script")
|
||
try:
|
||
# 从前端获取 JSON 格式的请求体
|
||
data = request.json
|
||
script_type = data.get("script_type")
|
||
|
||
# base_dir = "/home/jsfb/jsfb_ws"
|
||
|
||
# 获取当前脚本执行的绝对路径
|
||
# base_path = os.path.dirname(os.path.abspath(__file__))
|
||
# base_path = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
|
||
|
||
# 获取当前脚本执行的绝对路径
|
||
current_path = os.path.abspath(__file__)
|
||
|
||
# 获取当前脚本所在目录的上一级目录
|
||
base_path = os.path.dirname(os.path.dirname(current_path))
|
||
|
||
print("动态目录:", base_path)
|
||
|
||
|
||
# base_path = os.path.join(base_dir, "MassageRobot_aubo")
|
||
# print(f"Base path: {base_path}")
|
||
# script_path = os.path.join(base_path, f"../my_script.sh")
|
||
# 根据请求参数执行不同的 shell 脚本
|
||
|
||
if script_type == "zh":
|
||
script_path = os.path.join(base_path, "setup.sh")
|
||
# 确保路径是规范化的(处理相对路径)
|
||
script_path = os.path.normpath(script_path)
|
||
|
||
# 检查脚本路径是否存在
|
||
if not os.path.exists(script_path):
|
||
print(f"Script not found: {script_path}, 404")
|
||
return jsonify({"error": f"Script not found: {script_path}"}), 404
|
||
|
||
# 执行 shell 脚本
|
||
# subprocess.Popen(
|
||
# ["gnome-terminal", "--", "bash", "-c", f"cd {base_path} && bash setup.sh; exec bash"],
|
||
# cwd=base_path
|
||
# )
|
||
subprocess.Popen(
|
||
["gnome-terminal", "--", "bash", "-c", f"cd {base_path} && bash setup.sh; exit"],
|
||
cwd=base_path
|
||
)
|
||
elif script_type == "jp":
|
||
script_path = os.path.join(base_path, "setup_JP.sh")
|
||
# 确保路径是规范化的(处理相对路径)
|
||
script_path = os.path.normpath(script_path)
|
||
|
||
# 检查脚本路径是否存在
|
||
if not os.path.exists(script_path):
|
||
print(f"Script not found: {script_path}, 404")
|
||
return jsonify({"error": f"Script not found: {script_path}"}), 404
|
||
|
||
# 执行 shell 脚本
|
||
# subprocess.Popen(
|
||
# ["gnome-terminal", "--", "bash", "-c", f"cd {base_path} && bash setup_JP.sh; exec bash"],
|
||
# cwd=base_path
|
||
# )
|
||
subprocess.Popen(
|
||
["gnome-terminal", "--", "bash", "-c", f"cd {base_path} && bash setup_JP.sh; exit"],
|
||
cwd=base_path
|
||
)
|
||
elif script_type == "en":
|
||
script_path = os.path.join(base_path, "setup_EN.sh")
|
||
# 确保路径是规范化的(处理相对路径)
|
||
script_path = os.path.normpath(script_path)
|
||
|
||
# 检查脚本路径是否存在
|
||
if not os.path.exists(script_path):
|
||
print(f"Script not found: {script_path}, 404")
|
||
return jsonify({"error": f"Script not found: {script_path}"}), 404
|
||
|
||
# 执行 shell 脚本
|
||
# subprocess.Popen(
|
||
# ["gnome-terminal", "--", "bash", "-c", f"cd {base_path} && bash setup_EN.sh; exec bash"],
|
||
# cwd=base_path
|
||
# )
|
||
subprocess.Popen(
|
||
["gnome-terminal", "--", "bash", "-c", f"cd {base_path} && bash setup_EN.sh; exit"],
|
||
cwd=base_path
|
||
)
|
||
elif script_type == "ko":
|
||
script_path = os.path.join(base_path, "setup_KO.sh")
|
||
# 确保路径是规范化的(处理相对路径)
|
||
script_path = os.path.normpath(script_path)
|
||
|
||
# 检查脚本路径是否存在
|
||
if not os.path.exists(script_path):
|
||
print(f"Script not found: {script_path}, 404")
|
||
return jsonify({"error": f"Script not found: {script_path}"}), 404
|
||
|
||
# 执行 shell 脚本
|
||
# subprocess.Popen(
|
||
# ["gnome-terminal", "--", "bash", "-c", f"cd {base_path} && bash setup_KO.sh; exec bash"],
|
||
# cwd=base_path
|
||
# )
|
||
subprocess.Popen(
|
||
["gnome-terminal", "--", "bash", "-c", f"cd {base_path} && bash setup_KO.sh; exit"],
|
||
cwd=base_path
|
||
)
|
||
|
||
|
||
else:
|
||
print("Invalid script type provided, 400")
|
||
return jsonify({"error": "Invalid script type provided"}), 400
|
||
|
||
# 返回脚本执行结果
|
||
return jsonify(
|
||
{
|
||
"message": "Script executed successfully",
|
||
}
|
||
)
|
||
except Exception as e:
|
||
print(f"Error running script: {e}, 500")
|
||
return jsonify({"error": str(e)}), 500
|
||
|
||
@app.route('/set_zero', methods=['POST'])
|
||
def set_zero():
|
||
try:
|
||
# 检查服务是否运行
|
||
if not is_service_running("massage.service"):
|
||
sensor = XjcSensor()
|
||
max_try = 3 # 设置最大重试次数
|
||
delay = 0.5 # 每次重试前的延迟时间
|
||
|
||
# 尝试调用 set_zero 方法
|
||
for attempt in range(max_try):
|
||
sensor.disable_active_transmission()
|
||
|
||
time.sleep(0.5)
|
||
|
||
result = sensor.set_zero()
|
||
|
||
if result == 0:
|
||
# 设置成功,返回成功信息
|
||
return jsonify({"message": "Set zero success"}), 200
|
||
else:
|
||
# 设置失败,等待并重试
|
||
print(f"Set zero attempt {attempt + 1} failed, retrying...")
|
||
time.sleep(delay)
|
||
|
||
# 如果多次尝试后失败,返回错误信息
|
||
print("Set zero failed after multiple attempts.")
|
||
requests.post("http://127.0.0.1:5000/on_message", data={"message": "传感器初始化失败"})
|
||
return jsonify({"message": "Set zero failed after multiple attempts"}), 500
|
||
else:
|
||
return jsonify({"message": "Service is already running, no need to set zero"}), 200
|
||
except Exception as e:
|
||
# 捕获异常并返回错误信息
|
||
print(f"Error in /set_zero: {e}")
|
||
return jsonify({"message": "Set zero failed", "error": str(e)}), 500
|
||
|
||
# 全局变量,用于存储 AuboC5 实例
|
||
aubo_c5 = None
|
||
|
||
@app.route('/power_toggle', methods=['POST'])
|
||
def power_toggle():
|
||
global aubo_c5
|
||
|
||
try:
|
||
# 获取 power_state 参数
|
||
power_state = request.form.get('power_state')
|
||
|
||
# 检查 power_state 是否有效
|
||
if not power_state:
|
||
return jsonify({"error": "Missing 'power_state' parameter"}), 400
|
||
|
||
if not is_service_running("massage.service"):
|
||
# power_on 操作
|
||
if power_state == 'power_on':
|
||
if aubo_c5 is None: # 如果 Aubo_C5 实例还没有创建
|
||
aubo_c5 = AuboC5()
|
||
requests.post("http://127.0.0.1:5000/on_message", data={"message": "上电成功"})
|
||
# power_off 操作
|
||
elif power_state == 'power_off':
|
||
if aubo_c5 is not None: # 如果 Aubo_C5 实例已创建
|
||
aubo_c5.power_off()
|
||
# 延迟 5 秒后删除实例
|
||
time.sleep(5) # 阻塞主线程 5 秒
|
||
aubo_c5 = None # 删除实例
|
||
print("Aubo_C5 instance has been deleted after 5 seconds.")
|
||
# requests.post("http://127.0.0.1:5000/on_message", data={"message": "断电成功"})
|
||
return jsonify({"message": "AuboC5 powered off successfully."})
|
||
else:
|
||
return jsonify({"error": "Aubo_C5 is not powered on."}), 400
|
||
else:
|
||
return jsonify({"error": "command is error"}), 400
|
||
|
||
else:
|
||
return jsonify({"error": "Invalid power_state value. Use 'power_on' or 'power_off'."}), 400
|
||
|
||
return jsonify({"message": f"Aubo_C5 {power_state} successfully."})
|
||
|
||
except Exception as e:
|
||
print(f"Error in /power_toggle: {e}")
|
||
return jsonify({"error": f"An error occurred: {str(e)}"}), 500
|
||
|
||
@app.route('/pack_mode', methods=['POST'])
|
||
def pack_mode():
|
||
global aubo_c5
|
||
|
||
try:
|
||
if aubo_c5 is None: # 如果 Aubo_C5 实例还没有创建
|
||
aubo_c5 = AuboC5()
|
||
aubo_c5.pack()
|
||
requests.post("http://127.0.0.1:5000/on_message", data={"message": "请确认是否到达打包位置,如有异常,请上电后手动调整"})
|
||
return jsonify({"message": f"Aubo_C5 mover to pack pos successfully."})
|
||
|
||
except Exception as e:
|
||
print(f"Error in /pack_mode: {e}")
|
||
return jsonify({"error": f"An error occurred: {str(e)}"}), 500
|
||
|
||
|
||
@app.route("/reset-language", methods=["POST"])
|
||
def reset_language():
|
||
print("reset-language")
|
||
try:
|
||
# 获取当前脚本执行的绝对路径
|
||
current_path = os.path.abspath(__file__)
|
||
|
||
# 获取当前脚本所在目录的上一级目录
|
||
base_path = os.path.dirname(os.path.dirname(current_path))
|
||
|
||
print("动态目录:", base_path)
|
||
|
||
# 构造 restart_language.sh 脚本路径
|
||
script_path = os.path.join(base_path, "restart_language.sh")
|
||
|
||
# 确保路径是规范化的(处理相对路径)
|
||
script_path = os.path.normpath(script_path)
|
||
|
||
# 检查脚本路径是否存在
|
||
if not os.path.exists(script_path):
|
||
print(f"Script not found: {script_path}, 404")
|
||
return jsonify({"error": f"Script not found: {script_path}"}), 404
|
||
|
||
# 执行 restart_language.sh 脚本
|
||
subprocess.Popen(
|
||
["gnome-terminal", "--", "bash", "-c", f"cd {base_path} && bash restart_language.sh; exit"],
|
||
cwd=base_path
|
||
)
|
||
|
||
# 返回成功响应
|
||
return jsonify({
|
||
"message": "Restart language script executed successfully in new terminal"
|
||
})
|
||
except Exception as e:
|
||
print(f"Error running script: {e}, 500")
|
||
return jsonify({"error": str(e)}), 500
|
||
|
||
# 初始化 WifiManager 对象
|
||
wifi_manager = WifiManager()
|
||
|
||
@app.route('/scan_wifi', methods=['GET'])
|
||
def scan_wifi():
|
||
"""
|
||
扫描 Wi-Fi 网络并返回扫描结果
|
||
"""
|
||
try:
|
||
wifi_networks = wifi_manager.scan_wifi()
|
||
return jsonify({"status": "success", "wifi_networks": wifi_networks}), 200
|
||
except Exception as e:
|
||
return jsonify({"status": "error", "message": str(e)}), 500
|
||
|
||
|
||
@app.route('/connect_wifi', methods=['POST'])
|
||
def connect_wifi():
|
||
"""
|
||
连接到指定的 Wi-Fi 网络
|
||
请求体应包含 SSID 和 password
|
||
"""
|
||
try:
|
||
data = request.get_json()
|
||
ssid = data.get('ssid')
|
||
password = data.get('password')
|
||
|
||
if not ssid or not password:
|
||
return jsonify({"status": "error", "message": "SSID 和密码是必需的"}), 400
|
||
|
||
response = wifi_manager.connect_wifi(ssid, password)
|
||
return jsonify({"status": "success", "message": response}), 200
|
||
except Exception as e:
|
||
return jsonify({"status": "error", "message": str(e)}), 500
|
||
|
||
|
||
@app.route('/disconnect_wifi', methods=['POST'])
|
||
def disconnect_wifi():
|
||
"""
|
||
断开当前的 Wi-Fi 连接
|
||
"""
|
||
try:
|
||
response = wifi_manager.disconnect_wifi()
|
||
return jsonify({"status": "success", "message": response}), 200
|
||
except Exception as e:
|
||
return jsonify({"status": "error", "message": str(e)}), 500
|
||
|
||
|
||
@app.route('/get_current_connection', methods=['GET'])
|
||
def get_current_connection():
|
||
"""
|
||
获取当前连接的 Wi-Fi 信息
|
||
"""
|
||
try:
|
||
connection_info = wifi_manager.get_current_connection()
|
||
return jsonify({"status": "success", "connection_info": connection_info}), 200
|
||
except Exception as e:
|
||
return jsonify({"status": "error", "message": str(e)}), 500
|
||
|
||
|
||
@app.route('/is_connected', methods=['GET'])
|
||
def is_connected():
|
||
"""
|
||
判断是否已连接 Wi-Fi
|
||
"""
|
||
try:
|
||
connected = wifi_manager.is_connected()
|
||
return jsonify({"status": "success", "is_connected": connected}), 200
|
||
except Exception as e:
|
||
return jsonify({"status": "error", "message": str(e)}), 500
|
||
|
||
# 写入文件并上传到服务器
|
||
def write_and_upload_info(ngrok_url, reverse_ssh_port):
|
||
hostname = socket.gethostname()
|
||
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||
|
||
# 生成用于连接的反向 SSH 命令
|
||
reverse_ssh_command = f"ssh jsfb@{SERVER_IP} -p {reverse_ssh_port}"
|
||
|
||
# 文件内容,包括主机名、ngrok URL、在线时间和 SSH 命令
|
||
file_content = (
|
||
f"Hostname: {hostname}\n"
|
||
f"Ngrok URL: {ngrok_url}\n"
|
||
f"Online Time: {timestamp}\n"
|
||
f"SSH Command: {reverse_ssh_command}"
|
||
)
|
||
file_path = f"../tmp/{hostname}.txt"
|
||
|
||
# 写入文件
|
||
with open(file_path, "w") as file:
|
||
file.write(file_content)
|
||
|
||
# 使用 paramiko 通过 SFTP 上传文件
|
||
ssh = paramiko.SSHClient()
|
||
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
|
||
try:
|
||
ssh.connect(SERVER_IP, username=SERVER_USER, password=SERVER_PASSWORD)
|
||
sftp = ssh.open_sftp()
|
||
sftp.put(file_path, f"{REMOTE_PATH}/{hostname}.txt")
|
||
print(f"File {file_path} uploaded to {REMOTE_PATH}/{hostname}.txt on server.")
|
||
finally:
|
||
sftp.close()
|
||
ssh.close()
|
||
|
||
|
||
# 启动 ngrok 隧道并写入/上传文件
|
||
def start_ngrok(port, max_retries=3, retry_interval=2):
|
||
global ngrok_process
|
||
for attempt in range(max_retries):
|
||
print(f"Attempting to start ngrok (Attempt {attempt + 1}/{max_retries})")
|
||
ngrok_process = subprocess.Popen(
|
||
["/usr/local/bin/ngrok", "http", str(port)],
|
||
stdout=subprocess.PIPE,
|
||
stderr=subprocess.PIPE,
|
||
)
|
||
time.sleep(retry_interval) # 等待 ngrok 启动
|
||
|
||
# 检查 ngrok 是否启动成功
|
||
try:
|
||
ngrok_url = get_ngrok_url()
|
||
print(f"Ngrok started successfully with URL: {ngrok_url}")
|
||
# write_and_upload_info(ngrok_url) # 写入和上传信息文件
|
||
return ngrok_url
|
||
except Exception as e:
|
||
print(f"Failed to get ngrok URL: {e}")
|
||
ngrok_process.terminate()
|
||
time.sleep(retry_interval)
|
||
|
||
print("Failed to start ngrok after multiple attempts")
|
||
return "Error"
|
||
|
||
|
||
# 获取 ngrok 公共 URL
|
||
def get_ngrok_url():
|
||
try:
|
||
response = requests.get(
|
||
"http://localhost:4040/api/tunnels", timeout=5
|
||
) # ngrok 本地 API
|
||
response.raise_for_status() # 检查是否请求成功
|
||
data = response.json()
|
||
return data["tunnels"][0]["public_url"]
|
||
except requests.RequestException as e:
|
||
raise RuntimeError(f"Error fetching ngrok URL: {e}")
|
||
|
||
|
||
# Socket.IO 事件处理函数,当客户端连接时启动定时任务
|
||
@socketio.on("connect", namespace="/")
|
||
def connect():
|
||
socketio.start_background_task(music_background_thread)
|
||
|
||
|
||
zeroconf = Zeroconf()
|
||
|
||
# 变量用于存储是否已经注册了服务
|
||
service_registered = False
|
||
|
||
|
||
def get_all_local_ips():
|
||
"""获取本机的所有 IPv4 地址,不进行任何筛选"""
|
||
interfaces = netifaces.interfaces() # 获取所有网络接口
|
||
print("可用网络接口:", interfaces)
|
||
|
||
local_ips = []
|
||
|
||
for interface in interfaces:
|
||
addrs = netifaces.ifaddresses(interface) # 获取接口的地址信息
|
||
print(f"接口 {interface} 的地址信息: {addrs}")
|
||
|
||
# 检查 IPv4 地址 (AF_INET)
|
||
if netifaces.AF_INET in addrs:
|
||
for addr_info in addrs[netifaces.AF_INET]:
|
||
print(f"接口 {interface} 的 IPv4 地址信息: {addr_info}")
|
||
ip_addr = addr_info.get("addr")
|
||
|
||
# 直接收集所有的 IPv4 地址
|
||
if ip_addr:
|
||
local_ips.append(ip_addr)
|
||
|
||
if DEBUG_MODE:
|
||
return ["127.0.0.1"] # 调试用
|
||
|
||
# 如果没有找到 IPv4 地址,返回一个回环地址列表
|
||
return local_ips if local_ips else ["127.0.0.1"]
|
||
|
||
|
||
def register_service(local_ips):
|
||
print(local_ips)
|
||
global service_registered
|
||
if service_registered:
|
||
return
|
||
|
||
# # 确保服务名称唯一,使用 uuid 生成一个唯一标识符
|
||
# unique_id = uuid.uuid4()
|
||
hostname = socket.gethostname()
|
||
# print(hostname)
|
||
|
||
for ip in local_ips:
|
||
if ip.startswith("127."):
|
||
continue
|
||
if ip.startswith("192.168.100"):
|
||
continue
|
||
# 将 IP 地址转换为字节格式
|
||
service_info = ServiceInfo(
|
||
"_http._tcp.local.", # 服务类型
|
||
f"LL-X1-{hostname}._http._tcp.local.", # 确保服务名称唯一
|
||
addresses=[socket.inet_aton(ip)], # 使用局域网 IP 地址
|
||
port=5000, # Flask 监听的端口
|
||
properties={}, # 可选的额外服务信息
|
||
server=f"{hostname}.local.", # 主机名
|
||
)
|
||
|
||
zeroconf.register_service(service_info, allow_name_change=True)
|
||
print(f"mDNS 服务已注册,IP 地址:{ip}")
|
||
|
||
service_registered = True
|
||
|
||
|
||
# 在应用退出时注销 zeroconf 服务并关闭 ngrok
|
||
def cleanup():
|
||
global ngrok_process
|
||
print("注销 mDNS 服务并关闭 ngrok...")
|
||
reverse_ssh.stop_tunnel()
|
||
if ngrok_process:
|
||
ngrok_process.terminate()
|
||
zeroconf.close() # 注销 zeroconf 服务
|
||
|
||
|
||
def fetch_music_list_with_retry():
|
||
"""后台定时任务:不断尝试获取音乐列表,直到成功"""
|
||
global all_playlists, playlist, current_song
|
||
|
||
while True:
|
||
try:
|
||
# 清空现有的播放列表
|
||
all_playlists.clear()
|
||
# 遍历分类获取音乐数据
|
||
for categoryID in categoryIDs:
|
||
diss_info, music_info_list = get_song_list(categoryID)
|
||
all_playlists.append(
|
||
{"diss_info": diss_info, "music_info_list": music_info_list}
|
||
)
|
||
print(diss_info)
|
||
# 如果数据获取成功,更新播放列表
|
||
if all_playlists:
|
||
playlist = generate_playlist(all_playlists[0]["music_info_list"])
|
||
current_song.update(
|
||
{
|
||
"album_img_url": playlist[0]["album_img_url"],
|
||
"music_name": playlist[0]["music_name"],
|
||
"singer_name": playlist[0]["singer_name"],
|
||
"is_playing": False,
|
||
}
|
||
)
|
||
print("音乐列表更新成功!")
|
||
break # 成功后退出循环
|
||
except Exception as e:
|
||
print(f"获取音乐列表失败,重试中... 错误信息: {e}")
|
||
print(f"Failed to fetch playlist for category {categoryID}: {e}")
|
||
time.sleep(5) # 等待5秒后重试
|
||
|
||
|
||
def initialize_app(port):
|
||
"""运行 Flask-SocketIO 之前的初始化逻辑。"""
|
||
global playlist, current_song, powerboard
|
||
|
||
# local_ips = get_all_local_ips()
|
||
# register_service(local_ips)
|
||
retry_count = 0
|
||
max_retries = 10
|
||
|
||
while retry_count < max_retries:
|
||
local_ips = get_all_local_ips()
|
||
|
||
# 如果local_ips列表的长度大于2,则直接调用register_service
|
||
if len(local_ips) > 1:
|
||
register_service(local_ips)
|
||
print("Service registered successfully with local IPs:", local_ips)
|
||
break
|
||
|
||
# 如果列表长度不足2,则进行重试
|
||
print(f"Retrying... Current IP list length: {len(local_ips)}")
|
||
retry_count += 1
|
||
time.sleep(3) # 等待3秒后重试
|
||
|
||
# 如果重试超过5次仍然只有1个IP,执行register_service并打印警告
|
||
if len(local_ips) == 1:
|
||
print(f"Warning: Only one local IP found after {max_retries} retries. Registering with this IP.")
|
||
register_service(local_ips)
|
||
|
||
# 初始化PowerBoard并设置回调
|
||
powerboard = PowerBoard()
|
||
powerboard.set_callback(system_shutdown)
|
||
powerboard.start_monitoring()
|
||
|
||
# # 获取最新音乐数据
|
||
# global latest_music
|
||
# try:
|
||
# latest_music = fetch_latest_music()
|
||
# except Exception as e:
|
||
# print(f"Failed to fetch latest music data: {e}")
|
||
|
||
# try:
|
||
# # 尝试初始化音乐列表
|
||
# fetch_music_list_with_retry()
|
||
# except Exception as e:
|
||
# print(f"初始化音乐列表失败:{e}")
|
||
# # 启动一个后台任务,不断尝试获取音乐数据
|
||
# socketio.start_background_task(fetch_music_list_with_retry)
|
||
|
||
# try:
|
||
# # 启动 ngrok 隧道并获取 URL
|
||
# ngrok_url = ''
|
||
# print("Ngrok URL:", ngrok_url)
|
||
# result = reverse_ssh.start_tunnel()
|
||
# print(result)
|
||
# reverse_ssh_port = result["port"]
|
||
# write_and_upload_info(ngrok_url,reverse_ssh_port) # 写入和上传信息文件
|
||
|
||
# except RuntimeError as e:
|
||
# print("Error:", e)
|
||
|
||
|
||
def system_shutdown():
|
||
"""执行系统关机流程,包括远程主机关机、电源板关机和本地系统关机"""
|
||
remote_host = "192.168.100.20" # 远程主机的 IP 地址
|
||
remote_username = "root" # 远程主机的用户名
|
||
remote_password = "bestcobot" # 远程主机的密码
|
||
remote_port = 8822 # 远程主机的 SSH 端口
|
||
sudo_password = "jsfb"
|
||
|
||
try:
|
||
# 1. 先执行远程主机关机命令
|
||
if not is_host_down(remote_host, 8822):
|
||
print("Remote host is alive, processing shutdown")
|
||
remote_command = "shutdown -h now"
|
||
stdout, stderr = execute_command_on_remote(
|
||
remote_host,
|
||
remote_username,
|
||
remote_password,
|
||
remote_command,
|
||
port=remote_port,
|
||
)
|
||
|
||
# 2. 等待远程主机关机并检测
|
||
print("Waiting for remote host to shutdown...")
|
||
if not is_host_down(remote_host, 8822):
|
||
print("Remote host did not shutdown in time")
|
||
return False
|
||
|
||
# 3. 处理电源板关机
|
||
global powerboard, power_board_ip, power_board_thread, power_board_thread_running
|
||
if power_board_thread_running:
|
||
power_board_thread_running = False
|
||
if power_board_thread:
|
||
try:
|
||
power_board_thread.join(timeout=2.0)
|
||
except Exception as e:
|
||
print(f"Error stopping power board thread during shutdown: {e}")
|
||
power_board_thread = None
|
||
|
||
# 4. 发送关机信号到电源板
|
||
if power_board_ip:
|
||
udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
||
udp_socket.settimeout(1.0)
|
||
for _ in range(3):
|
||
try:
|
||
udp_socket.sendto(b"SHUTDOWN", (power_board_ip, 3660))
|
||
print(f"Sent shutdown signal to {power_board_ip}:3660")
|
||
time.sleep(0.1)
|
||
except Exception as e:
|
||
print(f"Error sending shutdown signal: {e}")
|
||
power_board_ip = None
|
||
|
||
powerboard.send_cmd("UI-SHUTDOWN")
|
||
|
||
# 5. 执行本地系统关机
|
||
command = f'echo {sudo_password} | /bin/sh -c "/usr/bin/sudo -S /sbin/shutdown now"'
|
||
result = subprocess.run(
|
||
command,
|
||
shell=True,
|
||
check=True,
|
||
stdout=subprocess.PIPE,
|
||
stderr=subprocess.PIPE,
|
||
)
|
||
print("System is shutting down.")
|
||
return True
|
||
|
||
except Exception as e:
|
||
print(f"Error during shutdown process: {e}")
|
||
return False
|
||
|
||
|
||
initialize_app(5000)
|
||
|
||
|
||
def check_network():
|
||
try:
|
||
# 测试连接到公共 DNS 服务器
|
||
socket.create_connection(("8.8.8.8", 53), timeout=5)
|
||
return True
|
||
except OSError:
|
||
return False
|
||
|
||
# 全局变量用于存储设备位置信息
|
||
device_location = {
|
||
'latitude': None,
|
||
'longitude': None,
|
||
'accuracy': None,
|
||
'timestamp': None
|
||
}
|
||
|
||
@app.route('/update_device_location', methods=['POST'])
|
||
def update_device_location():
|
||
"""
|
||
更新设备位置信息并自动获取天气数据
|
||
请求体示例:
|
||
{
|
||
"latitude": 35.123456,
|
||
"longitude": 139.789012,
|
||
"accuracy": 10.5
|
||
}
|
||
"""
|
||
try:
|
||
location_data = request.json
|
||
print(f"Received location data: {location_data}")
|
||
|
||
# 检查必要的位置数据是否存在
|
||
required_fields = ['latitude', 'longitude']
|
||
if not all(field in location_data for field in required_fields):
|
||
return jsonify({
|
||
'status': 'error',
|
||
'message': 'Missing required location data fields'
|
||
}), 400
|
||
|
||
# 更新全局位置信息
|
||
global device_location
|
||
device_location.update({
|
||
'latitude': location_data.get('latitude'),
|
||
'longitude': location_data.get('longitude'),
|
||
'accuracy': location_data.get('accuracy'),
|
||
'timestamp': datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||
})
|
||
|
||
# 打印位置信息
|
||
print(f"Device Location Updated: {device_location}")
|
||
|
||
# 获取主机名作为设备名称
|
||
hostname = socket.gethostname()
|
||
|
||
# 获取软件版本信息
|
||
version_response = get_local_versions()
|
||
version_data = version_response.json # 使用.json属性获取数据
|
||
versions = version_data.get('versions', [])
|
||
software_version = versions[0] if versions else "default" # 使用第一个版本,如果没有则默认为1.0
|
||
vtxdb_version = vtxdb.get_version()
|
||
iot_config = vtxdb.get("robot_config","IoT")
|
||
|
||
# 准备发送到物联网平台的数据
|
||
iot_data = {
|
||
"device_name": hostname,
|
||
"device_id": f"{iot_config['product_id']}_{hostname}",
|
||
"longitude": location_data.get('longitude'),
|
||
"latitude": location_data.get('latitude'),
|
||
"produce_name": "RS-LL-X1",
|
||
"product_id": iot_config['product_id'],
|
||
"software_version": software_version,
|
||
"vtxdb_version": vtxdb_version,
|
||
"timestamp": device_location['timestamp']
|
||
}
|
||
|
||
# 发送位置信息到物联网平台
|
||
try:
|
||
iot_response = requests.post(
|
||
"http://app.robotstorm.tech:8080/iot/location",
|
||
json=iot_data,
|
||
timeout=5 # 设置5秒超时
|
||
)
|
||
print(f"IoT platform response: {iot_response.status_code}")
|
||
except Exception as iot_error:
|
||
print(f"Error sending data to IoT platform: {iot_error}")
|
||
|
||
# 异步获取天气数据
|
||
weather_updated = False
|
||
if should_update_weather(device_location['latitude'], device_location['longitude']):
|
||
try:
|
||
weather_results = asyncio.run(get_all_weather_data(
|
||
device_location['latitude'],
|
||
device_location['longitude']
|
||
))
|
||
if all(weather_results.values()):
|
||
update_weather_cache(
|
||
weather_results,
|
||
device_location['latitude'],
|
||
device_location['longitude']
|
||
)
|
||
weather_updated = True
|
||
print("Weather data updated successfully")
|
||
except Exception as weather_error:
|
||
print(f"Error updating weather data: {weather_error}")
|
||
|
||
return jsonify({
|
||
'status': 'success',
|
||
'message': 'Location data updated successfully' + (' and weather data refreshed' if weather_updated else ''),
|
||
'data': {
|
||
'location': device_location,
|
||
'weather_updated': weather_updated
|
||
}
|
||
}), 200
|
||
|
||
except Exception as e:
|
||
print(f"Error processing location data: {e}")
|
||
return jsonify({
|
||
'status': 'error',
|
||
'message': f'Failed to process location data: {str(e)}'
|
||
}), 500
|
||
|
||
# 坐标系转换工具函数
|
||
def wgs84_to_gcj02(lat, lng):
|
||
"""
|
||
WGS84坐标系转GCJ02坐标系
|
||
"""
|
||
if not lat or not lng:
|
||
return None, None
|
||
|
||
a = 6378245.0 # 长半轴
|
||
ee = 0.00669342162296594323 # 偏心率平方
|
||
|
||
def transform_lat(x, y):
|
||
ret = -100.0 + 2.0 * x + 3.0 * y + 0.2 * y * y + 0.1 * x * y + 0.2 * math.sqrt(abs(x))
|
||
ret += (20.0 * math.sin(6.0 * x * math.pi) + 20.0 * math.sin(2.0 * x * math.pi)) * 2.0 / 3.0
|
||
ret += (20.0 * math.sin(y * math.pi) + 40.0 * math.sin(y / 3.0 * math.pi)) * 2.0 / 3.0
|
||
ret += (160.0 * math.sin(y / 12.0 * math.pi) + 320 * math.sin(y * math.pi / 30.0)) * 2.0 / 3.0
|
||
return ret
|
||
|
||
def transform_lng(x, y):
|
||
ret = 300.0 + x + 2.0 * y + 0.1 * x * x + 0.1 * x * y + 0.1 * math.sqrt(abs(x))
|
||
ret += (20.0 * math.sin(6.0 * x * math.pi) + 20.0 * math.sin(2.0 * x * math.pi)) * 2.0 / 3.0
|
||
ret += (20.0 * math.sin(x * math.pi) + 40.0 * math.sin(x / 3.0 * math.pi)) * 2.0 / 3.0
|
||
ret += (150.0 * math.sin(x / 12.0 * math.pi) + 300.0 * math.sin(x / 30.0 * math.pi)) * 2.0 / 3.0
|
||
return ret
|
||
|
||
dLat = transform_lat(lng - 105.0, lat - 35.0)
|
||
dLng = transform_lng(lng - 105.0, lat - 35.0)
|
||
radLat = lat / 180.0 * math.pi
|
||
magic = math.sin(radLat)
|
||
magic = 1 - ee * magic * magic
|
||
sqrtMagic = math.sqrt(magic)
|
||
dLat = (dLat * 180.0) / ((a * (1 - ee)) / (magic * sqrtMagic) * math.pi)
|
||
dLng = (dLng * 180.0) / (a / sqrtMagic * math.cos(radLat) * math.pi)
|
||
gcj_lat = lat + dLat
|
||
gcj_lng = lng + dLng
|
||
return gcj_lat, gcj_lng
|
||
|
||
def gcj02_to_bd09(lat, lng):
|
||
"""
|
||
GCJ02坐标系转BD09坐标系
|
||
"""
|
||
if not lat or not lng:
|
||
return None, None
|
||
|
||
x_pi = math.pi * 3000.0 / 180.0
|
||
z = math.sqrt(lng * lng + lat * lat) + 0.00002 * math.sin(lat * x_pi)
|
||
theta = math.atan2(lat, lng) + 0.000003 * math.cos(lng * x_pi)
|
||
bd_lng = z * math.cos(theta) + 0.0065
|
||
bd_lat = z * math.sin(theta) + 0.006
|
||
return bd_lat, bd_lng
|
||
|
||
@app.route('/get_device_location', methods=['GET'])
|
||
def get_device_location():
|
||
"""
|
||
获取设备位置信息,支持不同坐标系
|
||
可选参数 coordinate_system:
|
||
- wgs84 (默认): 原始GPS坐标
|
||
- gcj02: 国测局坐标系
|
||
- bd09: 百度坐标系
|
||
|
||
GET /get_device_location
|
||
# 或
|
||
GET /get_device_location?coordinate_system=wgs84
|
||
|
||
GET /get_device_location?coordinate_system=gcj02
|
||
|
||
GET /get_device_location?coordinate_system=bd09
|
||
"""
|
||
try:
|
||
global device_location
|
||
coordinate_system = request.args.get('coordinate_system', 'wgs84').lower()
|
||
|
||
# 检查是否有位置信息
|
||
if not device_location['latitude'] or not device_location['longitude']:
|
||
return jsonify({
|
||
'status': 'error',
|
||
'message': 'No location data available'
|
||
}), 404
|
||
|
||
# 获取原始WGS84坐标
|
||
wgs84_lat = device_location['latitude']
|
||
wgs84_lng = device_location['longitude']
|
||
|
||
# 根据请求的坐标系进行转换
|
||
if coordinate_system == 'gcj02':
|
||
lat, lng = wgs84_to_gcj02(wgs84_lat, wgs84_lng)
|
||
coordinate_type = 'GCJ02'
|
||
elif coordinate_system == 'bd09':
|
||
gcj02_lat, gcj02_lng = wgs84_to_gcj02(wgs84_lat, wgs84_lng)
|
||
lat, lng = gcj02_to_bd09(gcj02_lat, gcj02_lng)
|
||
coordinate_type = 'BD09'
|
||
else: # wgs84
|
||
lat, lng = wgs84_lat, wgs84_lng
|
||
coordinate_type = 'WGS84'
|
||
|
||
return jsonify({
|
||
'status': 'success',
|
||
'data': {
|
||
'latitude': lat,
|
||
'longitude': lng,
|
||
'accuracy': device_location['accuracy'],
|
||
'timestamp': device_location['timestamp'],
|
||
'coordinate_system': coordinate_type
|
||
}
|
||
}), 200
|
||
|
||
except Exception as e:
|
||
print(f"Error getting location data: {e}")
|
||
return jsonify({
|
||
'status': 'error',
|
||
'message': f'Failed to get location data: {str(e)}'
|
||
}), 500
|
||
|
||
# 和风天气API配置
|
||
QWEATHER_API_KEY = "79b35822391e4b999dbda0f90dfc1dcf" # 请替换为实际的API密钥
|
||
QWEATHER_API_BASE = "https://devapi.qweather.com/v7" # 免费api
|
||
QWEATHER_GEO_API_BASE = "https://geoapi.qweather.com/v2" # GeoAPI接口
|
||
|
||
# 全局变量存储天气数据
|
||
weather_data = {
|
||
'now': None, # 实时天气
|
||
'hourly': None, # 逐小时预报
|
||
'daily': None, # 每日预报
|
||
'last_update': None, # 最后更新时间
|
||
'location': None, # 最后请求的位置
|
||
'geo_info': None # 地理位置信息
|
||
}
|
||
|
||
async def fetch_weather_data(session, url):
|
||
"""异步获取天气数据"""
|
||
try:
|
||
async with session.get(url) as response:
|
||
return await response.json()
|
||
except Exception as e:
|
||
print(f"Error fetching weather data: {e}")
|
||
return None
|
||
|
||
async def get_all_weather_data(latitude, longitude):
|
||
"""并发获取所有天气数据和地理信息"""
|
||
base_params = f"location={longitude},{latitude}&key={QWEATHER_API_KEY}"
|
||
|
||
urls = [
|
||
f"{QWEATHER_API_BASE}/grid-weather/now?{base_params}",
|
||
f"{QWEATHER_API_BASE}/grid-weather/24h?{base_params}",
|
||
f"{QWEATHER_API_BASE}/grid-weather/7d?{base_params}"
|
||
]
|
||
|
||
async with aiohttp.ClientSession() as session:
|
||
# 创建所有任务,包括天气数据和地理信息
|
||
weather_tasks = [fetch_weather_data(session, url) for url in urls]
|
||
geo_task = fetch_geo_info(session, latitude, longitude)
|
||
|
||
# 并发执行所有任务
|
||
all_tasks = weather_tasks + [geo_task]
|
||
results = await asyncio.gather(*all_tasks)
|
||
|
||
# 分离结果
|
||
weather_results = results[:3]
|
||
geo_result = results[3]
|
||
|
||
return {
|
||
'now': weather_results[0],
|
||
'hourly': weather_results[1],
|
||
'daily': weather_results[2],
|
||
'geo': geo_result
|
||
}
|
||
|
||
def update_weather_cache(weather_results, latitude, longitude):
|
||
"""更新天气数据缓存"""
|
||
global weather_data
|
||
weather_data.update({
|
||
'now': weather_results['now'],
|
||
'hourly': weather_results['hourly'],
|
||
'daily': weather_results['daily'],
|
||
'geo_info': weather_results['geo'],
|
||
'last_update': datetime.now(),
|
||
'location': {'latitude': latitude, 'longitude': longitude}
|
||
})
|
||
|
||
def should_update_weather(latitude=None, longitude=None, force_update=False):
|
||
"""检查是否需要更新天气数据"""
|
||
if force_update:
|
||
return True
|
||
|
||
if not weather_data['last_update']:
|
||
return True
|
||
|
||
# 检查时间是否过期(1小时更新一次)
|
||
time_expired = (datetime.now() - weather_data['last_update']) > timedelta(hours=1)
|
||
|
||
# 检查位置是否改变(超过0.01度则认为位置改变)
|
||
location_changed = False
|
||
if latitude and longitude and weather_data['location']:
|
||
old_lat = weather_data['location']['latitude']
|
||
old_lng = weather_data['location']['longitude']
|
||
location_changed = (
|
||
abs(latitude - old_lat) > 0.01 or
|
||
abs(longitude - old_lng) > 0.01
|
||
)
|
||
|
||
return time_expired or location_changed
|
||
|
||
async def fetch_geo_info(session, latitude, longitude):
|
||
"""异步获取地理位置信息"""
|
||
try:
|
||
url = f"{QWEATHER_GEO_API_BASE}/city/lookup?location={longitude},{latitude}&key={QWEATHER_API_KEY}"
|
||
async with session.get(url) as response:
|
||
return await response.json()
|
||
except Exception as e:
|
||
print(f"Error fetching geo info: {e}")
|
||
return None
|
||
|
||
@app.route('/get_weather', methods=['GET'])
|
||
def get_weather():
|
||
"""
|
||
获取天气信息
|
||
可选参数:
|
||
- force_update: 是否强制更新天气数据
|
||
- coordinate_system: 坐标系统(wgs84/gcj02/bd09)
|
||
|
||
# 使用默认设置获取天气
|
||
GET /get_weather
|
||
|
||
# 强制更新天气数据
|
||
GET /get_weather?force_update=true
|
||
|
||
# 使用特定坐标系
|
||
GET /get_weather?coordinate_system=gcj02
|
||
"""
|
||
try:
|
||
force_update = request.args.get('force_update', '').lower() == 'true'
|
||
coordinate_system = request.args.get('coordinate_system', 'wgs84').lower()
|
||
|
||
# 获取当前位置
|
||
global device_location
|
||
if not device_location['latitude'] or not device_location['longitude']:
|
||
return jsonify({
|
||
'status': 'error',
|
||
'message': 'No location data available'
|
||
}), 404
|
||
|
||
# 根据坐标系统获取正确的经纬度
|
||
if coordinate_system == 'gcj02':
|
||
lat, lng = wgs84_to_gcj02(device_location['latitude'], device_location['longitude'])
|
||
elif coordinate_system == 'bd09':
|
||
gcj02_lat, gcj02_lng = wgs84_to_gcj02(device_location['latitude'], device_location['longitude'])
|
||
lat, lng = gcj02_to_bd09(gcj02_lat, gcj02_lng)
|
||
else: # wgs84
|
||
lat, lng = device_location['latitude'], device_location['longitude']
|
||
|
||
# 检查是否需要更新天气数据
|
||
if should_update_weather(lat, lng, force_update):
|
||
# 异步获取所有天气数据和地理信息
|
||
weather_results = asyncio.run(get_all_weather_data(lat, lng))
|
||
|
||
# 检查是否所有请求都成功
|
||
if not all(weather_results.values()):
|
||
return jsonify({
|
||
'status': 'error',
|
||
'message': 'Failed to fetch weather data'
|
||
}), 500
|
||
|
||
# 更新缓存
|
||
update_weather_cache(weather_results, lat, lng)
|
||
|
||
# 返回天气数据
|
||
return jsonify({
|
||
'status': 'success',
|
||
'data': {
|
||
'now': weather_data['now'],
|
||
'hourly': weather_data['hourly'],
|
||
'daily': weather_data['daily'],
|
||
'geo_info': weather_data['geo_info'],
|
||
'last_update': weather_data['last_update'].strftime("%Y-%m-%d %H:%M:%S") if weather_data['last_update'] else None,
|
||
'location': {
|
||
'latitude': lat,
|
||
'longitude': lng,
|
||
'coordinate_system': coordinate_system.upper()
|
||
}
|
||
}
|
||
}), 200
|
||
|
||
except Exception as e:
|
||
print(f"Error getting weather data: {e}")
|
||
return jsonify({
|
||
'status': 'error',
|
||
'message': f'Failed to get weather data: {str(e)}'
|
||
}), 500
|
||
|
||
@app.route("/get_demo_video")
|
||
def get_demo_video():
|
||
path = "static/images/video/demo.m4v"
|
||
range_header = request.headers.get("Range", None)
|
||
|
||
if not range_header:
|
||
return Response(open(path, "rb"), mimetype="video/mp4")
|
||
|
||
# 解析 Range 请求头
|
||
size = os.path.getsize(path)
|
||
byte_range = range_header.split("=")[1]
|
||
start, end = byte_range.split("-")
|
||
start = int(start)
|
||
end = int(end) if end else size - 1
|
||
|
||
chunk_size = 1024 * 1024 # 1MB 片段
|
||
with open(path, "rb") as f:
|
||
f.seek(start)
|
||
data = f.read(end - start + 1)
|
||
|
||
headers = {
|
||
"Content-Range": f"bytes {start}-{end}/{size}",
|
||
"Accept-Ranges": "bytes",
|
||
"Content-Length": str(end - start + 1),
|
||
"Content-Type": "video/mp4",
|
||
}
|
||
|
||
return Response(data, status=206, headers=headers)
|
||
|
||
@app.route('/get_volume', methods=['GET'])
|
||
def get_volume():
|
||
"""
|
||
获取当前音量并返回
|
||
"""
|
||
print("访问了 /get_volume 路由") # 添加调试日志
|
||
current_volume = volume_control.get_current_volume()
|
||
|
||
if current_volume is not None:
|
||
return jsonify({"current_volume": current_volume}), 200
|
||
else:
|
||
return jsonify({"error": "无法获取当前音量"}), 500
|
||
|
||
@app.route('/adjust_volume', methods=['POST'])
|
||
def adjust_volume():
|
||
try:
|
||
# 获取请求体中的指令和第二个参数(是否发送到前端)
|
||
adjust_volumn_result = request.json.get('adjust_volumn_result')
|
||
notify_frontend = request.json.get('notify_frontend', True) # 默认值为 True,如果没有传递则会发送通知
|
||
|
||
if adjust_volumn_result is None:
|
||
return jsonify({"error": "没有提供调整指令"}), 400
|
||
|
||
# 调用 volume_control 模块进行音量调整
|
||
volume_control.adjust_volume(adjust_volumn_result)
|
||
|
||
# 如果 `notify_frontend` 为 True,则发送通知到前端
|
||
if notify_frontend:
|
||
socketio.emit('volume_adjusted', {'message': f"音量已调整为 {adjust_volumn_result}"})
|
||
|
||
return jsonify({"status": "success", "message": f"音量已调整为 {adjust_volumn_result}"}), 200
|
||
|
||
except Exception as e:
|
||
return jsonify({"error": str(e)}), 500
|
||
|
||
def send_audio_command(command, port=8766):
|
||
try:
|
||
# 输出调试信息
|
||
print(f"正在发送命令: {command} 到端口: {port}")
|
||
|
||
# 创建新的事件循环
|
||
loop = asyncio.new_event_loop()
|
||
asyncio.set_event_loop(loop)
|
||
|
||
# 确保任务已创建
|
||
task = asyncio.ensure_future(send_message(command, port))
|
||
|
||
# 等待任务完成
|
||
loop.run_until_complete(task)
|
||
|
||
# 输出发送命令后的成功信息
|
||
print(f"命令 {command} 成功发送到端口 {port}")
|
||
|
||
except Exception as e:
|
||
print(f"发送命令时发生错误: {e}")
|
||
|
||
@socketio.on("speech_audio_control")
|
||
def speech_audio_control(data):
|
||
if "start" in data:
|
||
command = "speech_audio_start"
|
||
send_audio_command(command)
|
||
|
||
elif "cancel" in data:
|
||
command = "speech_audio_cancel"
|
||
send_audio_command(command)
|
||
|
||
|
||
@app.route('/upload_speak', methods=['POST'])
|
||
def upload_speak():
|
||
try:
|
||
# 检查是否有文件上传
|
||
if 'audio' not in request.files:
|
||
return jsonify({'error': 'No audio file provided'}), 400
|
||
|
||
audio_file = request.files['audio']
|
||
if audio_file.filename == '':
|
||
return jsonify({'error': 'No selected file'}), 400
|
||
|
||
# 确保 ../tmp 目录存在
|
||
tmp_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', 'tmp'))
|
||
os.makedirs(tmp_dir, exist_ok=True)
|
||
|
||
# 删除旧的音频文件
|
||
for old_file in os.listdir(tmp_dir):
|
||
if old_file.startswith("speak_") and old_file.endswith(".mp3"):
|
||
old_file_path = os.path.join(tmp_dir, old_file)
|
||
try:
|
||
os.remove(old_file_path)
|
||
print(f"Deleted old audio file: {old_file_path}")
|
||
except Exception as e:
|
||
print(f"Error deleting old audio file {old_file_path}: {e}")
|
||
|
||
# 生成带时间戳的文件名
|
||
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
||
filename = f"speak_{timestamp}.mp3"
|
||
file_path = os.path.join(tmp_dir, filename)
|
||
|
||
# 保存文件
|
||
audio_file.save(file_path)
|
||
print(f"Audio file saved to: {file_path}")
|
||
|
||
# 发送指令到端口 8766,告知新的音频文件上传并发送文件绝对路径
|
||
command = f"speech_audio_stop:{file_path}"
|
||
send_audio_command(command) # 使用公共函数发送指令
|
||
|
||
return jsonify({'message': 'Audio file uploaded successfully', 'filename': filename}), 200
|
||
|
||
except Exception as e:
|
||
print(f"Error uploading audio file: {e}")
|
||
return jsonify({'error': str(e)}), 500
|
||
|
||
@app.route('/heartbeat')
|
||
def heartbeat():
|
||
return 'OK'
|
||
|
||
@app.route("/upload_playlist_image", methods=["POST"])
|
||
def upload_playlist_image():
|
||
try:
|
||
if 'image' not in request.files:
|
||
return jsonify({"status": "error", "message": "No image file uploaded"}), 400
|
||
|
||
image_file = request.files['image']
|
||
image_stream = io.BytesIO(image_file.read())
|
||
image = Image.open(image_stream)
|
||
cv_image = cv2.cvtColor(np.array(image), cv2.COLOR_RGB2BGR)
|
||
|
||
decoded_objects = decode(cv_image)
|
||
if not decoded_objects:
|
||
return jsonify({"status": "error", "message": "No QR code found in image"}), 400
|
||
|
||
qr_data = decoded_objects[0].data.decode('utf-8')
|
||
print("Original QR URL:", qr_data)
|
||
|
||
# 替换原始QR码URL中的域名
|
||
if qr_data.startswith('https://c6.y.qq.com/'):
|
||
qr_data = qr_data.replace('https://c6.y.qq.com/', 'https://robotstorm.tech/c6yqq/')
|
||
|
||
headers = {
|
||
'User-Agent': 'Mozilla/5.0 (iPhone; CPU iPhone OS 14_0 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.0 Mobile/15E148 Safari/604.1',
|
||
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
|
||
'Accept-Language': 'en-US,en;q=0.5',
|
||
'Connection': 'keep-alive',
|
||
}
|
||
|
||
response = requests.get(qr_data, headers=headers, allow_redirects=True)
|
||
final_url = response.url
|
||
|
||
# 替换重定向后URL中的域名
|
||
if final_url.startswith('https://y.qq.com/'):
|
||
final_url = final_url.replace('https://y.qq.com/', 'https://robotstorm.tech/yqq/')
|
||
|
||
print("Final URL:", final_url)
|
||
|
||
parsed_url = urlparse(final_url)
|
||
query_params = parse_qs(parsed_url.query)
|
||
|
||
playlist_id = query_params.get('id', [None])[0]
|
||
if not playlist_id:
|
||
path_parts = parsed_url.path.split('/')
|
||
for part in path_parts:
|
||
if part.isdigit():
|
||
playlist_id = part
|
||
break
|
||
|
||
if not playlist_id:
|
||
return jsonify({"status": "error", "message": "No playlist ID found in QR code"}), 400
|
||
|
||
print("Found playlist ID:", playlist_id)
|
||
|
||
global categoryIDs
|
||
if playlist_id not in categoryIDs:
|
||
categoryIDs.insert(0, playlist_id)
|
||
# 保存更新后的歌单ID到配置文件
|
||
if not save_playlist_ids(categoryIDs):
|
||
return jsonify({"status": "error", "message": "Failed to save playlist configuration"}), 500
|
||
# Clear the playlists cache to force refresh
|
||
global all_playlists
|
||
all_playlists = []
|
||
|
||
return jsonify({"status": "success", "message": "Playlist added successfully"})
|
||
|
||
except Exception as e:
|
||
print(f"Error processing playlist image: {e}")
|
||
return jsonify({"status": "error", "message": str(e)}), 500
|
||
|
||
@app.route("/delete_playlist", methods=["POST"])
|
||
def delete_playlist():
|
||
try:
|
||
data = request.get_json()
|
||
category_id = data.get('categoryId')
|
||
|
||
if not category_id:
|
||
return jsonify({"status": "error", "message": "未提供歌单ID"}), 400
|
||
|
||
global categoryIDs, all_playlists
|
||
|
||
if category_id in categoryIDs:
|
||
categoryIDs.remove(category_id)
|
||
# 保存更新后的歌单ID到配置文件
|
||
if not save_playlist_ids(categoryIDs):
|
||
return jsonify({"status": "error", "message": "Failed to save playlist configuration"}), 500
|
||
|
||
# 从 all_playlists 中删除对应的歌单
|
||
all_playlists = [p for p in all_playlists if p['diss_info'].get('categoryId') != category_id]
|
||
|
||
return jsonify({"status": "success", "message": "歌单删除成功"})
|
||
else:
|
||
return jsonify({"status": "error", "message": "歌单不存在"}), 404
|
||
|
||
except Exception as e:
|
||
print(f"Error deleting playlist: {e}")
|
||
return jsonify({"status": "error", "message": str(e)}), 500
|
||
|
||
@app.route('/api/license/use', methods=['POST'])
|
||
def use_license():
|
||
# 调用模块中的逻辑,获取返回结果
|
||
result = license_module.use_license()
|
||
|
||
# 如果返回的是错误,返回500状态码
|
||
if 'error' in result:
|
||
return jsonify(result), 500
|
||
else:
|
||
return jsonify(result), 200 # 返回成功的数据
|
||
|
||
@app.route('/api/license/check', methods=['GET'])
|
||
def check_license():
|
||
# 调用模块中的逻辑,获取返回结果
|
||
result = license_module.check_license()
|
||
|
||
# 如果返回的是错误,返回500状态码
|
||
if 'error' in result:
|
||
return jsonify(result), 500
|
||
else:
|
||
return jsonify(result), 200 # 返回成功的数据
|
||
|
||
@app.route('/api/license/info', methods=['GET'])
|
||
def get_license_info():
|
||
# 调用模块中的逻辑,获取返回结果
|
||
result = license_module.get_license_info()
|
||
|
||
# 如果返回的是错误,返回500状态码
|
||
if 'error' in result:
|
||
return jsonify(result), 500
|
||
else:
|
||
return jsonify(result), 200 # 返回成功的数据
|
||
|
||
# 设置深度思考状态的接口
|
||
@app.route("/set_deep_thought", methods=["POST"])
|
||
def set_deep_thought():
|
||
try:
|
||
# 获取请求数据,设定深度思考状态
|
||
data = request.json
|
||
status = data.get("status", False)
|
||
|
||
# 设置深度思考状态
|
||
set_deep_thought_status(status)
|
||
|
||
return jsonify({"status": "success", "message": f"Deep thought status set to {status}"}), 200
|
||
except Exception as e:
|
||
return jsonify({"status": "error", "message": str(e)}), 500
|
||
|
||
|
||
# 获取深度思考状态的接口
|
||
@app.route("/get_deep_thought", methods=["GET"])
|
||
def get_deep_thought():
|
||
try:
|
||
# 获取深度思考状态
|
||
status = get_deep_thought_status()
|
||
|
||
return jsonify({"status": "success", "deep_thought_active": status}), 200
|
||
except Exception as e:
|
||
return jsonify({"status": "error", "message": str(e)}), 500
|
||
|
||
|
||
# 设置联网搜索状态的接口
|
||
@app.route("/set_ai_search", methods=["POST"])
|
||
def set_ai_search():
|
||
try:
|
||
# 获取请求数据,设定联网搜索状态
|
||
data = request.json
|
||
status = data.get("status", False)
|
||
|
||
# 设置联网搜索状态
|
||
set_ai_search_status(status)
|
||
|
||
return jsonify({"status": "success", "message": f"Deep thought status set to {status}"}), 200
|
||
except Exception as e:
|
||
return jsonify({"status": "error", "message": str(e)}), 500
|
||
|
||
|
||
# 获取联网搜索状态的接口
|
||
@app.route("/get_ai_search", methods=["GET"])
|
||
def get_ai_search():
|
||
try:
|
||
# 获取联网搜索状态
|
||
status = get_ai_search_status()
|
||
|
||
return jsonify({"status": "success", "ai_search_active": status}), 200
|
||
except Exception as e:
|
||
return jsonify({"status": "error", "message": str(e)}), 500
|
||
|
||
@app.route("/massage_plans")
|
||
def massage_plans():
|
||
# 获取URL参数
|
||
choose_task = request.args.get('choose_task', '')
|
||
body_part = request.args.get('body_part', '')
|
||
|
||
# 将参数传递给模板
|
||
return render_template(
|
||
"massage_plan.html",
|
||
initial_choose_task=choose_task,
|
||
initial_body_part=body_part
|
||
)
|
||
|
||
@app.route("/get_massage_plan", methods=["GET"])
|
||
def get_massage_plan():
|
||
try:
|
||
# 获取筛选参数
|
||
plan_name = request.args.get('plan_name', '')
|
||
choose_task = request.args.get('choose_task', '')
|
||
body_part = request.args.get('body_part', '')
|
||
|
||
# 从vtxdb获取按摩计划数据
|
||
massage_plans = vtxdb.get("massage_plan", plan_name)
|
||
|
||
# 如果指定了plan_name,直接返回结果
|
||
if plan_name:
|
||
return jsonify({"status": "success", "data": massage_plans})
|
||
|
||
# 如果没有指定plan_name但指定了筛选条件,进行筛选
|
||
if choose_task or body_part:
|
||
filtered_plans = {}
|
||
for name, plan in massage_plans.items():
|
||
# 同时满足两个条件
|
||
if choose_task and body_part:
|
||
if plan.get('choose_task') == choose_task and plan.get('body_part') == body_part:
|
||
filtered_plans[name] = plan
|
||
# 只满足choose_task条件
|
||
elif choose_task:
|
||
if plan.get('choose_task') == choose_task:
|
||
filtered_plans[name] = plan
|
||
# 只满足body_part条件
|
||
elif body_part:
|
||
if plan.get('body_part') == body_part:
|
||
filtered_plans[name] = plan
|
||
|
||
return jsonify({"status": "success", "data": filtered_plans})
|
||
|
||
# 如果没有任何筛选条件,返回所有计划
|
||
return jsonify({"status": "success", "data": massage_plans})
|
||
|
||
except Exception as e:
|
||
return jsonify({"status": "error", "message": str(e)}), 500
|
||
|
||
@app.route("/set_massage_plan", methods=["POST", "DELETE"])
|
||
def set_massage_plan():
|
||
try:
|
||
data = request.json
|
||
plan_name = data.get('plan_name')
|
||
|
||
if request.method == "DELETE":
|
||
if not plan_name:
|
||
return jsonify({"status": "error", "message": "缺少疗程名称"}), 400
|
||
|
||
# 先获取疗程信息,检查是否可以删除
|
||
plan_data = vtxdb.get("massage_plan", plan_name)
|
||
if not plan_data or not isinstance(plan_data, dict):
|
||
return jsonify({"status": "error", "message": "疗程不存在"}), 404
|
||
|
||
if not plan_data.get('can_delete', False):
|
||
return jsonify({"status": "error", "message": "该疗程不允许删除"}), 403
|
||
|
||
# 执行删除操作
|
||
vtxdb.delete("massage_plan", plan_name)
|
||
return jsonify({"status": "success", "message": "删除成功"})
|
||
|
||
else: # POST method
|
||
plan_data = data.get('plan_data')
|
||
if not plan_name or not plan_data:
|
||
return jsonify({"status": "error", "message": "缺少必要参数"}), 400
|
||
|
||
# 确保复制的疗程可以删除
|
||
if isinstance(plan_data, dict):
|
||
plan_data['can_delete'] = True
|
||
|
||
# 保存到vtxdb
|
||
vtxdb.set("massage_plan", plan_name, plan_data)
|
||
return jsonify({"status": "success", "message": "保存成功"})
|
||
|
||
except Exception as e:
|
||
return jsonify({"status": "error", "message": str(e)}), 500
|
||
|
||
@app.route("/develop_login", methods=["POST"])
|
||
def develop_login():
|
||
data = request.json
|
||
password = data["password"]
|
||
|
||
# 简单验证示例
|
||
if password == "cd123456":
|
||
return jsonify(success=True)
|
||
else:
|
||
return jsonify(success=False)
|
||
|
||
@app.route("/plans_edit", methods=["POST"])
|
||
def plans_edit():
|
||
data = request.json
|
||
password = data["password"]
|
||
|
||
# 简单验证示例
|
||
if password == "robotstorm":
|
||
return jsonify(success=True)
|
||
else:
|
||
return jsonify(success=False)
|
||
|
||
@app.route("/get_log", methods=["GET"])
|
||
def get_log():
|
||
log_type = request.args.get("type") # 必填参数:ui/language/massage
|
||
keyword = request.args.get("keyword", "") # 可选参数:筛选关键字
|
||
page = int(request.args.get("page", 1))
|
||
page_size = int(request.args.get("pageSize", 400))
|
||
|
||
# 参数校验
|
||
if log_type not in ["ui", "language", "massage"]:
|
||
return jsonify({"error": "无效的日志类型"}), 400
|
||
|
||
# 调用工具函数查询日志
|
||
result = read_log_file(log_type, keyword, page, page_size)
|
||
return jsonify(result)
|
||
|
||
# 存储校准任务的状态
|
||
calibration_tasks = {}
|
||
|
||
def run_calibration():
|
||
"""在后台运行校准任务"""
|
||
try:
|
||
# 通知前端开始校准
|
||
socketio.emit('calibration_status', {
|
||
'status': 'running',
|
||
'message': '开始自动校准...'
|
||
})
|
||
|
||
# 设置标定板参数
|
||
calibration_board_size = [6, 3] # width=6, height=3
|
||
calibration_board_square_size = 0.030 # 单位:米
|
||
|
||
# 创建校准对象并执行校准
|
||
calib = Calibration(calibration_board_size, calibration_board_square_size,"/home/jsfb/Documents/")
|
||
|
||
# 收集数据
|
||
socketio.emit('calibration_status', {
|
||
'status': 'collecting',
|
||
'message': '正在收集标定数据...'
|
||
})
|
||
calib.collect_data()
|
||
|
||
# 执行校准
|
||
socketio.emit('calibration_status', {
|
||
'status': 'calibrating',
|
||
'message': '正在计算标定参数...'
|
||
})
|
||
r, t ,intrinsics= calib.calibrate()
|
||
print("旋转矩阵:")
|
||
print(r)
|
||
print("平移向量:")
|
||
print(t)
|
||
print("内参矩阵:")
|
||
print(intrinsics)
|
||
|
||
|
||
# 转换结果为可序列化的格式
|
||
rotation_matrix = r.tolist() if hasattr(r, 'tolist') else r
|
||
translation_vector = t.flatten().tolist() if hasattr(t, 'tolist') else t
|
||
|
||
# 发送完成信号和结果
|
||
socketio.emit('calibration_status', {
|
||
'status': 'completed',
|
||
'message': '校准完成',
|
||
'results': {
|
||
'rotation_matrix': rotation_matrix,
|
||
'translation_vector': translation_vector,
|
||
'intrinsics': intrinsics # 直接使用字典格式的内参
|
||
}
|
||
})
|
||
|
||
except Exception as e:
|
||
error_message = str(e)
|
||
print(f"校准过程出错: {error_message}")
|
||
|
||
# 发送错误信息
|
||
socketio.emit('calibration_status', {
|
||
'status': 'error',
|
||
'message': f'校准失败: {error_message}'
|
||
})
|
||
|
||
@app.route("/start_calibration", methods=["POST"])
|
||
def start_calibration():
|
||
"""启动自动校准过程"""
|
||
try:
|
||
# 启动后台线程进行校准
|
||
thread = threading.Thread(target=run_calibration)
|
||
thread.daemon = True
|
||
thread.start()
|
||
|
||
return jsonify({
|
||
'status': 'success',
|
||
'message': '校准任务已启动'
|
||
})
|
||
|
||
except Exception as e:
|
||
return jsonify({
|
||
'status': 'error',
|
||
'message': str(e)
|
||
}), 500
|
||
|
||
@app.route("/get_calibration_status/<task_id>", methods=["GET"])
|
||
def get_calibration_status(task_id):
|
||
"""获取校准任务的状态"""
|
||
if task_id not in calibration_tasks:
|
||
return jsonify({
|
||
'status': 'error',
|
||
'message': '任务不存在'
|
||
}), 404
|
||
|
||
task = calibration_tasks[task_id]
|
||
response = {
|
||
'status': task['status'],
|
||
'started_at': task['started_at']
|
||
}
|
||
|
||
if task['status'] == 'completed' and 'results' in task:
|
||
response['results'] = task['results']
|
||
elif task['status'] == 'error' and 'error' in task:
|
||
response['error'] = task['error']
|
||
|
||
return jsonify(response)
|
||
|
||
@app.route("/cancel_calibration/<task_id>", methods=["POST"])
|
||
def cancel_calibration(task_id):
|
||
"""取消正在进行的校准任务"""
|
||
if task_id not in calibration_tasks:
|
||
return jsonify({
|
||
'status': 'error',
|
||
'message': '任务不存在'
|
||
}), 404
|
||
|
||
if calibration_tasks[task_id]['status'] == 'completed':
|
||
return jsonify({
|
||
'status': 'error',
|
||
'message': '任务已完成,无法取消'
|
||
}), 400
|
||
|
||
# 从任务列表中移除任务
|
||
del calibration_tasks[task_id]
|
||
|
||
return jsonify({
|
||
'status': 'success',
|
||
'message': '校准任务已取消'
|
||
})
|
||
|
||
@app.route("/save_calibration", methods=["POST"])
|
||
def save_calibration():
|
||
"""保存标定结果到数据库"""
|
||
try:
|
||
data = request.get_json()
|
||
rotation_matrix = data.get('rotation_matrix')
|
||
translation_vector = data.get('translation_vector')
|
||
intrinsics = data.get('intrinsics')
|
||
|
||
if not rotation_matrix or not translation_vector or not intrinsics:
|
||
return jsonify({
|
||
'status': 'error',
|
||
'message': '缺少必要的标定数据'
|
||
}), 400
|
||
|
||
# 保存到数据库
|
||
vtxdb.set("robot_config", "camera.camera_matrix", rotation_matrix)
|
||
vtxdb.set("robot_config", "camera.camera_trans", translation_vector)
|
||
vtxdb.set("robot_config", "camera.intrinsics", intrinsics) # 保存字典格式的内参
|
||
|
||
return jsonify({
|
||
'status': 'success',
|
||
'message': '标定结果已保存'
|
||
})
|
||
|
||
except Exception as e:
|
||
print(f"保存标定结果时出错: {str(e)}")
|
||
return jsonify({
|
||
'status': 'error',
|
||
'message': str(e)
|
||
}), 500
|
||
|
||
@app.route("/massage_plan_visualization")
|
||
def massage_plan_visualization():
|
||
body_part = request.args.get('body_part', 'back') # 默认为背部
|
||
|
||
# 腰部(waist)、肩膀(shoulder)和背部(back)都使用相同的图片和点位
|
||
if body_part in ['waist', 'shoulder']:
|
||
body_part = 'back'
|
||
|
||
return render_template("massage_plan_visualization.html", body_part=body_part)
|
||
|
||
@app.route("/massage_plan_control")
|
||
def massage_plan_control():
|
||
body_part = request.args.get('body_part', 'back') # 默认为背部
|
||
return render_template("massage_plan_control.html", body_part=body_part)
|
||
|
||
# 默认选中的按摩头
|
||
@app.route("/get_massage_heads", methods=["GET"])
|
||
def get_massage_heads():
|
||
try:
|
||
# 从vtxdb获取按摩头配置
|
||
heads_config = vtxdb.get("system_config", "massage_heads")
|
||
|
||
# 如果不存在,初始化默认配置
|
||
if not heads_config:
|
||
default_heads = {
|
||
'thermotherapy': {'display': True, 'name': '深部热疗'},
|
||
'shockwave': {'display': True, 'name': '点阵按摩'},
|
||
'ball': {'display': False, 'name': '全能滚珠'},
|
||
'finger': {'display': True, 'name': '指疗通络'},
|
||
'roller': {'display': True, 'name': '滚滚刺疗'},
|
||
'stone': {'display': True, 'name': '温砭舒揉'},
|
||
'ion': {'display': False, 'name': '离子光灸'},
|
||
"heat": {'display': False, 'name': '能量热疗'},
|
||
"spheres": {'display': False, 'name': '天球滚捏'},
|
||
}
|
||
vtxdb.set("system_config", "massage_heads", default_heads)
|
||
heads_config = default_heads
|
||
|
||
return jsonify({
|
||
"status": "success",
|
||
"data": heads_config
|
||
})
|
||
|
||
except Exception as e:
|
||
return jsonify({
|
||
"status": "error",
|
||
"message": f"获取按摩头配置失败: {str(e)}"
|
||
}), 500
|
||
|
||
@app.route("/set_massage_heads", methods=["POST"])
|
||
def set_massage_heads():
|
||
try:
|
||
data = request.json
|
||
heads_config = data.get('heads_config')
|
||
|
||
if not heads_config:
|
||
return jsonify({
|
||
"status": "error",
|
||
"message": "缺少按摩头配置参数"
|
||
}), 400
|
||
|
||
# 验证配置格式
|
||
required_fields = ['thermotherapy', 'shockwave', 'ball',
|
||
'finger', 'roller', 'stone', 'ion']
|
||
|
||
if not all(head in heads_config for head in required_fields):
|
||
return jsonify({
|
||
"status": "error",
|
||
"message": "按摩头配置不完整"
|
||
}), 400
|
||
|
||
# 保存到vtxdb
|
||
vtxdb.set("system_config", "massage_heads", heads_config)
|
||
|
||
return jsonify({
|
||
"status": "success",
|
||
"message": "按摩头配置保存成功"
|
||
})
|
||
|
||
except Exception as e:
|
||
return jsonify({
|
||
"status": "error",
|
||
"message": f"保存按摩头配置失败: {str(e)}"
|
||
}), 500
|
||
|
||
@app.route("/get_jump_mode", methods=["GET"])
|
||
def get_jump_mode():
|
||
try:
|
||
# 从vtxdb获取跳跃模式配置
|
||
jump_config = vtxdb.get("system_config", "jump_mode")
|
||
# 如果不存在,初始化默认配置
|
||
if not jump_config:
|
||
default_config = {
|
||
"enabled": True, # 默认开启跳跃模式
|
||
}
|
||
vtxdb.set("system_config", "jump_mode", default_config)
|
||
jump_config = default_config
|
||
|
||
return jsonify({
|
||
"status": "success",
|
||
"data": jump_config
|
||
})
|
||
|
||
except Exception as e:
|
||
return jsonify({
|
||
"status": "error",
|
||
"message": f"获取跳跃模式失败: {str(e)}"
|
||
}), 500
|
||
|
||
@app.route("/set_jump_mode", methods=["POST"])
|
||
def set_jump_mode():
|
||
try:
|
||
data = request.json
|
||
enabled = data.get("enabled")
|
||
|
||
if enabled is None:
|
||
return jsonify({
|
||
"status": "error",
|
||
"message": "缺少enabled参数"
|
||
}), 400
|
||
|
||
# 获取当前配置
|
||
current_config = vtxdb.get("system_config", "jump_mode") or {
|
||
"enabled": True,
|
||
}
|
||
|
||
# 更新配置
|
||
current_config["enabled"] = bool(enabled)
|
||
vtxdb.set("system_config", "jump_mode", current_config)
|
||
|
||
return jsonify({
|
||
"status": "success",
|
||
"message": "跳跃模式设置成功",
|
||
"data": current_config
|
||
})
|
||
|
||
except Exception as e:
|
||
return jsonify({
|
||
"status": "error",
|
||
"message": f"设置跳跃模式失败: {str(e)}"
|
||
}), 500
|
||
|
||
|
||
if __name__ == "__main__":
|
||
# 使用 argparse 解析命令行参数
|
||
parser = argparse.ArgumentParser(description="Run Flask-SocketIO server.")
|
||
parser.add_argument("--debug", action="store_true", help="Enable debug mode")
|
||
args, unknown = parser.parse_known_args()
|
||
|
||
# 设置调试模式
|
||
DEBUG_MODE = args.debug
|
||
|
||
while True:
|
||
try:
|
||
# 尝试启动 Flask-SocketIO 应用
|
||
socketio.run(
|
||
app, debug=DEBUG_MODE, host="0.0.0.0", port=5000, allow_unsafe_werkzeug=True
|
||
)
|
||
except Exception as e:
|
||
print(f"Error: {e}", file=sys.stderr)
|
||
print("Restarting server in 5 seconds...")
|
||
time.sleep(5) # 等待5秒后重启
|
||
|
||
|
||
# MAX_WAIT_TIME = 300 # 最多等待 5 分钟
|
||
# elapsed_time = 0
|
||
# while not check_network() and elapsed_time < MAX_WAIT_TIME:
|
||
# print(f"Network unavailable. Retrying in 5 seconds... ({elapsed_time}/{MAX_WAIT_TIME}s)")
|
||
# time.sleep(5)
|
||
# elapsed_time += 5
|
||
# if elapsed_time >= MAX_WAIT_TIME:
|
||
# print("Network initialization failed. Proceeding without network...")
|
||
|
||
# # 启动 Flask-SocketIO 服务
|
||
# socketio.run(app, host="0.0.0.0", port=5000)
|
||
|
||
# # 运行 Flask-SocketIO 应用
|
||
# socketio.run(
|
||
# app, debug=DEBUG_MODE, host="0.0.0.0", port=5000, allow_unsafe_werkzeug=True
|
||
# )
|
||
# socketio.run(app, debug=DEBUG_MODE, host="0.0.0.0", port=5000) #如果使用gunicorn启动 |