import sys import os from iot_device_sdk_python.ota.ota_query_version import OTAQueryVersion sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from urllib.parse import urlparse import requests import logging import time import socket import threading from queue import Queue from typing import Union from iot_device_sdk_python.client.client_conf import ClientConf from iot_device_sdk_python.client.connect_auth_info import ConnectAuthInfo from iot_device_sdk_python.service.abstract_service import AbstractService from iot_device_sdk_python.client.request.command_response import CommandRsp from iot_device_sdk_python.service.property import Property from iot_device_sdk_python.transport.connect_listener import ConnectListener from iot_device_sdk_python.filemanager.file_manager_service import FileManagerService from iot_device_sdk_python.filemanager.file_manager_listener import FileManagerListener from iot_device_sdk_python.filemanager.url_info import UrlInfo from iot_device_sdk_python.ota.ota_service import OTAService from iot_device_sdk_python.ota.ota_listener import OTAListener from iot_device_sdk_python.ota.ota_package_info import OTAPackageInfo from iot_device_sdk_python.ota.ota_package_info_v2 import OTAPackageInfoV2 from iot_device_sdk_python.iot_device import IotDevice import yaml import json import hashlib from pathlib import Path from watchdog.observers import Observer from watchdog.events import FileSystemEventHandler from VortXDB.client import VTXClient # 配置日志输出到标准输出,systemd会自动捕获 logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(threadName)s - %(filename)s[%(funcName)s] - %(levelname)s: %(message)s" ) logger = logging.getLogger(__name__) def get_absolute_path(relative_path): """Convert relative path to absolute path based on script location""" if os.path.isabs(relative_path): return relative_path script_dir = os.path.dirname(os.path.abspath(__file__)) return os.path.abspath(os.path.join(script_dir, relative_path)) def load_config(config_path): """Load configuration from YAML file""" try: config_path = get_absolute_path(config_path) with open(config_path, 'r') as f: config = yaml.safe_load(f) # Convert relative paths in watch_dirs to absolute paths if 'watch_dirs' in config: config['watch_dirs'] = [get_absolute_path(path) for path in config['watch_dirs']] return config except Exception as e: logger.error(f"Error loading config file: {str(e)}") raise class UploadQueueManager: def __init__(self, upload_interval=2.0): self.upload_queue = Queue() self.upload_interval = upload_interval self._running = False self._upload_thread = None self._lock = threading.Lock() def start(self): with self._lock: if not self._running: self._running = True self._upload_thread = threading.Thread(target=self._upload_worker, daemon=True) self._upload_thread.start() def stop(self): with self._lock: if self._running: self._running = False # 清空队列 while not self.upload_queue.empty(): try: self.upload_queue.get_nowait() self.upload_queue.task_done() except: pass # 发送停止信号 self.upload_queue.put(None) if self._upload_thread and self._upload_thread.is_alive(): self._upload_thread.join(timeout=5) self._upload_thread = None def clear_queue(self): """清空上传队列""" while not self.upload_queue.empty(): try: self.upload_queue.get_nowait() self.upload_queue.task_done() except: pass def add_to_queue(self, file_info): if self._running: self.upload_queue.put(file_info) def _upload_worker(self): while self._running: try: file_info = self.upload_queue.get() if file_info is None: # Stop signal break iot_instance, file_name, file_path, file_tracker = file_info try: # Ensure the file_name includes hostname for all uploads if not file_name.startswith(os.uname()[1] + '/'): file_name = f"{os.uname()[1]}/{file_name}" # 创建一个事件来跟踪上传状态 upload_success = threading.Event() def upload_callback(success): if success: upload_success.set() # 注册回调到文件管理器监听器 iot_instance.file_manager.get_listener().set_upload_callback(upload_callback) # 开始上传 iot_instance.upload_file(file_name, file_path) # 等待上传结果,最多等待60秒 if upload_success.wait(timeout=60): logger.info(f"Successfully uploaded file: {file_path} as {file_name}") # 只在上传成功后更新记录 file_tracker.update_record(file_path, check_cleanup=True) else: logger.error(f"Upload timeout for file: {file_path}") # 可以选择重新加入队列或其他错误处理 if os.path.exists(file_path): # 如果文件还存在,重新加入队列 logger.info(f"Re-queuing file for upload: {file_path}") self.add_to_queue(file_info) except Exception as e: logger.error(f"Error uploading file {file_path}: {str(e)}") # 如果是临时错误,可以重新加入队列 if os.path.exists(file_path): logger.info(f"Re-queuing file for upload: {file_path}") self.add_to_queue(file_info) time.sleep(self.upload_interval) # Wait between uploads except Exception as e: logger.error(f"Error in upload worker: {str(e)}") continue finally: self.upload_queue.task_done() class CustomConnectListener(ConnectListener): def __init__(self, iot_device: IotDevice, iot_lite=None): """ 传入IotDevice实例和IoTLite实例 """ self.device = iot_device self.iot_lite = iot_lite self._reconnect_thread = None self._should_reconnect = True self._reconnect_delay = 5 # 初始重连延迟5秒 self._max_reconnect_delay = 300 # 最大重连延迟5分钟 self._reconnect_lock = threading.Lock() def connection_lost(self, cause: str): """ 连接丢失通知,启动重连机制 Args: cause: 连接丢失原因 """ logger.warning("Connection lost. Cause: " + cause) # 先停止所有服务 if self.iot_lite: try: self.iot_lite.stop_monitoring() except Exception as e: logger.error(f"Error stopping services on connection lost: {str(e)}") with self._reconnect_lock: if not self._reconnect_thread or not self._reconnect_thread.is_alive(): self._should_reconnect = True self._reconnect_thread = threading.Thread(target=self._reconnect_worker) self._reconnect_thread.daemon = True self._reconnect_thread.start() def connect_complete(self, reconnect: bool, server_uri: str): """ 连接成功通知 Args: reconnect: 是否为重连 server_uri: 服务端地址 """ logger.info(f"Connect {'reconnected' if reconnect else 'connected'} to {server_uri}") # 重置重连延迟 self._reconnect_delay = 5 # 如果是重连成功,需要重新启动文件监控 if reconnect and self.iot_lite: try: self.iot_lite.restart_services() except Exception as e: logger.error(f"Error restarting services after reconnection: {str(e)}") def _reconnect_worker(self): """重连工作线程""" while self._should_reconnect: logger.info(f"Attempting to reconnect in {self._reconnect_delay} seconds...") time.sleep(self._reconnect_delay) try: if self.device.connect() == 0: logger.info("Reconnection successful") break else: logger.warning("Reconnection failed, will retry...") # 使用指数退避策略增加重连延迟,但不超过最大值 self._reconnect_delay = min(self._reconnect_delay * 2, self._max_reconnect_delay) except Exception as e: logger.error(f"Error during reconnection: {str(e)}") self._reconnect_delay = min(self._reconnect_delay * 2, self._max_reconnect_delay) class IoTLite: def __init__(self, server_uri, port, device_id, secret, watch_dirs=None, max_files=20): self.server_uri = server_uri self.port = port self.device_id = device_id print(device_id) self.secret = secret self._running = False self.max_files = max_files self.watch_dirs = watch_dirs or [] # 确保所有监控目录都是绝对路径 self.watch_dirs = [get_absolute_path(path) for path in self.watch_dirs] # Create directories if they don't exist for watch_dir in self.watch_dirs: try: os.makedirs(watch_dir, exist_ok=True) logger.info(f"Ensured directory exists: {watch_dir}") except Exception as e: logger.error(f"Error creating directory {watch_dir}: {str(e)}") raise """ 创建设备 """ connect_auth_info = ConnectAuthInfo() connect_auth_info.server_uri = self.server_uri connect_auth_info.port = self.port connect_auth_info.id = self.device_id connect_auth_info.secret = self.secret self.client_conf = ClientConf(connect_auth_info) self.device = IotDevice(self.client_conf) self.license_service = LicenseService() self.device.add_service("License", self.license_service) self.vtxdb_service = VortXDBService() self.device.add_service("VortXDB", self.vtxdb_service) self.expected_delivery_service = ExpectedDeliveryService() self.device.add_service("ExpectedDelivery", self.expected_delivery_service) # 添加连接监听器 self.connect_listener = CustomConnectListener(self.device, self) self.device.get_client().add_connect_listener(self.connect_listener) self.file_manager: FileManagerService = self.device.get_file_manager_service() file_manager_listener = FileManagerListener(self.file_manager) self.file_manager.set_listener(file_manager_listener) # Initialize upload queue manager self.upload_queue_manager = UploadQueueManager(upload_interval=0.05) # Initialize file monitoring self.observers = [] self.event_handlers = [] self.file_trackers = {} self._init_file_monitoring() def _init_file_monitoring(self): """初始化文件监控""" for watch_dir in self.watch_dirs: # 使用/home/jsfb/jsfb_ws/iot_records目录存储记录文件 record_dir = "/home/jsfb/jsfb_ws/iot_records" os.makedirs(record_dir, exist_ok=True) record_file = os.path.join(record_dir, f"upload_records_{os.path.basename(watch_dir)}.json") file_tracker = FileTracker(watch_dir, record_file, max_files=self.max_files) self.file_trackers[watch_dir] = file_tracker observer = Observer() event_handler = FileChangeHandler(file_tracker, self, self.upload_queue_manager) self.observers.append(observer) self.event_handlers.append(event_handler) def restart_services(self): """重新启动服务(用于重连后)""" logger.info("Restarting services...") try: # 停止现有的监控服务 self.stop_monitoring() # 重新创建上传队列管理器 self.upload_queue_manager = UploadQueueManager(upload_interval=0.05) # 重新初始化并启动监控服务 self._init_file_monitoring() self.start_monitoring() # 重新扫描现有文件 self._scan_existing_files() logger.info("Services successfully restarted") except Exception as e: logger.error(f"Error restarting services: {str(e)}") raise def stop_monitoring(self): """停止文件监控服务""" # Stop upload queue manager try: if hasattr(self, 'upload_queue_manager'): self.upload_queue_manager.stop() except Exception as e: logger.error(f"Error stopping upload queue manager: {str(e)}") # Stop all observers for observer in self.observers: try: observer.stop() observer.join(timeout=5) except Exception as e: logger.error(f"Error stopping observer: {str(e)}") # Clean up resources for handler in self.event_handlers: try: handler._last_modified_time.clear() handler._file_sizes.clear() except Exception as e: logger.error(f"Error cleaning up resources: {str(e)}") # Clear lists self.observers.clear() self.event_handlers.clear() def start_monitoring(self): """启动文件监控服务""" # Start upload queue manager self.upload_queue_manager.start() # Start all observers for i, observer in enumerate(self.observers): watch_dir = self.watch_dirs[i] event_handler = self.event_handlers[i] observer.schedule(event_handler, watch_dir, recursive=True) observer.start() logger.info(f"Started monitoring directory: {watch_dir}") def start(self): """启动服务,包含重试机制""" if not self.connect(): raise RuntimeError("Failed to connect to platform") self._running = True self.license_service.enable_auto_report(30) self.vtxdb_service.enable_auto_report(30) self.expected_delivery_service.enable_auto_report(30) self.start_monitoring() # Initial scan of existing files self._scan_existing_files() def stop(self): """Gracefully stop all components""" if not self._running: return self._running = False logger.info("Stopping file monitoring...") # 停止重连 if hasattr(self, 'connect_listener'): self.connect_listener._should_reconnect = False if self.connect_listener._reconnect_thread and self.connect_listener._reconnect_thread.is_alive(): self.connect_listener._reconnect_thread.join(timeout=5) self.stop_monitoring() self.license_service.disable_auto_report() self.vtxdb_service.disable_auto_report() self.expected_delivery_service.disable_auto_report() # Disconnect from platform try: self.device.destroy() logger.info("Disconnected from platform") except Exception as e: logger.error(f"Error disconnecting from platform: {str(e)}") logger.info("Cleanup completed") def connect(self): """连接到平台,带有重试机制""" max_retries = 5 retry_count = 0 retry_delay = 5 # seconds while retry_count < max_retries: try: if self.device.connect() == 0: logger.info("Successfully connected to platform") return True logger.warning(f"Connection failed, attempt {retry_count + 1}/{max_retries}") time.sleep(retry_delay) retry_count += 1 except Exception as e: logger.error(f"Connection error: {str(e)}") time.sleep(retry_delay) retry_count += 1 logger.error("Failed to connect after maximum retries") return False def _scan_existing_files(self): for watch_dir in self.watch_dirs: file_tracker = self.file_trackers[watch_dir] for root, _, files in os.walk(watch_dir): # Skip hidden directories if any(part.startswith('.') for part in Path(root).parts): continue for file in files: # Skip hidden files if file.startswith('.'): continue file_path = os.path.join(root, file) if file_tracker.needs_upload(file_path): relative_path = os.path.relpath(file_path, "/home/jsfb/jsfb_ws") # Pass file_tracker to upload queue self.upload_queue_manager.add_to_queue((self, relative_path, file_path, file_tracker)) def upload_file(self, file_name, file_path): logger.info(f"Uploading file: {file_path} with name: {file_name}") self.file_manager.upload_file(file_name=file_name, file_path=file_path) def is_running(self): """Return the running state of the IoT service""" return self._running class LicenseService(AbstractService): def __init__(self): super().__init__() # 按照设备模型定义属性,注意属性的prop_name需要和设备模型一致,writeable表示属性是否可写;field_name为变量的名字,val为属性的值 self.license_info = Property(val='', field_name="license_info", prop_name="info", writeable=False) self.activation_code = Property(val='', field_name="activation_code", prop_name="activation_code", writeable=True) # 定义命令名称与方法名称的映射关系 self.command2method = {"activate": "activate", "use": "use"} self.__set_writeable_and_readable(self.license_info, self.activation_code) self.__set_command2method(self.command2method) def __set_writeable_and_readable(self, *args): for arg in args: self._readable_prop2field[arg.prop_name] = arg.field_name if arg.writeable: self._writeable_prop2field[arg.prop_name] = arg.field_name def __set_command2method(self, c2m): self._command2method = c2m # def _auto_report_thread_func(self, report_interval: int): # """ # 周期上报属性方法 # Args: # report_interval: 上报周期,单位s # """ # schedule.every(report_interval).seconds.do(self.fire_properties_changed, []) # while self._auto_report: # schedule.run_pending() # time.sleep(1) def get_license_info(self): try: response = requests.get('http://127.0.0.1:5288/api/license/info') if response.status_code == 200: self.license_info.val = response.json() else: logger.error(f"获取license信息失败,状态码: {response.status_code}") except Exception as e: logger.error(f"获取license信息时发生错误: {e}") return self.license_info.val def set_activation_code(self, activation_code): try: response = requests.post( 'http://127.0.0.1:5288/api/license/activate', json={"activation_code": activation_code} ) if response.status_code != 200 or response.status_code != 202: logger.error(f"激活失败,状态码: {response.status_code}") except Exception as e: logger.error(f"激活过程中发生错误: {e}") def activate(self, paras: dict): activation_code = paras.get("activation_code") try: response = requests.post( 'http://127.0.0.1:5288/api/license/activate', json={"activation_code": activation_code} ) if response.status_code != 200 or response.status_code != 202: logger.error(f"激活失败,状态码: {response.status_code}") command_rsp = CommandRsp() command_rsp.result_code = CommandRsp.fail_code() return command_rsp except Exception as e: logger.error(f"激活过程中发生错误: {e}") command_rsp = CommandRsp() command_rsp.result_code = CommandRsp.fail_code() return command_rsp command_rsp = CommandRsp() command_rsp.result_code = CommandRsp.success_code() return command_rsp def use(self, paras: dict): try: response = requests.post( 'http://127.0.0.1:5288/api/license/use' ) if response.status_code != 200: logger.error(f"消耗使用次数失败,状态码: {response.status_code}") command_rsp = CommandRsp() command_rsp.result_code = CommandRsp.fail_code() return command_rsp except Exception as e: logger.error(f"消耗使用次数过程中发生错误: {e}") command_rsp = CommandRsp() command_rsp.result_code = CommandRsp.fail_code() return command_rsp command_rsp = CommandRsp() command_rsp.result_code = CommandRsp.success_code() return command_rsp class VortXDBService(AbstractService): def __init__(self, vtxdb: VTXClient = None): super().__init__() if vtxdb is not None: self.vtxdb = vtxdb else: self.vtxdb = VTXClient() # 按照设备模型定义属性,注意属性的prop_name需要和设备模型一致,writeable表示属性是否可写;field_name为变量的名字,val为属性的值 self.robot_info = Property(val=self.vtxdb.get("robot_info", ""), field_name="robot_info", prop_name="robot_info", writeable=True) self.robot_config = Property(val=self.vtxdb.get("robot_config", ""), field_name="robot_config", prop_name="robot_config", writeable=True) self.massage_plan = Property(val=self.vtxdb.get("massage_plan", ""), field_name="massage_plan", prop_name="massage_plan", writeable=True) # 定义命令名称与方法名称的映射关系 # self.command2method = {"set": "vtx_set", "delete": "vtx_delete"} self.__set_writeable_and_readable(self.robot_config, self.robot_info, self.massage_plan) # self.__set_command2method(self.command2method) def __set_writeable_and_readable(self, *args): for arg in args: self._readable_prop2field[arg.prop_name] = arg.field_name if arg.writeable: self._writeable_prop2field[arg.prop_name] = arg.field_name def __set_command2method(self, c2m): self._command2method = c2m # def vtx_set(self, paras: dict): # pass # def vtx_delete(self, paras: dict): # pass def get_robot_config(self): self.robot_config.val = self.vtxdb.get("robot_config", "") return self.robot_config.val def set_robot_config(self, data: dict): self.vtxdb.set("robot_config", "", data) def get_robot_info(self): """获取机器人信息,返回一个包含序列号、软件版本、设备位置信息的字典""" # 获取设备序列号(hostname) serial_number = socket.gethostname() # 获取软件版本(当前.py文件所在的上一级目录) current_file_path = os.path.abspath(__file__) software_version = os.path.basename(os.path.dirname(current_file_path)) # 获取设备经纬度 location_url = "http://127.0.0.1:5000/get_device_location" try: response = requests.get(location_url, timeout=1) if response.status_code == 200: location_data = response.json().get('data', {}) else: location_data = {'error': 'Failed to retrieve location'} except Exception as e: location_data = {'error': f'Location request failed: {str(e)}'} # 获取参数服务器版本 vortxdb_version = self.vtxdb.get_version() # 组织返回数据 self.robot_info.val = { "serial_number": serial_number, "software_version": software_version, "vortxdb_version": vortxdb_version, "device_location": location_data } return self.robot_info.val def set_robot_info(self, data: dict): self.vtxdb.set("robot_info", "", data) def get_massage_plan(self): self.massage_plan.val = self.vtxdb.get("massage_plan", "") return self.massage_plan.val def set_massage_plan(self, data: dict): self.vtxdb.set("massage_plan", "", data) class ExpectedDeliveryService(AbstractService): def __init__(self, vtxdb: VTXClient = None): super().__init__() if vtxdb is not None: self.vtxdb = vtxdb else: self.vtxdb = VTXClient() # 按照设备模型定义属性,注意属性的prop_name需要和设备模型一致,writeable表示属性是否可写;field_name为变量的名字,val为属性的值 self.token_HW = Property(val=self.vtxdb.get("robot_config", "Language.Speech_processor.huaweiyun_recognize_config.token_HW"), field_name="token_HW", prop_name="token_HW", writeable=True) # 定义命令名称与方法名称的映射关系 # self.command2method = {"activate": "activate", "use": "use"} self.__set_writeable_and_readable(self.token_HW) def __set_writeable_and_readable(self, *args): for arg in args: self._readable_prop2field[arg.prop_name] = arg.field_name if arg.writeable: self._writeable_prop2field[arg.prop_name] = arg.field_name def __set_command2method(self, c2m): self._command2method = c2m def set_token_HW(self,data): old_data = self.vtxdb.get("robot_config", "Language.Speech_processor.huaweiyun_recognize_config.token_HW") print("data:",data) if data != old_data: self.vtxdb.set("robot_config", "Language.Speech_processor.huaweiyun_recognize_config.token_HW",data) # print("self.token_HW.val:",self.token_HW.val) def get_token_HW(self): self.token_HW.val = self.vtxdb.get("robot_config", "Language.Speech_processor.huaweiyun_recognize_config.token_HW") # print("self.token_HW.val:",self.token_HW.val) return self.token_HW.val class FileManagerListener(FileManagerListener): def __init__(self, file_manager: FileManagerService): self._file_manager = file_manager self._upload_callback = None def set_upload_callback(self, callback): """设置上传完成回调函数""" self._upload_callback = callback def on_upload_url(self, url_info: UrlInfo): """ 接收文件上传url :param url_info: 上传参数 """ if url_info.object_name not in self._file_manager.upload_file_dict.keys(): raise RuntimeError("object_name: " + url_info.object_name + " has no related file_path") file_path = self._file_manager.upload_file_dict.get(url_info.object_name) if not os.path.isfile(file_path): raise RuntimeError("file_path: " + file_path + " is not file") data = open(file_path, 'rb').read() headers = {"Content-Type": "text/plain", "Host": urlparse(url_info.url).netloc} resp = requests.put(url=url_info.url, data=data, headers=headers) upload_success = resp.status_code == requests.codes.ok if upload_success: logger.info("upload file success. url is: %s" % url_info.url) self._file_manager.report_upload_result(object_name=url_info.object_name, result_code=0, status_code=resp.status_code) else: logger.error("upload file fail, status code: %s" % str(resp.status_code)) logger.error("response content: %s" % resp.text) self._file_manager.report_upload_result(object_name=url_info.object_name, result_code=1, status_code=resp.status_code) # 调用回调函数通知上传结果 if self._upload_callback: self._upload_callback(upload_success) def on_download_url(self, url_info: UrlInfo): """ 接收文件下载url :param url_info: 下载参数 """ if url_info.object_name not in self._file_manager.download_file_dict.keys(): raise RuntimeError("object_name: " + url_info.object_name + " has no related file_path") file_path = self._file_manager.download_file_dict.get(url_info.object_name) headers = {"Content-Type": "text/plain", "Host": urlparse(url_info.url).netloc} resp = requests.get(url=url_info.url, headers=headers) open(file_path, 'wb').write(resp.content) if resp.status_code == requests.codes.ok: logger.info("download file success.") self._file_manager.report_download_result(object_name=url_info.object_name, result_code=0, status_code=resp.status_code) else: logger.error("download file fail, status code: %s" % str(resp.status_code)) logger.error("response content: %s" % resp.text) self._file_manager.report_download_result(object_name=url_info.object_name, result_code=1, status_code=resp.status_code) class FileTracker: def __init__(self, watch_dir, record_file, max_files=20): self.watch_dir = Path(watch_dir) self.record_file = Path(record_file) self.max_files = max_files # Create directory for record file if it doesn't exist os.makedirs(os.path.dirname(self.record_file), exist_ok=True) self.file_records = {} self._load_records() def _load_records(self): if self.record_file.exists(): try: with open(self.record_file, 'r') as f: self.file_records = json.load(f) except json.JSONDecodeError: logger.warning(f"Invalid JSON in {self.record_file}, starting with empty records") self.file_records = {} except Exception as e: logger.error(f"Error loading records from {self.record_file}: {str(e)}") self.file_records = {} def _save_records(self): with open(self.record_file, 'w') as f: json.dump(self.file_records, f, indent=4) def _calculate_file_hash(self, file_path): sha256_hash = hashlib.sha256() with open(file_path, "rb") as f: for byte_block in iter(lambda: f.read(4096), b""): sha256_hash.update(byte_block) return sha256_hash.hexdigest() def needs_upload(self, file_path): file_path = str(file_path) current_hash = self._calculate_file_hash(file_path) file_info = self.file_records.get(file_path) if file_info is None: return True return current_hash != file_info['hash'] def update_record(self, file_path, check_cleanup=False): """ Update the record for a file and optionally check for cleanup :param file_path: Path to the file :param check_cleanup: Whether to perform cleanup check after update """ file_path = str(file_path) current_time = time.time() # 更新记录 self.file_records[file_path] = { 'hash': self._calculate_file_hash(file_path), 'last_upload': current_time, 'upload_time': current_time # 添加上传时间字段用于排序 } self._save_records() # 只有当记录数达到或超过max_files时才进行清理 if check_cleanup and len(self.file_records) >= self.max_files: self._cleanup_old_files() def _cleanup_old_files(self): """Clean up old files when the number of records exceeds max_files""" if len(self.file_records) <= self.max_files: return # 按上传时间排序 sorted_files = sorted( self.file_records.items(), key=lambda x: x[1].get('upload_time', x[1]['last_upload']) # 兼容旧记录 ) # 只删除最早的一个文件 oldest_file_path, _ = sorted_files[0] try: # 删除文件(如果存在) if os.path.exists(oldest_file_path): os.remove(oldest_file_path) logger.info(f"Removed oldest file: {oldest_file_path}") else: logger.warning(f"Oldest file already removed: {oldest_file_path}") # 从记录中删除 del self.file_records[oldest_file_path] # 清理空目录 try: self._cleanup_empty_dirs(os.path.dirname(oldest_file_path)) except Exception as e: logger.error(f"Error cleaning up directories for {oldest_file_path}: {str(e)}") except Exception as e: logger.error(f"Error removing oldest file {oldest_file_path}: {str(e)}") # 即使删除失败也从记录中移除 del self.file_records[oldest_file_path] # 保存更新后的记录 self._save_records() def _cleanup_empty_dirs(self, directory): """Recursively remove empty directories""" try: directory = Path(directory) if not directory.exists(): return # Don't delete the watch directory itself or anything outside it if not str(directory).startswith(str(self.watch_dir)): return # Check if directory is empty (excluding hidden files) has_visible_files = False for item in directory.iterdir(): if not item.name.startswith('.'): has_visible_files = True break if not has_visible_files: try: directory.rmdir() # This will only succeed if the directory is empty logger.info(f"Removed empty directory: {directory}") # Recursively check parent directory self._cleanup_empty_dirs(directory.parent) except OSError: # Directory not empty or other error pass except Exception as e: logger.error(f"Error cleaning up directory {directory}: {str(e)}") class FileChangeHandler(FileSystemEventHandler): def __init__(self, file_tracker, iot_instance, upload_queue_manager): self.file_tracker = file_tracker self.iot = iot_instance self.upload_queue_manager = upload_queue_manager self._last_modified_time = {} self._file_sizes = {} super().__init__() def _is_hidden(self, path): """Check if a file or directory is hidden (starts with .)""" return any(part.startswith('.') for part in Path(path).parts) def _is_file_ready(self, file_path): """ Check if file is ready for upload by comparing sizes over time Returns True if file size is stable (writing completed) """ try: current_size = os.path.getsize(file_path) last_size = self._file_sizes.get(file_path) self._file_sizes[file_path] = current_size # If we have no previous size, store and wait if last_size is None: return False # If size hasn't changed, file is ready return current_size == last_size except (OSError, IOError): # If we can't read the file, assume it's not ready return False def on_created(self, event): if event.is_directory or self._is_hidden(event.src_path): return # When file is created, initialize its tracking self._file_sizes[event.src_path] = None self._handle_file_event(event.src_path) def on_modified(self, event): if event.is_directory or self._is_hidden(event.src_path): return current_time = time.time() last_time = self._last_modified_time.get(event.src_path, 0) # Debounce modifications (wait at least 1 second between uploads) if current_time - last_time > 1: self._handle_file_event(event.src_path) def _handle_file_event(self, file_path): # Double check for hidden files/directories if self._is_hidden(file_path): return # Wait until file is completely written if not self._is_file_ready(file_path): # Schedule a retry after a short delay time.sleep(0.5) if not self._is_file_ready(file_path): return try: if self.file_tracker.needs_upload(file_path): relative_path = os.path.relpath(file_path, "/home/jsfb/jsfb_ws") # Pass file_tracker to upload queue for cleanup after upload self.upload_queue_manager.add_to_queue((self.iot, relative_path, file_path, self.file_tracker)) self._last_modified_time[file_path] = time.time() logger.info(f"Successfully queued file: {file_path}") except Exception as e: logger.error(f"Error processing file {file_path}: {str(e)}") def __del__(self): # Clean up tracking dictionaries self._last_modified_time.clear() self._file_sizes.clear() class OTAListener(OTAListener): def __init__(self, ota_service: OTAService): self.ota_service = ota_service def on_query_version(self, queryInfo: OTAQueryVersion): """ 接收查询版本请求 """ # 获取当前脚本执行的绝对路径 current_path = os.path.abspath(__file__) # 获取当前脚本所在目录的上一级目录 base_path = os.path.dirname(os.path.dirname(current_path)) # 检查当前路径是否包含 MassageRobot_aubo- 开头的文件夹名 if "MassageRobot_aubo-" in base_path: # 提取版本号部分并去除后续目录部分 version = base_path.split("MassageRobot_aubo-")[-1].split("/")[0] else: # 如果没有匹配的版本号,则返回默认值 version = "default" print({"current_version": version}) self.ota_service.report_version(sw_version=version) def on_receive_package_info(self, pkg: Union[OTAPackageInfo, OTAPackageInfoV2]): """ 接收新版本通知 :param pkg: 新版本包信息 """ print("OTASampleListener on_receive_package_info, pkg=", str(pkg.to_dict())) if self.pre_check(pkg) != 0: print("pre_check failed") return # 下载包并升级 self.download_package(pkg) def pre_check(self, pkg: Union[OTAPackageInfo, OTAPackageInfoV2]): """ 对新版本包信息进行检查 :param pkg: 新版本包信息 :return: 如果允许升级,返回0;返回非0表示不允许升级 """ # TODO 对版本号、剩余空间、剩余电量、信号质量等进行检查、如果不允许升级,上报OTAService中定义的错误码或者自定义错误码,返回-1 return 0 def check_package(self, pkg: Union[OTAPackageInfo, OTAPackageInfoV2], sign: str): """ 校验升级包 :param pkg: 新版本包信息 :param sign: str :return: 0表示校验成功;非0表示校验失败 """ if isinstance(pkg, OTAPackageInfo) and sign != pkg.sign: print("check package fail: current file sha256 ",sign) print("target file sha256 ",pkg.sign) return -1 # TODO 增加其他校验 return 0 if __name__ == "__main__": try: # Load configuration from YAML file current_dir = os.path.dirname(os.path.abspath(__file__)) # config_path = os.path.join(current_dir, 'config.yaml') vtxdb = VTXClient() config = vtxdb.get("robot_config","IoT") # print(config) try: hostname = os.uname()[1] except AttributeError: # Windows环境下使用 platform.node() import platform hostname = platform.node() # Create IoTLite instance with configuration iot = IoTLite( server_uri=config['server_uri'], port=config['port'], device_id=f"{config['product_id']}_{hostname}", secret=config['secret'], watch_dirs=config['watch_dirs'], max_files=config.get('max_files', 20) ) def signal_handler(signum, frame): """Handle shutdown signals""" logger.info(f"Received signal {signum}, stopping services...") try: iot.stop() except Exception as e: logger.error(f"Error during shutdown: {str(e)}") sys.exit(0) # Register signal handlers import signal for sig in [signal.SIGTERM, signal.SIGINT]: signal.signal(sig, signal_handler) iot.start() logger.info("IoT service started with file monitoring") # Keep the main thread running while True: if not iot.is_running(): break time.sleep(1) except Exception as e: logger.error(f"Error in main loop: {str(e)}") sys.exit(1) finally: try: iot.stop() except Exception as e: logger.error(f"Error during final cleanup: {str(e)}") logger.info("IoT service stopped") sys.exit(0)