2025-05-27 15:46:31 +08:00

404 lines
15 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import subprocess
import time
class WifiManager:
def __init__(self, interface=None):
"""
初始化 WifiManager 对象
:param interface: Wi-Fi 网络接口名, 如果为None则自动检测
"""
self.interface = interface or self._detect_wifi_interface()
# 确保Wi-Fi已打开
self._ensure_wifi_enabled()
def _detect_wifi_interface(self):
"""
自动检测Wi-Fi接口名称
:return: 检测到的Wi-Fi接口名称
"""
result = self._run_command(['nmcli', 'device', 'status'])
for line in result.splitlines():
if 'wifi' in line.lower():
return line.split()[0]
return 'wlan0' # 默认返回值
def _run_command(self, command):
"""
执行 shell 命令并返回输出
:param command: 要执行的命令列表
:return: 命令的标准输出内容
"""
result = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
if result.returncode == 0:
return result.stdout.decode('utf-8')
else:
return result.stderr.decode('utf-8')
def scan_wifi(self):
"""
扫描附近的 Wi-Fi 网络
:return: 可用的 Wi-Fi 网络列表格式为嵌套的字典包含SSID、Signal、Security和InUse信息
"""
print("\n开始扫描Wi-Fi网络...")
# 先执行扫描
self._run_command(['nmcli', 'device', 'wifi', 'rescan'])
time.sleep(1) # 等待扫描完成
# 执行 nmcli 命令获取 Wi-Fi 列表
print("获取可用网络列表...")
output = self._run_command(['nmcli', 'device', 'wifi', 'list'])
# 解析 Wi-Fi 网络信息
wifi_list = []
seen_ssids = set() # 仅用于检查SSID是否重复
lines = output.splitlines()
# 跳过第一行标题
for line in lines[1:]:
if not line.strip():
continue
# 检查是否在使用中
in_use = '*' in line
# 分割行,保留空格
parts = line.split()
if len(parts) < 8: # 确保有足够的字段
continue
# 解析SSID处理包含空格的情况
ssid_start_idx = 1 if not in_use else 2 # 如果是当前连接的网络,跳过一个额外的
# 找到MODE字段的位置通常是"Infra"
mode_idx = -1
for i, part in enumerate(parts[ssid_start_idx:], ssid_start_idx):
if part == "Infra":
mode_idx = i
break
if mode_idx == -1:
continue
# 提取SSID不包括MODE
ssid = ' '.join(parts[ssid_start_idx:mode_idx])
# 跳过隐藏的SSID和重复的SSID
if ssid == '--' or ssid in seen_ssids:
continue
# 从末开始解析固定字段
try:
signal = int(parts[-3]) # SIGNAL 通常是倒数第三个字段
security = parts[-1] # SECURITY 是最后一个字段
except (ValueError, IndexError):
continue
# 添加到列表并记录SSID
wifi_info = {
'SSID': ssid,
'Signal': signal,
'Security': security,
'InUse': in_use
}
wifi_list.append(wifi_info)
seen_ssids.add(ssid)
print(f"扫描完成,找到 {len(wifi_list)} 个不重复的网络")
return sorted(wifi_list, key=lambda x: x['Signal'], reverse=True)
def _run_command_with_timeout(self, command, timeout=10):
"""
执行带超时的shell命令
:param command: 要执行的命令列表
:param timeout: 超时时(秒)
:return: (output, error_code) 输出内容和错误码
"""
try:
process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
try:
stdout, stderr = process.communicate(timeout=timeout)
return stdout.decode('utf-8'), process.returncode
except subprocess.TimeoutExpired:
process.kill()
return "", -1 # 超时返回空输出和-1错误码
except Exception as e:
return str(e), -2 # 其他错误返回错误信息和-2错误码
def _run_command_in_terminal(self, command):
"""
在终端中执行命令并获取输出
:param command: 要执行的命令列表
:return: (output, error_code)
"""
# 创建临时脚本文件来存储命令和捕获输出
import tempfile
import os
with tempfile.NamedTemporaryFile(mode='w', suffix='.sh', delete=False) as script:
# 写入命令到脚本
script.write("#!/bin/bash\n")
# 添加超时控制5秒后强制关闭终端
script.write("(sleep 5; kill -9 $$) & \n")
script.write(f"{' '.join(command)} > ../tmp/wifi_output 2>&1\n")
script.write(f"echo $? > ../tmp/wifi_status\n")
script_path = script.name
# 设置脚本权限
os.chmod(script_path, 0o755)
# 在gnome-terminal中执行脚本添加--wait参数等待终端关闭
terminal_cmd = ['gnome-terminal', '--wait', '--', '/bin/bash', script_path]
try:
subprocess.run(terminal_cmd, timeout=10)
# 等待输出文件生成
time.sleep(3)
# 读取输出和状态
try:
with open('../tmp/wifi_output', 'r') as f:
output = f.read()
with open('../tmp/wifi_status', 'r') as f:
status = int(f.read().strip())
# 清理临时文件
os.unlink(script_path)
os.unlink('../tmp/wifi_output')
os.unlink('../tmp/wifi_status')
return output, status
except (FileNotFoundError, ValueError):
return "Failed to get command output", -1
except subprocess.TimeoutExpired:
return "Command timeout", -1
finally:
# 确保清理临时脚本
if os.path.exists(script_path):
os.unlink(script_path)
def connect_wifi(self, ssid, password):
"""
连接到指定的 Wi-Fi 网络
:param ssid: Wi-Fi 网络名称 (SSID)
:param password: Wi-Fi 密码
:return: 连接状态信息字典
"""
print(f"\n准备连接到Wi-Fi网络 '{ssid}'...")
# # 检查当前连接
# current_connections = self.get_current_connection()
# for conn in current_connections:
# if conn['type'].lower() == 'wifi' and conn['name'] != ssid:
# print(f"检测到已有其他Wi-Fi连接正在断开...")
# self.disconnect_wifi()
# time.sleep(2) # 等待断开完成
# break
# 先删除可能存在的同名连接,避免使用缓存的密码
print("清理可能存在的旧连接...")
delete_cmd = ['nmcli', 'connection', 'delete', ssid]
self._run_command_in_terminal(delete_cmd)
time.sleep(2) # 等待删除完成
# 使用终端执行连接命令
print("正在连接...")
connect_cmd = ['nmcli', '--wait', '20', 'device', 'wifi', 'connect', ssid, 'password', password]
result, error_code = self._run_command_in_terminal(connect_cmd)
# 处理各种情况
if error_code == -1:
print("连接超时或密码错误")
# 超时或密码错误
self._run_command_in_terminal(['nmcli', 'connection', 'delete', ssid])
return {
'status': 'error',
'message': f'连接超时或密码错误'
}
elif error_code == 0 and ("成功" in result or "successfully" in result.lower()):
print("连接成功")
# 连接成功
time.sleep(1) # 等待连接状态更新
connections = self.get_current_connection()
wifi_conn = next((conn for conn in connections if conn['type'].lower() == 'wifi'), None)
return {
'status': 'success',
'message': f'已成功连接到 {ssid}',
'connection_info': wifi_conn
}
else:
# 其他错误情况
error_msg = result.strip()
if "密码" in error_msg or "password" in error_msg.lower():
error_msg = "连接失败,请重试"
print(f"连接失败: 密码错误")
elif not error_msg:
error_msg = "连接失败,请重试"
print(f"连接失败: 未知错误")
else:
print(f"连接失败: {error_msg}")
return {
'status': 'error',
'message': f'{error_msg}'
}
def disconnect_wifi(self):
"""
断开当前的 Wi-Fi 连接
:return: 断开连接状态信息
"""
print("\n正在断<EFBFBD><EFBFBD>Wi-Fi连接...")
# 获取当前Wi-Fi连接
current_connections = self.get_current_connection()
if not current_connections:
print("当前没有Wi-Fi连接")
return {
'status': 'success',
'message': '当前没有Wi-Fi连接'
}
# 对每个Wi-Fi连接执行断开操作
for conn in current_connections:
print(f"正在断开 {conn['name']} 的连接...")
command = ['nmcli', 'device', 'disconnect', self.interface]
result, error_code = self._run_command_in_terminal(command)
if error_code == 0 and ("成功" in result or "successfully" in result.lower()):
print(f"已成功断开 {conn['name']}")
# 删除连接配置
delete_cmd = ['nmcli', 'connection', 'delete', conn['name']]
self._run_command_in_terminal(delete_cmd)
time.sleep(1) # 等待删除完成
else:
print(f"断开 {conn['name']} 失败: {result}")
return {
'status': 'error',
'message': f'断开连接失败: {result}'
}
# 最后检查是否真的断开了
time.sleep(1) # 等<><E7AD89><EFBFBD>状态更新
if not self.is_connected():
print("已成功断开所有Wi-Fi连接")
return {
'status': 'success',
'message': '已断开Wi-Fi连接'
}
else:
print("断开连接失败仍有Wi-Fi连接")
return {
'status': 'error',
'message': '断开连接失败仍有Wi-Fi连接'
}
def get_current_connection(self):
"""
获取当前连接的 Wi-Fi 网络信息
:return: 当前连接的Wi-Fi网络信息列表如果没有Wi-Fi连接则返回空列表
"""
print("\n获取当前连接信息...")
output = self._run_command(['nmcli', 'connection', 'show', '--active'])
connections = []
wifi_connections = [] # 专门存储Wi-Fi连接
for line in output.splitlines()[1:]: # 跳过标题行
parts = [part for part in line.split(' ') if part.strip()]
if len(parts) >= 4:
connection = {
'name': parts[0].strip(),
'uuid': parts[1].strip(),
'type': parts[2].strip(),
'device': parts[3].strip()
}
connections.append(connection)
# 只收集Wi-Fi连接
if connection['type'].lower() == 'wifi':
wifi_connections.append(connection)
if connections:
print("找到以下活动连接:")
for conn in connections:
print(f" - {conn['name']} ({conn['type']}) on {conn['device']}")
if wifi_connections:
print("当前Wi-Fi连接:")
for wifi in wifi_connections:
print(f" - {wifi['name']} on {wifi['device']}")
else:
print("没有Wi-Fi连接")
else:
print("没有找到任何活动连接")
return wifi_connections # 只返回Wi-Fi连接信息
def is_connected(self):
"""
判断是否当前已连接到 Wi-Fi
:return: 如果已连接返回 True否则返回 False
"""
connections = self.get_current_connection()
return any(conn['type'].lower() == 'wifi' for conn in connections)
def _ensure_wifi_enabled(self):
"""
确保Wi-Fi已启用
"""
print("\n检查Wi-Fi状态...")
# 检查Wi-Fi状态
status = self._run_command(['nmcli', 'radio', 'wifi'])
if 'enabled' not in status.lower():
print("Wi-Fi未启用正在开启...")
# 如果Wi-Fi未启用则启用它
self._run_command(['nmcli', 'radio', 'wifi', 'on'])
time.sleep(1) # 等待Wi-Fi启动
print("Wi-Fi已开启")
else:
print("Wi-Fi已启用")
# 确保网络接口已启用
print(f"正在启用网络接口 {self.interface}...")
self._run_command(['nmcli', 'device', 'connect', self.interface])
time.sleep(1) # 等待接口启动
print(f"网络接口 {self.interface} 已启用")
# 使用示例
if __name__ == '__main__':
wifi_manager = WifiManager()
# 扫描 Wi-Fi 网络并打印
print("扫描 Wi-Fi 网络:")
wifi_networks = wifi_manager.scan_wifi()
for network in wifi_networks:
print(network)
# 连接到指定的 Wi-Fi 网络
ssid = "RobotStorm"
password = "123456789"
print(f"\n尝试连接到 Wi-Fi 网络 '{ssid}'...")
result = wifi_manager.connect_wifi(ssid, password)
# 打印连接结果
print(f"连接状态: {result['status']}")
print(f"详细信息: {result['message']}")
if result['status'] == 'success' and 'connection_info' in result:
conn_info = result['connection_info']
print(f"连接详情:")
print(f" 网络名称: {conn_info['name']}")
print(f" 设备: {conn_info['device']}")
# 获取当前连接的 Wi-Fi 信息
print("\n当前连接的 Wi-Fi 信息:")
connections = wifi_manager.get_current_connection()
for conn in connections:
if conn['type'].lower() == 'wifi':
print(f" 网络名称: {conn['name']}")
print(f" 设备: {conn['device']}")
print(f" 类型: {conn['type']}")
# 判断是否已连接 Wi-Fi
print("\nWi-Fi连接状态", "连接" if wifi_manager.is_connected() else "未连接")
result = wifi_manager.disconnect_wifi() # 断开连接
print(f"\n断开连接状态: {result['status']}")