2025-05-27 15:46:31 +08:00

502 lines
20 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- encoding: utf-8 -*-
# Copyright (c) 2023-2024 Huawei Cloud Computing Technology Co., Ltd. All rights reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import absolute_import
from typing import List, Optional
import logging
import json
from iot_device_sdk_python.client.client_conf import ClientConf
from iot_device_sdk_python.client.request.props_get import PropsGet
from iot_device_sdk_python.client.request.props_set import PropSet
from iot_device_sdk_python.iot_device import IotDevice
from iot_device_sdk_python.client.request.device_event import DeviceEvent
from iot_device_sdk_python.gateway.sub_dev_discovery_listener import SubDevDiscoveryListener
from iot_device_sdk_python.gateway.gtw_operate_sub_device_listener import GtwOperateSubDeviceListener
from iot_device_sdk_python.gateway.sub_devices_persistence import SubDevicesPersistence
from iot_device_sdk_python.transport.action_listener import ActionListener
from iot_device_sdk_python.utils.iot_util import get_node_id_from_device_id, get_event_time
from iot_device_sdk_python.gateway.requests.device_info import DeviceInfo
from iot_device_sdk_python.client.request.device_message import DeviceMessage
from iot_device_sdk_python.client.request.service_property import ServiceProperty
from iot_device_sdk_python.gateway.requests.device_property import DeviceProperty
from iot_device_sdk_python.transport.raw_message import RawMessage
from iot_device_sdk_python.gateway.requests.device_status import DeviceStatus
from iot_device_sdk_python.gateway.requests.added_sub_device_info import AddedSubDeviceInfo
from iot_device_sdk_python.client.request.device_events import DeviceEvents
from iot_device_sdk_python.gateway.requests.sub_devices_info import SubDevicesInfo
from iot_device_sdk_python.gateway.requests.gtw_add_sub_device_rsp import GtwAddSubDeviceRsp
from iot_device_sdk_python.gateway.requests.gtw_del_sub_device_rsp import GtwDelSubDeviceRsp
from iot_device_sdk_python.gateway.requests.added_sub_device_info_rsp import AddedSubDeviceInfoRsp
from iot_device_sdk_python.gateway.requests.add_sub_device_failed_reason import AddSubDeviceFailedReason
from iot_device_sdk_python.gateway.requests.del_sub_device_failed_reason import DelSubDeviceFailedReason
from iot_device_sdk_python.client.request.command import Command
class AbstractGateway(IotDevice):
"""
抽象网关,实现了子设备管理,子设备消息转发功能
"""
_logger = logging.getLogger(__name__)
def __init__(self, sub_devices_persistence: SubDevicesPersistence, client_conf: ClientConf):
"""
初始化方法
Args:
sub_devices_persistence: 子设备信息持久化,提供子设备信息保存能力
"""
super().__init__(client_conf)
self._sub_dev_discovery_listener: Optional[SubDevDiscoveryListener] = None # TODO 子设备发现监听器属于scan事件类型暂未使用
self._gtw_operate_sub_device_listener: Optional[GtwOperateSubDeviceListener] = None # 网关操作子设备监听器
self._sub_device_persistence = sub_devices_persistence # 子设备持久化,提供子设备信息保存能力
def set_sub_dev_discovery_listener(self, sub_dev_discovery_listener: SubDevDiscoveryListener):
"""
设置子设备发现监听器(暂未使用)
TODO 子设备发现监听器属于scan事件类型暂未使用
Args:
sub_dev_discovery_listener: 子设备发现监听器
"""
self._sub_dev_discovery_listener = sub_dev_discovery_listener
def set_gtw_operate_sub_device_listener(self, gtw_operate_sub_device_listener: GtwOperateSubDeviceListener):
"""
设置网关添加/删除子设备监听器
Args:
gtw_operate_sub_device_listener: 网关操作子设备监听器
"""
self._gtw_operate_sub_device_listener = gtw_operate_sub_device_listener
def get_sub_device_by_node_id(self, node_id: str) -> DeviceInfo:
"""
根据设备标识码查询子设备
Args:
node_id: 设备标识码
Returns:
DeviceInfo: 子设备信息
"""
return self._sub_device_persistence.get_sub_device(node_id)
def get_sub_device_by_device_id(self, device_id: str) -> DeviceInfo:
"""
根据设备id查询子设备
Args:
device_id: 设备id
Returns:
DeviceInfo: 子设备信息,
"""
node_id: str = get_node_id_from_device_id(device_id)
return self._sub_device_persistence.get_sub_device(node_id)
def report_sub_dev_list(self, device_infos: List[DeviceInfo], listener: Optional[ActionListener] = None):
"""
上报子设备发现结果
TODO 属于scan事件类型暂未使用
Args:
device_infos: 子设备信息列表
listener: 发布监听器
"""
device_event = DeviceEvent()
device_event.service_id = "sub_device_discovery"
device_event.event_type = "scan_result"
device_event.event_time = get_event_time()
device_info_list = list()
for device_info in device_infos:
device_info_list.append(device_info.to_dict())
paras: dict = {"devices": device_info_list}
device_event.paras = paras
self.get_client().report_event(device_event, listener)
def report_sub_device_message(self, device_message: DeviceMessage, listener: Optional[ActionListener] = None):
"""
上报子设备消息
Args:
device_message: 设备消息
listener: 发布监听器
"""
self.get_client().report_device_message(device_message, listener)
def report_sub_device_properties(self, device_id: str, services: List[ServiceProperty],
listener: Optional[ActionListener] = None):
"""
上报子设备属性
Args:
device_id: 子设备id
services: 服务属性列表
listener: 发布监听器
"""
device_property = DeviceProperty()
device_property.device_id = device_id
device_property.services = services
self.report_batch_properties([device_property], listener)
def report_batch_properties(self, device_properties: List[DeviceProperty],
listener: Optional[ActionListener] = None):
"""
批量上报子设备属性
Args:
device_properties: 子设备属性列表
listener: 发布监听器
"""
device_property_list = list()
for device_property in device_properties:
device_property_list.append(device_property.to_dict())
devices: dict = {"devices": device_property_list}
topic = "$oc/devices/" + self.get_device_id() + "/sys/gateway/sub_devices/properties/report"
try:
payload = json.dumps(devices)
except Exception as e:
self._logger.error("json.dumps failed, Exception: %s", str(e))
raise e
raw_message = RawMessage(topic, payload)
self.get_client().publish_raw_message(raw_message, listener)
def report_sub_device_status(self, device_id: str, status: str, listener: Optional[ActionListener] = None):
"""
上报子设备状态
Args:
device_id: 子设备id
status: 设备状态。OFFLINE:设备离线ONLINE:设备在线
listener: 发布监听器
"""
device_status = DeviceStatus()
device_status.device_id = device_id
device_status.status = status
self.report_batch_status([device_status], listener)
def report_batch_status(self, statuses: List[DeviceStatus], listener: Optional[ActionListener] = None):
"""
批量上报子设备状态
Args:
statuses: 子设备状态列表
listener: 发布监听器
"""
device_event = DeviceEvent()
device_event.service_id = "$sub_device_manager"
device_event.event_type = "sub_device_update_status"
device_event.event_time = get_event_time()
status_list = list()
for status in statuses:
status_list.append(status.to_dict())
device_event.paras = {"device_statuses": status_list}
self.get_client().report_event(device_event, listener)
def gtw_add_sub_device(self, added_sub_device_infos: List[AddedSubDeviceInfo], event_id: str,
listener: Optional[ActionListener] = None):
"""
网关发起新增子设备请求
Args:
added_sub_device_infos: 子设备信息列表
event_id: 此次请求的事件id不携带则由平台自动生成
listener: 发布监听器
"""
device_event = DeviceEvent()
device_event.service_id = "$sub_device_manager"
device_event.event_type = "add_sub_device_request"
device_event.event_time = get_event_time()
device_event.event_id = event_id
added_sub_device_info_list = list()
for added_sub_device_info in added_sub_device_infos:
added_sub_device_info_list.append(added_sub_device_info.to_dict())
paras: dict = {"devices": added_sub_device_info_list}
device_event.paras = paras
self.get_client().report_event(device_event, listener)
def gtw_del_sub_device(self, del_sub_devices: List[str], event_id: str, listener: Optional[ActionListener] = None):
"""
网关发起删除子设备请求
Args:
del_sub_devices: 要删除的子设备列表
event_id: 此次请求的事件id不携带则有平台自动生成
listener: 发布监听器
"""
device_event = DeviceEvent()
device_event.service_id = "$sub_device_manager"
device_event.event_type = "delete_sub_device_request"
device_event.event_time = get_event_time()
device_event.event_id = event_id
paras: dict = {"devices": del_sub_devices}
device_event.paras = paras
self.get_client().report_event(device_event, listener)
def on_event(self, device_events: DeviceEvents):
"""
事件处理回调由SDK自动调用
Args:
device_events: 设备事件
"""
# 子设备的事件
if device_events.device_id and device_events.device_id != self.get_device_id():
self.on_sub_dev_event(device_events)
return
# 网关的事件
super().on_event(device_events)
# 网关管理子设备的事件
for event in device_events.services:
event: DeviceEvent
if event.service_id != "$sub_device_manager":
continue
if event.event_type == "start_scan":
# TODO scan的事件类型暂未启用
pass
elif event.event_type == "add_sub_device_notify":
# 平台通知网关子设备新增
sub_devices_info = SubDevicesInfo()
version = event.paras.get("version")
sub_devices_info.version = version
tmp = list()
devices: list = event.paras.get("devices")
for device in devices:
device: dict
device_info = DeviceInfo()
device_info.convert_from_dict(device)
tmp.append(device_info)
sub_devices_info.devices = tmp
self.on_add_sub_devices(sub_devices_info)
elif event.event_type == "delete_sub_device_notify":
# 平台通知网关子设备删除
sub_devices_info = SubDevicesInfo()
version = event.paras.get("version")
sub_devices_info.version = version
tmp = list()
devices: list = event.paras.get("devices")
for device in devices:
device: dict
device_info = DeviceInfo()
device_info.convert_from_dict(device)
tmp.append(device_info)
sub_devices_info.devices = tmp
self.on_delete_sub_devices(sub_devices_info)
elif event.event_type == "add_sub_device_response":
# 网关新增子设备请求响应
gtw_add_sub_device_rsp = GtwAddSubDeviceRsp()
# successful_devices
success_tmp = list()
successful_devices: list = event.paras.get("successful_devices")
for device in successful_devices:
device: dict
added_sub_device_info_rsp = AddedSubDeviceInfoRsp()
added_sub_device_info_rsp.convert_from_dict(device)
success_tmp.append(added_sub_device_info_rsp)
gtw_add_sub_device_rsp.successful_devices = success_tmp
# failed_devices
fail_tmp = list()
failed_devices: list = event.paras.get("failed_devices")
for device in failed_devices:
device: dict
add_sub_device_failed_reason = AddSubDeviceFailedReason()
add_sub_device_failed_reason.convert_from_dict(device)
fail_tmp.append(add_sub_device_failed_reason)
gtw_add_sub_device_rsp.add_sub_device_failed_reasons = fail_tmp
if self._gtw_operate_sub_device_listener is not None:
self._gtw_operate_sub_device_listener.on_add_sub_device_rsp(gtw_add_sub_device_rsp,
event.event_id)
elif event.event_type == "delete_sub_device_response":
# 网关删除子设备请求响应
gtw_del_sub_device_rsp = GtwDelSubDeviceRsp()
# successful_devices
gtw_del_sub_device_rsp.successful_devices = event.paras.get("successful_devices")
# failed_devices
fail_tmp = list()
failed_devices: list = event.paras.get("failed_devices")
for device in failed_devices:
device: dict
del_sub_device_failed_reason = DelSubDeviceFailedReason()
del_sub_device_failed_reason.convert_from_dict(device)
fail_tmp.append(del_sub_device_failed_reason)
gtw_del_sub_device_rsp.failed_devices = fail_tmp
if self._gtw_operate_sub_device_listener is not None:
self._gtw_operate_sub_device_listener.on_del_sub_device_rsp(gtw_del_sub_device_rsp,
event.event_id)
else:
self._logger.info("gateway receive unknown event_type: %s", event.event_type)
def on_device_message(self, message: DeviceMessage):
"""
设备消息处理回调
Args:
message: 消息
"""
# 子设备的
if message.device_id and message.device_id != self.get_device_id():
self.on_sub_dev_message(message)
return
# 网关的
super().on_device_message(message)
def on_command(self, request_id: str, command: Command):
"""
命令处理回调
Args:
request_id: 请求id
command: 命令
"""
# 子设备的
if command.device_id and command.device_id != self.get_device_id():
self.on_sub_dev_command(request_id, command)
return
# 网关的
super().on_command(request_id, command)
def on_properties_set(self, request_id: str, props_set: PropSet):
"""
属性设置处理回调
Args:
request_id: 请求id
props_set: 属性设置请求
"""
# 子设备的
if props_set.device_id and props_set.device_id != self.get_device_id():
self.on_sub_dev_properties_set(request_id, props_set)
return
# 网关的
super().on_properties_set(request_id, props_set)
def on_properties_get(self, request_id: str, props_get: PropsGet):
"""
属性查询处理回调
Args:
request_id: 请求id
props_get: 属性查询请求
"""
# 子设备的
if props_get.device_id and props_get.device_id != self.get_device_id():
self.on_sub_dev_properties_get(request_id, props_get)
return
# 网关的
super().on_properties_get(request_id, props_get)
def on_add_sub_devices(self, sub_devices_info: SubDevicesInfo):
"""
添加子设备处理回调
Args:
sub_devices_info: 子设备信息
Returns:
int: 处理结果0表示成功
"""
if self._sub_device_persistence is not None:
return self._sub_device_persistence.add_sub_devices(sub_devices_info)
return -1
def on_delete_sub_devices(self, sub_devices_info: SubDevicesInfo):
"""
删除子设备处理回调
Args:
sub_devices_info: 子设备信息
Returns:
int: 处理结果0表示成功
"""
if self._sub_device_persistence is not None:
return self._sub_device_persistence.delete_sub_devices(sub_devices_info)
return -1
def sync_sub_devices(self):
"""
向平台请求同步子设备信息
"""
self._logger.debug("start to syncSubDevices, local version is %s",
str(self._sub_device_persistence.get_version()))
device_event = DeviceEvent()
device_event.service_id = "$sub_device_manager"
device_event.event_type = "sub_device_sync_request"
device_event.event_time = get_event_time()
paras: dict = {"version": self._sub_device_persistence.get_version()}
device_event.paras = paras
self.get_client().report_event(device_event)
def on_sub_dev_command(self, request_id: str, command: Command):
"""
子设备命令下发处理,网关需要转发给子设备,需要子类实现
Args:
request_id: 请求id
command: 命令
"""
def on_sub_dev_properties_set(self, request_id: str, props_set: PropSet):
"""
子设备属性设置,网关需要转发给子设备,需要子类实现
Args:
request_id: 请求id
props_set: 属性设置
"""
def on_sub_dev_properties_get(self, request_id: str, props_get: PropsGet):
"""
子设备属性查询,网关需要转发给子设备,需要子类实现
Args:
request_id: 请求id
props_get: 属性查询
"""
def on_sub_dev_message(self, message: DeviceMessage):
"""
子设备消息下发,网关需要转发给子设备,需要子类实现
Args:
message: 设备消息
"""
def on_sub_dev_event(self, device_events: DeviceEvents):
"""
子设备事件下发,网关需要转发给子设备,需要子类实现
Args:
device_events: 设备事件
"""