MassageRobot_Dobot/UI_next/tools/version_control.py
2025-05-27 15:46:31 +08:00

319 lines
11 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import re
import subprocess
import threading
import time
from flask import jsonify
from flask_socketio import SocketIO
# 全局变量
BUCKET_NAME = "robotstorm-files" # OBS桶的名称
DOWNLOAD_DIR = "/home/jsfb/jsfb_ws/downloads/" # 下载文件的目录
INSTALL_DIR = "/home/jsfb/jsfb_ws/"
INSTALL_SCRIPT_PATH = "/home/jsfb/jsfb_ws/OTA_tools/install_package.sh"
def get_local_versions(base_dir="/home/jsfb/jsfb_ws"):
"""获取本地安装的版本列表"""
folders = os.listdir(base_dir)
versions = []
for folder in folders:
if folder.startswith("MassageRobot_aubo-"):
version = folder.replace("MassageRobot_aubo-", "")
versions.append(version)
return jsonify({"versions": versions})
def get_remote_versions(obs):
"""获取远程可用的版本列表"""
try:
prefix = "MassageRobot_aubo/MassageRobot_aubo-"
packages = obs.get_packages(BUCKET_NAME, prefix)
if not packages:
return jsonify({"error": "List Objects Failed"}), 404
pattern = r"MassageRobot_aubo-([\w\.\-]+)\.tar\.gz"
matches = [
re.search(pattern, package['key']).group(1)
for package in packages
if re.search(pattern, package['key'])
]
unique_versions = sorted(set(matches))
if not unique_versions:
return jsonify({"error": "No versions found"}), 404
return jsonify({"versions": unique_versions})
except Exception as e:
return jsonify({"error": str(e)}), 500
def download_package_callback(socketio, transferredAmount, totalAmount, totalSeconds):
"""下载进度回调"""
try:
avg_speed_kb = transferredAmount / totalSeconds / 1024
progress_percent = transferredAmount * 100.0 / totalAmount
socketio.emit("download_progress", {
"progress": f"{progress_percent:.2f}",
"speed": f"{avg_speed_kb:.2f}"
})
except ZeroDivisionError:
print("Calculating speed... insufficient data (totalSeconds is 0).")
def background_download(version, obs, socketio):
"""后台运行下载任务"""
try:
object_key = f"MassageRobot_aubo/MassageRobot_aubo-{version}.tar.gz"
obs.download(
BUCKET_NAME,
object_key,
DOWNLOAD_DIR,
callback=lambda x, y, z: download_package_callback(socketio, x, y, z)
)
print(f"Download of version {version} completed.")
package_path = os.path.join(DOWNLOAD_DIR, f"MassageRobot_aubo-{version}.tar.gz")
bash_command = f"{INSTALL_SCRIPT_PATH} -p {package_path} -d {INSTALL_DIR}"
print(f"Running installation script: {bash_command}")
subprocess.Popen([
"/usr/bin/gnome-terminal", "--", "bash", "-c", f"bash {bash_command}; exit"
], cwd=os.path.dirname(INSTALL_SCRIPT_PATH))
print("Installation script started in a new terminal.")
time.sleep(5)
except Exception as e:
print(f"Error during download of version {version}: {str(e)}")
def start_download_package(version, obs, socketio):
"""启动下载包的处理"""
try:
if not version:
return jsonify({"error": "Version not specified"}), 400
download_thread = threading.Thread(
target=background_download,
args=(version, obs, socketio)
)
download_thread.start()
return jsonify({"message": f"Download of version {version} is in progress..."})
except Exception as e:
return jsonify({"error": str(e)}), 500
def get_current_version(script_path):
"""获取当前版本号"""
base_path = os.path.dirname(os.path.dirname(script_path))
if "MassageRobot_aubo-" in base_path:
version = base_path.split("MassageRobot_aubo-")[-1].split("/")[0]
else:
version = "default"
return jsonify({"current_version": version})
def switch_version(version, base_dir="/home/jsfb/jsfb_ws"):
"""切换到指定版本"""
if not version:
return jsonify({"error": "Version not specified"}), 400
if version == "default":
target_folder = os.path.join(base_dir, "MassageRobot_aubo")
else:
target_folder = os.path.join(base_dir, f"MassageRobot_aubo-{version}")
if not os.path.isdir(target_folder):
return jsonify({"error": f"Version {version} not found"}), 404
setup_script_path = os.path.join(target_folder, "setup.sh")
if not os.path.isfile(setup_script_path):
return jsonify({"error": "setup.sh not found in the target version folder"}), 404
try:
subprocess.Popen(
["gnome-terminal", "--", "bash", "-c", f"bash {setup_script_path}; exit"],
cwd=target_folder
)
return jsonify({"message": f"Version {version} switched successfully"}), 200
except Exception as e:
return jsonify({"error": f"Failed to switch version: {str(e)}"}), 500
####VortXDB
def get_vtx_local_versions(base_dir="/home/jsfb/jsfb_ws"):
"""获取本地安装的版本列表"""
folders = os.listdir(base_dir)
versions = []
for folder in folders:
if folder.startswith("VortXDB-"):
version = folder.replace("VortXDB-", "")
versions.append(version)
return jsonify({"versions": versions})
def get_vtx_remote_versions(obs):
"""获取远程可用的版本列表"""
try:
prefix = "VortXDB/VortXDB-"
packages = obs.get_packages(BUCKET_NAME, prefix)
if not packages:
return jsonify({"error": "List Objects Failed"}), 404
pattern = r"VortXDB-([\w\.\-]+)\.tar\.gz"
matches = [
re.search(pattern, package['key']).group(1)
for package in packages
if re.search(pattern, package['key'])
]
unique_versions = sorted(set(matches))
if not unique_versions:
return jsonify({"error": "No versions found"}), 404
return jsonify({"versions": unique_versions})
except Exception as e:
return jsonify({"error": str(e)}), 500
def download_vtx_package_callback(socketio, transferredAmount, totalAmount, totalSeconds):
"""下载进度回调"""
try:
avg_speed_kb = transferredAmount / totalSeconds / 1024
progress_percent = transferredAmount * 100.0 / totalAmount
socketio.emit("download_vtx_progress", {
"progress": f"{progress_percent:.2f}",
"speed": f"{avg_speed_kb:.2f}"
})
except ZeroDivisionError:
print("Calculating speed... insufficient data (totalSeconds is 0).")
def vtx_background_download(version, obs, socketio):
"""后台运行下载任务"""
try:
object_key = f"VortXDB/VortXDB-{version}.tar.gz"
obs.download(
BUCKET_NAME,
object_key,
DOWNLOAD_DIR,
callback=lambda x, y, z: download_package_callback(socketio, x, y, z)
)
print(f"Download of version {version} completed.")
package_path = os.path.join(DOWNLOAD_DIR, f"VortXDB-{version}.tar.gz")
bash_command = f"{INSTALL_SCRIPT_PATH} -p {package_path} -d {INSTALL_DIR}"
print(f"Running installation script: {bash_command}")
subprocess.Popen([
"/usr/bin/gnome-terminal", "--", "bash", "-c", f"bash {bash_command}; exit"
], cwd=os.path.dirname(INSTALL_SCRIPT_PATH))
print("Installation script started in a new terminal.")
time.sleep(5)
except Exception as e:
print(f"Error during download of version {version}: {str(e)}")
def start_vtx_download_package(version, obs, socketio):
"""启动下载包的处理"""
try:
if not version:
return jsonify({"error": "Version not specified"}), 400
download_thread = threading.Thread(
target=vtx_background_download,
args=(version, obs, socketio)
)
download_thread.start()
return jsonify({"message": f"Download of version {version} is in progress..."})
except Exception as e:
return jsonify({"error": str(e)}), 500
def get_vtx_current_version(vtxdb):
"""获取当前版本号"""
version = vtxdb.get_version()
return jsonify({"current_version": version})
def switch_vtx_version(version, base_dir="/home/jsfb/jsfb_ws"):
"""切换到指定版本"""
if not version:
return jsonify({"error": "Version not specified"}), 400
if version == "default":
target_folder = os.path.join(base_dir, "VortXDB")
else:
target_folder = os.path.join(base_dir, f"VortXDB-{version}")
if not os.path.isdir(target_folder):
return jsonify({"error": f"Version {version} not found"}), 404
setup_script_path = os.path.join(target_folder, "setup.sh")
if not os.path.isfile(setup_script_path):
return jsonify({"error": "setup.sh not found in the target version folder"}), 404
try:
subprocess.Popen(
["gnome-terminal", "--", "bash", "-c", f"bash {setup_script_path}; exit"],
cwd=target_folder
)
return jsonify({"message": f"Version {version} switched successfully"}), 200
except Exception as e:
return jsonify({"error": f"Failed to switch version: {str(e)}"}), 500
def sync_user_data(source_version, current_version=None, base_dir="/home/jsfb/jsfb_ws"):
"""从指定版本同步用户数据到当前版本"""
if not source_version:
return jsonify({"error": "源版本未指定"}), 400
# 获取当前版本路径
if current_version:
if current_version == "default":
current_folder = os.path.join(base_dir, "VortXDB")
else:
current_folder = os.path.join(base_dir, f"VortXDB-{current_version}")
else:
# 如果未指定当前版本,则使用应用程序所在目录
current_folder = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
# 构建源版本路径
if source_version == "default":
source_folder = os.path.join(base_dir, "VortXDB")
else:
source_folder = os.path.join(base_dir, f"VortXDB-{source_version}")
# 检查源版本是否存在
if not os.path.isdir(source_folder):
return jsonify({"error": f"源版本 {source_version} 未找到"}), 404
# 检查当前版本是否存在
if not os.path.isdir(current_folder):
return jsonify({"error": f"当前版本路径 {current_folder} 未找到"}), 404
# 构建当前版本的setup.sh路径
setup_script_path = os.path.join(current_folder, "setup.sh")
# 检查当前版本的setup.sh是否存在
if not os.path.isfile(setup_script_path):
return jsonify({"error": "当前版本中未找到setup.sh"}), 404
try:
# 运行当前版本的setup.sh并传入源版本路径作为参数
sync_command = f"bash {setup_script_path} -c {source_folder}"
subprocess.Popen(
["gnome-terminal", "--", "bash", "-c", f"{sync_command}; echo '同步完成,正在退出'; exit"],
cwd=current_folder
)
return jsonify({"message": f"已开始从版本 {source_version} 同步用户数据到版本 {current_version or '当前'}"}), 200
except Exception as e:
return jsonify({"error": f"同步用户数据失败: {str(e)}"}), 500