2025-05-27 15:46:31 +08:00

131 lines
3.9 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- encoding: utf-8 -*-
# Copyright (c) 2023-2024 Huawei Cloud Computing Technology Co., Ltd. All rights reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import absolute_import
import logging
import time
import hmac
import hashlib
import datetime
def get_gmt_timestamp():
"""
返回当前时间戳即从格林威治时间1970年01月01日00时00分00秒起至现在的毫秒数
Returns:
int: 当前时间戳
"""
return int(time.time() * 1000)
def get_timestamp():
return time.strftime("%Y%m%d%H", time.gmtime(time.time()))
def get_event_time():
"""
获取当前时间format为 '%Y-%m-%dT%H:%M:%SZ'
Returns:
str: 当前时间
"""
return datetime.datetime.now().strftime("%Y-%m-%dT%H:%M:%SZ")
def get_client_id(device_id=None, psw_sig_type="0", timestamp=""):
"""
一机一密的设备clientId由4个部分组成设备ID、设备身份标识类型固定值为0、密码签名类型、时间戳通过下划线分割
psw_sig_type为密码签名类型
'1' 表示检验时间戳,会先校验消息时间戳与平台时间是否一致,在判断密码是否正确。
'0' 表示不校验时间戳,但也必须带时间戳,但不校验时间是否准确,仅判断密码是否正确。
Args:
device_id: 设备id
psw_sig_type: 密码签名类型
timestamp: 时间戳
Returns:
str: clientId
"""
if not isinstance(device_id, str):
raise ValueError("device_id should be a string type")
return device_id + "_0_" + psw_sig_type + "_" + timestamp
def sha256_hash_from_file(file_path):
with open(file_path, 'rb') as file:
sha256obj = hashlib.sha256()
sha256obj.update(file.read())
hash_value = sha256obj.hexdigest()
return hash_value
def sha256_mac(secret, timestamp):
secret_key = timestamp.encode("utf-8")
secret = secret.encode("utf-8")
password = hmac.new(secret_key, secret, digestmod=hashlib.sha256).hexdigest()
return password
def sha256_mac_salt(secret, salt):
secret_key = salt.encode("utf-8")
return hmac.new(secret_key, secret, digestmod=hashlib.sha256).hexdigest()
def get_request_id_from_msg(msg):
"""
从topic里解析出requestId
:param msg: 一个RawMessage实例
:return: requestId
"""
topic_list = msg.topic.strip().split("request_id=")
if len(topic_list) > 1:
return topic_list[-1]
else:
raise ValueError("request_id was not found at message topic")
def get_device_id_from_msg(msg):
topic_list = msg.topic.strip().split("/")
device_id_index = topic_list.index("devices") + 1
if 0 < device_id_index < len(topic_list):
return topic_list[device_id_index]
else:
return None
def str_is_empty(value):
if value is None:
return True
if not isinstance(value, str):
raise ValueError("Input parameter value is not string")
return value.strip() == ""
def get_node_id_from_device_id(device_id: str):
"""
从deviceId解析出nodeId
:param device_id: 设备id
:return: 设备物理标识
"""
try:
tmp_index = device_id.index("_") + 1
node_id = device_id[tmp_index:]
except Exception as e:
logging.error("get node_id from device_id failed, Exception: %s", str(e))
return None
return node_id