import serial import serial.tools.list_ports import yaml import atexit import threading import time import signal import sys class PowerBoard: def __init__(self, baudrate=115200, timeout=1, reconnect_interval=5): self.baudrate = baudrate self.timeout = timeout self.reconnect_interval = reconnect_interval self.port = None self.ser = None self.callback = None self.running = False self.monitor_thread = None self.reconnect_thread = None self.last_connection_attempt = 0 # 初始化连接 self.connect() # 注册退出时的清理函数 atexit.register(self.__del__) # 注册信号处理 signal.signal(signal.SIGINT, self._signal_handler) signal.signal(signal.SIGTERM, self._signal_handler) def connect(self): """尝试建立串口连接""" try: if time.time() - self.last_connection_attempt < self.reconnect_interval: return False self.last_connection_attempt = time.time() self.port = self.find_esp32_port() if self.port: if self.ser and self.ser.is_open: self.ser.close() self.ser = serial.Serial(self.port, self.baudrate, timeout=self.timeout) print(f"连接到 {self.port},波特率: {self.baudrate}") return True else: print("无法找到 ESP32 串口") return False except serial.SerialException as e: print(f"无法连接到串口 {self.port},错误: {e}") return False except Exception as e: print(f"连接时发生未知错误: {e}") return False def find_esp32_port(self): """寻找对应通用控制板的串口""" try: # 目标 YAML 文件路径 yaml_file_path = '/home/jsfb/jsfb_ws/global_config/power_board.yaml' # 读取现有的 YAML 文件内容 with open(yaml_file_path, 'r') as file: data = yaml.safe_load(file) # 读取并解析 YAML 内容 # 获取 target_usb_device_path 的值 target_usb_device_path = data.get('target_usb_device_path') print(f"已成功将 target_usb_device_path 添加到 {yaml_file_path}") ports = serial.tools.list_ports.comports() for port in ports: print(port.usb_device_path) if target_usb_device_path == port.usb_device_path: print(f"找到 ESP32 在端口: {port.usb_device_path}") return port.device print("未找到具有指定描述的 ESP32。") except Exception as e: print(f"查找串口时发生错误: {e}") return None def check_connection(self): """检查串口连接状态""" return self.ser is not None and self.ser.is_open def reconnect_if_needed(self): """如果需要则重新连接""" if not self.check_connection(): return self.connect() return True def recv_cmd(self): """接收串口指令""" if not self.check_connection(): return False try: command = self.ser.readline().decode().strip() # 读取并解码指令 if command: # 只有当收到非空指令时才打印 print(f"接收到指令: {command}") if command == "SHUTDOWN": return True except (serial.SerialException, UnicodeDecodeError) as e: print(f"接收指令时发生错误: {e}") self.ser = None # 清除当前连接,触发重连 except Exception as e: print(f"接收指令时发生未知错误: {e}") return False def send_cmd(self, cmd, wait_for_response=False, response_timeout=1.0): """主动向串口发送指令 Args: cmd: 要发送的指令字符串 wait_for_response: 是否等待响应 response_timeout: 等待响应的超时时间(秒) Returns: wait_for_response为False时: 发送成功返回True,失败返回False wait_for_response为True时: 发送成功且收到响应则返回响应字符串,否则返回None """ # 检查连接状态,如有需要则尝试重连 if not self.check_connection(): if not self.reconnect_if_needed(): print("发送指令失败: 无法连接到串口设备") return False if not wait_for_response else None try: # 确保命令以换行符结束 if not cmd.endswith('\n'): cmd += '\n' # 发送指令 self.ser.write(cmd.encode()) self.ser.flush() print(f"成功发送指令: {cmd.strip()}") # 如果不需要等待响应,直接返回成功 if not wait_for_response: return True # 等待并获取响应 start_time = time.time() response = "" while (time.time() - start_time) < response_timeout: if self.ser.in_waiting > 0: try: line = self.ser.readline().decode().strip() response += line + "\n" # 如果收到完整响应,可以提前返回 if line.endswith('>') or line.endswith('OK') or line.endswith('ERROR'): break except UnicodeDecodeError: print("解码响应时出错,可能收到了非文本数据") continue time.sleep(0.01) # 小延时避免过度占用CPU return response.strip() if response else None except serial.SerialException as e: print(f"发送指令时发生串口错误: {e}") self.ser = None # 清除当前连接,触发重连 return False if not wait_for_response else None except Exception as e: print(f"发送指令时发生未知错误: {e}") return False if not wait_for_response else None def set_callback(self, callback_func): """设置接收到SHUTDOWN指令时的回调函数 Args: callback_func: 回调函数,当接收到SHUTDOWN指令时会被调用 """ self.callback = callback_func def start_monitoring(self): """开始在后台监控串口指令""" if self.monitor_thread is not None and self.monitor_thread.is_alive(): print("监控线程已经在运行中") return self.running = True self.monitor_thread = threading.Thread(target=self._monitor_loop) self.monitor_thread.daemon = True # 设置为守护线程,这样主程序退出时线程会自动结束 self.monitor_thread.start() print("开始后台监控串口指令") def stop_monitoring(self): """停止监控串口指令""" self.running = False if self.monitor_thread: self.monitor_thread.join() print("停止监控串口指令") def _monitor_loop(self): """监控循环的内部实现""" while self.running: try: if not self.check_connection(): if not self.connect(): time.sleep(self.reconnect_interval) # 固定重连间隔 continue if self.recv_cmd(): print("接收到SHUTDOWN指令") if self.callback: try: self.callback() except Exception as e: print(f"执行回调函数时发生错误: {e}") time.sleep(0.1) # 添加小延时避免CPU占用过高 except Exception as e: print(f"监控循环中发生错误: {e}") time.sleep(self.reconnect_interval) # 发生错误时等待固定时间后重试 def _signal_handler(self, signum, frame): """处理系统信号""" print(f"\n接收到信号 {signum},正在清理...") self.stop_monitoring() sys.exit(0) def __del__(self): """析构函数,关闭串口""" try: self.stop_monitoring() if self.ser and self.ser.is_open: print("关闭串口连接...") self.ser.close() except Exception as e: print(f"清理资源时发生错误: {e}") # 使用示例 if __name__ == "__main__": def shutdown_callback(): print("执行关机回调函数...") # 在这里添加关机操作的代码 board = PowerBoard() board.set_callback(shutdown_callback) board.start_monitoring() # 保持主程序运行 try: while True: # 示例: 每10秒发送一次状态查询指令 command = input("请输入要发送的指令(输入'exit'退出): ") if command.lower() == 'exit': break # 发送指令并等待响应 if command: response = board.send_cmd(command, wait_for_response=True, response_timeout=2.0) if response: print(f"收到响应:\n{response}") else: print("未收到响应或发送失败") time.sleep(1) except KeyboardInterrupt: print("\n程序被用户中断") except Exception as e: print(f"\n程序发生异常: {e}") finally: board.stop_monitoring()