2025-05-27 15:46:31 +08:00

2160 lines
71 KiB
Python
Executable File
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from flask import (
Flask,
render_template,
jsonify,
redirect,
url_for,
request,
Response,
render_template_string,
session,
)
from flask_socketio import SocketIO
from flask_cors import CORS
from flask_sslify import SSLify
from zeroconf import Zeroconf, ServiceInfo
import atexit
import uuid
import socket
import netifaces
import time
from qq_music import (
fetch_latest_music,
get_song_list,
fetch_album_image,
get_music_info,
player,
)
import threading
from threading import Lock
import asyncio
import websockets
import json
import argparse
import librosa
import numpy as np
import subprocess
import signal
import os
import sys
import re
from datetime import datetime
import paramiko
import requests
from bs4 import BeautifulSoup, NavigableString
import shutil
from urllib.parse import urljoin, urlparse
from requests.auth import HTTPBasicAuth
from tools.ssh_tools import execute_command_on_remote, is_host_down, ReverseSSH
import logging
from force_sensor_aubo import XjcSensor
DEBUG_MODE = False
# 配置 logging 模块
log_file = "../log/UI-next-app.log"
# 定义备份文件夹路径
backup_folder = os.path.abspath(os.path.join(os.path.dirname(__file__), "../../UILog"))
# 创建 UILog 文件夹(如果不存在)
if not os.path.exists(backup_folder):
os.makedirs(backup_folder) # 自动创建目录
# 备份日志文件
if os.path.exists(log_file):
# 生成带时间戳的备份文件名
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
backup_file = os.path.join(backup_folder, f"UI-next-app_{timestamp}.log")
shutil.copy2(log_file, backup_file)
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
handlers=[
# logging.FileHandler(log_file),
logging.FileHandler(log_file, mode="w"), # 覆盖模式,每次运行清空日志
logging.StreamHandler(sys.stdout),
],
)
# 定义日志记录器
class LoggerWriter:
def __init__(self, logger, level):
self.logger = logger
self.level = level
def write(self, message):
if message.strip() != "":
self.logger.log(self.level, message.strip())
def flush(self):
pass
# 重定向标准输出和标准错误
sys.stdout = LoggerWriter(logging.getLogger(), logging.INFO)
sys.stderr = LoggerWriter(logging.getLogger(), logging.ERROR)
if getattr(sys, "frozen", False):
template_folder = os.path.join(sys._MEIPASS, "templates")
static_folder = os.path.join(sys._MEIPASS, "static")
app = Flask(__name__, template_folder=template_folder, static_folder=static_folder)
else:
app = Flask(__name__)
# app = Flask(__name__)
# sslify = SSLify(app)
CORS(app)
app.secret_key = "jsfb" # 设置密钥,用于安全保护 session
app.config["SECRET_KEY"] = "jsfb"
socketio = SocketIO(app, cors_allowed_origins="*")
def signal_handler(signal, frame):
print("Shutting Down")
sys.exit(0)
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
programs = [
{
"id": 1,
"name": "暂未开放",
"details": {"项目名称": "", "项目功效": "", "时长": "", "上次时间": ""},
},
]
# programs = [
# {'id': 1, 'name': '未開放', 'details': {'プロジェクト名': 'なし', 'プロジェクト効果': 'なし', '時間': 'なし', '最終日時': 'なし'}},
# ]
current_mode = "smart_mode" # manual_mode / smart_mode
messages = [
"小悠师傅 开始按摩",
"小悠小悠 今天天气怎么样",
"小悠小悠 今天有什么新闻",
"使用“小悠师傅” 或 “小悠小悠”唤醒我...",
]
# messages = ["ユウマスター マッサージを開始", "ユウユウ 今日の天気はどう?", "ユウユウ 今日のニュースは?", "「ユウマスター」または「ユウユウ」で私を起動してください..."]
latest_music = []
all_playlists = []
def send_status_messages():
while True:
for msg in messages:
socketio.emit("message", msg) # 使用 'message' 事件名发送消息
socketio.sleep(3) # 每条消息之间等待5秒
socketio.sleep(5) # 列表发送完一轮后等待10秒
# SocketIO event handler for synthesizing audio
# @socketio.on('synthesize_audio')
# def handle_synthesize_audio(data):
# text = data['text']
# print(text)
# ai.tts_and_play_audio(socketio,text)
@app.route("/get_music_data", methods=["GET"])
def get_music_data():
global latest_music
if not latest_music:
latest_music = fetch_latest_music()
return jsonify(latest_music)
categoryIDs = ["9126599100", "7299191148", "7132357466", "1137267511", "7284914844"]
@app.route("/get_playlists", methods=["GET"])
def get_playlists():
# print("get_playlists")
global all_playlists
if not all_playlists:
for categoryID in categoryIDs:
# print(categoryID)
diss_info, music_info_list = get_song_list(categoryID)
all_playlists.append(
{"diss_info": diss_info, "music_info_list": music_info_list}
)
print(diss_info)
return jsonify(all_playlists)
@app.route("/search", methods=["POST"])
def search():
# 获取前端发送过来的数据
search_term = request.json.get("term")
print(search_term)
search_results = get_music_info(search_term)
# 返回处理后的数据给前端
return jsonify(search_results)
@app.route("/get_album_image", methods=["POST"])
def get_album_image():
data = request.get_json()
singer_name = data.get("singer_name")
if not singer_name:
return jsonify({"error": "Singer name is required"}), 400
album_img_url, error = fetch_album_image(singer_name)
if album_img_url:
return jsonify({"album_img_url": album_img_url}), 200
else:
return jsonify({"error": error}), 404
@app.route("/song_clicked", methods=["POST"])
def song_clicked():
global playlist, current_song_index
data = request.json
songmid = data.get("songmid")
print(f"Song clicked with songmid: {songmid}")
# 查找歌曲是否已在播放列表中
song_in_playlist = next(
(song for song in playlist if song["songmid"] == songmid), None
)
if song_in_playlist:
current_song_index = playlist.index(song_in_playlist)
next_song = playlist[current_song_index]
current_song.update(
{
"album_img_url": next_song["album_img_url"],
"music_name": next_song["music_name"],
"singer_name": next_song["singer_name"],
"is_playing": True,
}
)
print(current_song)
# 获取并播放下一首歌曲
music_page = f"https://y.qq.com/n/yqq/song/{next_song['songmid']}.html"
music_url = player.fetch_music_url_by_mid(next_song["songmid"])
if not music_url:
music_url = player.fetch_music_url(music_page)
socketio.sleep(0.5)
player.play_music(music_url)
else:
# 如果歌曲不在播放列表中,则添加到播放列表
music_page = f"https://y.qq.com/n/yqq/song/{songmid}.html"
music_url = player.fetch_music_url_by_mid(songmid)
if not music_url:
music_url = player.fetch_music_url(music_page)
if music_url:
# 获取歌曲信息并添加到播放列表
song_info = {
"album_img_url": data.get("album_img_url"),
"music_name": data.get("music_name"),
"singer_name": data.get("singer_name"),
"songmid": songmid,
}
playlist.append(song_info)
current_song_index = len(playlist) - 1
# 开始播放该歌曲
current_song.update(
{
"album_img_url": song_info["album_img_url"],
"music_name": song_info["music_name"],
"singer_name": song_info["singer_name"],
"is_playing": True,
}
)
player.play_music(music_url)
print(playlist)
return jsonify({"status": "success", "message": "Music started"})
else:
return jsonify({"status": "error", "message": "Failed to fetch music URL"})
return jsonify({"status": "success", "message": "Song added to playlist"})
@app.route("/playlist_update", methods=["POST"])
def playlist_update():
global playlist, current_song_index
data = request.json
new_playlist = data.get("playlist", [])
if not new_playlist:
return jsonify({"status": "error", "message": "Playlist is empty"})
# 清空当前播放列表并更新为新的播放列表
playlist = []
for song in new_playlist:
song_info = {
"album_img_url": song.get("album_img_url"),
"music_name": song.get("music_name"),
"singer_name": song.get("singer_name"),
"songmid": song.get("songmid"),
}
playlist.append(song_info)
# 设置当前播放的歌曲为新列表中的第一首
current_song_index = 0
first_song = playlist[current_song_index]
current_song.update(
{
"album_img_url": first_song["album_img_url"],
"music_name": first_song["music_name"],
"singer_name": first_song["singer_name"],
"is_playing": True,
}
)
# 获取并播放第一首歌
music_page = f"https://y.qq.com/n/yqq/song/{first_song['songmid']}.html"
music_url = player.fetch_music_url_by_mid(first_song["songmid"])
if not music_url:
music_url = player.fetch_music_url(music_page)
if music_url:
socketio.sleep(0.5)
player.play_music(music_url)
print(playlist)
return jsonify(
{"status": "success", "message": "Playlist updated and playing first song"}
)
else:
return jsonify(
{"status": "error", "message": "Failed to fetch music URL for first song"}
)
@app.route("/pause_music", methods=["POST"])
def pause_music():
player.pause_music()
return jsonify({"status": "success", "message": "Music paused"})
@app.route("/resume_music", methods=["POST"])
def resume_music():
if player.is_playing:
player.resume_music()
else:
next_song = playlist[current_song_index]
current_song.update(
{
"album_img_url": next_song["album_img_url"],
"music_name": next_song["music_name"],
"singer_name": next_song["singer_name"],
"is_playing": True,
}
)
print(current_song)
music_page = f"https://y.qq.com/n/yqq/song/{next_song['songmid']}.html"
music_url = player.fetch_music_url_by_mid(next_song["songmid"])
if not music_url:
music_url = player.fetch_music_url(music_page)
socketio.sleep(0.5)
player.play_music(music_url)
return jsonify({"status": "success", "message": "Music resumed"})
@app.route("/stop_music", methods=["POST"])
def stop_music():
player.stop_music()
return jsonify({"status": "success", "message": "Music stopped"})
@app.route("/play_last_song", methods=["POST"])
def play_last_song():
global current_song_index
if current_song_index > 0:
current_song_index -= 1
else:
current_song_index = len(playlist) - 1
print(current_song_index)
next_song = playlist[current_song_index]
current_song.update(
{
"album_img_url": next_song["album_img_url"],
"music_name": next_song["music_name"],
"singer_name": next_song["singer_name"],
"is_playing": True,
}
)
music_page = f"https://y.qq.com/n/yqq/song/{next_song['songmid']}.html"
music_url = player.fetch_music_url_by_mid(next_song["songmid"])
if not music_url:
music_url = player.fetch_music_url(music_page)
if music_url:
player.play_music(music_url)
return jsonify({"status": "success", "message": "Playing last song"})
else:
return jsonify({"status": "error", "message": "Failed to fetch music URL"})
@app.route("/play_next_song", methods=["POST"])
def play_next_song():
global current_song_index
current_song_index = (current_song_index + 1) % len(playlist)
print(current_song_index)
next_song = playlist[current_song_index]
current_song.update(
{
"album_img_url": next_song["album_img_url"],
"music_name": next_song["music_name"],
"singer_name": next_song["singer_name"],
"is_playing": True,
}
)
music_page = f"https://y.qq.com/n/yqq/song/{next_song['songmid']}.html"
music_url = player.fetch_music_url_by_mid(next_song["songmid"])
if not music_url:
music_url = player.fetch_music_url(music_page)
if music_url:
player.play_music(music_url)
return jsonify({"status": "success", "message": "Playing next song"})
else:
return jsonify({"status": "error", "message": "Failed to fetch music URL"})
@socketio.on("seek_music")
def handle_seek_music(position):
print(position)
player.seek_music(position)
def generate_playlist(music_info_list):
# 使用列表推导式生成字典列表
playlist = [
{
"music_name": song[0],
"singer_name": song[1],
"songmid": song[2],
"album_img_url": (
f"http://y.gtimg.cn/music/photo_new/T002R180x180M000{song[3]}.jpg"
if song[3]
else ""
),
}
for song in music_info_list
]
return playlist
# 全局变量来存储歌曲信息和播放状态
current_song = {
"album_img_url": "https://via.placeholder.com/300x300",
"music_name": None,
"singer_name": None,
"is_playing": False, # 添加播放状态
}
# 全局变量来存储播放列表
# playlist = [{
# 'album_img_url': 'http://y.gtimg.cn/music/photo_new/T002R180x180M000001G7iIK00yQyr.jpg',
# 'music_name': '大鱼海棠主题曲——大鱼',
# 'singer_name': '凤凉柒',
# 'songmid': '000Qf8b01p0vIJ'}]
playlist = []
# Index to keep track of current song in playlist
current_song_index = 0
if len(playlist) > 0:
current_song.update(
{
"album_img_url": playlist[0]["album_img_url"],
"music_name": playlist[0]["music_name"],
"singer_name": playlist[0]["singer_name"],
"is_playing": False, # 或者根据实际情况设置
}
)
print("playlist: ", playlist)
else:
diss_info, music_info_list = get_song_list(categoryIDs[0])
# print(diss_info, music_info_list)
playlist = generate_playlist(music_info_list)
print("playlist:", playlist)
if not playlist:
playlist = [
{
"album_img_url": "https://via.placeholder.com/300x300",
"music_name": "默认音乐",
"singer_name": "未知艺术家",
"songmid": "000000",
}
]
current_song.update(
{
"album_img_url": playlist[0]["album_img_url"],
"music_name": playlist[0]["music_name"],
"singer_name": playlist[0]["singer_name"],
"is_playing": False, # 或者根据实际情况设置
}
)
# 创建线程锁,确保数据安全
thread_lock = Lock()
# 定时任务,每隔一秒发送当前歌曲信息到前端
def music_background_thread():
global playlist, current_song_index
while True:
socketio.sleep(1) # 间隔一秒钟
with thread_lock:
current_song["is_playing"] = not player.is_paused and player.is_playing
socketio.emit("current_song", current_song, namespace="/")
if player.is_playing:
current_position = player.get_current_position()
music_length = player.get_music_length()
# print(current_position)
if current_position < 0:
# 切换到下一首歌曲
current_song_index = (current_song_index + 1) % len(playlist)
next_song = playlist[current_song_index]
current_song.update(
{
"album_img_url": next_song["album_img_url"],
"music_name": next_song["music_name"],
"singer_name": next_song["singer_name"],
"is_playing": True,
}
)
print(current_song)
# 获取并播放下一首歌曲
music_page = (
f"https://y.qq.com/n/yqq/song/{next_song['songmid']}.html"
)
music_url = player.fetch_music_url_by_mid(next_song["songmid"])
if not music_url:
music_url = player.fetch_music_url(music_page)
socketio.sleep(0.15)
player.play_music(music_url)
socketio.emit(
"music_status",
{"position": current_position, "length": music_length},
)
@app.route("/set_current_song", methods=["POST"])
def set_current_song():
global current_song
data = request.json
current_song.update(
{
"album_img_url": data.get("album_img_url"),
"music_name": data.get("music_name"),
"singer_name": data.get("singer_name"),
"is_playing": data.get("is_playing", False), # 更新播放状态
}
)
return jsonify({"status": "success"})
@app.route("/get_current_song", methods=["GET"])
def get_current_song():
if current_song["music_name"]:
return jsonify({"status": "success", "data": current_song})
return jsonify({"status": "error", "message": "No song is currently set"})
ai_chat_list = []
async def send_message(message, port=8766):
uri = "ws://localhost:" + str(port)
# start_time = time.time()
async with websockets.connect(uri) as websocket:
# print(time.time() - start_time)
await websocket.send(message)
response = await websocket.recv()
return response
@socketio.on("send_message")
def handle_message(data):
message = data["message"]
if not message:
return jsonify({"error": "Message not provided"}), 400
socketio.emit("add_message", {"sender": "user", "message": message})
ai_chat_list.append({"sender": "user", "message": message})
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
task = asyncio.ensure_future(send_message(f"'user_input': \"{message}\""))
loop.run_until_complete(task)
last_ai_message_time = None
@app.route("/ai_respon", methods=["POST"])
def ai_response():
global last_ai_message_time
# 获取 POST 请求的数据
message = request.form.get("message")
if not message:
return jsonify({"status": "error", "message": "No message provided"})
current_time = time.time()
# 如果是首次或距离上次超过5秒添加新消息
if not last_ai_message_time or (current_time - last_ai_message_time) > 5:
ai_chat_list.append({"sender": "ai", "message": message})
else:
# 在最近一条AI消息上叠加
if ai_chat_list and ai_chat_list[-1]["sender"] == "ai":
ai_chat_list[-1]["message"] += " " + message
else:
ai_chat_list.append({"sender": "ai", "message": message})
# 更新最后一次AI消息时间
last_ai_message_time = current_time
socketio.emit("add_message", {"sender": "ai", "message": message})
return jsonify({"status": "success", "message": "Received ai message"})
@app.route("/user_input", methods=["POST"])
def user_input():
# 获取 POST 请求的数据
message = request.form.get("message")
if not message:
return jsonify({"status": "error", "message": "No message provided"})
socketio.emit("add_message", {"sender": "user", "message": message})
ai_chat_list.append({"sender": "user", "message": message})
# 返回 JSON 格式的响应
return jsonify({"status": "success", "message": "Received user message"})
@app.route("/get_history", methods=["GET"])
def get_history():
return jsonify(ai_chat_list)
def process_audio_and_emit(path):
x, sr = librosa.load(path, sr=8000)
start_time = time.time()
# Emit mouth movement data through socketio
try:
for _ in range(int(len(x) / 800)):
index = int((time.time() - start_time) * 8000) + 1
if index < len(x):
# new_value = x[index]
new_value = abs(x[index] * 5)
socketio.emit("mouth_movement", {"value": str(new_value)})
time.sleep(0.1)
except Exception as e:
print(f"Error in mouth movement emission: {e}")
# Reset mouth movement value at the end
socketio.emit("mouth_movement", {"value": str(0.0)})
@app.route("/lip_sync", methods=["POST"])
def lip_sync():
try:
# 获取 POST 请求的数据
path = request.form.get("path")
print(path)
if path.startswith("pre_mp3"):
path = "../" + path
# 在单独的线程中处理音频和口型数据
thread = threading.Thread(target=process_audio_and_emit, args=(path,))
thread.start()
# 立即返回 JSON 格式的响应
return jsonify({"status": "success"})
except Exception as e:
print(f"Error: {e}")
return jsonify({"status": "error", "message": str(e)})
import traceback
massage_status = {
"progress": 0,
"force": '',
"temperature": '',
"gear": '',
"is_massaging": False,
"current_task": "",
"task_time": "",
"current_head": "thermotherapy",
"body_part": "back",
"massage_service_started": False,
"press": '',
"frequency": '',
}
@app.route("/update_massage_status", methods=["POST"])
def update_status():
try:
print(f"update_massage_status{request.form}")
# 获取 POST 请求的数据
progress = request.form.get("progress")
force = request.form.get("force")
temperature = request.form.get("temperature")
gear = request.form.get("gear")
is_massaging = request.form.get("is_massaging")
current_task = request.form.get("current_task")
task_time = request.form.get("task_time")
body_part = request.form.get("body_part")
current_head = request.form.get("current_head")
massage_service_started = request.form.get("massage_service_started")
press = request.form.get("press")
frequency = request.form.get("frequency")
# 仅在非空时更新字典中的值
if progress:
massage_status["progress"] = progress
if force and force != "0":
print(force)
massage_status["force"] = force
if temperature and temperature != "0":
print(temperature)
massage_status["temperature"] = temperature
if gear and gear != "0":
print(gear)
massage_status["gear"] = gear
if is_massaging:
if is_massaging == "False" or is_massaging == "false":
is_massaging = False
global display_image_url
display_image_url = "static/images/smart_mode/back.png"
socketio.emit("change_image", {"path": display_image_url})
if is_massaging == "True" or is_massaging == "true":
is_massaging = True
massage_status["is_massaging"] = is_massaging
if current_task:
task_to_index = {
"lung": 0,
"heart": 1,
"hepatobiliary": 2,
"spleen": 3,
"kidney": 4,
"lung_left": 0,
"heart_left": 1,
"hepatobiliary_left": 2,
"spleen_left": 3,
"kidney_left": 4,
"lung_right": 0,
"heart_right": 1,
"hepatobiliary_right": 2,
"spleen_right": 3,
"kidney_right": 4,
}
massage_status["current_task"] = current_task
# 获取对应的索引
index = task_to_index.get(current_task, None)
print(current_task, index)
if index is not None:
# 发送索引到前端
socketio.emit("highlightPlane", index)
else:
socketio.emit("highlightPlane", None)
elif current_task == "-1":
socketio.emit("highlightPlane", None)
if task_time is not None:
if task_time == "null":
task_time = ""
massage_status["task_time"] = task_time
if current_head:
massage_status["current_head"] = current_head
if body_part:
massage_status["body_part"] = body_part
if press:
massage_status["press"] = press
if frequency:
massage_status["frequency"] = frequency
if massage_service_started:
print(massage_service_started)
if massage_service_started == "False" or massage_service_started == "false":
massage_service_started = False
if massage_service_started == "True" or massage_service_started == "true":
massage_service_started = True
massage_status["massage_service_started"] = massage_service_started
print(massage_status)
# 通过 SocketIO 发送更新后的数据
socketio.emit("update_massage_status", massage_status)
return jsonify({"status": "success"})
except Exception as e:
print(f"Error: {e}")
return jsonify({"status": "error", "message": str(e)})
def is_service_running(service_name):
try:
# 使用 subprocess.run 执行 systemctl is-active 命令
result = subprocess.run(
["/bin/systemctl", "is-active", service_name],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
check=True,
text=True,
)
# 如果返回的输出是 'active',则服务正在运行
return result.stdout.strip() == "active"
except subprocess.CalledProcessError:
# 如果服务未运行,或者其他错误,返回 False
return False
@app.route("/get_status", methods=["GET"])
def get_status():
global display_image_url
try:
command = "get_status"
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
task = asyncio.ensure_future(send_message(command, 8765))
loop.run_until_complete(task)
except Exception as e:
print(f"Error: {e}")
file_url = display_image_url
# 使用 Socket.IO 发送 URL 给前端
socketio.emit("change_image", {"path": file_url})
# try:
# command = 'get_status'
# loop = asyncio.new_event_loop()
# asyncio.set_event_loop(loop)
# task = asyncio.ensure_future(send_message(command,8766))
# loop.run_until_complete(task)
# except Exception as e:
# # print(f"Error: {e}")
if is_service_running("massage.service"):
massage_status["massage_service_started"] = True
else:
massage_status["massage_service_started"] = False
print(massage_status)
return jsonify(massage_status)
######
last_command_time = 0
@socketio.on("send_command")
def send_command(data):
# global last_command_time
# current_time = time.time()
# if current_time - last_command_time < 1:
# # if less than 3 seconds passed since the last command, ignore this command
# return
# last_command_time = current_time
if "begin" in data:
try:
command = "UI_start"
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
task = asyncio.ensure_future(send_message(command, 8766))
loop.run_until_complete(task)
except Exception as e:
print(f"Error: {e}")
try:
command = data
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
task = asyncio.ensure_future(send_message(command, 8765))
loop.run_until_complete(task)
except Exception as e:
print(f"Error: {e}")
# @app.before_request
# def before_request():
# if request.url.startswith('https://'):
# url = request.url.replace('https://', 'http://', 1)
# code = 301 # redirect permanently
# return redirect(url, code=code)
@app.after_request
def after_request(response):
response.headers.add("Access-Control-Allow-Origin", "*")
response.headers.add("Access-Control-Allow-Headers", "Content-Type,Authorization")
response.headers.add("Access-Control-Allow-Methods", "GET,PUT,POST,DELETE")
return response
@app.route("/")
def index():
return render_template("login.html", time=int(time.time()))
@app.route("/home")
def home():
return render_template("home.html", time=int(time.time()))
@app.route("/health")
def health():
global current_mode
if current_mode == "manual_mode":
return render_template(
"select_program.html", time=int(time.time()), programs=programs
)
else:
return render_template("smart_mode.html", time=int(time.time()))
# return render_template('smart_mode_JP.html', time=int(time.time()))
@app.route("/setting")
def setting():
return render_template("setting.html", time=int(time.time()))
@app.route("/learning")
def learning():
return render_template("learning.html", time=int(time.time()))
@app.route("/help")
def help():
return render_template("help.html", time=int(time.time()))
@app.route("/ai_ball")
def ai_ball():
return render_template("ai_ball_v2.html", time=int(time.time()))
@app.route("/model")
def model():
return render_template("3d_model_v6_local.html", time=int(time.time()))
@app.route("/ai_chatbot")
def ai_chatbot():
return render_template("ai_chatbot.html", time=int(time.time()))
@app.route("/dynamic_function")
def dynamic_function():
return render_template("dynamic_function_v2.html", time=int(time.time()))
@app.route("/control_panel")
def control_panel():
return render_template("control_panel_v2.html", time=int(time.time()))
@app.route("/music")
def music():
return render_template("full_music_player.html", time=int(time.time()))
@app.route("/ir_list")
def ir_list():
return render_template("ir_list.html", time=int(time.time()))
@app.route("/joystick")
def joystick():
return render_template("joystick_v2.html", time=int(time.time()))
@app.route("/program-details/<int:program_id>")
def program_details(program_id):
program = next(
(program for program in programs if program["id"] == program_id), None
)
if program:
return jsonify(program)
else:
return jsonify({"error": "program not found"}), 404
@app.route("/switch_mode/<mode>")
def switch_mode(mode):
# session['current_mode'] = mode
global current_mode
current_mode = mode
return redirect(url_for("health"))
@app.route("/get-ip")
def get_ip():
# 获取服务器的IP
host = request.host.split(":")[0]
return jsonify({"ip": host})
@app.route("/login", methods=["POST"])
def login():
data = request.json
username = data["username"]
password = data["password"]
# 简单验证示例
if username == "jsfb" and password == "123456":
return jsonify(success=True)
else:
return jsonify(success=False)
@app.route("/massage_control", methods=["POST"])
def massage_control():
action = request.form.get("action")
print("massage_control action: ", action)
sudo_password = "jsfb"
remote_host = "192.168.100.20" # 远程主机的 IP 地址
remote_username = "root" # 远程主机的用户名
remote_password = "bestcobot" # 远程主机的密码
remote_port = 8822 # 远程主机的 SSH 端口
if action:
if action == "start":
action = "restart"
try:
command = "connect_arm"
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
task = asyncio.ensure_future(send_message(command, 8766))
loop.run_until_complete(task)
except Exception as e:
print(f"Error: {e}")
if action == "stop":
try:
command = "disconnect_arm"
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
task = asyncio.ensure_future(send_message(command, 8766))
loop.run_until_complete(task)
except Exception as e:
print(f"Error: {e}")
# 处理本地关机
if action == "shutdown":
# 1. 先执行远程主机关机命令
remote_command = "sudo /sbin/shutdown now"
stdout, stderr = execute_command_on_remote(
remote_host,
remote_username,
remote_password,
remote_command,
port=remote_port,
)
# if stderr:
# return jsonify({"status": "error", "message": f"Failed to shutdown remote host: {stderr}"}), 500
# 2. 等待远程主机关机并检测
print("Waiting for remote host to shutdown...")
if not is_host_down(remote_host):
return (
jsonify(
{
"status": "error",
"message": "Remote host did not shutdown in time",
}
),
500,
)
# 3. 远程主机已经关机,执行本地关机
command = f'echo {sudo_password} | /bin/sh -c "/usr/bin/sudo -S /sbin/shutdown now"'
try:
result = subprocess.run(
command,
shell=True,
check=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
print("System is shutting down.")
except subprocess.CalledProcessError as e:
return (
jsonify(
{
"status": "error",
"message": f"Error when trying to shutdown: {e.stderr.decode()}",
}
),
500,
)
return jsonify(
{
"status": "success",
"message": "Remote and local systems shutdown successfully.",
}
)
# 非 shutdown 操作
else:
command = f'echo {sudo_password} | /bin/sh -c "/usr/bin/sudo -S /bin/systemctl {action} massage"'
try:
result = subprocess.run(
command,
shell=True,
check=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
print(result.stdout.decode())
print(f"Service {action} successfully.")
except subprocess.CalledProcessError as e:
print(f"Error when {action} service: {e.stderr.decode()}")
return jsonify({"status": "success"})
else:
return jsonify({"status": "error", "message": "Invalid action"}), 400
@app.route("/get_versions", methods=["GET"])
def get_versions():
base_dir = "/home/jsfb/jsfb_ws"
folders = os.listdir(base_dir)
versions = []
# 遍历文件夹列表
for folder in folders:
# 检查是否以 MassageRobot_aubo 开头
if folder.startswith("MassageRobot_aubo-"):
# 提取版本号部分
version = folder.replace("MassageRobot_aubo-", "")
versions.append(version)
return jsonify({"versions": versions})
# 文件服务器的基本信息
FILE_SERVER_URL = "http://8.138.8.114/files/MassageRobot/aubo" # 文件服务器的URL
FILE_SERVER_USERNAME = "jsfb" # 基本身份验证的用户名
FILE_SERVER_PASSWORD = "jsfb" # 基本身份验证的密码
DOWNLOAD_DIR = "/home/jsfb/Downloads" # 下载文件的目录,需根据实际情况设置
DOWNLOAD_SCRIPT_PATH = (
"/home/jsfb/jsfb_ws/OTA_tools/download_and_install_package.sh" # Bash 脚本路径
)
INSTALL_DIR = "/home/jsfb/jsfb_ws/"
@app.route("/get_remote_versions", methods=["GET"])
def get_remote_versions():
try:
# 发送请求,使用 HTTP 基本身份验证
response = requests.get(
FILE_SERVER_URL,
auth=HTTPBasicAuth(FILE_SERVER_USERNAME, FILE_SERVER_PASSWORD),
)
# 检查请求是否成功
if response.status_code != 200:
return jsonify({"error": "Failed to access file server"}), 500
# 获取文件服务器返回的内容HTML格式解析内容
html_content = response.text
# 使用正则表达式提取文件名
pattern = r"MassageRobot_aubo-([\w\.\-]+)\.tar\.gz"
matches = re.findall(pattern, html_content)
# 使用 set 去重
unique_versions = sorted(set(matches))
# 如果匹配到的内容为空,返回错误信息
if not unique_versions:
return jsonify({"error": "No versions found"}), 404
# 返回去重后的版本号列表
return jsonify({"versions": unique_versions})
except Exception as e:
return jsonify({"error": str(e)}), 500
# @app.route("/download_package", methods=["POST"])
# def download_package():
# try:
# # 获取用户提供的版本号
# data = request.get_json()
# version = data.get("version")
# if not version:
# return jsonify({"error": "Version not specified"}), 400
# # 构造文件名
# file_name = f"MassageRobot_aubo-{version}.tar.gz"
# # 构造 Bash 脚本的命令
# command = [
# "bash",
# DOWNLOAD_SCRIPT_PATH,
# "-f",
# file_name,
# "-d",
# DOWNLOAD_DIR,
# "-u",
# FILE_SERVER_USERNAME,
# "-p",
# FILE_SERVER_PASSWORD,
# "-l",
# FILE_SERVER_URL,
# "-i",
# INSTALL_DIR,
# ]
# # 在后台运行脚本,并将输出重定向到文件
# with open("../log/download_install_package.log", "w") as f:
# download_install_process = subprocess.Popen(
# command, stdout=f, stderr=subprocess.STDOUT
# )
# # 返回 "正在下载" 响应
# return jsonify({"message": f"Download of version {version} is in progress..."})
# except Exception as e:
# return jsonify({"error": str(e)}), 500
@app.route("/download_package", methods=["POST"])
def download_package():
try:
# 获取用户提供的版本号
data = request.get_json()
version = data.get("version")
if not version:
return jsonify({"error": "Version not specified"}), 400
# 构造文件名
file_name = f"MassageRobot_aubo-{version}.tar.gz"
# 构造 Bash 脚本的命令
command = [
"bash",
DOWNLOAD_SCRIPT_PATH,
"-f",
file_name,
"-d",
DOWNLOAD_DIR,
"-u",
FILE_SERVER_USERNAME,
"-p",
FILE_SERVER_PASSWORD,
"-l",
FILE_SERVER_URL,
"-i",
INSTALL_DIR,
]
# process = subprocess.Popen(
# command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, bufsize=1
# )
# BASE_DIR = os.path.dirname(os.path.abspath(__file__)) # 获取当前脚本的目录
# download_log_file = os.path.join(BASE_DIR, "log", "../log/Download.log.log") # 构造绝对路径
download_log_file = "../tmp/Download.log"
# **重置日志文件内容**
with open(download_log_file, "w") as log_file:
log_file.write("") # 清空日志文件
# 打开新的终端窗口运行命令
subprocess.Popen(
["gnome-terminal", "--", "bash", "-c", f"{' '.join(command)} > {download_log_file} 2>&1"],
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
bufsize=1,
)
# 后台任务:运行脚本并解析进度
@socketio.start_background_task
def download_and_emit_progress():
last_progress = 0 # 初始化进度
try:
with open(download_log_file, "r") as log_file:
while True:
line = log_file.readline()
if line:
# 打印到后端日志和终端
sys.stdout.write(line)
sys.stdout.flush()
# 检查是否有进度百分比
if "%" in line:
match = re.search(r"(\d+\.?\d*)%", line)
if match:
progress = float(match.group(1))
if progress - last_progress > 0: # 进度有更新
last_progress = progress
socketio.emit("download_progress", {"data": progress})
else:
time.sleep(0.1) # 等待新内容写入
except FileNotFoundError:
socketio.emit("download_progress", {"data": "Log file not found"})
except Exception as e:
socketio.emit("download_progress", {"data": f"Error: {str(e)}"})
# 启动后台任务
download_and_emit_progress()
# 返回初始响应
return jsonify({"message": f"Download of version {version} is in progress..."})
except Exception as e:
return jsonify({"error": str(e)}), 500
# 获取当前版本号
@app.route("/get_current_version", methods=["GET"])
def get_current_version():
# 获取当前的工作目录
# current_path = os.getcwd()
# 获取当前脚本执行的绝对路径
current_path = os.path.abspath(__file__)
# 获取当前脚本所在目录的上一级目录
base_path = os.path.dirname(os.path.dirname(current_path))
# 检查当前路径是否包含 MassageRobot_aubo- 开头的文件夹名
if "MassageRobot_aubo-" in base_path:
# 提取版本号部分并去除后续目录部分
version = base_path.split("MassageRobot_aubo-")[-1].split("/")[0]
else:
# 如果没有匹配的版本号,则返回默认值
version = "default"
print({"current_version": version})
return jsonify({"current_version": version})
# # 进行版本切换并执行对应路径下的setup.sh
# @app.route("/switch_version", methods=["POST"])
# def switch_version():
# base_dir = "/home/jsfb/jsfb_ws"
# data = request.json
# # 获取客户端传递的版本号
# version = data.get("version")
# if not version:
# return jsonify({"error": "Version not specified"}), 400
# # 如果版本是 'default',则目标文件夹不带版本号后缀
# if version == "default":
# target_folder = os.path.join(base_dir, "MassageRobot_aubo")
# else:
# # 构建带版本号的目标文件夹路径
# target_folder = os.path.join(base_dir, f"MassageRobot_aubo-{version}")
# # 检查目标文件夹是否存在
# if not os.path.isdir(target_folder):
# return jsonify({"error": f"Version {version} not found"}), 404
# # 构建setup.sh脚本路径
# setup_script_path = os.path.join(target_folder, "setup.sh")
# # 检查setup.sh是否存在
# if not os.path.isfile(setup_script_path):
# return (
# jsonify({"error": "setup.sh not found in the target version folder"}),
# 404,
# )
# # 执行setup.sh脚本
# try:
# # 使用 subprocess.run 切换到目标目录并执行 setup.sh
# subprocess.run(["bash", "setup.sh"], check=True, cwd=target_folder)
# # 返回完成消息
# return jsonify({"message": f"Version {version} switched successfully"}), 200
# except subprocess.CalledProcessError as e:
# # 返回错误信息
# return jsonify({"error": f"Failed to switch version: {str(e)}"}), 500
@app.route("/switch_version", methods=["POST"])
def switch_version():
base_dir = "/home/jsfb/jsfb_ws"
data = request.json
# 获取客户端传递的版本号
version = data.get("version")
if not version:
return jsonify({"error": "Version not specified"}), 400
# 如果版本是 'default',则目标文件夹不带版本号后缀
if version == "default":
target_folder = os.path.join(base_dir, "MassageRobot_aubo")
else:
# 构建带版本号的目标文件夹路径
target_folder = os.path.join(base_dir, f"MassageRobot_aubo-{version}")
# 检查目标文件夹是否存在
if not os.path.isdir(target_folder):
return jsonify({"error": f"Version {version} not found"}), 404
# 构建setup.sh脚本路径
setup_script_path = os.path.join(target_folder, "setup.sh")
# 检查setup.sh是否存在
if not os.path.isfile(setup_script_path):
return (
jsonify({"error": "setup.sh not found in the target version folder"}), 404
)
# 执行setup.sh脚本
try:
# 使用 gnome-terminal 打开新终端窗口并在目标目录中运行 setup.sh
subprocess.Popen(
["gnome-terminal", "--", "bash", "-c", f"bash {setup_script_path}; exit"],
cwd=target_folder
)
# 返回完成消息
return jsonify({"message": f"Version {version} switched successfully"}), 200
except Exception as e:
# 返回错误信息
return jsonify({"error": f"Failed to switch version: {str(e)}"}), 500
@app.route("/on_message", methods=["POST"])
def on_message():
try:
# 获取 POST 请求中的 form 数据
message = request.form.get(
"message", ""
).strip() # 从表单数据中获取 'message' 字段,并去掉多余的空白字符
if not message:
# 如果 message 为空或只包含空格,返回错误响应,但不触发弹窗
return (
jsonify(
{
"status": "error",
"message": "Message cannot be empty, no popup triggered",
}
),
400,
)
# 使用 Socket.IO 向客户端发送消息
socketio.emit("on_message", {"message": message})
# 返回成功的响应
return jsonify(
{"status": "success", "message": "Popup triggered with message: " + message}
)
except Exception as e:
# 捕获所有异常并返回服务器错误
return (
jsonify({"status": "error", "message": "An error occurred: " + str(e)}),
500,
)
@app.route("/modify_web")
def modify_web():
# 从请求参数中获取目标URL
target_url = request.args.get("url")
if not target_url:
return "No URL provided", 400
# 判断并替换特定的 URL
if target_url == "http://m.layeyese.com/images/icon.png":
target_url = "http://127.0.0.1:5000/static/images/webapp/favicon.ico"
if target_url == "http://m.layeyese.com/images/banner_ysdq.png":
target_url = "http://127.0.0.1:5000/static/images/banner_ysdq.png"
# 处理相对路径的问题
base_url = urlparse(target_url).scheme + "://" + urlparse(target_url).netloc
try:
# 请求目标URL的内容
response = requests.get(target_url)
response.raise_for_status() # 检查请求是否成功
except requests.RequestException as e:
return f"Error fetching the page: {str(e)}", 500
# 获取内容类型
content_type = response.headers.get("Content-Type", "")
# 处理 HTML 内容
if "text/html" in content_type:
# 解析网页内容
soup = BeautifulSoup(response.content, "html.parser")
# 移除所有零宽字符 U+FEFF
for element in soup.find_all(text=True):
if "\uFEFF" in element:
cleaned_text = element.replace("\uFEFF", "") # 移除 U+FEFF 字符
element.replace_with(NavigableString(cleaned_text))
# 注入自定义CSS和JavaScript
style_injection = """
<style>
::-webkit-scrollbar {
width: 0px; /* 隐藏水平滚动条 */
height: 0px; /* 隐藏垂直滚动条 */
}
* {
font-family: '' !important;
}
body {
overflow: -moz-scrollbars-none; /* 针对旧版Firefox */
-ms-overflow-style: none; /* 针对IE和Edge */
}
.top {
background-color: rgb(212, 96, 241) !important;
}
.logo {
display: none !important;
}
.nwtop {
border-top-left-radius: 10px !important;
border-top-right-radius: 10px !important;
background: linear-gradient(to right bottom, rgb(212, 96, 241), rgba(145, 66, 197, 0.5)) !important;
}
.shouyedhl {
background: linear-gradient(to bottom, rgb(212, 96, 241), rgba(145, 66, 197, 0.5)) !important;
background-color: '' !important;
border-bottom-left-radius: 10px !important;
border-bottom-right-radius: 10px !important;
}
.shouyedhl-center li a {
max-width: 5rem !important;
font-size: 0.8rem
}
.shouyenr1-2 {
background: rgba(255, 255, 255, 0.85);
backdrop-filter: blur(5px); /* 背景模糊效果 */
box-shadow: 0px 0px 24px rgba(255, 255, 255, 0.65); /* 与其他元素一致的阴影效果 */
}
.map2-1 em {
background: linear-gradient(to right bottom, rgb(212, 96, 241), rgba(145, 66, 197, 0.5)) !important;
}
.map2-2 .new_add_ietm li {
background: linear-gradient(to right bottom, rgb(212, 96, 241), rgba(145, 66, 197, 0.5)) !important;
}
.dibu {
display: none !important;
}
.lbym {
background-color: '' !important;
}
</style>
<script>
document.addEventListener('DOMContentLoaded', function () {
// 取消特定链接的跳转行为
const targetLink = document.querySelector('a[href="/modify_web?url=http://m.hongyunzyg.com/form1.html"]');
if (targetLink) {
targetLink.addEventListener('click', function(event) {
event.preventDefault(); // 阻止默认跳转
alert('暂未开放'); // 可选:提示用户跳转已被取消
});
}
});
</script>
"""
# 在<head>中注入自定义CSS和JS
if soup.head:
soup.head.append(BeautifulSoup(style_injection, "html.parser"))
else:
# 如果没有<head>标签创建一个并注入CSS和JS
head = soup.new_tag("head")
head.append(BeautifulSoup(style_injection, "html.parser"))
soup.insert(0, head)
# 修改内部的链接为代理链接
for tag in soup.find_all(["a", "link", "script", "img", "iframe"]):
attr = "href" if tag.name in ["a", "link"] else "src"
if tag.has_attr(attr):
original_url = tag[attr]
# 将相对路径转为绝对路径
absolute_url = urljoin(base_url, original_url)
# 修改链接为代理路径
tag[attr] = url_for("modify_web") + "?url=" + absolute_url
# 返回修改后的 HTML 内容
modified_html = str(soup)
return render_template_string(modified_html)
# 处理静态资源如图片、CSS、JS等
elif any(x in content_type for x in ["image", "css", "javascript", "font"]):
return Response(response.content, content_type=content_type)
# 处理其他类型的数据
else:
return Response(response.content, content_type=content_type)
# 定义 static 文件夹的路径
STATIC_FOLDER = os.path.join(app.root_path, "static")
display_image_url = "static/images/smart_mode/back.png"
# 定义接收 POST 请求的路由
@app.route("/change_image", methods=["POST"])
def change_image():
global display_image_url
# 从请求中获取图片路径
image_path = request.form.get("path")
# 检查路径是否存在
if not image_path or not os.path.exists(image_path):
return jsonify({"error": "Invalid image path"}), 400
# 获取文件名
filename = os.path.basename(image_path)
# 定义目标路径,将文件复制到 static 目录下
target_path = os.path.join(STATIC_FOLDER + "/images", filename)
try:
# 复制文件到 static 目录
shutil.copy(image_path, target_path)
except Exception as e:
return jsonify({"error": f"Failed to copy file: {str(e)}"}), 500
# 生成文件的 URL
file_url = "static/images/" + filename
# file_url = url_for('static', filename='images/' + filename, _external=True)
display_image_url = file_url
# 使用 Socket.IO 发送 URL 给前端
socketio.emit("change_image", {"path": file_url})
# 返回图片的 URL
return jsonify({"url": file_url}), 200
# 服务器相关信息
SERVER_IP = "8.138.8.114" # 替换为您的服务器 IP
SERVER_USER = "root" # 替换为 root 用户
SERVER_PASSWORD = "JuShenFengBao209" # 替换为 root 用户密码
REMOTE_PATH = "/var/www/html/devices"
reverse_ssh = ReverseSSH(SERVER_IP, SERVER_USER, SERVER_PASSWORD)
# 开启反向隧道
@app.route("/start_tunnel", methods=["GET"])
def start_tunnel():
result = reverse_ssh.start_tunnel()
return jsonify(result)
# 关闭反向隧道
@app.route("/stop_tunnel", methods=["GET"])
def stop_tunnel():
result = reverse_ssh.stop_tunnel()
return jsonify(result)
# 定义一个路由,根据请求参数执行不同的 shell 脚本
@app.route("/run-script", methods=["POST"])
def run_script():
print("run-script")
try:
# 从前端获取 JSON 格式的请求体
data = request.json
script_type = data.get("script_type")
# base_dir = "/home/jsfb/jsfb_ws"
# 获取当前脚本执行的绝对路径
# base_path = os.path.dirname(os.path.abspath(__file__))
# base_path = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
# 获取当前脚本执行的绝对路径
current_path = os.path.abspath(__file__)
# 获取当前脚本所在目录的上一级目录
base_path = os.path.dirname(os.path.dirname(current_path))
print("动态目录:", base_path)
# base_path = os.path.join(base_dir, "MassageRobot_aubo")
# print(f"Base path: {base_path}")
# script_path = os.path.join(base_path, f"../my_script.sh")
# 根据请求参数执行不同的 shell 脚本
if script_type == "zh":
script_path = os.path.join(base_path, "setup.sh")
# 确保路径是规范化的(处理相对路径)
script_path = os.path.normpath(script_path)
# 检查脚本路径是否存在
if not os.path.exists(script_path):
print(f"Script not found: {script_path}, 404")
return jsonify({"error": f"Script not found: {script_path}"}), 404
# 执行 shell 脚本
# subprocess.Popen(
# ["gnome-terminal", "--", "bash", "-c", f"cd {base_path} && bash setup.sh; exec bash"],
# cwd=base_path
# )
subprocess.Popen(
["gnome-terminal", "--", "bash", "-c", f"cd {base_path} && bash setup.sh; exit"],
cwd=base_path
)
elif script_type == "jp":
script_path = os.path.join(base_path, "setup_JP.sh")
# 确保路径是规范化的(处理相对路径)
script_path = os.path.normpath(script_path)
# 检查脚本路径是否存在
if not os.path.exists(script_path):
print(f"Script not found: {script_path}, 404")
return jsonify({"error": f"Script not found: {script_path}"}), 404
# 执行 shell 脚本
# subprocess.Popen(
# ["gnome-terminal", "--", "bash", "-c", f"cd {base_path} && bash setup_JP.sh; exec bash"],
# cwd=base_path
# )
subprocess.Popen(
["gnome-terminal", "--", "bash", "-c", f"cd {base_path} && bash setup_JP.sh; exit"],
cwd=base_path
)
elif script_type == "en":
script_path = os.path.join(base_path, "setup_EN.sh")
# 确保路径是规范化的(处理相对路径)
script_path = os.path.normpath(script_path)
# 检查脚本路径是否存在
if not os.path.exists(script_path):
print(f"Script not found: {script_path}, 404")
return jsonify({"error": f"Script not found: {script_path}"}), 404
# 执行 shell 脚本
# subprocess.Popen(
# ["gnome-terminal", "--", "bash", "-c", f"cd {base_path} && bash setup_EN.sh; exec bash"],
# cwd=base_path
# )
subprocess.Popen(
["gnome-terminal", "--", "bash", "-c", f"cd {base_path} && bash setup_EN.sh; exit"],
cwd=base_path
)
elif script_type == "ko":
script_path = os.path.join(base_path, "setup_KO.sh")
# 确保路径是规范化的(处理相对路径)
script_path = os.path.normpath(script_path)
# 检查脚本路径是否存在
if not os.path.exists(script_path):
print(f"Script not found: {script_path}, 404")
return jsonify({"error": f"Script not found: {script_path}"}), 404
# 执行 shell 脚本
# subprocess.Popen(
# ["gnome-terminal", "--", "bash", "-c", f"cd {base_path} && bash setup_KO.sh; exec bash"],
# cwd=base_path
# )
subprocess.Popen(
["gnome-terminal", "--", "bash", "-c", f"cd {base_path} && bash setup_KO.sh; exit"],
cwd=base_path
)
else:
print("Invalid script type provided, 400")
return jsonify({"error": "Invalid script type provided"}), 400
# 返回脚本执行结果
return jsonify(
{
"message": "Script executed successfully",
}
)
except Exception as e:
print(f"Error running script: {e}, 500")
return jsonify({"error": str(e)}), 500
@app.route('/set_zero', methods=['POST'])
def set_zero():
try:
# 检查服务是否运行
if not is_service_running("massage.service"):
sensor = XjcSensor()
max_try = 3 # 设置最大重试次数
delay = 0.5 # 每次重试前的延迟时间
# 尝试调用 set_zero 方法
for attempt in range(max_try):
sensor.disable_active_transmission()
time.sleep(0.5)
result = sensor.set_zero()
if result == 0:
# 设置成功,返回成功信息
return jsonify({"message": "Set zero success"}), 200
else:
# 设置失败,等待并重试
print(f"Set zero attempt {attempt + 1} failed, retrying...")
time.sleep(delay)
# 如果多次尝试后失败,返回错误信息
print("Set zero failed after multiple attempts.")
requests.post("http://127.0.0.1:5000/on_message", data={"message": "传感器初始化失败"})
return jsonify({"message": "Set zero failed after multiple attempts"}), 500
else:
return jsonify({"message": "Service is already running, no need to set zero"}), 200
except Exception as e:
# 捕获异常并返回错误信息
print(f"Error in /set_zero: {e}")
return jsonify({"message": "Set zero failed", "error": str(e)}), 500
@app.route("/reset-language", methods=["POST"])
def reset_language():
print("reset-language")
try:
# 获取当前脚本执行的绝对路径
current_path = os.path.abspath(__file__)
# 获取当前脚本所在目录的上一级目录
base_path = os.path.dirname(os.path.dirname(current_path))
print("动态目录:", base_path)
# 构造 restart_language.sh 脚本路径
script_path = os.path.join(base_path, "restart_language.sh")
# 确保路径是规范化的(处理相对路径)
script_path = os.path.normpath(script_path)
# 检查脚本路径是否存在
if not os.path.exists(script_path):
print(f"Script not found: {script_path}, 404")
return jsonify({"error": f"Script not found: {script_path}"}), 404
# 执行 restart_language.sh 脚本
subprocess.Popen(
["gnome-terminal", "--", "bash", "-c", f"cd {base_path} && bash restart_language.sh; exit"],
cwd=base_path
)
# 返回成功响应
return jsonify({
"message": "Restart language script executed successfully in new terminal"
})
except Exception as e:
print(f"Error running script: {e}, 500")
return jsonify({"error": str(e)}), 500
# 写入文件并上传到服务器
def write_and_upload_info(ngrok_url, reverse_ssh_port):
hostname = socket.gethostname()
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# 生成用于连接的反向 SSH 命令
reverse_ssh_command = f"ssh jsfb@{SERVER_IP} -p {reverse_ssh_port}"
# 文件内容包括主机名、ngrok URL、在线时间和 SSH 命令
file_content = (
f"Hostname: {hostname}\n"
f"Ngrok URL: {ngrok_url}\n"
f"Online Time: {timestamp}\n"
f"SSH Command: {reverse_ssh_command}"
)
file_path = f"../tmp/{hostname}.txt"
# 写入文件
with open(file_path, "w") as file:
file.write(file_content)
# 使用 paramiko 通过 SFTP 上传文件
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
try:
ssh.connect(SERVER_IP, username=SERVER_USER, password=SERVER_PASSWORD)
sftp = ssh.open_sftp()
sftp.put(file_path, f"{REMOTE_PATH}/{hostname}.txt")
print(f"File {file_path} uploaded to {REMOTE_PATH}/{hostname}.txt on server.")
finally:
sftp.close()
ssh.close()
# 启动 ngrok 隧道并写入/上传文件
def start_ngrok(port, max_retries=3, retry_interval=2):
global ngrok_process
for attempt in range(max_retries):
print(f"Attempting to start ngrok (Attempt {attempt + 1}/{max_retries})")
ngrok_process = subprocess.Popen(
["/usr/local/bin/ngrok", "http", str(port)],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
time.sleep(retry_interval) # 等待 ngrok 启动
# 检查 ngrok 是否启动成功
try:
ngrok_url = get_ngrok_url()
print(f"Ngrok started successfully with URL: {ngrok_url}")
# write_and_upload_info(ngrok_url) # 写入和上传信息文件
return ngrok_url
except Exception as e:
print(f"Failed to get ngrok URL: {e}")
ngrok_process.terminate()
time.sleep(retry_interval)
print("Failed to start ngrok after multiple attempts")
return "Error"
# 获取 ngrok 公共 URL
def get_ngrok_url():
try:
response = requests.get(
"http://localhost:4040/api/tunnels", timeout=5
) # ngrok 本地 API
response.raise_for_status() # 检查是否请求成功
data = response.json()
return data["tunnels"][0]["public_url"]
except requests.RequestException as e:
raise RuntimeError(f"Error fetching ngrok URL: {e}")
# Socket.IO 事件处理函数,当客户端连接时启动定时任务
@socketio.on("connect", namespace="/")
def connect():
socketio.start_background_task(music_background_thread)
zeroconf = Zeroconf()
# 变量用于存储是否已经注册了服务
service_registered = False
def get_all_local_ips():
"""获取本机的所有 IPv4 地址,不进行任何筛选"""
interfaces = netifaces.interfaces() # 获取所有网络接口
print("可用网络接口:", interfaces)
local_ips = []
for interface in interfaces:
addrs = netifaces.ifaddresses(interface) # 获取接口的地址信息
print(f"接口 {interface} 的地址信息: {addrs}")
# 检查 IPv4 地址 (AF_INET)
if netifaces.AF_INET in addrs:
for addr_info in addrs[netifaces.AF_INET]:
print(f"接口 {interface} 的 IPv4 地址信息: {addr_info}")
ip_addr = addr_info.get("addr")
# 直接收集所有的 IPv4 地址
if ip_addr:
local_ips.append(ip_addr)
if DEBUG_MODE:
return ["127.0.0.1"] # 调试用
# 如果没有找到 IPv4 地址,返回一个回环地址列表
return local_ips if local_ips else ["127.0.0.1"]
def register_service(local_ips):
print(local_ips)
global service_registered
if service_registered:
return
# # 确保服务名称唯一,使用 uuid 生成一个唯一标识符
# unique_id = uuid.uuid4()
hostname = socket.gethostname()
# print(hostname)
for ip in local_ips:
if ip.startswith("127."):
continue
if ip.startswith("192.168.100"):
continue
# 将 IP 地址转换为字节格式
service_info = ServiceInfo(
"_http._tcp.local.", # 服务类型
f"LL-X1-{hostname}._http._tcp.local.", # 确保服务名称唯一
addresses=[socket.inet_aton(ip)], # 使用局域网 IP 地址
port=5000, # Flask 监听的端口
properties={}, # 可选的额外服务信息
server=f"{hostname}.local.", # 主机名
)
zeroconf.register_service(service_info, allow_name_change=True)
print(f"mDNS 服务已注册IP 地址:{ip}")
service_registered = True
# 在应用退出时注销 zeroconf 服务并关闭 ngrok
def cleanup():
global ngrok_process
print("注销 mDNS 服务并关闭 ngrok...")
reverse_ssh.stop_tunnel()
if ngrok_process:
ngrok_process.terminate()
zeroconf.close() # 注销 zeroconf 服务
def fetch_music_list_with_retry():
"""后台定时任务:不断尝试获取音乐列表,直到成功"""
global all_playlists, playlist, current_song
while True:
try:
# 清空现有的播放列表
all_playlists.clear()
# 遍历分类获取音乐数据
for categoryID in categoryIDs:
diss_info, music_info_list = get_song_list(categoryID)
all_playlists.append(
{"diss_info": diss_info, "music_info_list": music_info_list}
)
print(diss_info)
# 如果数据获取成功,更新播放列表
if all_playlists:
playlist = generate_playlist(all_playlists[0]["music_info_list"])
current_song.update(
{
"album_img_url": playlist[0]["album_img_url"],
"music_name": playlist[0]["music_name"],
"singer_name": playlist[0]["singer_name"],
"is_playing": False,
}
)
print("音乐列表更新成功!")
break # 成功后退出循环
except Exception as e:
print(f"获取音乐列表失败,重试中... 错误信息: {e}")
print(f"Failed to fetch playlist for category {categoryID}: {e}")
time.sleep(5) # 等待5秒后重试
def initialize_app(port):
"""运行 Flask-SocketIO 之前的初始化逻辑。"""
global playlist, current_song
local_ips = get_all_local_ips()
register_service(local_ips)
# 获取最新音乐数据
global latest_music
try:
latest_music = fetch_latest_music()
except Exception as e:
print(f"Failed to fetch latest music data: {e}")
try:
# 尝试初始化音乐列表
fetch_music_list_with_retry()
except Exception as e:
print(f"初始化音乐列表失败:{e}")
# 启动一个后台任务,不断尝试获取音乐数据
socketio.start_background_task(fetch_music_list_with_retry)
try:
# 启动 ngrok 隧道并获取 URL
ngrok_url = start_ngrok(port)
print("Ngrok URL:", ngrok_url)
result = reverse_ssh.start_tunnel()
print(result)
reverse_ssh_port = result["port"]
write_and_upload_info(ngrok_url,reverse_ssh_port) # 写入和上传信息文件
except RuntimeError as e:
print("Error:", e)
# 启动后台任务
socketio.start_background_task(send_status_messages)
initialize_app(5000)
def check_network():
try:
# 测试连接到公共 DNS 服务器
socket.create_connection(("8.8.8.8", 53), timeout=5)
return True
except OSError:
return False
if __name__ == "__main__":
# 使用 argparse 解析命令行参数
parser = argparse.ArgumentParser(description="Run Flask-SocketIO server.")
parser.add_argument("--debug", action="store_true", help="Enable debug mode")
args, unknown = parser.parse_known_args()
# 设置调试模式
DEBUG_MODE = args.debug
while True:
try:
# 尝试启动 Flask-SocketIO 应用
socketio.run(
app, debug=DEBUG_MODE, host="0.0.0.0", port=5000, allow_unsafe_werkzeug=True
)
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
print("Restarting server in 5 seconds...")
time.sleep(5) # 等待5秒后重启
# MAX_WAIT_TIME = 300 # 最多等待 5 分钟
# elapsed_time = 0
# while not check_network() and elapsed_time < MAX_WAIT_TIME:
# print(f"Network unavailable. Retrying in 5 seconds... ({elapsed_time}/{MAX_WAIT_TIME}s)")
# time.sleep(5)
# elapsed_time += 5
# if elapsed_time >= MAX_WAIT_TIME:
# print("Network initialization failed. Proceeding without network...")
# # 启动 Flask-SocketIO 服务
# socketio.run(app, host="0.0.0.0", port=5000)
# # 运行 Flask-SocketIO 应用
# socketio.run(
# app, debug=DEBUG_MODE, host="0.0.0.0", port=5000, allow_unsafe_werkzeug=True
# )
# socketio.run(app, debug=DEBUG_MODE, host="0.0.0.0", port=5000) #如果使用gunicorn启动