2025-05-27 15:46:31 +08:00

269 lines
9.8 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import serial
import serial.tools.list_ports
import yaml
import atexit
import threading
import time
import signal
import sys
class PowerBoard:
def __init__(self, baudrate=115200, timeout=1, reconnect_interval=5):
self.baudrate = baudrate
self.timeout = timeout
self.reconnect_interval = reconnect_interval
self.port = None
self.ser = None
self.callback = None
self.running = False
self.monitor_thread = None
self.reconnect_thread = None
self.last_connection_attempt = 0
# 初始化连接
self.connect()
# 注册退出时的清理函数
atexit.register(self.__del__)
# 注册信号处理
signal.signal(signal.SIGINT, self._signal_handler)
signal.signal(signal.SIGTERM, self._signal_handler)
def connect(self):
"""尝试建立串口连接"""
try:
if time.time() - self.last_connection_attempt < self.reconnect_interval:
return False
self.last_connection_attempt = time.time()
self.port = self.find_esp32_port()
if self.port:
if self.ser and self.ser.is_open:
self.ser.close()
self.ser = serial.Serial(self.port, self.baudrate, timeout=self.timeout)
print(f"连接到 {self.port},波特率: {self.baudrate}")
return True
else:
print("无法找到 ESP32 串口")
return False
except serial.SerialException as e:
print(f"无法连接到串口 {self.port},错误: {e}")
return False
except Exception as e:
print(f"连接时发生未知错误: {e}")
return False
def find_esp32_port(self):
"""寻找对应通用控制板的串口"""
try:
# 目标 YAML 文件路径
yaml_file_path = '/home/jsfb/jsfb_ws/global_config/power_board.yaml'
# 读取现有的 YAML 文件内容
with open(yaml_file_path, 'r') as file:
data = yaml.safe_load(file) # 读取并解析 YAML 内容
# 获取 target_usb_device_path 的值
target_usb_device_path = data.get('target_usb_device_path')
print(f"已成功将 target_usb_device_path 添加到 {yaml_file_path}")
ports = serial.tools.list_ports.comports()
for port in ports:
print(port.usb_device_path)
if target_usb_device_path == port.usb_device_path:
print(f"找到 ESP32 在端口: {port.usb_device_path}")
return port.device
print("未找到具有指定描述的 ESP32。")
except Exception as e:
print(f"查找串口时发生错误: {e}")
return None
def check_connection(self):
"""检查串口连接状态"""
return self.ser is not None and self.ser.is_open
def reconnect_if_needed(self):
"""如果需要则重新连接"""
if not self.check_connection():
return self.connect()
return True
def recv_cmd(self):
"""接收串口指令"""
if not self.check_connection():
return False
try:
command = self.ser.readline().decode().strip() # 读取并解码指令
if command: # 只有当收到非空指令时才打印
print(f"接收到指令: {command}")
if command == "SHUTDOWN":
return True
except (serial.SerialException, UnicodeDecodeError) as e:
print(f"接收指令时发生错误: {e}")
self.ser = None # 清除当前连接,触发重连
except Exception as e:
print(f"接收指令时发生未知错误: {e}")
return False
def send_cmd(self, cmd, wait_for_response=False, response_timeout=1.0):
"""主动向串口发送指令
Args:
cmd: 要发送的指令字符串
wait_for_response: 是否等待响应
response_timeout: 等待响应的超时时间(秒)
Returns:
wait_for_response为False时: 发送成功返回True失败返回False
wait_for_response为True时: 发送成功且收到响应则返回响应字符串否则返回None
"""
# 检查连接状态,如有需要则尝试重连
if not self.check_connection():
if not self.reconnect_if_needed():
print("发送指令失败: 无法连接到串口设备")
return False if not wait_for_response else None
try:
# 确保命令以换行符结束
if not cmd.endswith('\n'):
cmd += '\n'
# 发送指令
self.ser.write(cmd.encode())
self.ser.flush()
print(f"成功发送指令: {cmd.strip()}")
# 如果不需要等待响应,直接返回成功
if not wait_for_response:
return True
# 等待并获取响应
start_time = time.time()
response = ""
while (time.time() - start_time) < response_timeout:
if self.ser.in_waiting > 0:
try:
line = self.ser.readline().decode().strip()
response += line + "\n"
# 如果收到完整响应,可以提前返回
if line.endswith('>') or line.endswith('OK') or line.endswith('ERROR'):
break
except UnicodeDecodeError:
print("解码响应时出错,可能收到了非文本数据")
continue
time.sleep(0.01) # 小延时避免过度占用CPU
return response.strip() if response else None
except serial.SerialException as e:
print(f"发送指令时发生串口错误: {e}")
self.ser = None # 清除当前连接,触发重连
return False if not wait_for_response else None
except Exception as e:
print(f"发送指令时发生未知错误: {e}")
return False if not wait_for_response else None
def set_callback(self, callback_func):
"""设置接收到SHUTDOWN指令时的回调函数
Args:
callback_func: 回调函数当接收到SHUTDOWN指令时会被调用
"""
self.callback = callback_func
def start_monitoring(self):
"""开始在后台监控串口指令"""
if self.monitor_thread is not None and self.monitor_thread.is_alive():
print("监控线程已经在运行中")
return
self.running = True
self.monitor_thread = threading.Thread(target=self._monitor_loop)
self.monitor_thread.daemon = True # 设置为守护线程,这样主程序退出时线程会自动结束
self.monitor_thread.start()
print("开始后台监控串口指令")
def stop_monitoring(self):
"""停止监控串口指令"""
self.running = False
if self.monitor_thread:
self.monitor_thread.join()
print("停止监控串口指令")
def _monitor_loop(self):
"""监控循环的内部实现"""
while self.running:
try:
if not self.check_connection():
if not self.connect():
time.sleep(self.reconnect_interval) # 固定重连间隔
continue
if self.recv_cmd():
print("接收到SHUTDOWN指令")
if self.callback:
try:
self.callback()
except Exception as e:
print(f"执行回调函数时发生错误: {e}")
time.sleep(0.1) # 添加小延时避免CPU占用过高
except Exception as e:
print(f"监控循环中发生错误: {e}")
time.sleep(self.reconnect_interval) # 发生错误时等待固定时间后重试
def _signal_handler(self, signum, frame):
"""处理系统信号"""
print(f"\n接收到信号 {signum},正在清理...")
self.stop_monitoring()
sys.exit(0)
def __del__(self):
"""析构函数,关闭串口"""
try:
self.stop_monitoring()
if self.ser and self.ser.is_open:
print("关闭串口连接...")
self.ser.close()
except Exception as e:
print(f"清理资源时发生错误: {e}")
# 使用示例
if __name__ == "__main__":
def shutdown_callback():
print("执行关机回调函数...")
# 在这里添加关机操作的代码
board = PowerBoard()
board.set_callback(shutdown_callback)
board.start_monitoring()
# 保持主程序运行
try:
while True:
# 示例: 每10秒发送一次状态查询指令
command = input("请输入要发送的指令(输入'exit'退出): ")
if command.lower() == 'exit':
break
# 发送指令并等待响应
if command:
response = board.send_cmd(command, wait_for_response=True, response_timeout=2.0)
if response:
print(f"收到响应:\n{response}")
else:
print("未收到响应或发送失败")
time.sleep(1)
except KeyboardInterrupt:
print("\n程序被用户中断")
except Exception as e:
print(f"\n程序发生异常: {e}")
finally:
board.stop_monitoring()