1083 lines
43 KiB
Python
1083 lines
43 KiB
Python
import sys
|
||
import os
|
||
|
||
from iot_device_sdk_python.ota.ota_query_version import OTAQueryVersion
|
||
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
||
from urllib.parse import urlparse
|
||
import requests
|
||
import logging
|
||
import time
|
||
import socket
|
||
import threading
|
||
from queue import Queue
|
||
from typing import Union
|
||
from iot_device_sdk_python.client.client_conf import ClientConf
|
||
from iot_device_sdk_python.client.connect_auth_info import ConnectAuthInfo
|
||
from iot_device_sdk_python.service.abstract_service import AbstractService
|
||
from iot_device_sdk_python.client.request.command_response import CommandRsp
|
||
from iot_device_sdk_python.service.property import Property
|
||
from iot_device_sdk_python.transport.connect_listener import ConnectListener
|
||
from iot_device_sdk_python.filemanager.file_manager_service import FileManagerService
|
||
from iot_device_sdk_python.filemanager.file_manager_listener import FileManagerListener
|
||
from iot_device_sdk_python.filemanager.url_info import UrlInfo
|
||
from iot_device_sdk_python.ota.ota_service import OTAService
|
||
from iot_device_sdk_python.ota.ota_listener import OTAListener
|
||
from iot_device_sdk_python.ota.ota_package_info import OTAPackageInfo
|
||
from iot_device_sdk_python.ota.ota_package_info_v2 import OTAPackageInfoV2
|
||
from iot_device_sdk_python.iot_device import IotDevice
|
||
|
||
import yaml
|
||
import json
|
||
import hashlib
|
||
from pathlib import Path
|
||
from watchdog.observers import Observer
|
||
from watchdog.events import FileSystemEventHandler
|
||
|
||
from VortXDB.client import VTXClient
|
||
|
||
# 配置日志输出到标准输出,systemd会自动捕获
|
||
logging.basicConfig(
|
||
level=logging.INFO,
|
||
format="%(asctime)s - %(threadName)s - %(filename)s[%(funcName)s] - %(levelname)s: %(message)s"
|
||
)
|
||
logger = logging.getLogger(__name__)
|
||
|
||
def get_absolute_path(relative_path):
|
||
"""Convert relative path to absolute path based on script location"""
|
||
if os.path.isabs(relative_path):
|
||
return relative_path
|
||
script_dir = os.path.dirname(os.path.abspath(__file__))
|
||
return os.path.abspath(os.path.join(script_dir, relative_path))
|
||
|
||
def load_config(config_path):
|
||
"""Load configuration from YAML file"""
|
||
try:
|
||
config_path = get_absolute_path(config_path)
|
||
with open(config_path, 'r') as f:
|
||
config = yaml.safe_load(f)
|
||
|
||
# Convert relative paths in watch_dirs to absolute paths
|
||
if 'watch_dirs' in config:
|
||
config['watch_dirs'] = [get_absolute_path(path) for path in config['watch_dirs']]
|
||
|
||
return config
|
||
except Exception as e:
|
||
logger.error(f"Error loading config file: {str(e)}")
|
||
raise
|
||
|
||
class UploadQueueManager:
|
||
def __init__(self, upload_interval=2.0):
|
||
self.upload_queue = Queue()
|
||
self.upload_interval = upload_interval
|
||
self._running = False
|
||
self._upload_thread = None
|
||
self._lock = threading.Lock()
|
||
|
||
def start(self):
|
||
with self._lock:
|
||
if not self._running:
|
||
self._running = True
|
||
self._upload_thread = threading.Thread(target=self._upload_worker, daemon=True)
|
||
self._upload_thread.start()
|
||
|
||
def stop(self):
|
||
with self._lock:
|
||
if self._running:
|
||
self._running = False
|
||
# 清空队列
|
||
while not self.upload_queue.empty():
|
||
try:
|
||
self.upload_queue.get_nowait()
|
||
self.upload_queue.task_done()
|
||
except:
|
||
pass
|
||
# 发送停止信号
|
||
self.upload_queue.put(None)
|
||
if self._upload_thread and self._upload_thread.is_alive():
|
||
self._upload_thread.join(timeout=5)
|
||
self._upload_thread = None
|
||
|
||
def clear_queue(self):
|
||
"""清空上传队列"""
|
||
while not self.upload_queue.empty():
|
||
try:
|
||
self.upload_queue.get_nowait()
|
||
self.upload_queue.task_done()
|
||
except:
|
||
pass
|
||
|
||
def add_to_queue(self, file_info):
|
||
if self._running:
|
||
self.upload_queue.put(file_info)
|
||
|
||
def _upload_worker(self):
|
||
while self._running:
|
||
try:
|
||
file_info = self.upload_queue.get()
|
||
if file_info is None: # Stop signal
|
||
break
|
||
|
||
iot_instance, file_name, file_path, file_tracker = file_info
|
||
try:
|
||
# Ensure the file_name includes hostname for all uploads
|
||
if not file_name.startswith(os.uname()[1] + '/'):
|
||
file_name = f"{os.uname()[1]}/{file_name}"
|
||
|
||
# 创建一个事件来跟踪上传状态
|
||
upload_success = threading.Event()
|
||
|
||
def upload_callback(success):
|
||
if success:
|
||
upload_success.set()
|
||
|
||
# 注册回调到文件管理器监听器
|
||
iot_instance.file_manager.get_listener().set_upload_callback(upload_callback)
|
||
|
||
# 开始上传
|
||
iot_instance.upload_file(file_name, file_path)
|
||
|
||
# 等待上传结果,最多等待60秒
|
||
if upload_success.wait(timeout=60):
|
||
logger.info(f"Successfully uploaded file: {file_path} as {file_name}")
|
||
# 只在上传成功后更新记录
|
||
file_tracker.update_record(file_path, check_cleanup=True)
|
||
else:
|
||
logger.error(f"Upload timeout for file: {file_path}")
|
||
# 可以选择重新加入队列或其他错误处理
|
||
if os.path.exists(file_path): # 如果文件还存在,重新加入队列
|
||
logger.info(f"Re-queuing file for upload: {file_path}")
|
||
self.add_to_queue(file_info)
|
||
|
||
except Exception as e:
|
||
logger.error(f"Error uploading file {file_path}: {str(e)}")
|
||
# 如果是临时错误,可以重新加入队列
|
||
if os.path.exists(file_path):
|
||
logger.info(f"Re-queuing file for upload: {file_path}")
|
||
self.add_to_queue(file_info)
|
||
|
||
time.sleep(self.upload_interval) # Wait between uploads
|
||
|
||
except Exception as e:
|
||
logger.error(f"Error in upload worker: {str(e)}")
|
||
continue
|
||
finally:
|
||
self.upload_queue.task_done()
|
||
|
||
class CustomConnectListener(ConnectListener):
|
||
def __init__(self, iot_device: IotDevice, iot_lite=None):
|
||
""" 传入IotDevice实例和IoTLite实例 """
|
||
self.device = iot_device
|
||
self.iot_lite = iot_lite
|
||
self._reconnect_thread = None
|
||
self._should_reconnect = True
|
||
self._reconnect_delay = 5 # 初始重连延迟5秒
|
||
self._max_reconnect_delay = 300 # 最大重连延迟5分钟
|
||
self._reconnect_lock = threading.Lock()
|
||
|
||
def connection_lost(self, cause: str):
|
||
"""
|
||
连接丢失通知,启动重连机制
|
||
Args:
|
||
cause: 连接丢失原因
|
||
"""
|
||
logger.warning("Connection lost. Cause: " + cause)
|
||
|
||
# 先停止所有服务
|
||
if self.iot_lite:
|
||
try:
|
||
self.iot_lite.stop_monitoring()
|
||
except Exception as e:
|
||
logger.error(f"Error stopping services on connection lost: {str(e)}")
|
||
|
||
with self._reconnect_lock:
|
||
if not self._reconnect_thread or not self._reconnect_thread.is_alive():
|
||
self._should_reconnect = True
|
||
self._reconnect_thread = threading.Thread(target=self._reconnect_worker)
|
||
self._reconnect_thread.daemon = True
|
||
self._reconnect_thread.start()
|
||
|
||
def connect_complete(self, reconnect: bool, server_uri: str):
|
||
"""
|
||
连接成功通知
|
||
Args:
|
||
reconnect: 是否为重连
|
||
server_uri: 服务端地址
|
||
"""
|
||
logger.info(f"Connect {'reconnected' if reconnect else 'connected'} to {server_uri}")
|
||
# 重置重连延迟
|
||
self._reconnect_delay = 5
|
||
|
||
# 如果是重连成功,需要重新启动文件监控
|
||
if reconnect and self.iot_lite:
|
||
try:
|
||
self.iot_lite.restart_services()
|
||
except Exception as e:
|
||
logger.error(f"Error restarting services after reconnection: {str(e)}")
|
||
|
||
def _reconnect_worker(self):
|
||
"""重连工作线程"""
|
||
while self._should_reconnect:
|
||
logger.info(f"Attempting to reconnect in {self._reconnect_delay} seconds...")
|
||
time.sleep(self._reconnect_delay)
|
||
|
||
try:
|
||
if self.device.connect() == 0:
|
||
logger.info("Reconnection successful")
|
||
break
|
||
else:
|
||
logger.warning("Reconnection failed, will retry...")
|
||
# 使用指数退避策略增加重连延迟,但不超过最大值
|
||
self._reconnect_delay = min(self._reconnect_delay * 2, self._max_reconnect_delay)
|
||
except Exception as e:
|
||
logger.error(f"Error during reconnection: {str(e)}")
|
||
self._reconnect_delay = min(self._reconnect_delay * 2, self._max_reconnect_delay)
|
||
|
||
class IoTLite:
|
||
def __init__(self, server_uri, port, device_id, secret, watch_dirs=None, max_files=20):
|
||
self.server_uri = server_uri
|
||
self.port = port
|
||
self.device_id = device_id
|
||
print(device_id)
|
||
self.secret = secret
|
||
self._running = False
|
||
self.max_files = max_files
|
||
self.watch_dirs = watch_dirs or []
|
||
|
||
# 确保所有监控目录都是绝对路径
|
||
self.watch_dirs = [get_absolute_path(path) for path in self.watch_dirs]
|
||
|
||
# Create directories if they don't exist
|
||
for watch_dir in self.watch_dirs:
|
||
try:
|
||
os.makedirs(watch_dir, exist_ok=True)
|
||
logger.info(f"Ensured directory exists: {watch_dir}")
|
||
except Exception as e:
|
||
logger.error(f"Error creating directory {watch_dir}: {str(e)}")
|
||
raise
|
||
|
||
""" 创建设备 """
|
||
connect_auth_info = ConnectAuthInfo()
|
||
connect_auth_info.server_uri = self.server_uri
|
||
connect_auth_info.port = self.port
|
||
connect_auth_info.id = self.device_id
|
||
connect_auth_info.secret = self.secret
|
||
|
||
self.client_conf = ClientConf(connect_auth_info)
|
||
self.device = IotDevice(self.client_conf)
|
||
self.license_service = LicenseService()
|
||
self.device.add_service("License", self.license_service)
|
||
self.vtxdb_service = VortXDBService()
|
||
self.device.add_service("VortXDB", self.vtxdb_service)
|
||
self.expected_delivery_service = ExpectedDeliveryService()
|
||
self.device.add_service("ExpectedDelivery", self.expected_delivery_service)
|
||
|
||
# 添加连接监听器
|
||
self.connect_listener = CustomConnectListener(self.device, self)
|
||
self.device.get_client().add_connect_listener(self.connect_listener)
|
||
|
||
self.file_manager: FileManagerService = self.device.get_file_manager_service()
|
||
file_manager_listener = FileManagerListener(self.file_manager)
|
||
self.file_manager.set_listener(file_manager_listener)
|
||
|
||
# Initialize upload queue manager
|
||
self.upload_queue_manager = UploadQueueManager(upload_interval=0.05)
|
||
|
||
# Initialize file monitoring
|
||
self.observers = []
|
||
self.event_handlers = []
|
||
self.file_trackers = {}
|
||
|
||
self._init_file_monitoring()
|
||
|
||
def _init_file_monitoring(self):
|
||
"""初始化文件监控"""
|
||
for watch_dir in self.watch_dirs:
|
||
# 使用/home/jsfb/jsfb_ws/iot_records目录存储记录文件
|
||
record_dir = "/home/jsfb/jsfb_ws/iot_records"
|
||
os.makedirs(record_dir, exist_ok=True)
|
||
record_file = os.path.join(record_dir, f"upload_records_{os.path.basename(watch_dir)}.json")
|
||
|
||
file_tracker = FileTracker(watch_dir, record_file, max_files=self.max_files)
|
||
self.file_trackers[watch_dir] = file_tracker
|
||
|
||
observer = Observer()
|
||
event_handler = FileChangeHandler(file_tracker, self, self.upload_queue_manager)
|
||
self.observers.append(observer)
|
||
self.event_handlers.append(event_handler)
|
||
|
||
def restart_services(self):
|
||
"""重新启动服务(用于重连后)"""
|
||
logger.info("Restarting services...")
|
||
try:
|
||
# 停止现有的监控服务
|
||
self.stop_monitoring()
|
||
|
||
# 重新创建上传队列管理器
|
||
self.upload_queue_manager = UploadQueueManager(upload_interval=0.05)
|
||
|
||
# 重新初始化并启动监控服务
|
||
self._init_file_monitoring()
|
||
self.start_monitoring()
|
||
|
||
# 重新扫描现有文件
|
||
self._scan_existing_files()
|
||
|
||
logger.info("Services successfully restarted")
|
||
except Exception as e:
|
||
logger.error(f"Error restarting services: {str(e)}")
|
||
raise
|
||
|
||
def stop_monitoring(self):
|
||
"""停止文件监控服务"""
|
||
# Stop upload queue manager
|
||
try:
|
||
if hasattr(self, 'upload_queue_manager'):
|
||
self.upload_queue_manager.stop()
|
||
except Exception as e:
|
||
logger.error(f"Error stopping upload queue manager: {str(e)}")
|
||
|
||
# Stop all observers
|
||
for observer in self.observers:
|
||
try:
|
||
observer.stop()
|
||
observer.join(timeout=5)
|
||
except Exception as e:
|
||
logger.error(f"Error stopping observer: {str(e)}")
|
||
|
||
# Clean up resources
|
||
for handler in self.event_handlers:
|
||
try:
|
||
handler._last_modified_time.clear()
|
||
handler._file_sizes.clear()
|
||
except Exception as e:
|
||
logger.error(f"Error cleaning up resources: {str(e)}")
|
||
|
||
# Clear lists
|
||
self.observers.clear()
|
||
self.event_handlers.clear()
|
||
|
||
def start_monitoring(self):
|
||
"""启动文件监控服务"""
|
||
# Start upload queue manager
|
||
self.upload_queue_manager.start()
|
||
|
||
# Start all observers
|
||
for i, observer in enumerate(self.observers):
|
||
watch_dir = self.watch_dirs[i]
|
||
event_handler = self.event_handlers[i]
|
||
observer.schedule(event_handler, watch_dir, recursive=True)
|
||
observer.start()
|
||
logger.info(f"Started monitoring directory: {watch_dir}")
|
||
|
||
def start(self):
|
||
"""启动服务,包含重试机制"""
|
||
if not self.connect():
|
||
raise RuntimeError("Failed to connect to platform")
|
||
|
||
self._running = True
|
||
self.license_service.enable_auto_report(30)
|
||
self.vtxdb_service.enable_auto_report(30)
|
||
self.expected_delivery_service.enable_auto_report(30)
|
||
self.start_monitoring()
|
||
# Initial scan of existing files
|
||
self._scan_existing_files()
|
||
|
||
def stop(self):
|
||
"""Gracefully stop all components"""
|
||
if not self._running:
|
||
return
|
||
|
||
self._running = False
|
||
logger.info("Stopping file monitoring...")
|
||
|
||
# 停止重连
|
||
if hasattr(self, 'connect_listener'):
|
||
self.connect_listener._should_reconnect = False
|
||
if self.connect_listener._reconnect_thread and self.connect_listener._reconnect_thread.is_alive():
|
||
self.connect_listener._reconnect_thread.join(timeout=5)
|
||
|
||
self.stop_monitoring()
|
||
self.license_service.disable_auto_report()
|
||
self.vtxdb_service.disable_auto_report()
|
||
self.expected_delivery_service.disable_auto_report()
|
||
|
||
# Disconnect from platform
|
||
try:
|
||
self.device.destroy()
|
||
logger.info("Disconnected from platform")
|
||
except Exception as e:
|
||
logger.error(f"Error disconnecting from platform: {str(e)}")
|
||
|
||
logger.info("Cleanup completed")
|
||
|
||
def connect(self):
|
||
"""连接到平台,带有重试机制"""
|
||
max_retries = 5
|
||
retry_count = 0
|
||
retry_delay = 5 # seconds
|
||
|
||
while retry_count < max_retries:
|
||
try:
|
||
if self.device.connect() == 0:
|
||
logger.info("Successfully connected to platform")
|
||
return True
|
||
logger.warning(f"Connection failed, attempt {retry_count + 1}/{max_retries}")
|
||
time.sleep(retry_delay)
|
||
retry_count += 1
|
||
except Exception as e:
|
||
logger.error(f"Connection error: {str(e)}")
|
||
time.sleep(retry_delay)
|
||
retry_count += 1
|
||
|
||
logger.error("Failed to connect after maximum retries")
|
||
return False
|
||
|
||
def _scan_existing_files(self):
|
||
for watch_dir in self.watch_dirs:
|
||
file_tracker = self.file_trackers[watch_dir]
|
||
for root, _, files in os.walk(watch_dir):
|
||
# Skip hidden directories
|
||
if any(part.startswith('.') for part in Path(root).parts):
|
||
continue
|
||
|
||
for file in files:
|
||
# Skip hidden files
|
||
if file.startswith('.'):
|
||
continue
|
||
|
||
file_path = os.path.join(root, file)
|
||
if file_tracker.needs_upload(file_path):
|
||
relative_path = os.path.relpath(file_path, "/home/jsfb/jsfb_ws")
|
||
# Pass file_tracker to upload queue
|
||
self.upload_queue_manager.add_to_queue((self, relative_path, file_path, file_tracker))
|
||
|
||
def upload_file(self, file_name, file_path):
|
||
logger.info(f"Uploading file: {file_path} with name: {file_name}")
|
||
self.file_manager.upload_file(file_name=file_name, file_path=file_path)
|
||
|
||
def is_running(self):
|
||
"""Return the running state of the IoT service"""
|
||
return self._running
|
||
|
||
|
||
class LicenseService(AbstractService):
|
||
def __init__(self):
|
||
super().__init__()
|
||
|
||
# 按照设备模型定义属性,注意属性的prop_name需要和设备模型一致,writeable表示属性是否可写;field_name为变量的名字,val为属性的值
|
||
self.license_info = Property(val='', field_name="license_info", prop_name="info", writeable=False)
|
||
self.activation_code = Property(val='', field_name="activation_code", prop_name="activation_code", writeable=True)
|
||
# 定义命令名称与方法名称的映射关系
|
||
self.command2method = {"activate": "activate", "use": "use"}
|
||
|
||
self.__set_writeable_and_readable(self.license_info, self.activation_code)
|
||
self.__set_command2method(self.command2method)
|
||
|
||
def __set_writeable_and_readable(self, *args):
|
||
for arg in args:
|
||
self._readable_prop2field[arg.prop_name] = arg.field_name
|
||
if arg.writeable:
|
||
self._writeable_prop2field[arg.prop_name] = arg.field_name
|
||
|
||
def __set_command2method(self, c2m):
|
||
self._command2method = c2m
|
||
|
||
# def _auto_report_thread_func(self, report_interval: int):
|
||
# """
|
||
# 周期上报属性方法
|
||
|
||
# Args:
|
||
# report_interval: 上报周期,单位s
|
||
# """
|
||
# schedule.every(report_interval).seconds.do(self.fire_properties_changed, [])
|
||
# while self._auto_report:
|
||
# schedule.run_pending()
|
||
# time.sleep(1)
|
||
|
||
def get_license_info(self):
|
||
try:
|
||
response = requests.get('http://127.0.0.1:5288/api/license/info')
|
||
if response.status_code == 200:
|
||
self.license_info.val = response.json()
|
||
else:
|
||
logger.error(f"获取license信息失败,状态码: {response.status_code}")
|
||
except Exception as e:
|
||
logger.error(f"获取license信息时发生错误: {e}")
|
||
return self.license_info.val
|
||
|
||
def set_activation_code(self, activation_code):
|
||
try:
|
||
response = requests.post(
|
||
'http://127.0.0.1:5288/api/license/activate',
|
||
json={"activation_code": activation_code}
|
||
)
|
||
if response.status_code != 200 or response.status_code != 202:
|
||
logger.error(f"激活失败,状态码: {response.status_code}")
|
||
except Exception as e:
|
||
logger.error(f"激活过程中发生错误: {e}")
|
||
|
||
|
||
def activate(self, paras: dict):
|
||
activation_code = paras.get("activation_code")
|
||
|
||
try:
|
||
response = requests.post(
|
||
'http://127.0.0.1:5288/api/license/activate',
|
||
json={"activation_code": activation_code}
|
||
)
|
||
if response.status_code != 200 or response.status_code != 202:
|
||
logger.error(f"激活失败,状态码: {response.status_code}")
|
||
command_rsp = CommandRsp()
|
||
command_rsp.result_code = CommandRsp.fail_code()
|
||
return command_rsp
|
||
except Exception as e:
|
||
logger.error(f"激活过程中发生错误: {e}")
|
||
command_rsp = CommandRsp()
|
||
command_rsp.result_code = CommandRsp.fail_code()
|
||
return command_rsp
|
||
|
||
command_rsp = CommandRsp()
|
||
command_rsp.result_code = CommandRsp.success_code()
|
||
return command_rsp
|
||
|
||
def use(self, paras: dict):
|
||
try:
|
||
response = requests.post(
|
||
'http://127.0.0.1:5288/api/license/use'
|
||
)
|
||
if response.status_code != 200:
|
||
logger.error(f"消耗使用次数失败,状态码: {response.status_code}")
|
||
command_rsp = CommandRsp()
|
||
command_rsp.result_code = CommandRsp.fail_code()
|
||
return command_rsp
|
||
except Exception as e:
|
||
logger.error(f"消耗使用次数过程中发生错误: {e}")
|
||
command_rsp = CommandRsp()
|
||
command_rsp.result_code = CommandRsp.fail_code()
|
||
return command_rsp
|
||
|
||
command_rsp = CommandRsp()
|
||
command_rsp.result_code = CommandRsp.success_code()
|
||
return command_rsp
|
||
|
||
class VortXDBService(AbstractService):
|
||
def __init__(self, vtxdb: VTXClient = None):
|
||
super().__init__()
|
||
|
||
if vtxdb is not None:
|
||
self.vtxdb = vtxdb
|
||
else:
|
||
self.vtxdb = VTXClient()
|
||
# 按照设备模型定义属性,注意属性的prop_name需要和设备模型一致,writeable表示属性是否可写;field_name为变量的名字,val为属性的值
|
||
self.robot_info = Property(val=self.vtxdb.get("robot_info", ""), field_name="robot_info", prop_name="robot_info", writeable=True)
|
||
self.robot_config = Property(val=self.vtxdb.get("robot_config", ""), field_name="robot_config", prop_name="robot_config", writeable=True)
|
||
self.massage_plan = Property(val=self.vtxdb.get("massage_plan", ""), field_name="massage_plan", prop_name="massage_plan", writeable=True)
|
||
|
||
# 定义命令名称与方法名称的映射关系
|
||
# self.command2method = {"set": "vtx_set", "delete": "vtx_delete"}
|
||
|
||
self.__set_writeable_and_readable(self.robot_config, self.robot_info, self.massage_plan)
|
||
# self.__set_command2method(self.command2method)
|
||
|
||
def __set_writeable_and_readable(self, *args):
|
||
for arg in args:
|
||
self._readable_prop2field[arg.prop_name] = arg.field_name
|
||
if arg.writeable:
|
||
self._writeable_prop2field[arg.prop_name] = arg.field_name
|
||
|
||
def __set_command2method(self, c2m):
|
||
self._command2method = c2m
|
||
|
||
# def vtx_set(self, paras: dict):
|
||
# pass
|
||
|
||
# def vtx_delete(self, paras: dict):
|
||
# pass
|
||
|
||
def get_robot_config(self):
|
||
self.robot_config.val = self.vtxdb.get("robot_config", "")
|
||
return self.robot_config.val
|
||
|
||
def set_robot_config(self, data: dict):
|
||
self.vtxdb.set("robot_config", "", data)
|
||
|
||
def get_robot_info(self):
|
||
"""获取机器人信息,返回一个包含序列号、软件版本、设备位置信息的字典"""
|
||
|
||
# 获取设备序列号(hostname)
|
||
serial_number = socket.gethostname()
|
||
|
||
# 获取软件版本(当前.py文件所在的上一级目录)
|
||
current_file_path = os.path.abspath(__file__)
|
||
software_version = os.path.basename(os.path.dirname(current_file_path))
|
||
|
||
# 获取设备经纬度
|
||
location_url = "http://127.0.0.1:5000/get_device_location"
|
||
try:
|
||
response = requests.get(location_url, timeout=1)
|
||
if response.status_code == 200:
|
||
location_data = response.json().get('data', {})
|
||
else:
|
||
location_data = {'error': 'Failed to retrieve location'}
|
||
except Exception as e:
|
||
location_data = {'error': f'Location request failed: {str(e)}'}
|
||
|
||
# 获取参数服务器版本
|
||
vortxdb_version = self.vtxdb.get_version()
|
||
|
||
# 组织返回数据
|
||
self.robot_info.val = {
|
||
"serial_number": serial_number,
|
||
"software_version": software_version,
|
||
"vortxdb_version": vortxdb_version,
|
||
"device_location": location_data
|
||
}
|
||
|
||
return self.robot_info.val
|
||
|
||
def set_robot_info(self, data: dict):
|
||
self.vtxdb.set("robot_info", "", data)
|
||
|
||
def get_massage_plan(self):
|
||
self.massage_plan.val = self.vtxdb.get("massage_plan", "")
|
||
return self.massage_plan.val
|
||
|
||
def set_massage_plan(self, data: dict):
|
||
self.vtxdb.set("massage_plan", "", data)
|
||
|
||
|
||
class ExpectedDeliveryService(AbstractService):
|
||
def __init__(self, vtxdb: VTXClient = None):
|
||
super().__init__()
|
||
if vtxdb is not None:
|
||
self.vtxdb = vtxdb
|
||
else:
|
||
self.vtxdb = VTXClient()
|
||
# 按照设备模型定义属性,注意属性的prop_name需要和设备模型一致,writeable表示属性是否可写;field_name为变量的名字,val为属性的值
|
||
self.token_HW = Property(val=self.vtxdb.get("robot_config", "Language.Speech_processor.huaweiyun_recognize_config.token_HW"), field_name="token_HW", prop_name="token_HW", writeable=True)
|
||
|
||
# 定义命令名称与方法名称的映射关系
|
||
# self.command2method = {"activate": "activate", "use": "use"}
|
||
|
||
self.__set_writeable_and_readable(self.token_HW)
|
||
|
||
def __set_writeable_and_readable(self, *args):
|
||
for arg in args:
|
||
self._readable_prop2field[arg.prop_name] = arg.field_name
|
||
if arg.writeable:
|
||
self._writeable_prop2field[arg.prop_name] = arg.field_name
|
||
|
||
def __set_command2method(self, c2m):
|
||
self._command2method = c2m
|
||
|
||
def set_token_HW(self,data):
|
||
old_data = self.vtxdb.get("robot_config", "Language.Speech_processor.huaweiyun_recognize_config.token_HW")
|
||
print("data:",data)
|
||
if data != old_data:
|
||
self.vtxdb.set("robot_config", "Language.Speech_processor.huaweiyun_recognize_config.token_HW",data)
|
||
# print("self.token_HW.val:",self.token_HW.val)
|
||
|
||
def get_token_HW(self):
|
||
self.token_HW.val = self.vtxdb.get("robot_config", "Language.Speech_processor.huaweiyun_recognize_config.token_HW")
|
||
# print("self.token_HW.val:",self.token_HW.val)
|
||
return self.token_HW.val
|
||
|
||
|
||
class FileManagerListener(FileManagerListener):
|
||
def __init__(self, file_manager: FileManagerService):
|
||
self._file_manager = file_manager
|
||
self._upload_callback = None
|
||
|
||
def set_upload_callback(self, callback):
|
||
"""设置上传完成回调函数"""
|
||
self._upload_callback = callback
|
||
|
||
def on_upload_url(self, url_info: UrlInfo):
|
||
"""
|
||
接收文件上传url
|
||
:param url_info: 上传参数
|
||
"""
|
||
if url_info.object_name not in self._file_manager.upload_file_dict.keys():
|
||
raise RuntimeError("object_name: " + url_info.object_name + " has no related file_path")
|
||
|
||
file_path = self._file_manager.upload_file_dict.get(url_info.object_name)
|
||
if not os.path.isfile(file_path):
|
||
raise RuntimeError("file_path: " + file_path + " is not file")
|
||
|
||
data = open(file_path, 'rb').read()
|
||
headers = {"Content-Type": "text/plain", "Host": urlparse(url_info.url).netloc}
|
||
resp = requests.put(url=url_info.url, data=data, headers=headers)
|
||
|
||
upload_success = resp.status_code == requests.codes.ok
|
||
if upload_success:
|
||
logger.info("upload file success. url is: %s" % url_info.url)
|
||
self._file_manager.report_upload_result(object_name=url_info.object_name,
|
||
result_code=0,
|
||
status_code=resp.status_code)
|
||
else:
|
||
logger.error("upload file fail, status code: %s" % str(resp.status_code))
|
||
logger.error("response content: %s" % resp.text)
|
||
self._file_manager.report_upload_result(object_name=url_info.object_name,
|
||
result_code=1,
|
||
status_code=resp.status_code)
|
||
|
||
# 调用回调函数通知上传结果
|
||
if self._upload_callback:
|
||
self._upload_callback(upload_success)
|
||
|
||
def on_download_url(self, url_info: UrlInfo):
|
||
"""
|
||
接收文件下载url
|
||
:param url_info: 下载参数
|
||
"""
|
||
if url_info.object_name not in self._file_manager.download_file_dict.keys():
|
||
raise RuntimeError("object_name: " + url_info.object_name + " has no related file_path")
|
||
|
||
file_path = self._file_manager.download_file_dict.get(url_info.object_name)
|
||
|
||
headers = {"Content-Type": "text/plain", "Host": urlparse(url_info.url).netloc}
|
||
resp = requests.get(url=url_info.url, headers=headers)
|
||
open(file_path, 'wb').write(resp.content)
|
||
if resp.status_code == requests.codes.ok:
|
||
logger.info("download file success.")
|
||
self._file_manager.report_download_result(object_name=url_info.object_name,
|
||
result_code=0,
|
||
status_code=resp.status_code)
|
||
else:
|
||
logger.error("download file fail, status code: %s" % str(resp.status_code))
|
||
logger.error("response content: %s" % resp.text)
|
||
self._file_manager.report_download_result(object_name=url_info.object_name,
|
||
result_code=1,
|
||
status_code=resp.status_code)
|
||
|
||
class FileTracker:
|
||
def __init__(self, watch_dir, record_file, max_files=20):
|
||
self.watch_dir = Path(watch_dir)
|
||
self.record_file = Path(record_file)
|
||
self.max_files = max_files
|
||
|
||
# Create directory for record file if it doesn't exist
|
||
os.makedirs(os.path.dirname(self.record_file), exist_ok=True)
|
||
|
||
self.file_records = {}
|
||
self._load_records()
|
||
|
||
def _load_records(self):
|
||
if self.record_file.exists():
|
||
try:
|
||
with open(self.record_file, 'r') as f:
|
||
self.file_records = json.load(f)
|
||
except json.JSONDecodeError:
|
||
logger.warning(f"Invalid JSON in {self.record_file}, starting with empty records")
|
||
self.file_records = {}
|
||
except Exception as e:
|
||
logger.error(f"Error loading records from {self.record_file}: {str(e)}")
|
||
self.file_records = {}
|
||
|
||
def _save_records(self):
|
||
with open(self.record_file, 'w') as f:
|
||
json.dump(self.file_records, f, indent=4)
|
||
|
||
def _calculate_file_hash(self, file_path):
|
||
sha256_hash = hashlib.sha256()
|
||
with open(file_path, "rb") as f:
|
||
for byte_block in iter(lambda: f.read(4096), b""):
|
||
sha256_hash.update(byte_block)
|
||
return sha256_hash.hexdigest()
|
||
|
||
def needs_upload(self, file_path):
|
||
file_path = str(file_path)
|
||
current_hash = self._calculate_file_hash(file_path)
|
||
file_info = self.file_records.get(file_path)
|
||
|
||
if file_info is None:
|
||
return True
|
||
|
||
return current_hash != file_info['hash']
|
||
|
||
def update_record(self, file_path, check_cleanup=False):
|
||
"""
|
||
Update the record for a file and optionally check for cleanup
|
||
:param file_path: Path to the file
|
||
:param check_cleanup: Whether to perform cleanup check after update
|
||
"""
|
||
file_path = str(file_path)
|
||
current_time = time.time()
|
||
|
||
# 更新记录
|
||
self.file_records[file_path] = {
|
||
'hash': self._calculate_file_hash(file_path),
|
||
'last_upload': current_time,
|
||
'upload_time': current_time # 添加上传时间字段用于排序
|
||
}
|
||
self._save_records()
|
||
|
||
# 只有当记录数达到或超过max_files时才进行清理
|
||
if check_cleanup and len(self.file_records) >= self.max_files:
|
||
self._cleanup_old_files()
|
||
|
||
def _cleanup_old_files(self):
|
||
"""Clean up old files when the number of records exceeds max_files"""
|
||
if len(self.file_records) <= self.max_files:
|
||
return
|
||
|
||
# 按上传时间排序
|
||
sorted_files = sorted(
|
||
self.file_records.items(),
|
||
key=lambda x: x[1].get('upload_time', x[1]['last_upload']) # 兼容旧记录
|
||
)
|
||
|
||
# 只删除最早的一个文件
|
||
oldest_file_path, _ = sorted_files[0]
|
||
try:
|
||
# 删除文件(如果存在)
|
||
if os.path.exists(oldest_file_path):
|
||
os.remove(oldest_file_path)
|
||
logger.info(f"Removed oldest file: {oldest_file_path}")
|
||
else:
|
||
logger.warning(f"Oldest file already removed: {oldest_file_path}")
|
||
|
||
# 从记录中删除
|
||
del self.file_records[oldest_file_path]
|
||
|
||
# 清理空目录
|
||
try:
|
||
self._cleanup_empty_dirs(os.path.dirname(oldest_file_path))
|
||
except Exception as e:
|
||
logger.error(f"Error cleaning up directories for {oldest_file_path}: {str(e)}")
|
||
|
||
except Exception as e:
|
||
logger.error(f"Error removing oldest file {oldest_file_path}: {str(e)}")
|
||
# 即使删除失败也从记录中移除
|
||
del self.file_records[oldest_file_path]
|
||
|
||
# 保存更新后的记录
|
||
self._save_records()
|
||
|
||
def _cleanup_empty_dirs(self, directory):
|
||
"""Recursively remove empty directories"""
|
||
try:
|
||
directory = Path(directory)
|
||
if not directory.exists():
|
||
return
|
||
|
||
# Don't delete the watch directory itself or anything outside it
|
||
if not str(directory).startswith(str(self.watch_dir)):
|
||
return
|
||
|
||
# Check if directory is empty (excluding hidden files)
|
||
has_visible_files = False
|
||
for item in directory.iterdir():
|
||
if not item.name.startswith('.'):
|
||
has_visible_files = True
|
||
break
|
||
|
||
if not has_visible_files:
|
||
try:
|
||
directory.rmdir() # This will only succeed if the directory is empty
|
||
logger.info(f"Removed empty directory: {directory}")
|
||
# Recursively check parent directory
|
||
self._cleanup_empty_dirs(directory.parent)
|
||
except OSError:
|
||
# Directory not empty or other error
|
||
pass
|
||
except Exception as e:
|
||
logger.error(f"Error cleaning up directory {directory}: {str(e)}")
|
||
|
||
class FileChangeHandler(FileSystemEventHandler):
|
||
def __init__(self, file_tracker, iot_instance, upload_queue_manager):
|
||
self.file_tracker = file_tracker
|
||
self.iot = iot_instance
|
||
self.upload_queue_manager = upload_queue_manager
|
||
self._last_modified_time = {}
|
||
self._file_sizes = {}
|
||
super().__init__()
|
||
|
||
def _is_hidden(self, path):
|
||
"""Check if a file or directory is hidden (starts with .)"""
|
||
return any(part.startswith('.') for part in Path(path).parts)
|
||
|
||
def _is_file_ready(self, file_path):
|
||
"""
|
||
Check if file is ready for upload by comparing sizes over time
|
||
Returns True if file size is stable (writing completed)
|
||
"""
|
||
try:
|
||
current_size = os.path.getsize(file_path)
|
||
last_size = self._file_sizes.get(file_path)
|
||
self._file_sizes[file_path] = current_size
|
||
|
||
# If we have no previous size, store and wait
|
||
if last_size is None:
|
||
return False
|
||
|
||
# If size hasn't changed, file is ready
|
||
return current_size == last_size
|
||
except (OSError, IOError):
|
||
# If we can't read the file, assume it's not ready
|
||
return False
|
||
|
||
def on_created(self, event):
|
||
if event.is_directory or self._is_hidden(event.src_path):
|
||
return
|
||
# When file is created, initialize its tracking
|
||
self._file_sizes[event.src_path] = None
|
||
self._handle_file_event(event.src_path)
|
||
|
||
def on_modified(self, event):
|
||
if event.is_directory or self._is_hidden(event.src_path):
|
||
return
|
||
|
||
current_time = time.time()
|
||
last_time = self._last_modified_time.get(event.src_path, 0)
|
||
|
||
# Debounce modifications (wait at least 1 second between uploads)
|
||
if current_time - last_time > 1:
|
||
self._handle_file_event(event.src_path)
|
||
|
||
def _handle_file_event(self, file_path):
|
||
# Double check for hidden files/directories
|
||
if self._is_hidden(file_path):
|
||
return
|
||
|
||
# Wait until file is completely written
|
||
if not self._is_file_ready(file_path):
|
||
# Schedule a retry after a short delay
|
||
time.sleep(0.5)
|
||
if not self._is_file_ready(file_path):
|
||
return
|
||
|
||
try:
|
||
if self.file_tracker.needs_upload(file_path):
|
||
relative_path = os.path.relpath(file_path, "/home/jsfb/jsfb_ws")
|
||
# Pass file_tracker to upload queue for cleanup after upload
|
||
self.upload_queue_manager.add_to_queue((self.iot, relative_path, file_path, self.file_tracker))
|
||
self._last_modified_time[file_path] = time.time()
|
||
logger.info(f"Successfully queued file: {file_path}")
|
||
except Exception as e:
|
||
logger.error(f"Error processing file {file_path}: {str(e)}")
|
||
|
||
def __del__(self):
|
||
# Clean up tracking dictionaries
|
||
self._last_modified_time.clear()
|
||
self._file_sizes.clear()
|
||
|
||
class OTAListener(OTAListener):
|
||
def __init__(self, ota_service: OTAService):
|
||
self.ota_service = ota_service
|
||
|
||
def on_query_version(self, queryInfo: OTAQueryVersion):
|
||
"""
|
||
接收查询版本请求
|
||
"""
|
||
# 获取当前脚本执行的绝对路径
|
||
current_path = os.path.abspath(__file__)
|
||
|
||
# 获取当前脚本所在目录的上一级目录
|
||
base_path = os.path.dirname(os.path.dirname(current_path))
|
||
|
||
# 检查当前路径是否包含 MassageRobot_Dobot- 开头的文件夹名
|
||
if "MassageRobot_Dobot-" in base_path:
|
||
# 提取版本号部分并去除后续目录部分
|
||
version = base_path.split("MassageRobot_Dobot-")[-1].split("/")[0]
|
||
else:
|
||
# 如果没有匹配的版本号,则返回默认值
|
||
version = "default"
|
||
|
||
print({"current_version": version})
|
||
self.ota_service.report_version(sw_version=version)
|
||
|
||
def on_receive_package_info(self, pkg: Union[OTAPackageInfo, OTAPackageInfoV2]):
|
||
"""
|
||
接收新版本通知
|
||
:param pkg: 新版本包信息
|
||
"""
|
||
print("OTASampleListener on_receive_package_info, pkg=", str(pkg.to_dict()))
|
||
if self.pre_check(pkg) != 0:
|
||
print("pre_check failed")
|
||
return
|
||
# 下载包并升级
|
||
self.download_package(pkg)
|
||
|
||
def pre_check(self, pkg: Union[OTAPackageInfo, OTAPackageInfoV2]):
|
||
"""
|
||
对新版本包信息进行检查
|
||
:param pkg: 新版本包信息
|
||
:return: 如果允许升级,返回0;返回非0表示不允许升级
|
||
"""
|
||
# TODO 对版本号、剩余空间、剩余电量、信号质量等进行检查、如果不允许升级,上报OTAService中定义的错误码或者自定义错误码,返回-1
|
||
return 0
|
||
|
||
def check_package(self, pkg: Union[OTAPackageInfo, OTAPackageInfoV2], sign: str):
|
||
"""
|
||
校验升级包
|
||
:param pkg: 新版本包信息
|
||
:param sign: str
|
||
:return: 0表示校验成功;非0表示校验失败
|
||
"""
|
||
if isinstance(pkg, OTAPackageInfo) and sign != pkg.sign:
|
||
print("check package fail: current file sha256 ",sign)
|
||
print("target file sha256 ",pkg.sign)
|
||
return -1
|
||
# TODO 增加其他校验
|
||
return 0
|
||
|
||
if __name__ == "__main__":
|
||
try:
|
||
# Load configuration from YAML file
|
||
current_dir = os.path.dirname(os.path.abspath(__file__))
|
||
# config_path = os.path.join(current_dir, 'config.yaml')
|
||
vtxdb = VTXClient()
|
||
config = vtxdb.get("robot_config","IoT")
|
||
# print(config)
|
||
|
||
try:
|
||
hostname = os.uname()[1]
|
||
except AttributeError:
|
||
# Windows环境下使用 platform.node()
|
||
import platform
|
||
hostname = platform.node()
|
||
|
||
# Create IoTLite instance with configuration
|
||
iot = IoTLite(
|
||
server_uri=config['server_uri'],
|
||
port=config['port'],
|
||
device_id=f"{config['product_id']}_{hostname}",
|
||
secret=config['secret'],
|
||
watch_dirs=config['watch_dirs'],
|
||
max_files=config.get('max_files', 20)
|
||
)
|
||
|
||
def signal_handler(signum, frame):
|
||
"""Handle shutdown signals"""
|
||
logger.info(f"Received signal {signum}, stopping services...")
|
||
try:
|
||
iot.stop()
|
||
except Exception as e:
|
||
logger.error(f"Error during shutdown: {str(e)}")
|
||
sys.exit(0)
|
||
|
||
# Register signal handlers
|
||
import signal
|
||
for sig in [signal.SIGTERM, signal.SIGINT]:
|
||
signal.signal(sig, signal_handler)
|
||
|
||
iot.start()
|
||
logger.info("IoT service started with file monitoring")
|
||
|
||
# Keep the main thread running
|
||
while True:
|
||
if not iot.is_running():
|
||
break
|
||
time.sleep(1)
|
||
|
||
except Exception as e:
|
||
logger.error(f"Error in main loop: {str(e)}")
|
||
sys.exit(1)
|
||
finally:
|
||
try:
|
||
iot.stop()
|
||
except Exception as e:
|
||
logger.error(f"Error during final cleanup: {str(e)}")
|
||
logger.info("IoT service stopped")
|
||
sys.exit(0) |