# -*- encoding: utf-8 -*- # Copyright (c) 2023-2024 Huawei Cloud Computing Technology Co., Ltd. All rights reserved. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. from __future__ import absolute_import import logging import time import hmac import hashlib import datetime def get_gmt_timestamp(): """ 返回当前时间戳,即从格林威治时间1970年01月01日00时00分00秒起至现在的毫秒数 Returns: int: 当前时间戳 """ return int(time.time() * 1000) def get_timestamp(): return time.strftime("%Y%m%d%H", time.gmtime(time.time())) def get_event_time(): """ 获取当前时间,format为 '%Y-%m-%dT%H:%M:%SZ' Returns: str: 当前时间 """ return datetime.datetime.now().strftime("%Y-%m-%dT%H:%M:%SZ") def get_client_id(device_id=None, psw_sig_type="0", timestamp=""): """ 一机一密的设备clientId由4个部分组成:设备ID、设备身份标识类型(固定值为0)、密码签名类型、时间戳,通过下划线分割 psw_sig_type为密码签名类型 '1' 表示检验时间戳,会先校验消息时间戳与平台时间是否一致,在判断密码是否正确。 '0' 表示不校验时间戳,但也必须带时间戳,但不校验时间是否准确,仅判断密码是否正确。 Args: device_id: 设备id psw_sig_type: 密码签名类型 timestamp: 时间戳 Returns: str: clientId """ if not isinstance(device_id, str): raise ValueError("device_id should be a string type") return device_id + "_0_" + psw_sig_type + "_" + timestamp def sha256_hash_from_file(file_path): with open(file_path, 'rb') as file: sha256obj = hashlib.sha256() sha256obj.update(file.read()) hash_value = sha256obj.hexdigest() return hash_value def sha256_mac(secret, timestamp): secret_key = timestamp.encode("utf-8") secret = secret.encode("utf-8") password = hmac.new(secret_key, secret, digestmod=hashlib.sha256).hexdigest() return password def sha256_mac_salt(secret, salt): secret_key = salt.encode("utf-8") return hmac.new(secret_key, secret, digestmod=hashlib.sha256).hexdigest() def get_request_id_from_msg(msg): """ 从topic里解析出requestId :param msg: 一个RawMessage实例 :return: requestId """ topic_list = msg.topic.strip().split("request_id=") if len(topic_list) > 1: return topic_list[-1] else: raise ValueError("request_id was not found at message topic") def get_device_id_from_msg(msg): topic_list = msg.topic.strip().split("/") device_id_index = topic_list.index("devices") + 1 if 0 < device_id_index < len(topic_list): return topic_list[device_id_index] else: return None def str_is_empty(value): if value is None: return True if not isinstance(value, str): raise ValueError("Input parameter value is not string") return value.strip() == "" def get_node_id_from_device_id(device_id: str): """ 从deviceId解析出nodeId :param device_id: 设备id :return: 设备物理标识 """ try: tmp_index = device_id.index("_") + 1 node_id = device_id[tmp_index:] except Exception as e: logging.error("get node_id from device_id failed, Exception: %s", str(e)) return None return node_id