2025-05-27 15:46:31 +08:00

135 lines
5.1 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from obs import ObsClient
import traceback
import atexit
import os
class HW_OBS:
def __init__(self, ak='', sk='', server="https://obs.cn-south-1.myhuaweicloud.com"):
"""初始化OBS客户端"""
self.obsClient = ObsClient(access_key_id=ak, secret_access_key=sk, server=server)
atexit.register(self.close) # 注册 close 方法到 atexit
def get_packages(self, bucket_name, prefix, max_keys=100):
"""
列举OBS存储桶内的对象。
:param bucket_name: 存储桶名称
:param prefix: 对象的前缀
:param max_keys: 单次列举对象的最大数量
:return: 对象列表
"""
try:
# 列举对象
resp = self.obsClient.listObjects(bucket_name, prefix, max_keys=max_keys, encoding_type='url')
if resp.status < 300:
print('List Objects Succeeded')
# print('requestId:', resp.requestId)
# print('bucket_name:', resp.body.name)
# print('prefix:', resp.body.prefix)
# print('max_keys:', resp.body.max_keys)
# print('is_truncated:', resp.body.is_truncated)
object_list = []
for content in resp.body.contents:
obj_info = {
'key': content.key,
'lastModified': content.lastModified,
'etag': content.etag,
'size': content.size,
'storageClass': content.storageClass,
'owner_id': content.owner.owner_id,
'owner_name': content.owner.owner_name,
}
object_list.append(obj_info)
# print(obj_info)
return object_list
else:
print('List Objects Failed')
# print('requestId:', resp.requestId)
print('errorCode:', resp.errorCode)
print('errorMessage:', resp.errorMessage)
return None
except Exception:
print('List Objects Failed')
print(traceback.format_exc())
return None
def download(self, bucket_name, object_key, download_dir, part_size=10 * 1024 * 1024, task_num=5, enable_checkpoint=True, callback = None):
"""
下载OBS存储桶内的对象。
:param bucket_name: 存储桶名称
:param object_key: 对象的Key
:param download_dir: 下载到本地的文件夹路径
:param part_size: 分段大小默认10MB
:param task_num: 分段下载的并发数
:param enable_checkpoint: 是否启用断点续传
"""
if callback is None:
def callback(transferredAmount, totalAmount, totalSeconds):
"""下载进度回调"""
try:
avg_speed_kb = transferredAmount / totalSeconds / 1024
progress_percent = transferredAmount * 100.0 / totalAmount
# print(f"Downloaded: {transferredAmount / (1024 * 1024):.2f} MB / {totalAmount / (1024 * 1024):.2f} MB")
print(f"Progress: {progress_percent:.2f}% | Speed: {avg_speed_kb:.2f} KB/s")
except ZeroDivisionError:
print("Calculating speed... insufficient data (totalSeconds is 0).")
try:
# 检查目标文件夹是否存在,不存在则创建
if not os.path.exists(download_dir):
os.makedirs(download_dir)
print(f"Directory created: {download_dir}")
# 提取文件名并拼接完整路径
file_name = object_key.split("/")[-1]
download_file = os.path.join(download_dir, file_name)
resp = self.obsClient.downloadFile(
bucket_name, object_key, download_file, part_size, task_num, enable_checkpoint, progressCallback=callback
)
if resp.status < 300:
print('Download File Succeeded:', download_file)
else:
print('Download File Failed')
print('errorCode:', resp.errorCode)
print('errorMessage:', resp.errorMessage)
except Exception:
print('Download File Failed')
print(traceback.format_exc())
def close(self):
"""关闭OBS客户端"""
try:
if self.obsClient:
self.obsClient.close()
print("OBS Client closed successfully.")
except Exception as e:
print("Failed to close OBS Client:", e)
# 使用示例
if __name__ == '__main__':
# 替换为实际的ak、sk和服务器地址
ak = ''
sk = ''
server = "https://obs.cn-south-1.myhuaweicloud.com"
obs = HW_OBS(ak, sk, server)
# 列举对象
bucket_name = "robotstorm-files"
prefix = "MassageRobot_aubo/MassageRobot_aubo-"
packages = obs.get_packages(bucket_name, prefix)
print(packages)
# 下载对象
object_key = packages[0]['key']
download_dir = '/home/jsfb/jsfb_ws/downloads/'
obs.download(bucket_name, object_key, download_dir)
# 关闭客户端
# obs.close()