269 lines
9.8 KiB
Python
269 lines
9.8 KiB
Python
import serial
|
||
import serial.tools.list_ports
|
||
import yaml
|
||
import atexit
|
||
import threading
|
||
import time
|
||
import signal
|
||
import sys
|
||
|
||
class PowerBoard:
|
||
def __init__(self, baudrate=115200, timeout=1, reconnect_interval=5):
|
||
self.baudrate = baudrate
|
||
self.timeout = timeout
|
||
self.reconnect_interval = reconnect_interval
|
||
self.port = None
|
||
self.ser = None
|
||
self.callback = None
|
||
self.running = False
|
||
self.monitor_thread = None
|
||
self.reconnect_thread = None
|
||
self.last_connection_attempt = 0
|
||
|
||
# 初始化连接
|
||
self.connect()
|
||
|
||
# 注册退出时的清理函数
|
||
atexit.register(self.__del__)
|
||
# 注册信号处理
|
||
signal.signal(signal.SIGINT, self._signal_handler)
|
||
signal.signal(signal.SIGTERM, self._signal_handler)
|
||
|
||
def connect(self):
|
||
"""尝试建立串口连接"""
|
||
try:
|
||
if time.time() - self.last_connection_attempt < self.reconnect_interval:
|
||
return False
|
||
|
||
self.last_connection_attempt = time.time()
|
||
self.port = self.find_esp32_port()
|
||
|
||
if self.port:
|
||
if self.ser and self.ser.is_open:
|
||
self.ser.close()
|
||
|
||
self.ser = serial.Serial(self.port, self.baudrate, timeout=self.timeout)
|
||
print(f"连接到 {self.port},波特率: {self.baudrate}")
|
||
return True
|
||
else:
|
||
print("无法找到 ESP32 串口")
|
||
return False
|
||
|
||
except serial.SerialException as e:
|
||
print(f"无法连接到串口 {self.port},错误: {e}")
|
||
return False
|
||
except Exception as e:
|
||
print(f"连接时发生未知错误: {e}")
|
||
return False
|
||
|
||
def find_esp32_port(self):
|
||
"""寻找对应通用控制板的串口"""
|
||
try:
|
||
# 目标 YAML 文件路径
|
||
yaml_file_path = '/home/jsfb/jsfb_ws/global_config/power_board.yaml'
|
||
|
||
# 读取现有的 YAML 文件内容
|
||
with open(yaml_file_path, 'r') as file:
|
||
data = yaml.safe_load(file) # 读取并解析 YAML 内容
|
||
|
||
# 获取 target_usb_device_path 的值
|
||
target_usb_device_path = data.get('target_usb_device_path')
|
||
|
||
print(f"已成功将 target_usb_device_path 添加到 {yaml_file_path}")
|
||
|
||
ports = serial.tools.list_ports.comports()
|
||
for port in ports:
|
||
print(port.usb_device_path)
|
||
if target_usb_device_path == port.usb_device_path:
|
||
print(f"找到 ESP32 在端口: {port.usb_device_path}")
|
||
return port.device
|
||
print("未找到具有指定描述的 ESP32。")
|
||
except Exception as e:
|
||
print(f"查找串口时发生错误: {e}")
|
||
return None
|
||
|
||
def check_connection(self):
|
||
"""检查串口连接状态"""
|
||
return self.ser is not None and self.ser.is_open
|
||
|
||
def reconnect_if_needed(self):
|
||
"""如果需要则重新连接"""
|
||
if not self.check_connection():
|
||
return self.connect()
|
||
return True
|
||
|
||
def recv_cmd(self):
|
||
"""接收串口指令"""
|
||
if not self.check_connection():
|
||
return False
|
||
|
||
try:
|
||
command = self.ser.readline().decode().strip() # 读取并解码指令
|
||
if command: # 只有当收到非空指令时才打印
|
||
print(f"接收到指令: {command}")
|
||
if command == "SHUTDOWN":
|
||
return True
|
||
except (serial.SerialException, UnicodeDecodeError) as e:
|
||
print(f"接收指令时发生错误: {e}")
|
||
self.ser = None # 清除当前连接,触发重连
|
||
except Exception as e:
|
||
print(f"接收指令时发生未知错误: {e}")
|
||
return False
|
||
|
||
def send_cmd(self, cmd, wait_for_response=False, response_timeout=1.0):
|
||
"""主动向串口发送指令
|
||
|
||
Args:
|
||
cmd: 要发送的指令字符串
|
||
wait_for_response: 是否等待响应
|
||
response_timeout: 等待响应的超时时间(秒)
|
||
|
||
Returns:
|
||
wait_for_response为False时: 发送成功返回True,失败返回False
|
||
wait_for_response为True时: 发送成功且收到响应则返回响应字符串,否则返回None
|
||
"""
|
||
# 检查连接状态,如有需要则尝试重连
|
||
if not self.check_connection():
|
||
if not self.reconnect_if_needed():
|
||
print("发送指令失败: 无法连接到串口设备")
|
||
return False if not wait_for_response else None
|
||
|
||
try:
|
||
# 确保命令以换行符结束
|
||
if not cmd.endswith('\n'):
|
||
cmd += '\n'
|
||
|
||
# 发送指令
|
||
self.ser.write(cmd.encode())
|
||
self.ser.flush()
|
||
print(f"成功发送指令: {cmd.strip()}")
|
||
|
||
# 如果不需要等待响应,直接返回成功
|
||
if not wait_for_response:
|
||
return True
|
||
|
||
# 等待并获取响应
|
||
start_time = time.time()
|
||
response = ""
|
||
|
||
while (time.time() - start_time) < response_timeout:
|
||
if self.ser.in_waiting > 0:
|
||
try:
|
||
line = self.ser.readline().decode().strip()
|
||
response += line + "\n"
|
||
# 如果收到完整响应,可以提前返回
|
||
if line.endswith('>') or line.endswith('OK') or line.endswith('ERROR'):
|
||
break
|
||
except UnicodeDecodeError:
|
||
print("解码响应时出错,可能收到了非文本数据")
|
||
continue
|
||
time.sleep(0.01) # 小延时避免过度占用CPU
|
||
|
||
return response.strip() if response else None
|
||
|
||
except serial.SerialException as e:
|
||
print(f"发送指令时发生串口错误: {e}")
|
||
self.ser = None # 清除当前连接,触发重连
|
||
return False if not wait_for_response else None
|
||
except Exception as e:
|
||
print(f"发送指令时发生未知错误: {e}")
|
||
return False if not wait_for_response else None
|
||
|
||
def set_callback(self, callback_func):
|
||
"""设置接收到SHUTDOWN指令时的回调函数
|
||
Args:
|
||
callback_func: 回调函数,当接收到SHUTDOWN指令时会被调用
|
||
"""
|
||
self.callback = callback_func
|
||
|
||
def start_monitoring(self):
|
||
"""开始在后台监控串口指令"""
|
||
if self.monitor_thread is not None and self.monitor_thread.is_alive():
|
||
print("监控线程已经在运行中")
|
||
return
|
||
|
||
self.running = True
|
||
self.monitor_thread = threading.Thread(target=self._monitor_loop)
|
||
self.monitor_thread.daemon = True # 设置为守护线程,这样主程序退出时线程会自动结束
|
||
self.monitor_thread.start()
|
||
print("开始后台监控串口指令")
|
||
|
||
def stop_monitoring(self):
|
||
"""停止监控串口指令"""
|
||
self.running = False
|
||
if self.monitor_thread:
|
||
self.monitor_thread.join()
|
||
print("停止监控串口指令")
|
||
|
||
def _monitor_loop(self):
|
||
"""监控循环的内部实现"""
|
||
while self.running:
|
||
try:
|
||
if not self.check_connection():
|
||
if not self.connect():
|
||
time.sleep(self.reconnect_interval) # 固定重连间隔
|
||
continue
|
||
|
||
if self.recv_cmd():
|
||
print("接收到SHUTDOWN指令")
|
||
if self.callback:
|
||
try:
|
||
self.callback()
|
||
except Exception as e:
|
||
print(f"执行回调函数时发生错误: {e}")
|
||
|
||
time.sleep(0.1) # 添加小延时避免CPU占用过高
|
||
|
||
except Exception as e:
|
||
print(f"监控循环中发生错误: {e}")
|
||
time.sleep(self.reconnect_interval) # 发生错误时等待固定时间后重试
|
||
|
||
def _signal_handler(self, signum, frame):
|
||
"""处理系统信号"""
|
||
print(f"\n接收到信号 {signum},正在清理...")
|
||
self.stop_monitoring()
|
||
sys.exit(0)
|
||
|
||
def __del__(self):
|
||
"""析构函数,关闭串口"""
|
||
try:
|
||
self.stop_monitoring()
|
||
if self.ser and self.ser.is_open:
|
||
print("关闭串口连接...")
|
||
self.ser.close()
|
||
except Exception as e:
|
||
print(f"清理资源时发生错误: {e}")
|
||
|
||
# 使用示例
|
||
if __name__ == "__main__":
|
||
def shutdown_callback():
|
||
print("执行关机回调函数...")
|
||
# 在这里添加关机操作的代码
|
||
|
||
board = PowerBoard()
|
||
board.set_callback(shutdown_callback)
|
||
board.start_monitoring()
|
||
|
||
# 保持主程序运行
|
||
try:
|
||
while True:
|
||
# 示例: 每10秒发送一次状态查询指令
|
||
command = input("请输入要发送的指令(输入'exit'退出): ")
|
||
if command.lower() == 'exit':
|
||
break
|
||
|
||
# 发送指令并等待响应
|
||
if command:
|
||
response = board.send_cmd(command, wait_for_response=True, response_timeout=2.0)
|
||
if response:
|
||
print(f"收到响应:\n{response}")
|
||
else:
|
||
print("未收到响应或发送失败")
|
||
|
||
time.sleep(1)
|
||
except KeyboardInterrupt:
|
||
print("\n程序被用户中断")
|
||
except Exception as e:
|
||
print(f"\n程序发生异常: {e}")
|
||
finally:
|
||
board.stop_monitoring() |