2160 lines
71 KiB
Python
Executable File
2160 lines
71 KiB
Python
Executable File
from flask import (
|
||
Flask,
|
||
render_template,
|
||
jsonify,
|
||
redirect,
|
||
url_for,
|
||
request,
|
||
Response,
|
||
render_template_string,
|
||
session,
|
||
)
|
||
from flask_socketio import SocketIO
|
||
from flask_cors import CORS
|
||
from flask_sslify import SSLify
|
||
from zeroconf import Zeroconf, ServiceInfo
|
||
import atexit
|
||
import uuid
|
||
import socket
|
||
import netifaces
|
||
import time
|
||
from qq_music import (
|
||
fetch_latest_music,
|
||
get_song_list,
|
||
fetch_album_image,
|
||
get_music_info,
|
||
player,
|
||
)
|
||
import threading
|
||
from threading import Lock
|
||
import asyncio
|
||
import websockets
|
||
import json
|
||
import argparse
|
||
import librosa
|
||
import numpy as np
|
||
import subprocess
|
||
import signal
|
||
import os
|
||
import sys
|
||
import re
|
||
from datetime import datetime
|
||
import paramiko
|
||
import requests
|
||
from bs4 import BeautifulSoup, NavigableString
|
||
import shutil
|
||
from urllib.parse import urljoin, urlparse
|
||
from requests.auth import HTTPBasicAuth
|
||
from tools.ssh_tools import execute_command_on_remote, is_host_down, ReverseSSH
|
||
import logging
|
||
|
||
from force_sensor_aubo import XjcSensor
|
||
|
||
|
||
|
||
DEBUG_MODE = False
|
||
|
||
# 配置 logging 模块
|
||
log_file = "../log/UI-next-app.log"
|
||
|
||
|
||
# 定义备份文件夹路径
|
||
backup_folder = os.path.abspath(os.path.join(os.path.dirname(__file__), "../../UILog"))
|
||
|
||
# 创建 UILog 文件夹(如果不存在)
|
||
if not os.path.exists(backup_folder):
|
||
os.makedirs(backup_folder) # 自动创建目录
|
||
|
||
# 备份日志文件
|
||
if os.path.exists(log_file):
|
||
# 生成带时间戳的备份文件名
|
||
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
||
backup_file = os.path.join(backup_folder, f"UI-next-app_{timestamp}.log")
|
||
shutil.copy2(log_file, backup_file)
|
||
|
||
|
||
logging.basicConfig(
|
||
level=logging.INFO,
|
||
format="%(asctime)s - %(levelname)s - %(message)s",
|
||
handlers=[
|
||
# logging.FileHandler(log_file),
|
||
logging.FileHandler(log_file, mode="w"), # 覆盖模式,每次运行清空日志
|
||
logging.StreamHandler(sys.stdout),
|
||
],
|
||
)
|
||
|
||
|
||
# 定义日志记录器
|
||
class LoggerWriter:
|
||
def __init__(self, logger, level):
|
||
self.logger = logger
|
||
self.level = level
|
||
|
||
def write(self, message):
|
||
if message.strip() != "":
|
||
self.logger.log(self.level, message.strip())
|
||
|
||
def flush(self):
|
||
pass
|
||
|
||
|
||
# 重定向标准输出和标准错误
|
||
sys.stdout = LoggerWriter(logging.getLogger(), logging.INFO)
|
||
sys.stderr = LoggerWriter(logging.getLogger(), logging.ERROR)
|
||
|
||
if getattr(sys, "frozen", False):
|
||
template_folder = os.path.join(sys._MEIPASS, "templates")
|
||
static_folder = os.path.join(sys._MEIPASS, "static")
|
||
app = Flask(__name__, template_folder=template_folder, static_folder=static_folder)
|
||
else:
|
||
app = Flask(__name__)
|
||
|
||
# app = Flask(__name__)
|
||
# sslify = SSLify(app)
|
||
CORS(app)
|
||
app.secret_key = "jsfb" # 设置密钥,用于安全保护 session
|
||
app.config["SECRET_KEY"] = "jsfb"
|
||
socketio = SocketIO(app, cors_allowed_origins="*")
|
||
|
||
|
||
def signal_handler(signal, frame):
|
||
print("Shutting Down")
|
||
sys.exit(0)
|
||
|
||
|
||
signal.signal(signal.SIGINT, signal_handler)
|
||
signal.signal(signal.SIGTERM, signal_handler)
|
||
|
||
programs = [
|
||
{
|
||
"id": 1,
|
||
"name": "暂未开放",
|
||
"details": {"项目名称": "无", "项目功效": "无", "时长": "无", "上次时间": "无"},
|
||
},
|
||
]
|
||
|
||
# programs = [
|
||
# {'id': 1, 'name': '未開放', 'details': {'プロジェクト名': 'なし', 'プロジェクト効果': 'なし', '時間': 'なし', '最終日時': 'なし'}},
|
||
# ]
|
||
|
||
current_mode = "smart_mode" # manual_mode / smart_mode
|
||
|
||
messages = [
|
||
"小悠师傅 开始按摩",
|
||
"小悠小悠 今天天气怎么样",
|
||
"小悠小悠 今天有什么新闻",
|
||
"使用“小悠师傅” 或 “小悠小悠”唤醒我...",
|
||
]
|
||
|
||
# messages = ["ユウマスター マッサージを開始", "ユウユウ 今日の天気はどう?", "ユウユウ 今日のニュースは?", "「ユウマスター」または「ユウユウ」で私を起動してください..."]
|
||
|
||
|
||
latest_music = []
|
||
all_playlists = []
|
||
|
||
|
||
def send_status_messages():
|
||
while True:
|
||
for msg in messages:
|
||
socketio.emit("message", msg) # 使用 'message' 事件名发送消息
|
||
socketio.sleep(3) # 每条消息之间等待5秒
|
||
socketio.sleep(5) # 列表发送完一轮后等待10秒
|
||
|
||
|
||
# SocketIO event handler for synthesizing audio
|
||
# @socketio.on('synthesize_audio')
|
||
# def handle_synthesize_audio(data):
|
||
# text = data['text']
|
||
# print(text)
|
||
# ai.tts_and_play_audio(socketio,text)
|
||
|
||
|
||
@app.route("/get_music_data", methods=["GET"])
|
||
def get_music_data():
|
||
global latest_music
|
||
if not latest_music:
|
||
latest_music = fetch_latest_music()
|
||
return jsonify(latest_music)
|
||
|
||
|
||
categoryIDs = ["9126599100", "7299191148", "7132357466", "1137267511", "7284914844"]
|
||
|
||
|
||
@app.route("/get_playlists", methods=["GET"])
|
||
def get_playlists():
|
||
# print("get_playlists")
|
||
global all_playlists
|
||
if not all_playlists:
|
||
for categoryID in categoryIDs:
|
||
# print(categoryID)
|
||
diss_info, music_info_list = get_song_list(categoryID)
|
||
all_playlists.append(
|
||
{"diss_info": diss_info, "music_info_list": music_info_list}
|
||
)
|
||
print(diss_info)
|
||
return jsonify(all_playlists)
|
||
|
||
|
||
@app.route("/search", methods=["POST"])
|
||
def search():
|
||
# 获取前端发送过来的数据
|
||
search_term = request.json.get("term")
|
||
print(search_term)
|
||
search_results = get_music_info(search_term)
|
||
|
||
# 返回处理后的数据给前端
|
||
return jsonify(search_results)
|
||
|
||
|
||
@app.route("/get_album_image", methods=["POST"])
|
||
def get_album_image():
|
||
data = request.get_json()
|
||
singer_name = data.get("singer_name")
|
||
|
||
if not singer_name:
|
||
return jsonify({"error": "Singer name is required"}), 400
|
||
|
||
album_img_url, error = fetch_album_image(singer_name)
|
||
|
||
if album_img_url:
|
||
return jsonify({"album_img_url": album_img_url}), 200
|
||
else:
|
||
return jsonify({"error": error}), 404
|
||
|
||
|
||
@app.route("/song_clicked", methods=["POST"])
|
||
def song_clicked():
|
||
global playlist, current_song_index
|
||
data = request.json
|
||
songmid = data.get("songmid")
|
||
print(f"Song clicked with songmid: {songmid}")
|
||
# 查找歌曲是否已在播放列表中
|
||
song_in_playlist = next(
|
||
(song for song in playlist if song["songmid"] == songmid), None
|
||
)
|
||
|
||
if song_in_playlist:
|
||
current_song_index = playlist.index(song_in_playlist)
|
||
next_song = playlist[current_song_index]
|
||
current_song.update(
|
||
{
|
||
"album_img_url": next_song["album_img_url"],
|
||
"music_name": next_song["music_name"],
|
||
"singer_name": next_song["singer_name"],
|
||
"is_playing": True,
|
||
}
|
||
)
|
||
print(current_song)
|
||
# 获取并播放下一首歌曲
|
||
music_page = f"https://y.qq.com/n/yqq/song/{next_song['songmid']}.html"
|
||
music_url = player.fetch_music_url_by_mid(next_song["songmid"])
|
||
if not music_url:
|
||
music_url = player.fetch_music_url(music_page)
|
||
socketio.sleep(0.5)
|
||
player.play_music(music_url)
|
||
else:
|
||
# 如果歌曲不在播放列表中,则添加到播放列表
|
||
music_page = f"https://y.qq.com/n/yqq/song/{songmid}.html"
|
||
music_url = player.fetch_music_url_by_mid(songmid)
|
||
if not music_url:
|
||
music_url = player.fetch_music_url(music_page)
|
||
|
||
if music_url:
|
||
# 获取歌曲信息并添加到播放列表
|
||
song_info = {
|
||
"album_img_url": data.get("album_img_url"),
|
||
"music_name": data.get("music_name"),
|
||
"singer_name": data.get("singer_name"),
|
||
"songmid": songmid,
|
||
}
|
||
playlist.append(song_info)
|
||
current_song_index = len(playlist) - 1
|
||
|
||
# 开始播放该歌曲
|
||
current_song.update(
|
||
{
|
||
"album_img_url": song_info["album_img_url"],
|
||
"music_name": song_info["music_name"],
|
||
"singer_name": song_info["singer_name"],
|
||
"is_playing": True,
|
||
}
|
||
)
|
||
player.play_music(music_url)
|
||
print(playlist)
|
||
|
||
return jsonify({"status": "success", "message": "Music started"})
|
||
else:
|
||
return jsonify({"status": "error", "message": "Failed to fetch music URL"})
|
||
|
||
return jsonify({"status": "success", "message": "Song added to playlist"})
|
||
|
||
|
||
@app.route("/playlist_update", methods=["POST"])
|
||
def playlist_update():
|
||
global playlist, current_song_index
|
||
data = request.json
|
||
new_playlist = data.get("playlist", [])
|
||
|
||
if not new_playlist:
|
||
return jsonify({"status": "error", "message": "Playlist is empty"})
|
||
|
||
# 清空当前播放列表并更新为新的播放列表
|
||
playlist = []
|
||
for song in new_playlist:
|
||
song_info = {
|
||
"album_img_url": song.get("album_img_url"),
|
||
"music_name": song.get("music_name"),
|
||
"singer_name": song.get("singer_name"),
|
||
"songmid": song.get("songmid"),
|
||
}
|
||
playlist.append(song_info)
|
||
|
||
# 设置当前播放的歌曲为新列表中的第一首
|
||
current_song_index = 0
|
||
first_song = playlist[current_song_index]
|
||
current_song.update(
|
||
{
|
||
"album_img_url": first_song["album_img_url"],
|
||
"music_name": first_song["music_name"],
|
||
"singer_name": first_song["singer_name"],
|
||
"is_playing": True,
|
||
}
|
||
)
|
||
|
||
# 获取并播放第一首歌
|
||
music_page = f"https://y.qq.com/n/yqq/song/{first_song['songmid']}.html"
|
||
music_url = player.fetch_music_url_by_mid(first_song["songmid"])
|
||
if not music_url:
|
||
music_url = player.fetch_music_url(music_page)
|
||
|
||
if music_url:
|
||
socketio.sleep(0.5)
|
||
player.play_music(music_url)
|
||
print(playlist)
|
||
return jsonify(
|
||
{"status": "success", "message": "Playlist updated and playing first song"}
|
||
)
|
||
else:
|
||
return jsonify(
|
||
{"status": "error", "message": "Failed to fetch music URL for first song"}
|
||
)
|
||
|
||
|
||
@app.route("/pause_music", methods=["POST"])
|
||
def pause_music():
|
||
player.pause_music()
|
||
return jsonify({"status": "success", "message": "Music paused"})
|
||
|
||
|
||
@app.route("/resume_music", methods=["POST"])
|
||
def resume_music():
|
||
if player.is_playing:
|
||
player.resume_music()
|
||
else:
|
||
next_song = playlist[current_song_index]
|
||
current_song.update(
|
||
{
|
||
"album_img_url": next_song["album_img_url"],
|
||
"music_name": next_song["music_name"],
|
||
"singer_name": next_song["singer_name"],
|
||
"is_playing": True,
|
||
}
|
||
)
|
||
print(current_song)
|
||
music_page = f"https://y.qq.com/n/yqq/song/{next_song['songmid']}.html"
|
||
music_url = player.fetch_music_url_by_mid(next_song["songmid"])
|
||
if not music_url:
|
||
music_url = player.fetch_music_url(music_page)
|
||
socketio.sleep(0.5)
|
||
player.play_music(music_url)
|
||
return jsonify({"status": "success", "message": "Music resumed"})
|
||
|
||
|
||
@app.route("/stop_music", methods=["POST"])
|
||
def stop_music():
|
||
player.stop_music()
|
||
return jsonify({"status": "success", "message": "Music stopped"})
|
||
|
||
|
||
@app.route("/play_last_song", methods=["POST"])
|
||
def play_last_song():
|
||
global current_song_index
|
||
if current_song_index > 0:
|
||
current_song_index -= 1
|
||
else:
|
||
current_song_index = len(playlist) - 1
|
||
print(current_song_index)
|
||
next_song = playlist[current_song_index]
|
||
current_song.update(
|
||
{
|
||
"album_img_url": next_song["album_img_url"],
|
||
"music_name": next_song["music_name"],
|
||
"singer_name": next_song["singer_name"],
|
||
"is_playing": True,
|
||
}
|
||
)
|
||
|
||
music_page = f"https://y.qq.com/n/yqq/song/{next_song['songmid']}.html"
|
||
music_url = player.fetch_music_url_by_mid(next_song["songmid"])
|
||
if not music_url:
|
||
music_url = player.fetch_music_url(music_page)
|
||
if music_url:
|
||
player.play_music(music_url)
|
||
return jsonify({"status": "success", "message": "Playing last song"})
|
||
else:
|
||
return jsonify({"status": "error", "message": "Failed to fetch music URL"})
|
||
|
||
|
||
@app.route("/play_next_song", methods=["POST"])
|
||
def play_next_song():
|
||
global current_song_index
|
||
current_song_index = (current_song_index + 1) % len(playlist)
|
||
print(current_song_index)
|
||
|
||
next_song = playlist[current_song_index]
|
||
current_song.update(
|
||
{
|
||
"album_img_url": next_song["album_img_url"],
|
||
"music_name": next_song["music_name"],
|
||
"singer_name": next_song["singer_name"],
|
||
"is_playing": True,
|
||
}
|
||
)
|
||
|
||
music_page = f"https://y.qq.com/n/yqq/song/{next_song['songmid']}.html"
|
||
music_url = player.fetch_music_url_by_mid(next_song["songmid"])
|
||
if not music_url:
|
||
music_url = player.fetch_music_url(music_page)
|
||
if music_url:
|
||
player.play_music(music_url)
|
||
return jsonify({"status": "success", "message": "Playing next song"})
|
||
else:
|
||
return jsonify({"status": "error", "message": "Failed to fetch music URL"})
|
||
|
||
|
||
@socketio.on("seek_music")
|
||
def handle_seek_music(position):
|
||
print(position)
|
||
player.seek_music(position)
|
||
|
||
|
||
def generate_playlist(music_info_list):
|
||
# 使用列表推导式生成字典列表
|
||
playlist = [
|
||
{
|
||
"music_name": song[0],
|
||
"singer_name": song[1],
|
||
"songmid": song[2],
|
||
"album_img_url": (
|
||
f"http://y.gtimg.cn/music/photo_new/T002R180x180M000{song[3]}.jpg"
|
||
if song[3]
|
||
else ""
|
||
),
|
||
}
|
||
for song in music_info_list
|
||
]
|
||
return playlist
|
||
|
||
|
||
# 全局变量来存储歌曲信息和播放状态
|
||
current_song = {
|
||
"album_img_url": "https://via.placeholder.com/300x300",
|
||
"music_name": None,
|
||
"singer_name": None,
|
||
"is_playing": False, # 添加播放状态
|
||
}
|
||
|
||
# 全局变量来存储播放列表
|
||
# playlist = [{
|
||
# 'album_img_url': 'http://y.gtimg.cn/music/photo_new/T002R180x180M000001G7iIK00yQyr.jpg',
|
||
# 'music_name': '大鱼海棠主题曲——大鱼',
|
||
# 'singer_name': '凤凉柒',
|
||
# 'songmid': '000Qf8b01p0vIJ'}]
|
||
playlist = []
|
||
# Index to keep track of current song in playlist
|
||
current_song_index = 0
|
||
|
||
if len(playlist) > 0:
|
||
current_song.update(
|
||
{
|
||
"album_img_url": playlist[0]["album_img_url"],
|
||
"music_name": playlist[0]["music_name"],
|
||
"singer_name": playlist[0]["singer_name"],
|
||
"is_playing": False, # 或者根据实际情况设置
|
||
}
|
||
)
|
||
print("playlist: ", playlist)
|
||
else:
|
||
|
||
diss_info, music_info_list = get_song_list(categoryIDs[0])
|
||
# print(diss_info, music_info_list)
|
||
playlist = generate_playlist(music_info_list)
|
||
print("playlist:", playlist)
|
||
|
||
if not playlist:
|
||
playlist = [
|
||
{
|
||
"album_img_url": "https://via.placeholder.com/300x300",
|
||
"music_name": "默认音乐",
|
||
"singer_name": "未知艺术家",
|
||
"songmid": "000000",
|
||
}
|
||
]
|
||
|
||
current_song.update(
|
||
{
|
||
"album_img_url": playlist[0]["album_img_url"],
|
||
"music_name": playlist[0]["music_name"],
|
||
"singer_name": playlist[0]["singer_name"],
|
||
"is_playing": False, # 或者根据实际情况设置
|
||
}
|
||
)
|
||
|
||
|
||
# 创建线程锁,确保数据安全
|
||
thread_lock = Lock()
|
||
|
||
|
||
# 定时任务,每隔一秒发送当前歌曲信息到前端
|
||
def music_background_thread():
|
||
global playlist, current_song_index
|
||
while True:
|
||
socketio.sleep(1) # 间隔一秒钟
|
||
with thread_lock:
|
||
current_song["is_playing"] = not player.is_paused and player.is_playing
|
||
socketio.emit("current_song", current_song, namespace="/")
|
||
if player.is_playing:
|
||
current_position = player.get_current_position()
|
||
music_length = player.get_music_length()
|
||
# print(current_position)
|
||
|
||
if current_position < 0:
|
||
# 切换到下一首歌曲
|
||
current_song_index = (current_song_index + 1) % len(playlist)
|
||
next_song = playlist[current_song_index]
|
||
current_song.update(
|
||
{
|
||
"album_img_url": next_song["album_img_url"],
|
||
"music_name": next_song["music_name"],
|
||
"singer_name": next_song["singer_name"],
|
||
"is_playing": True,
|
||
}
|
||
)
|
||
print(current_song)
|
||
# 获取并播放下一首歌曲
|
||
music_page = (
|
||
f"https://y.qq.com/n/yqq/song/{next_song['songmid']}.html"
|
||
)
|
||
music_url = player.fetch_music_url_by_mid(next_song["songmid"])
|
||
if not music_url:
|
||
music_url = player.fetch_music_url(music_page)
|
||
socketio.sleep(0.15)
|
||
player.play_music(music_url)
|
||
|
||
socketio.emit(
|
||
"music_status",
|
||
{"position": current_position, "length": music_length},
|
||
)
|
||
|
||
|
||
@app.route("/set_current_song", methods=["POST"])
|
||
def set_current_song():
|
||
global current_song
|
||
data = request.json
|
||
current_song.update(
|
||
{
|
||
"album_img_url": data.get("album_img_url"),
|
||
"music_name": data.get("music_name"),
|
||
"singer_name": data.get("singer_name"),
|
||
"is_playing": data.get("is_playing", False), # 更新播放状态
|
||
}
|
||
)
|
||
return jsonify({"status": "success"})
|
||
|
||
|
||
@app.route("/get_current_song", methods=["GET"])
|
||
def get_current_song():
|
||
if current_song["music_name"]:
|
||
return jsonify({"status": "success", "data": current_song})
|
||
return jsonify({"status": "error", "message": "No song is currently set"})
|
||
|
||
|
||
ai_chat_list = []
|
||
|
||
|
||
async def send_message(message, port=8766):
|
||
uri = "ws://localhost:" + str(port)
|
||
# start_time = time.time()
|
||
async with websockets.connect(uri) as websocket:
|
||
# print(time.time() - start_time)
|
||
await websocket.send(message)
|
||
response = await websocket.recv()
|
||
return response
|
||
|
||
|
||
@socketio.on("send_message")
|
||
def handle_message(data):
|
||
message = data["message"]
|
||
if not message:
|
||
return jsonify({"error": "Message not provided"}), 400
|
||
socketio.emit("add_message", {"sender": "user", "message": message})
|
||
ai_chat_list.append({"sender": "user", "message": message})
|
||
|
||
loop = asyncio.new_event_loop()
|
||
asyncio.set_event_loop(loop)
|
||
task = asyncio.ensure_future(send_message(f"'user_input': \"{message}\""))
|
||
loop.run_until_complete(task)
|
||
|
||
|
||
last_ai_message_time = None
|
||
|
||
|
||
@app.route("/ai_respon", methods=["POST"])
|
||
def ai_response():
|
||
global last_ai_message_time
|
||
|
||
# 获取 POST 请求的数据
|
||
message = request.form.get("message")
|
||
if not message:
|
||
return jsonify({"status": "error", "message": "No message provided"})
|
||
|
||
current_time = time.time()
|
||
|
||
# 如果是首次或距离上次超过5秒,添加新消息
|
||
if not last_ai_message_time or (current_time - last_ai_message_time) > 5:
|
||
ai_chat_list.append({"sender": "ai", "message": message})
|
||
else:
|
||
# 在最近一条AI消息上叠加
|
||
if ai_chat_list and ai_chat_list[-1]["sender"] == "ai":
|
||
ai_chat_list[-1]["message"] += " " + message
|
||
else:
|
||
ai_chat_list.append({"sender": "ai", "message": message})
|
||
|
||
# 更新最后一次AI消息时间
|
||
last_ai_message_time = current_time
|
||
|
||
socketio.emit("add_message", {"sender": "ai", "message": message})
|
||
return jsonify({"status": "success", "message": "Received ai message"})
|
||
|
||
|
||
@app.route("/user_input", methods=["POST"])
|
||
def user_input():
|
||
# 获取 POST 请求的数据
|
||
message = request.form.get("message")
|
||
if not message:
|
||
return jsonify({"status": "error", "message": "No message provided"})
|
||
socketio.emit("add_message", {"sender": "user", "message": message})
|
||
ai_chat_list.append({"sender": "user", "message": message})
|
||
# 返回 JSON 格式的响应
|
||
return jsonify({"status": "success", "message": "Received user message"})
|
||
|
||
|
||
@app.route("/get_history", methods=["GET"])
|
||
def get_history():
|
||
return jsonify(ai_chat_list)
|
||
|
||
|
||
def process_audio_and_emit(path):
|
||
x, sr = librosa.load(path, sr=8000)
|
||
|
||
start_time = time.time()
|
||
|
||
# Emit mouth movement data through socketio
|
||
try:
|
||
for _ in range(int(len(x) / 800)):
|
||
index = int((time.time() - start_time) * 8000) + 1
|
||
if index < len(x):
|
||
# new_value = x[index]
|
||
new_value = abs(x[index] * 5)
|
||
socketio.emit("mouth_movement", {"value": str(new_value)})
|
||
time.sleep(0.1)
|
||
except Exception as e:
|
||
print(f"Error in mouth movement emission: {e}")
|
||
|
||
# Reset mouth movement value at the end
|
||
socketio.emit("mouth_movement", {"value": str(0.0)})
|
||
|
||
|
||
@app.route("/lip_sync", methods=["POST"])
|
||
def lip_sync():
|
||
try:
|
||
# 获取 POST 请求的数据
|
||
path = request.form.get("path")
|
||
print(path)
|
||
if path.startswith("pre_mp3"):
|
||
path = "../" + path
|
||
|
||
# 在单独的线程中处理音频和口型数据
|
||
thread = threading.Thread(target=process_audio_and_emit, args=(path,))
|
||
thread.start()
|
||
|
||
# 立即返回 JSON 格式的响应
|
||
return jsonify({"status": "success"})
|
||
|
||
except Exception as e:
|
||
print(f"Error: {e}")
|
||
return jsonify({"status": "error", "message": str(e)})
|
||
|
||
import traceback
|
||
|
||
massage_status = {
|
||
"progress": 0,
|
||
"force": '',
|
||
"temperature": '',
|
||
"gear": '',
|
||
"is_massaging": False,
|
||
"current_task": "",
|
||
"task_time": "",
|
||
"current_head": "thermotherapy",
|
||
"body_part": "back",
|
||
"massage_service_started": False,
|
||
"press": '',
|
||
"frequency": '',
|
||
}
|
||
|
||
|
||
@app.route("/update_massage_status", methods=["POST"])
|
||
def update_status():
|
||
try:
|
||
print(f"update_massage_status{request.form}")
|
||
|
||
# 获取 POST 请求的数据
|
||
progress = request.form.get("progress")
|
||
force = request.form.get("force")
|
||
temperature = request.form.get("temperature")
|
||
gear = request.form.get("gear")
|
||
is_massaging = request.form.get("is_massaging")
|
||
current_task = request.form.get("current_task")
|
||
task_time = request.form.get("task_time")
|
||
body_part = request.form.get("body_part")
|
||
current_head = request.form.get("current_head")
|
||
massage_service_started = request.form.get("massage_service_started")
|
||
press = request.form.get("press")
|
||
frequency = request.form.get("frequency")
|
||
|
||
# 仅在非空时更新字典中的值
|
||
if progress:
|
||
massage_status["progress"] = progress
|
||
if force and force != "0":
|
||
print(force)
|
||
massage_status["force"] = force
|
||
if temperature and temperature != "0":
|
||
print(temperature)
|
||
massage_status["temperature"] = temperature
|
||
if gear and gear != "0":
|
||
print(gear)
|
||
massage_status["gear"] = gear
|
||
if is_massaging:
|
||
if is_massaging == "False" or is_massaging == "false":
|
||
is_massaging = False
|
||
global display_image_url
|
||
display_image_url = "static/images/smart_mode/back.png"
|
||
socketio.emit("change_image", {"path": display_image_url})
|
||
if is_massaging == "True" or is_massaging == "true":
|
||
is_massaging = True
|
||
massage_status["is_massaging"] = is_massaging
|
||
if current_task:
|
||
task_to_index = {
|
||
"lung": 0,
|
||
"heart": 1,
|
||
"hepatobiliary": 2,
|
||
"spleen": 3,
|
||
"kidney": 4,
|
||
"lung_left": 0,
|
||
"heart_left": 1,
|
||
"hepatobiliary_left": 2,
|
||
"spleen_left": 3,
|
||
"kidney_left": 4,
|
||
"lung_right": 0,
|
||
"heart_right": 1,
|
||
"hepatobiliary_right": 2,
|
||
"spleen_right": 3,
|
||
"kidney_right": 4,
|
||
}
|
||
massage_status["current_task"] = current_task
|
||
# 获取对应的索引
|
||
index = task_to_index.get(current_task, None)
|
||
print(current_task, index)
|
||
if index is not None:
|
||
# 发送索引到前端
|
||
socketio.emit("highlightPlane", index)
|
||
else:
|
||
socketio.emit("highlightPlane", None)
|
||
elif current_task == "-1":
|
||
socketio.emit("highlightPlane", None)
|
||
if task_time is not None:
|
||
if task_time == "null":
|
||
task_time = ""
|
||
massage_status["task_time"] = task_time
|
||
if current_head:
|
||
massage_status["current_head"] = current_head
|
||
if body_part:
|
||
massage_status["body_part"] = body_part
|
||
if press:
|
||
massage_status["press"] = press
|
||
if frequency:
|
||
massage_status["frequency"] = frequency
|
||
if massage_service_started:
|
||
print(massage_service_started)
|
||
if massage_service_started == "False" or massage_service_started == "false":
|
||
massage_service_started = False
|
||
if massage_service_started == "True" or massage_service_started == "true":
|
||
massage_service_started = True
|
||
massage_status["massage_service_started"] = massage_service_started
|
||
|
||
print(massage_status)
|
||
|
||
# 通过 SocketIO 发送更新后的数据
|
||
socketio.emit("update_massage_status", massage_status)
|
||
|
||
return jsonify({"status": "success"})
|
||
|
||
except Exception as e:
|
||
print(f"Error: {e}")
|
||
return jsonify({"status": "error", "message": str(e)})
|
||
|
||
|
||
def is_service_running(service_name):
|
||
try:
|
||
# 使用 subprocess.run 执行 systemctl is-active 命令
|
||
result = subprocess.run(
|
||
["/bin/systemctl", "is-active", service_name],
|
||
stdout=subprocess.PIPE,
|
||
stderr=subprocess.PIPE,
|
||
check=True,
|
||
text=True,
|
||
)
|
||
# 如果返回的输出是 'active',则服务正在运行
|
||
return result.stdout.strip() == "active"
|
||
except subprocess.CalledProcessError:
|
||
# 如果服务未运行,或者其他错误,返回 False
|
||
return False
|
||
|
||
|
||
@app.route("/get_status", methods=["GET"])
|
||
def get_status():
|
||
global display_image_url
|
||
try:
|
||
command = "get_status"
|
||
loop = asyncio.new_event_loop()
|
||
asyncio.set_event_loop(loop)
|
||
task = asyncio.ensure_future(send_message(command, 8765))
|
||
loop.run_until_complete(task)
|
||
except Exception as e:
|
||
print(f"Error: {e}")
|
||
|
||
file_url = display_image_url
|
||
# 使用 Socket.IO 发送 URL 给前端
|
||
socketio.emit("change_image", {"path": file_url})
|
||
# try:
|
||
# command = 'get_status'
|
||
# loop = asyncio.new_event_loop()
|
||
# asyncio.set_event_loop(loop)
|
||
# task = asyncio.ensure_future(send_message(command,8766))
|
||
# loop.run_until_complete(task)
|
||
# except Exception as e:
|
||
|
||
# # print(f"Error: {e}")
|
||
if is_service_running("massage.service"):
|
||
massage_status["massage_service_started"] = True
|
||
else:
|
||
massage_status["massage_service_started"] = False
|
||
|
||
print(massage_status)
|
||
return jsonify(massage_status)
|
||
|
||
|
||
######
|
||
last_command_time = 0
|
||
|
||
|
||
@socketio.on("send_command")
|
||
def send_command(data):
|
||
# global last_command_time
|
||
|
||
# current_time = time.time()
|
||
# if current_time - last_command_time < 1:
|
||
# # if less than 3 seconds passed since the last command, ignore this command
|
||
# return
|
||
# last_command_time = current_time
|
||
|
||
if "begin" in data:
|
||
try:
|
||
command = "UI_start"
|
||
loop = asyncio.new_event_loop()
|
||
asyncio.set_event_loop(loop)
|
||
task = asyncio.ensure_future(send_message(command, 8766))
|
||
loop.run_until_complete(task)
|
||
except Exception as e:
|
||
print(f"Error: {e}")
|
||
|
||
try:
|
||
command = data
|
||
loop = asyncio.new_event_loop()
|
||
asyncio.set_event_loop(loop)
|
||
task = asyncio.ensure_future(send_message(command, 8765))
|
||
loop.run_until_complete(task)
|
||
except Exception as e:
|
||
print(f"Error: {e}")
|
||
|
||
|
||
# @app.before_request
|
||
# def before_request():
|
||
# if request.url.startswith('https://'):
|
||
# url = request.url.replace('https://', 'http://', 1)
|
||
# code = 301 # redirect permanently
|
||
# return redirect(url, code=code)
|
||
@app.after_request
|
||
def after_request(response):
|
||
response.headers.add("Access-Control-Allow-Origin", "*")
|
||
response.headers.add("Access-Control-Allow-Headers", "Content-Type,Authorization")
|
||
response.headers.add("Access-Control-Allow-Methods", "GET,PUT,POST,DELETE")
|
||
return response
|
||
|
||
|
||
@app.route("/")
|
||
def index():
|
||
return render_template("login.html", time=int(time.time()))
|
||
|
||
|
||
@app.route("/home")
|
||
def home():
|
||
return render_template("home.html", time=int(time.time()))
|
||
|
||
|
||
@app.route("/health")
|
||
def health():
|
||
global current_mode
|
||
if current_mode == "manual_mode":
|
||
return render_template(
|
||
"select_program.html", time=int(time.time()), programs=programs
|
||
)
|
||
else:
|
||
return render_template("smart_mode.html", time=int(time.time()))
|
||
# return render_template('smart_mode_JP.html', time=int(time.time()))
|
||
|
||
|
||
@app.route("/setting")
|
||
def setting():
|
||
return render_template("setting.html", time=int(time.time()))
|
||
|
||
|
||
@app.route("/learning")
|
||
def learning():
|
||
return render_template("learning.html", time=int(time.time()))
|
||
|
||
|
||
@app.route("/help")
|
||
def help():
|
||
return render_template("help.html", time=int(time.time()))
|
||
|
||
|
||
@app.route("/ai_ball")
|
||
def ai_ball():
|
||
return render_template("ai_ball_v2.html", time=int(time.time()))
|
||
|
||
|
||
@app.route("/model")
|
||
def model():
|
||
return render_template("3d_model_v6_local.html", time=int(time.time()))
|
||
|
||
|
||
@app.route("/ai_chatbot")
|
||
def ai_chatbot():
|
||
return render_template("ai_chatbot.html", time=int(time.time()))
|
||
|
||
|
||
@app.route("/dynamic_function")
|
||
def dynamic_function():
|
||
return render_template("dynamic_function_v2.html", time=int(time.time()))
|
||
|
||
|
||
@app.route("/control_panel")
|
||
def control_panel():
|
||
return render_template("control_panel_v2.html", time=int(time.time()))
|
||
|
||
|
||
@app.route("/music")
|
||
def music():
|
||
return render_template("full_music_player.html", time=int(time.time()))
|
||
|
||
|
||
@app.route("/ir_list")
|
||
def ir_list():
|
||
return render_template("ir_list.html", time=int(time.time()))
|
||
|
||
|
||
@app.route("/joystick")
|
||
def joystick():
|
||
return render_template("joystick_v2.html", time=int(time.time()))
|
||
|
||
|
||
@app.route("/program-details/<int:program_id>")
|
||
def program_details(program_id):
|
||
program = next(
|
||
(program for program in programs if program["id"] == program_id), None
|
||
)
|
||
if program:
|
||
return jsonify(program)
|
||
else:
|
||
return jsonify({"error": "program not found"}), 404
|
||
|
||
|
||
@app.route("/switch_mode/<mode>")
|
||
def switch_mode(mode):
|
||
# session['current_mode'] = mode
|
||
global current_mode
|
||
current_mode = mode
|
||
return redirect(url_for("health"))
|
||
|
||
|
||
@app.route("/get-ip")
|
||
def get_ip():
|
||
# 获取服务器的IP
|
||
host = request.host.split(":")[0]
|
||
return jsonify({"ip": host})
|
||
|
||
|
||
@app.route("/login", methods=["POST"])
|
||
def login():
|
||
data = request.json
|
||
username = data["username"]
|
||
password = data["password"]
|
||
|
||
# 简单验证示例
|
||
if username == "jsfb" and password == "123456":
|
||
return jsonify(success=True)
|
||
else:
|
||
return jsonify(success=False)
|
||
|
||
|
||
@app.route("/massage_control", methods=["POST"])
|
||
def massage_control():
|
||
action = request.form.get("action")
|
||
print("massage_control action: ", action)
|
||
sudo_password = "jsfb"
|
||
|
||
remote_host = "192.168.100.20" # 远程主机的 IP 地址
|
||
remote_username = "root" # 远程主机的用户名
|
||
remote_password = "bestcobot" # 远程主机的密码
|
||
remote_port = 8822 # 远程主机的 SSH 端口
|
||
|
||
if action:
|
||
if action == "start":
|
||
action = "restart"
|
||
try:
|
||
command = "connect_arm"
|
||
loop = asyncio.new_event_loop()
|
||
asyncio.set_event_loop(loop)
|
||
task = asyncio.ensure_future(send_message(command, 8766))
|
||
loop.run_until_complete(task)
|
||
except Exception as e:
|
||
print(f"Error: {e}")
|
||
if action == "stop":
|
||
try:
|
||
command = "disconnect_arm"
|
||
loop = asyncio.new_event_loop()
|
||
asyncio.set_event_loop(loop)
|
||
task = asyncio.ensure_future(send_message(command, 8766))
|
||
loop.run_until_complete(task)
|
||
except Exception as e:
|
||
print(f"Error: {e}")
|
||
|
||
# 处理本地关机
|
||
if action == "shutdown":
|
||
# 1. 先执行远程主机关机命令
|
||
remote_command = "sudo /sbin/shutdown now"
|
||
stdout, stderr = execute_command_on_remote(
|
||
remote_host,
|
||
remote_username,
|
||
remote_password,
|
||
remote_command,
|
||
port=remote_port,
|
||
)
|
||
|
||
# if stderr:
|
||
# return jsonify({"status": "error", "message": f"Failed to shutdown remote host: {stderr}"}), 500
|
||
|
||
# 2. 等待远程主机关机并检测
|
||
print("Waiting for remote host to shutdown...")
|
||
if not is_host_down(remote_host):
|
||
return (
|
||
jsonify(
|
||
{
|
||
"status": "error",
|
||
"message": "Remote host did not shutdown in time",
|
||
}
|
||
),
|
||
500,
|
||
)
|
||
|
||
# 3. 远程主机已经关机,执行本地关机
|
||
command = f'echo {sudo_password} | /bin/sh -c "/usr/bin/sudo -S /sbin/shutdown now"'
|
||
try:
|
||
result = subprocess.run(
|
||
command,
|
||
shell=True,
|
||
check=True,
|
||
stdout=subprocess.PIPE,
|
||
stderr=subprocess.PIPE,
|
||
)
|
||
print("System is shutting down.")
|
||
except subprocess.CalledProcessError as e:
|
||
return (
|
||
jsonify(
|
||
{
|
||
"status": "error",
|
||
"message": f"Error when trying to shutdown: {e.stderr.decode()}",
|
||
}
|
||
),
|
||
500,
|
||
)
|
||
|
||
return jsonify(
|
||
{
|
||
"status": "success",
|
||
"message": "Remote and local systems shutdown successfully.",
|
||
}
|
||
)
|
||
|
||
# 非 shutdown 操作
|
||
else:
|
||
command = f'echo {sudo_password} | /bin/sh -c "/usr/bin/sudo -S /bin/systemctl {action} massage"'
|
||
try:
|
||
result = subprocess.run(
|
||
command,
|
||
shell=True,
|
||
check=True,
|
||
stdout=subprocess.PIPE,
|
||
stderr=subprocess.PIPE,
|
||
)
|
||
print(result.stdout.decode())
|
||
print(f"Service {action} successfully.")
|
||
except subprocess.CalledProcessError as e:
|
||
print(f"Error when {action} service: {e.stderr.decode()}")
|
||
|
||
return jsonify({"status": "success"})
|
||
|
||
else:
|
||
return jsonify({"status": "error", "message": "Invalid action"}), 400
|
||
|
||
|
||
@app.route("/get_versions", methods=["GET"])
|
||
def get_versions():
|
||
base_dir = "/home/jsfb/jsfb_ws"
|
||
folders = os.listdir(base_dir)
|
||
|
||
versions = []
|
||
|
||
# 遍历文件夹列表
|
||
for folder in folders:
|
||
# 检查是否以 MassageRobot_aubo 开头
|
||
if folder.startswith("MassageRobot_aubo-"):
|
||
# 提取版本号部分
|
||
version = folder.replace("MassageRobot_aubo-", "")
|
||
versions.append(version)
|
||
return jsonify({"versions": versions})
|
||
|
||
|
||
# 文件服务器的基本信息
|
||
FILE_SERVER_URL = "http://8.138.8.114/files/MassageRobot/aubo" # 文件服务器的URL
|
||
FILE_SERVER_USERNAME = "jsfb" # 基本身份验证的用户名
|
||
FILE_SERVER_PASSWORD = "jsfb" # 基本身份验证的密码
|
||
DOWNLOAD_DIR = "/home/jsfb/Downloads" # 下载文件的目录,需根据实际情况设置
|
||
DOWNLOAD_SCRIPT_PATH = (
|
||
"/home/jsfb/jsfb_ws/OTA_tools/download_and_install_package.sh" # Bash 脚本路径
|
||
)
|
||
INSTALL_DIR = "/home/jsfb/jsfb_ws/"
|
||
|
||
|
||
@app.route("/get_remote_versions", methods=["GET"])
|
||
def get_remote_versions():
|
||
try:
|
||
# 发送请求,使用 HTTP 基本身份验证
|
||
response = requests.get(
|
||
FILE_SERVER_URL,
|
||
auth=HTTPBasicAuth(FILE_SERVER_USERNAME, FILE_SERVER_PASSWORD),
|
||
)
|
||
|
||
# 检查请求是否成功
|
||
if response.status_code != 200:
|
||
return jsonify({"error": "Failed to access file server"}), 500
|
||
|
||
# 获取文件服务器返回的内容(HTML格式),解析内容
|
||
html_content = response.text
|
||
|
||
# 使用正则表达式提取文件名
|
||
pattern = r"MassageRobot_aubo-([\w\.\-]+)\.tar\.gz"
|
||
matches = re.findall(pattern, html_content)
|
||
|
||
# 使用 set 去重
|
||
unique_versions = sorted(set(matches))
|
||
|
||
# 如果匹配到的内容为空,返回错误信息
|
||
if not unique_versions:
|
||
return jsonify({"error": "No versions found"}), 404
|
||
|
||
# 返回去重后的版本号列表
|
||
return jsonify({"versions": unique_versions})
|
||
|
||
except Exception as e:
|
||
return jsonify({"error": str(e)}), 500
|
||
|
||
|
||
# @app.route("/download_package", methods=["POST"])
|
||
# def download_package():
|
||
# try:
|
||
# # 获取用户提供的版本号
|
||
# data = request.get_json()
|
||
# version = data.get("version")
|
||
|
||
# if not version:
|
||
# return jsonify({"error": "Version not specified"}), 400
|
||
|
||
# # 构造文件名
|
||
# file_name = f"MassageRobot_aubo-{version}.tar.gz"
|
||
|
||
# # 构造 Bash 脚本的命令
|
||
# command = [
|
||
# "bash",
|
||
# DOWNLOAD_SCRIPT_PATH,
|
||
# "-f",
|
||
# file_name,
|
||
# "-d",
|
||
# DOWNLOAD_DIR,
|
||
# "-u",
|
||
# FILE_SERVER_USERNAME,
|
||
# "-p",
|
||
# FILE_SERVER_PASSWORD,
|
||
# "-l",
|
||
# FILE_SERVER_URL,
|
||
# "-i",
|
||
# INSTALL_DIR,
|
||
# ]
|
||
|
||
# # 在后台运行脚本,并将输出重定向到文件
|
||
# with open("../log/download_install_package.log", "w") as f:
|
||
# download_install_process = subprocess.Popen(
|
||
# command, stdout=f, stderr=subprocess.STDOUT
|
||
# )
|
||
|
||
# # 返回 "正在下载" 响应
|
||
# return jsonify({"message": f"Download of version {version} is in progress..."})
|
||
|
||
# except Exception as e:
|
||
# return jsonify({"error": str(e)}), 500
|
||
|
||
|
||
|
||
@app.route("/download_package", methods=["POST"])
|
||
def download_package():
|
||
try:
|
||
|
||
# 获取用户提供的版本号
|
||
data = request.get_json()
|
||
version = data.get("version")
|
||
|
||
if not version:
|
||
return jsonify({"error": "Version not specified"}), 400
|
||
|
||
# 构造文件名
|
||
file_name = f"MassageRobot_aubo-{version}.tar.gz"
|
||
|
||
# 构造 Bash 脚本的命令
|
||
command = [
|
||
"bash",
|
||
DOWNLOAD_SCRIPT_PATH,
|
||
"-f",
|
||
file_name,
|
||
"-d",
|
||
DOWNLOAD_DIR,
|
||
"-u",
|
||
FILE_SERVER_USERNAME,
|
||
"-p",
|
||
FILE_SERVER_PASSWORD,
|
||
"-l",
|
||
FILE_SERVER_URL,
|
||
"-i",
|
||
INSTALL_DIR,
|
||
]
|
||
|
||
|
||
# process = subprocess.Popen(
|
||
# command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, bufsize=1
|
||
# )
|
||
|
||
# BASE_DIR = os.path.dirname(os.path.abspath(__file__)) # 获取当前脚本的目录
|
||
# download_log_file = os.path.join(BASE_DIR, "log", "../log/Download.log.log") # 构造绝对路径
|
||
|
||
download_log_file = "../tmp/Download.log"
|
||
|
||
# **重置日志文件内容**
|
||
with open(download_log_file, "w") as log_file:
|
||
log_file.write("") # 清空日志文件
|
||
|
||
# 打开新的终端窗口运行命令
|
||
subprocess.Popen(
|
||
["gnome-terminal", "--", "bash", "-c", f"{' '.join(command)} > {download_log_file} 2>&1"],
|
||
stdout=subprocess.PIPE,
|
||
stderr=subprocess.STDOUT,
|
||
text=True,
|
||
bufsize=1,
|
||
)
|
||
|
||
# 后台任务:运行脚本并解析进度
|
||
@socketio.start_background_task
|
||
def download_and_emit_progress():
|
||
last_progress = 0 # 初始化进度
|
||
try:
|
||
with open(download_log_file, "r") as log_file:
|
||
while True:
|
||
line = log_file.readline()
|
||
if line:
|
||
# 打印到后端日志和终端
|
||
sys.stdout.write(line)
|
||
sys.stdout.flush()
|
||
|
||
# 检查是否有进度百分比
|
||
if "%" in line:
|
||
match = re.search(r"(\d+\.?\d*)%", line)
|
||
if match:
|
||
progress = float(match.group(1))
|
||
if progress - last_progress > 0: # 进度有更新
|
||
last_progress = progress
|
||
socketio.emit("download_progress", {"data": progress})
|
||
else:
|
||
time.sleep(0.1) # 等待新内容写入
|
||
except FileNotFoundError:
|
||
socketio.emit("download_progress", {"data": "Log file not found"})
|
||
except Exception as e:
|
||
socketio.emit("download_progress", {"data": f"Error: {str(e)}"})
|
||
|
||
# 启动后台任务
|
||
download_and_emit_progress()
|
||
|
||
# 返回初始响应
|
||
return jsonify({"message": f"Download of version {version} is in progress..."})
|
||
|
||
except Exception as e:
|
||
return jsonify({"error": str(e)}), 500
|
||
|
||
|
||
# 获取当前版本号
|
||
@app.route("/get_current_version", methods=["GET"])
|
||
def get_current_version():
|
||
# 获取当前的工作目录
|
||
# current_path = os.getcwd()
|
||
# 获取当前脚本执行的绝对路径
|
||
current_path = os.path.abspath(__file__)
|
||
|
||
# 获取当前脚本所在目录的上一级目录
|
||
base_path = os.path.dirname(os.path.dirname(current_path))
|
||
|
||
# 检查当前路径是否包含 MassageRobot_aubo- 开头的文件夹名
|
||
if "MassageRobot_aubo-" in base_path:
|
||
# 提取版本号部分并去除后续目录部分
|
||
version = base_path.split("MassageRobot_aubo-")[-1].split("/")[0]
|
||
else:
|
||
# 如果没有匹配的版本号,则返回默认值
|
||
version = "default"
|
||
|
||
print({"current_version": version})
|
||
return jsonify({"current_version": version})
|
||
|
||
|
||
# # 进行版本切换,并执行对应路径下的setup.sh
|
||
# @app.route("/switch_version", methods=["POST"])
|
||
# def switch_version():
|
||
# base_dir = "/home/jsfb/jsfb_ws"
|
||
# data = request.json
|
||
|
||
# # 获取客户端传递的版本号
|
||
# version = data.get("version")
|
||
# if not version:
|
||
# return jsonify({"error": "Version not specified"}), 400
|
||
|
||
# # 如果版本是 'default',则目标文件夹不带版本号后缀
|
||
# if version == "default":
|
||
# target_folder = os.path.join(base_dir, "MassageRobot_aubo")
|
||
# else:
|
||
# # 构建带版本号的目标文件夹路径
|
||
# target_folder = os.path.join(base_dir, f"MassageRobot_aubo-{version}")
|
||
|
||
# # 检查目标文件夹是否存在
|
||
# if not os.path.isdir(target_folder):
|
||
# return jsonify({"error": f"Version {version} not found"}), 404
|
||
|
||
# # 构建setup.sh脚本路径
|
||
# setup_script_path = os.path.join(target_folder, "setup.sh")
|
||
|
||
# # 检查setup.sh是否存在
|
||
# if not os.path.isfile(setup_script_path):
|
||
# return (
|
||
# jsonify({"error": "setup.sh not found in the target version folder"}),
|
||
# 404,
|
||
# )
|
||
|
||
# # 执行setup.sh脚本
|
||
# try:
|
||
# # 使用 subprocess.run 切换到目标目录并执行 setup.sh
|
||
# subprocess.run(["bash", "setup.sh"], check=True, cwd=target_folder)
|
||
|
||
# # 返回完成消息
|
||
# return jsonify({"message": f"Version {version} switched successfully"}), 200
|
||
# except subprocess.CalledProcessError as e:
|
||
# # 返回错误信息
|
||
# return jsonify({"error": f"Failed to switch version: {str(e)}"}), 500
|
||
@app.route("/switch_version", methods=["POST"])
|
||
def switch_version():
|
||
base_dir = "/home/jsfb/jsfb_ws"
|
||
data = request.json
|
||
|
||
# 获取客户端传递的版本号
|
||
version = data.get("version")
|
||
if not version:
|
||
return jsonify({"error": "Version not specified"}), 400
|
||
|
||
# 如果版本是 'default',则目标文件夹不带版本号后缀
|
||
if version == "default":
|
||
target_folder = os.path.join(base_dir, "MassageRobot_aubo")
|
||
else:
|
||
# 构建带版本号的目标文件夹路径
|
||
target_folder = os.path.join(base_dir, f"MassageRobot_aubo-{version}")
|
||
|
||
# 检查目标文件夹是否存在
|
||
if not os.path.isdir(target_folder):
|
||
return jsonify({"error": f"Version {version} not found"}), 404
|
||
|
||
# 构建setup.sh脚本路径
|
||
setup_script_path = os.path.join(target_folder, "setup.sh")
|
||
|
||
# 检查setup.sh是否存在
|
||
if not os.path.isfile(setup_script_path):
|
||
return (
|
||
jsonify({"error": "setup.sh not found in the target version folder"}), 404
|
||
)
|
||
|
||
# 执行setup.sh脚本
|
||
try:
|
||
# 使用 gnome-terminal 打开新终端窗口并在目标目录中运行 setup.sh
|
||
subprocess.Popen(
|
||
["gnome-terminal", "--", "bash", "-c", f"bash {setup_script_path}; exit"],
|
||
cwd=target_folder
|
||
)
|
||
|
||
# 返回完成消息
|
||
return jsonify({"message": f"Version {version} switched successfully"}), 200
|
||
except Exception as e:
|
||
# 返回错误信息
|
||
return jsonify({"error": f"Failed to switch version: {str(e)}"}), 500
|
||
|
||
|
||
@app.route("/on_message", methods=["POST"])
|
||
def on_message():
|
||
try:
|
||
# 获取 POST 请求中的 form 数据
|
||
message = request.form.get(
|
||
"message", ""
|
||
).strip() # 从表单数据中获取 'message' 字段,并去掉多余的空白字符
|
||
|
||
if not message:
|
||
# 如果 message 为空或只包含空格,返回错误响应,但不触发弹窗
|
||
return (
|
||
jsonify(
|
||
{
|
||
"status": "error",
|
||
"message": "Message cannot be empty, no popup triggered",
|
||
}
|
||
),
|
||
400,
|
||
)
|
||
|
||
# 使用 Socket.IO 向客户端发送消息
|
||
socketio.emit("on_message", {"message": message})
|
||
|
||
# 返回成功的响应
|
||
return jsonify(
|
||
{"status": "success", "message": "Popup triggered with message: " + message}
|
||
)
|
||
|
||
except Exception as e:
|
||
# 捕获所有异常并返回服务器错误
|
||
return (
|
||
jsonify({"status": "error", "message": "An error occurred: " + str(e)}),
|
||
500,
|
||
)
|
||
|
||
|
||
@app.route("/modify_web")
|
||
def modify_web():
|
||
# 从请求参数中获取目标URL
|
||
target_url = request.args.get("url")
|
||
if not target_url:
|
||
return "No URL provided", 400
|
||
|
||
# 判断并替换特定的 URL
|
||
if target_url == "http://m.layeyese.com/images/icon.png":
|
||
target_url = "http://127.0.0.1:5000/static/images/webapp/favicon.ico"
|
||
|
||
if target_url == "http://m.layeyese.com/images/banner_ysdq.png":
|
||
target_url = "http://127.0.0.1:5000/static/images/banner_ysdq.png"
|
||
|
||
# 处理相对路径的问题
|
||
base_url = urlparse(target_url).scheme + "://" + urlparse(target_url).netloc
|
||
try:
|
||
# 请求目标URL的内容
|
||
response = requests.get(target_url)
|
||
response.raise_for_status() # 检查请求是否成功
|
||
except requests.RequestException as e:
|
||
return f"Error fetching the page: {str(e)}", 500
|
||
|
||
# 获取内容类型
|
||
content_type = response.headers.get("Content-Type", "")
|
||
|
||
# 处理 HTML 内容
|
||
if "text/html" in content_type:
|
||
# 解析网页内容
|
||
soup = BeautifulSoup(response.content, "html.parser")
|
||
|
||
# 移除所有零宽字符 U+FEFF
|
||
for element in soup.find_all(text=True):
|
||
if "\uFEFF" in element:
|
||
cleaned_text = element.replace("\uFEFF", "") # 移除 U+FEFF 字符
|
||
element.replace_with(NavigableString(cleaned_text))
|
||
|
||
# 注入自定义CSS和JavaScript
|
||
style_injection = """
|
||
<style>
|
||
::-webkit-scrollbar {
|
||
width: 0px; /* 隐藏水平滚动条 */
|
||
height: 0px; /* 隐藏垂直滚动条 */
|
||
}
|
||
* {
|
||
font-family: '' !important;
|
||
}
|
||
body {
|
||
overflow: -moz-scrollbars-none; /* 针对旧版Firefox */
|
||
-ms-overflow-style: none; /* 针对IE和Edge */
|
||
}
|
||
.top {
|
||
background-color: rgb(212, 96, 241) !important;
|
||
}
|
||
.logo {
|
||
display: none !important;
|
||
}
|
||
.nwtop {
|
||
border-top-left-radius: 10px !important;
|
||
border-top-right-radius: 10px !important;
|
||
background: linear-gradient(to right bottom, rgb(212, 96, 241), rgba(145, 66, 197, 0.5)) !important;
|
||
}
|
||
.shouyedhl {
|
||
background: linear-gradient(to bottom, rgb(212, 96, 241), rgba(145, 66, 197, 0.5)) !important;
|
||
background-color: '' !important;
|
||
border-bottom-left-radius: 10px !important;
|
||
border-bottom-right-radius: 10px !important;
|
||
}
|
||
.shouyedhl-center li a {
|
||
max-width: 5rem !important;
|
||
font-size: 0.8rem
|
||
}
|
||
.shouyenr1-2 {
|
||
background: rgba(255, 255, 255, 0.85);
|
||
backdrop-filter: blur(5px); /* 背景模糊效果 */
|
||
box-shadow: 0px 0px 24px rgba(255, 255, 255, 0.65); /* 与其他元素一致的阴影效果 */
|
||
}
|
||
.map2-1 em {
|
||
background: linear-gradient(to right bottom, rgb(212, 96, 241), rgba(145, 66, 197, 0.5)) !important;
|
||
}
|
||
.map2-2 .new_add_ietm li {
|
||
background: linear-gradient(to right bottom, rgb(212, 96, 241), rgba(145, 66, 197, 0.5)) !important;
|
||
}
|
||
.dibu {
|
||
display: none !important;
|
||
}
|
||
.lbym {
|
||
background-color: '' !important;
|
||
}
|
||
</style>
|
||
<script>
|
||
document.addEventListener('DOMContentLoaded', function () {
|
||
// 取消特定链接的跳转行为
|
||
const targetLink = document.querySelector('a[href="/modify_web?url=http://m.hongyunzyg.com/form1.html"]');
|
||
if (targetLink) {
|
||
targetLink.addEventListener('click', function(event) {
|
||
event.preventDefault(); // 阻止默认跳转
|
||
alert('暂未开放'); // 可选:提示用户跳转已被取消
|
||
});
|
||
}
|
||
});
|
||
</script>
|
||
"""
|
||
# 在<head>中注入自定义CSS和JS
|
||
if soup.head:
|
||
soup.head.append(BeautifulSoup(style_injection, "html.parser"))
|
||
else:
|
||
# 如果没有<head>标签,创建一个并注入CSS和JS
|
||
head = soup.new_tag("head")
|
||
head.append(BeautifulSoup(style_injection, "html.parser"))
|
||
soup.insert(0, head)
|
||
|
||
# 修改内部的链接为代理链接
|
||
for tag in soup.find_all(["a", "link", "script", "img", "iframe"]):
|
||
attr = "href" if tag.name in ["a", "link"] else "src"
|
||
if tag.has_attr(attr):
|
||
original_url = tag[attr]
|
||
# 将相对路径转为绝对路径
|
||
absolute_url = urljoin(base_url, original_url)
|
||
# 修改链接为代理路径
|
||
tag[attr] = url_for("modify_web") + "?url=" + absolute_url
|
||
|
||
# 返回修改后的 HTML 内容
|
||
modified_html = str(soup)
|
||
return render_template_string(modified_html)
|
||
|
||
# 处理静态资源(如图片、CSS、JS等)
|
||
elif any(x in content_type for x in ["image", "css", "javascript", "font"]):
|
||
return Response(response.content, content_type=content_type)
|
||
|
||
# 处理其他类型的数据
|
||
else:
|
||
return Response(response.content, content_type=content_type)
|
||
|
||
|
||
# 定义 static 文件夹的路径
|
||
STATIC_FOLDER = os.path.join(app.root_path, "static")
|
||
display_image_url = "static/images/smart_mode/back.png"
|
||
|
||
|
||
# 定义接收 POST 请求的路由
|
||
@app.route("/change_image", methods=["POST"])
|
||
def change_image():
|
||
global display_image_url
|
||
# 从请求中获取图片路径
|
||
image_path = request.form.get("path")
|
||
|
||
# 检查路径是否存在
|
||
if not image_path or not os.path.exists(image_path):
|
||
return jsonify({"error": "Invalid image path"}), 400
|
||
|
||
# 获取文件名
|
||
filename = os.path.basename(image_path)
|
||
|
||
# 定义目标路径,将文件复制到 static 目录下
|
||
target_path = os.path.join(STATIC_FOLDER + "/images", filename)
|
||
|
||
try:
|
||
# 复制文件到 static 目录
|
||
shutil.copy(image_path, target_path)
|
||
except Exception as e:
|
||
return jsonify({"error": f"Failed to copy file: {str(e)}"}), 500
|
||
|
||
# 生成文件的 URL
|
||
file_url = "static/images/" + filename
|
||
# file_url = url_for('static', filename='images/' + filename, _external=True)
|
||
display_image_url = file_url
|
||
|
||
# 使用 Socket.IO 发送 URL 给前端
|
||
socketio.emit("change_image", {"path": file_url})
|
||
|
||
# 返回图片的 URL
|
||
return jsonify({"url": file_url}), 200
|
||
|
||
|
||
# 服务器相关信息
|
||
SERVER_IP = "8.138.8.114" # 替换为您的服务器 IP
|
||
SERVER_USER = "root" # 替换为 root 用户
|
||
SERVER_PASSWORD = "JuShenFengBao209" # 替换为 root 用户密码
|
||
REMOTE_PATH = "/var/www/html/devices"
|
||
|
||
reverse_ssh = ReverseSSH(SERVER_IP, SERVER_USER, SERVER_PASSWORD)
|
||
|
||
|
||
# 开启反向隧道
|
||
@app.route("/start_tunnel", methods=["GET"])
|
||
def start_tunnel():
|
||
result = reverse_ssh.start_tunnel()
|
||
return jsonify(result)
|
||
|
||
|
||
# 关闭反向隧道
|
||
@app.route("/stop_tunnel", methods=["GET"])
|
||
def stop_tunnel():
|
||
result = reverse_ssh.stop_tunnel()
|
||
return jsonify(result)
|
||
|
||
# 定义一个路由,根据请求参数执行不同的 shell 脚本
|
||
@app.route("/run-script", methods=["POST"])
|
||
def run_script():
|
||
print("run-script")
|
||
try:
|
||
# 从前端获取 JSON 格式的请求体
|
||
data = request.json
|
||
script_type = data.get("script_type")
|
||
|
||
# base_dir = "/home/jsfb/jsfb_ws"
|
||
|
||
# 获取当前脚本执行的绝对路径
|
||
# base_path = os.path.dirname(os.path.abspath(__file__))
|
||
# base_path = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
|
||
|
||
# 获取当前脚本执行的绝对路径
|
||
current_path = os.path.abspath(__file__)
|
||
|
||
# 获取当前脚本所在目录的上一级目录
|
||
base_path = os.path.dirname(os.path.dirname(current_path))
|
||
|
||
print("动态目录:", base_path)
|
||
|
||
|
||
# base_path = os.path.join(base_dir, "MassageRobot_aubo")
|
||
# print(f"Base path: {base_path}")
|
||
# script_path = os.path.join(base_path, f"../my_script.sh")
|
||
# 根据请求参数执行不同的 shell 脚本
|
||
|
||
if script_type == "zh":
|
||
script_path = os.path.join(base_path, "setup.sh")
|
||
# 确保路径是规范化的(处理相对路径)
|
||
script_path = os.path.normpath(script_path)
|
||
|
||
# 检查脚本路径是否存在
|
||
if not os.path.exists(script_path):
|
||
print(f"Script not found: {script_path}, 404")
|
||
return jsonify({"error": f"Script not found: {script_path}"}), 404
|
||
|
||
# 执行 shell 脚本
|
||
# subprocess.Popen(
|
||
# ["gnome-terminal", "--", "bash", "-c", f"cd {base_path} && bash setup.sh; exec bash"],
|
||
# cwd=base_path
|
||
# )
|
||
subprocess.Popen(
|
||
["gnome-terminal", "--", "bash", "-c", f"cd {base_path} && bash setup.sh; exit"],
|
||
cwd=base_path
|
||
)
|
||
elif script_type == "jp":
|
||
script_path = os.path.join(base_path, "setup_JP.sh")
|
||
# 确保路径是规范化的(处理相对路径)
|
||
script_path = os.path.normpath(script_path)
|
||
|
||
# 检查脚本路径是否存在
|
||
if not os.path.exists(script_path):
|
||
print(f"Script not found: {script_path}, 404")
|
||
return jsonify({"error": f"Script not found: {script_path}"}), 404
|
||
|
||
# 执行 shell 脚本
|
||
# subprocess.Popen(
|
||
# ["gnome-terminal", "--", "bash", "-c", f"cd {base_path} && bash setup_JP.sh; exec bash"],
|
||
# cwd=base_path
|
||
# )
|
||
subprocess.Popen(
|
||
["gnome-terminal", "--", "bash", "-c", f"cd {base_path} && bash setup_JP.sh; exit"],
|
||
cwd=base_path
|
||
)
|
||
elif script_type == "en":
|
||
script_path = os.path.join(base_path, "setup_EN.sh")
|
||
# 确保路径是规范化的(处理相对路径)
|
||
script_path = os.path.normpath(script_path)
|
||
|
||
# 检查脚本路径是否存在
|
||
if not os.path.exists(script_path):
|
||
print(f"Script not found: {script_path}, 404")
|
||
return jsonify({"error": f"Script not found: {script_path}"}), 404
|
||
|
||
# 执行 shell 脚本
|
||
# subprocess.Popen(
|
||
# ["gnome-terminal", "--", "bash", "-c", f"cd {base_path} && bash setup_EN.sh; exec bash"],
|
||
# cwd=base_path
|
||
# )
|
||
subprocess.Popen(
|
||
["gnome-terminal", "--", "bash", "-c", f"cd {base_path} && bash setup_EN.sh; exit"],
|
||
cwd=base_path
|
||
)
|
||
elif script_type == "ko":
|
||
script_path = os.path.join(base_path, "setup_KO.sh")
|
||
# 确保路径是规范化的(处理相对路径)
|
||
script_path = os.path.normpath(script_path)
|
||
|
||
# 检查脚本路径是否存在
|
||
if not os.path.exists(script_path):
|
||
print(f"Script not found: {script_path}, 404")
|
||
return jsonify({"error": f"Script not found: {script_path}"}), 404
|
||
|
||
# 执行 shell 脚本
|
||
# subprocess.Popen(
|
||
# ["gnome-terminal", "--", "bash", "-c", f"cd {base_path} && bash setup_KO.sh; exec bash"],
|
||
# cwd=base_path
|
||
# )
|
||
subprocess.Popen(
|
||
["gnome-terminal", "--", "bash", "-c", f"cd {base_path} && bash setup_KO.sh; exit"],
|
||
cwd=base_path
|
||
)
|
||
|
||
|
||
else:
|
||
print("Invalid script type provided, 400")
|
||
return jsonify({"error": "Invalid script type provided"}), 400
|
||
|
||
# 返回脚本执行结果
|
||
return jsonify(
|
||
{
|
||
"message": "Script executed successfully",
|
||
}
|
||
)
|
||
except Exception as e:
|
||
print(f"Error running script: {e}, 500")
|
||
return jsonify({"error": str(e)}), 500
|
||
|
||
@app.route('/set_zero', methods=['POST'])
|
||
def set_zero():
|
||
try:
|
||
# 检查服务是否运行
|
||
if not is_service_running("massage.service"):
|
||
sensor = XjcSensor()
|
||
max_try = 3 # 设置最大重试次数
|
||
delay = 0.5 # 每次重试前的延迟时间
|
||
|
||
# 尝试调用 set_zero 方法
|
||
for attempt in range(max_try):
|
||
sensor.disable_active_transmission()
|
||
|
||
time.sleep(0.5)
|
||
|
||
result = sensor.set_zero()
|
||
|
||
if result == 0:
|
||
# 设置成功,返回成功信息
|
||
return jsonify({"message": "Set zero success"}), 200
|
||
else:
|
||
# 设置失败,等待并重试
|
||
print(f"Set zero attempt {attempt + 1} failed, retrying...")
|
||
time.sleep(delay)
|
||
|
||
# 如果多次尝试后失败,返回错误信息
|
||
print("Set zero failed after multiple attempts.")
|
||
requests.post("http://127.0.0.1:5000/on_message", data={"message": "传感器初始化失败"})
|
||
return jsonify({"message": "Set zero failed after multiple attempts"}), 500
|
||
else:
|
||
return jsonify({"message": "Service is already running, no need to set zero"}), 200
|
||
except Exception as e:
|
||
# 捕获异常并返回错误信息
|
||
print(f"Error in /set_zero: {e}")
|
||
return jsonify({"message": "Set zero failed", "error": str(e)}), 500
|
||
|
||
|
||
@app.route("/reset-language", methods=["POST"])
|
||
def reset_language():
|
||
print("reset-language")
|
||
try:
|
||
# 获取当前脚本执行的绝对路径
|
||
current_path = os.path.abspath(__file__)
|
||
|
||
# 获取当前脚本所在目录的上一级目录
|
||
base_path = os.path.dirname(os.path.dirname(current_path))
|
||
|
||
print("动态目录:", base_path)
|
||
|
||
# 构造 restart_language.sh 脚本路径
|
||
script_path = os.path.join(base_path, "restart_language.sh")
|
||
|
||
# 确保路径是规范化的(处理相对路径)
|
||
script_path = os.path.normpath(script_path)
|
||
|
||
# 检查脚本路径是否存在
|
||
if not os.path.exists(script_path):
|
||
print(f"Script not found: {script_path}, 404")
|
||
return jsonify({"error": f"Script not found: {script_path}"}), 404
|
||
|
||
# 执行 restart_language.sh 脚本
|
||
subprocess.Popen(
|
||
["gnome-terminal", "--", "bash", "-c", f"cd {base_path} && bash restart_language.sh; exit"],
|
||
cwd=base_path
|
||
)
|
||
|
||
# 返回成功响应
|
||
return jsonify({
|
||
"message": "Restart language script executed successfully in new terminal"
|
||
})
|
||
except Exception as e:
|
||
print(f"Error running script: {e}, 500")
|
||
return jsonify({"error": str(e)}), 500
|
||
|
||
|
||
# 写入文件并上传到服务器
|
||
def write_and_upload_info(ngrok_url, reverse_ssh_port):
|
||
hostname = socket.gethostname()
|
||
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||
|
||
# 生成用于连接的反向 SSH 命令
|
||
reverse_ssh_command = f"ssh jsfb@{SERVER_IP} -p {reverse_ssh_port}"
|
||
|
||
# 文件内容,包括主机名、ngrok URL、在线时间和 SSH 命令
|
||
file_content = (
|
||
f"Hostname: {hostname}\n"
|
||
f"Ngrok URL: {ngrok_url}\n"
|
||
f"Online Time: {timestamp}\n"
|
||
f"SSH Command: {reverse_ssh_command}"
|
||
)
|
||
file_path = f"../tmp/{hostname}.txt"
|
||
|
||
# 写入文件
|
||
with open(file_path, "w") as file:
|
||
file.write(file_content)
|
||
|
||
# 使用 paramiko 通过 SFTP 上传文件
|
||
ssh = paramiko.SSHClient()
|
||
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
|
||
try:
|
||
ssh.connect(SERVER_IP, username=SERVER_USER, password=SERVER_PASSWORD)
|
||
sftp = ssh.open_sftp()
|
||
sftp.put(file_path, f"{REMOTE_PATH}/{hostname}.txt")
|
||
print(f"File {file_path} uploaded to {REMOTE_PATH}/{hostname}.txt on server.")
|
||
finally:
|
||
sftp.close()
|
||
ssh.close()
|
||
|
||
|
||
|
||
# 启动 ngrok 隧道并写入/上传文件
|
||
def start_ngrok(port, max_retries=3, retry_interval=2):
|
||
global ngrok_process
|
||
for attempt in range(max_retries):
|
||
print(f"Attempting to start ngrok (Attempt {attempt + 1}/{max_retries})")
|
||
ngrok_process = subprocess.Popen(
|
||
["/usr/local/bin/ngrok", "http", str(port)],
|
||
stdout=subprocess.PIPE,
|
||
stderr=subprocess.PIPE,
|
||
)
|
||
time.sleep(retry_interval) # 等待 ngrok 启动
|
||
|
||
# 检查 ngrok 是否启动成功
|
||
try:
|
||
ngrok_url = get_ngrok_url()
|
||
print(f"Ngrok started successfully with URL: {ngrok_url}")
|
||
# write_and_upload_info(ngrok_url) # 写入和上传信息文件
|
||
return ngrok_url
|
||
except Exception as e:
|
||
print(f"Failed to get ngrok URL: {e}")
|
||
ngrok_process.terminate()
|
||
time.sleep(retry_interval)
|
||
|
||
print("Failed to start ngrok after multiple attempts")
|
||
return "Error"
|
||
|
||
|
||
# 获取 ngrok 公共 URL
|
||
def get_ngrok_url():
|
||
try:
|
||
response = requests.get(
|
||
"http://localhost:4040/api/tunnels", timeout=5
|
||
) # ngrok 本地 API
|
||
response.raise_for_status() # 检查是否请求成功
|
||
data = response.json()
|
||
return data["tunnels"][0]["public_url"]
|
||
except requests.RequestException as e:
|
||
raise RuntimeError(f"Error fetching ngrok URL: {e}")
|
||
|
||
|
||
# Socket.IO 事件处理函数,当客户端连接时启动定时任务
|
||
@socketio.on("connect", namespace="/")
|
||
def connect():
|
||
socketio.start_background_task(music_background_thread)
|
||
|
||
|
||
zeroconf = Zeroconf()
|
||
|
||
# 变量用于存储是否已经注册了服务
|
||
service_registered = False
|
||
|
||
|
||
def get_all_local_ips():
|
||
"""获取本机的所有 IPv4 地址,不进行任何筛选"""
|
||
interfaces = netifaces.interfaces() # 获取所有网络接口
|
||
print("可用网络接口:", interfaces)
|
||
|
||
local_ips = []
|
||
|
||
for interface in interfaces:
|
||
addrs = netifaces.ifaddresses(interface) # 获取接口的地址信息
|
||
print(f"接口 {interface} 的地址信息: {addrs}")
|
||
|
||
# 检查 IPv4 地址 (AF_INET)
|
||
if netifaces.AF_INET in addrs:
|
||
for addr_info in addrs[netifaces.AF_INET]:
|
||
print(f"接口 {interface} 的 IPv4 地址信息: {addr_info}")
|
||
ip_addr = addr_info.get("addr")
|
||
|
||
# 直接收集所有的 IPv4 地址
|
||
if ip_addr:
|
||
local_ips.append(ip_addr)
|
||
|
||
if DEBUG_MODE:
|
||
return ["127.0.0.1"] # 调试用
|
||
|
||
# 如果没有找到 IPv4 地址,返回一个回环地址列表
|
||
return local_ips if local_ips else ["127.0.0.1"]
|
||
|
||
|
||
def register_service(local_ips):
|
||
print(local_ips)
|
||
global service_registered
|
||
if service_registered:
|
||
return
|
||
|
||
# # 确保服务名称唯一,使用 uuid 生成一个唯一标识符
|
||
# unique_id = uuid.uuid4()
|
||
hostname = socket.gethostname()
|
||
# print(hostname)
|
||
|
||
for ip in local_ips:
|
||
if ip.startswith("127."):
|
||
continue
|
||
if ip.startswith("192.168.100"):
|
||
continue
|
||
# 将 IP 地址转换为字节格式
|
||
service_info = ServiceInfo(
|
||
"_http._tcp.local.", # 服务类型
|
||
f"LL-X1-{hostname}._http._tcp.local.", # 确保服务名称唯一
|
||
addresses=[socket.inet_aton(ip)], # 使用局域网 IP 地址
|
||
port=5000, # Flask 监听的端口
|
||
properties={}, # 可选的额外服务信息
|
||
server=f"{hostname}.local.", # 主机名
|
||
)
|
||
|
||
zeroconf.register_service(service_info, allow_name_change=True)
|
||
print(f"mDNS 服务已注册,IP 地址:{ip}")
|
||
|
||
service_registered = True
|
||
|
||
|
||
# 在应用退出时注销 zeroconf 服务并关闭 ngrok
|
||
def cleanup():
|
||
global ngrok_process
|
||
print("注销 mDNS 服务并关闭 ngrok...")
|
||
reverse_ssh.stop_tunnel()
|
||
if ngrok_process:
|
||
ngrok_process.terminate()
|
||
zeroconf.close() # 注销 zeroconf 服务
|
||
|
||
|
||
def fetch_music_list_with_retry():
|
||
"""后台定时任务:不断尝试获取音乐列表,直到成功"""
|
||
global all_playlists, playlist, current_song
|
||
|
||
while True:
|
||
try:
|
||
# 清空现有的播放列表
|
||
all_playlists.clear()
|
||
# 遍历分类获取音乐数据
|
||
for categoryID in categoryIDs:
|
||
diss_info, music_info_list = get_song_list(categoryID)
|
||
all_playlists.append(
|
||
{"diss_info": diss_info, "music_info_list": music_info_list}
|
||
)
|
||
print(diss_info)
|
||
# 如果数据获取成功,更新播放列表
|
||
if all_playlists:
|
||
playlist = generate_playlist(all_playlists[0]["music_info_list"])
|
||
current_song.update(
|
||
{
|
||
"album_img_url": playlist[0]["album_img_url"],
|
||
"music_name": playlist[0]["music_name"],
|
||
"singer_name": playlist[0]["singer_name"],
|
||
"is_playing": False,
|
||
}
|
||
)
|
||
print("音乐列表更新成功!")
|
||
break # 成功后退出循环
|
||
except Exception as e:
|
||
print(f"获取音乐列表失败,重试中... 错误信息: {e}")
|
||
print(f"Failed to fetch playlist for category {categoryID}: {e}")
|
||
time.sleep(5) # 等待5秒后重试
|
||
|
||
|
||
def initialize_app(port):
|
||
"""运行 Flask-SocketIO 之前的初始化逻辑。"""
|
||
global playlist, current_song
|
||
|
||
local_ips = get_all_local_ips()
|
||
register_service(local_ips)
|
||
|
||
# 获取最新音乐数据
|
||
global latest_music
|
||
try:
|
||
latest_music = fetch_latest_music()
|
||
except Exception as e:
|
||
print(f"Failed to fetch latest music data: {e}")
|
||
|
||
try:
|
||
# 尝试初始化音乐列表
|
||
fetch_music_list_with_retry()
|
||
except Exception as e:
|
||
print(f"初始化音乐列表失败:{e}")
|
||
# 启动一个后台任务,不断尝试获取音乐数据
|
||
socketio.start_background_task(fetch_music_list_with_retry)
|
||
|
||
|
||
try:
|
||
# 启动 ngrok 隧道并获取 URL
|
||
ngrok_url = start_ngrok(port)
|
||
print("Ngrok URL:", ngrok_url)
|
||
result = reverse_ssh.start_tunnel()
|
||
print(result)
|
||
reverse_ssh_port = result["port"]
|
||
write_and_upload_info(ngrok_url,reverse_ssh_port) # 写入和上传信息文件
|
||
|
||
except RuntimeError as e:
|
||
print("Error:", e)
|
||
|
||
# 启动后台任务
|
||
socketio.start_background_task(send_status_messages)
|
||
|
||
|
||
initialize_app(5000)
|
||
|
||
|
||
def check_network():
|
||
try:
|
||
# 测试连接到公共 DNS 服务器
|
||
socket.create_connection(("8.8.8.8", 53), timeout=5)
|
||
return True
|
||
except OSError:
|
||
return False
|
||
|
||
if __name__ == "__main__":
|
||
# 使用 argparse 解析命令行参数
|
||
parser = argparse.ArgumentParser(description="Run Flask-SocketIO server.")
|
||
parser.add_argument("--debug", action="store_true", help="Enable debug mode")
|
||
args, unknown = parser.parse_known_args()
|
||
|
||
# 设置调试模式
|
||
DEBUG_MODE = args.debug
|
||
|
||
while True:
|
||
try:
|
||
# 尝试启动 Flask-SocketIO 应用
|
||
socketio.run(
|
||
app, debug=DEBUG_MODE, host="0.0.0.0", port=5000, allow_unsafe_werkzeug=True
|
||
)
|
||
except Exception as e:
|
||
print(f"Error: {e}", file=sys.stderr)
|
||
print("Restarting server in 5 seconds...")
|
||
time.sleep(5) # 等待5秒后重启
|
||
|
||
|
||
# MAX_WAIT_TIME = 300 # 最多等待 5 分钟
|
||
# elapsed_time = 0
|
||
# while not check_network() and elapsed_time < MAX_WAIT_TIME:
|
||
# print(f"Network unavailable. Retrying in 5 seconds... ({elapsed_time}/{MAX_WAIT_TIME}s)")
|
||
# time.sleep(5)
|
||
# elapsed_time += 5
|
||
# if elapsed_time >= MAX_WAIT_TIME:
|
||
# print("Network initialization failed. Proceeding without network...")
|
||
|
||
# # 启动 Flask-SocketIO 服务
|
||
# socketio.run(app, host="0.0.0.0", port=5000)
|
||
|
||
# # 运行 Flask-SocketIO 应用
|
||
# socketio.run(
|
||
# app, debug=DEBUG_MODE, host="0.0.0.0", port=5000, allow_unsafe_werkzeug=True
|
||
# )
|
||
# socketio.run(app, debug=DEBUG_MODE, host="0.0.0.0", port=5000) #如果使用gunicorn启动
|