2025-05-27 15:46:31 +08:00

406 lines
16 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import numpy as np
import open3d as o3d
import cv2
import time
import time
from functools import wraps
def print_runtime(func):
@wraps(func)
def wrapper(*args, **kwargs):
start = time.time() # 记录开始时间
result = func(*args, **kwargs) # 执行函数
end = time.time() # 记录结束时间
print(f"{func.__name__} 运行耗时: {end - start:.3f}") # 打印耗时
return result
return wrapper
class SimpleNormalVector:
def __init__(self, depth_path, rgb_path, intrinsics):
"""
初始化类,设置深度图像文件路径和相机内参。
将耗时的点云生成和法向量计算提前到初始化阶段完成。
"""
self.depth_path = depth_path
self.rgb_path = rgb_path
self.intrinsics = intrinsics
# 预先读取深度图像
self.depth_image = cv2.imread(depth_path, cv2.IMREAD_UNCHANGED)
# 在初始化时就创建RGBD图像和点云
self.rgbd_image = self.create_rgbd_image()
self.pcd, self.points, self.normals = self.generate_point_cloud(self.rgbd_image)
# 创建KD树用于快速最近邻搜索
self.pcd_tree = o3d.geometry.KDTreeFlann(self.pcd)
def create_rgbd_image(self):
"""
创建RGBD图像使用深度图像和RGB图像
"""
color_raw = o3d.io.read_image(self.rgb_path)
depth_raw = o3d.io.read_image(self.depth_path)
# 将深度图像转换为 NumPy 数组
depth_image = np.asarray(depth_raw)
# 创建掩膜深度值为0的区域
mask = (depth_image == 0).astype(np.uint8)
# 使用 OpenCV 的 inpaint 函数进行深度修复
depth_image_inpainted = cv2.inpaint(depth_image, mask, inpaintRadius=3, flags=cv2.INPAINT_TELEA)
# 将修复后的深度图像转换回 Open3D 图像
depth_fixed = o3d.geometry.Image(depth_image_inpainted)
# 创建RGBD图像
rgbd_image = o3d.geometry.RGBDImage.create_from_color_and_depth(
color_raw, depth_fixed, convert_rgb_to_intensity=False)
return rgbd_image
def generate_point_cloud(self, rgbd_image):
"""
从RGBD图像生成点云
返回点云对象、点坐标数组和法向量数组
"""
# 使用相机内参生成点云
fx = self.intrinsics['fx']
fy = self.intrinsics['fy']
cx = self.intrinsics['cx']
cy = self.intrinsics['cy']
intrinsic = o3d.camera.PinholeCameraIntrinsic()
intrinsic.set_intrinsics(640, 480, fx, fy, cx, cy)
pcd = o3d.geometry.PointCloud.create_from_rgbd_image(rgbd_image, intrinsic)
pcd = pcd.voxel_down_sample(voxel_size=0.01) # 降采样
# 估算点云的法向量
pcd.estimate_normals(search_param=o3d.geometry.KDTreeSearchParamHybrid(radius=0.1, max_nn=30))
# 获取点云的点和法向量
points = np.asarray(pcd.points)
normals = np.asarray(pcd.normals)
return pcd, points, normals
def get_normal_at_point(self, target_point):
"""
使用KD树快速查找最近点并获取其法向量
"""
# 使用KD树查找最近的点
_, idx, _ = self.pcd_tree.search_knn_vector_3d(target_point, 1)
# 获取最近点的法向量
normal = self.normals[idx[0]]
return {'coordinates': target_point, 'normal': normal}
def calculate_normals_batch(self, target_points):
"""
批量计算多个目标点的法向量,使用向量化操作减少循环次数
"""
n_points = len(target_points)
# 预分配结果数组
normals_out = np.zeros((n_points, 3))
indices = np.zeros(n_points, dtype=np.int32)
# 批量查找最近点的索引
for i, point in enumerate(target_points):
_, idx, _ = self.pcd_tree.search_knn_vector_3d(point, 1)
indices[i] = idx[0]
# 使用索引批量获取法向量
normals_out = self.normals[indices]
# 构建结果字典列表
results = []
for i, point in enumerate(target_points):
results.append({
'coordinates': point,
'normal': normals_out[i]
})
return results
def calculate_normals_vectorized(self, target_points):
"""
使用向量化操作计算法向量,适用于大量点的情况
"""
# 转换为numpy数组确保向量化操作
points_array = np.array(target_points)
# 为每个目标点找到点云中最近的点
# 计算所有目标点到所有点云点的距离矩阵
# 这会消耗大量内存,但速度很快
# target_points: (n_targets, 3), self.points: (n_cloud, 3)
# 使用广播计算距离矩阵: (n_targets, n_cloud)
n_targets = points_array.shape[0]
n_cloud = self.points.shape[0]
# 对于大型点云,分批处理以避免内存溢出
batch_size = 1000 # 每批处理的目标点数
closest_indices = np.zeros(n_targets, dtype=np.int32)
for batch_start in range(0, n_targets, batch_size):
batch_end = min(batch_start + batch_size, n_targets)
batch_points = points_array[batch_start:batch_end]
# 计算这批点到所有点云点的距离
# 展开 (a-b)^2 = a^2 - 2ab + b^2 以避免大矩阵
batch_sq_dist = np.sum(batch_points**2, axis=1, keepdims=True)
cloud_sq_dist = np.sum(self.points**2, axis=1)
dot_product = np.dot(batch_points, self.points.T)
# 计算欧氏距离的平方
distances = batch_sq_dist - 2 * dot_product + cloud_sq_dist
# 找到每行(每个目标点)的最小距离索引
min_indices = np.argmin(distances, axis=1)
closest_indices[batch_start:batch_end] = min_indices
# 使用最近点的索引获取对应的法向量
normals_out = self.normals[closest_indices]
# 构建结果
results = []
for i in range(n_targets):
results.append({
'coordinates': target_points[i],
'normal': normals_out[i]
})
return results
def calculate_normals(self, target_points):
"""
计算多个目标点的法向量,根据点数量自动选择最佳方法
"""
# if len(target_points) < 10:
# # 少量点时用循环,避免向量化的开销
# normals_data = []
# for point in target_points:
# normal_data = self.get_normal_at_point(point)
# normals_data.append(normal_data)
# return normals_data
# elif len(target_points) < 500:
# # 中等数量点时使用批量查询但保留KD树
# return self.calculate_normals_batch(target_points)
# else:
# # 大量点时,使用完全向量化的方法
# return self.calculate_normals_vectorized(target_points)
return self.calculate_normals_batch(target_points)
def pixel_to_camera_point(self, pixel_x, pixel_y, depth_value=None):
"""
将像素坐标转换为相机坐标系中的3D点
pixel_x, pixel_y: 像素坐标
depth_value: 可选指定的深度值。如果为None则从深度图中获取
"""
if depth_value is None:
# 确保像素坐标在图像范围内
if pixel_x < 0 or pixel_x >= self.depth_image.shape[1] or pixel_y < 0 or pixel_y >= self.depth_image.shape[0]:
return None
# 获取该像素的深度值(单位:毫米)
depth_value = self.depth_image[pixel_y, pixel_x]
# 相机内参
fx = self.intrinsics['fx']
fy = self.intrinsics['fy']
cx = self.intrinsics['cx']
cy = self.intrinsics['cy']
# 计算相机坐标系中的3D点单位
x = (pixel_x - cx) * depth_value / fx / 1000.0
y = (pixel_y - cy) * depth_value / fy / 1000.0
z = depth_value / 1000.0
return np.array([x, y, z])
def get_normal_at_pixel(self, pixel_x, pixel_y, depth_value=None):
"""
获取特定像素位置的法向量
"""
# 将像素坐标转换为相机坐标系下的3D点
camera_point = self.pixel_to_camera_point(pixel_x, pixel_y, depth_value)
if camera_point is None:
return None
# 计算该点的法向量
return self.get_normal_at_point(camera_point)
def pixels_to_camera_points(self, pixel_coords):
"""
批量将像素坐标转换为相机坐标系中的3D点
pixel_coords: 像素坐标数组,形状为(n, 2),每行为(x, y)
返回相机坐标系中的3D点数组形状为(n, 3)
"""
# 预分配结果数组
n_pixels = len(pixel_coords)
camera_points = np.zeros((n_pixels, 3))
# 获取所有像素点的深度值
depth_values = np.zeros(n_pixels)
for i, (px, py) in enumerate(pixel_coords):
if 0 <= px < self.depth_image.shape[1] and 0 <= py < self.depth_image.shape[0]:
depth_values[i] = self.depth_image[py, px]
# 相机内参
fx = self.intrinsics['fx']
fy = self.intrinsics['fy']
cx = self.intrinsics['cx']
cy = self.intrinsics['cy']
# 批量计算相机坐标
camera_points[:, 0] = (pixel_coords[:, 0] - cx) * depth_values / fx / 1000.0
camera_points[:, 1] = (pixel_coords[:, 1] - cy) * depth_values / fy / 1000.0
camera_points[:, 2] = depth_values / 1000.0
return camera_points
def get_normals_at_pixels(self, pixel_coords):
"""
批量获取多个像素位置的法向量
pixel_coords: 像素坐标数组,形状为(n, 2),每行为(x, y)
"""
# 将像素坐标批量转换为相机坐标系下的3D点
camera_points = self.pixels_to_camera_points(pixel_coords)
# 计算法向量
return self.calculate_normals(camera_points)
@print_runtime
def run(self, points_3d_camera_with_labels, visualize=False):
"""
与原始vector.py中的run函数兼容的方法
points_3d_camera_with_labels: 包含label和coordinates的点列表
visualize: 是否可视化结果(此参数仅为保持接口兼容,在此实现中不起作用)
返回: 包含label、coordinates和normal的字典列表
"""
# 提取目标点坐标列表
target_points = [point_data['coordinates'] for point_data in points_3d_camera_with_labels]
# 使用优化的批量计算法向量
normals_data = self.calculate_normals(target_points)
# 构建与原始输出格式一致的结果
result = []
for i, point_data in enumerate(points_3d_camera_with_labels):
result.append({
'label': point_data.get('label', ''),
'coordinates': normals_data[i]['coordinates'],
'normal': normals_data[i]['normal']
})
return result
if __name__ == "__main__":
# 示例用法
# 设置相机内参
intrinsics = {'fx': 453.17746, 'fy': 453.17746, 'cx': 325.943024, 'cy': 243.559982}
# 深度图和RGB图路径
depth_image_path = 'aucpuncture2point/configs/using_img/depth.png'
rgb_image_path = 'aucpuncture2point/configs/using_img/color.png'
print("开始初始化...")
start_time = time.time()
normal_vector = SimpleNormalVector(depth_image_path, rgb_image_path, intrinsics)
init_time = time.time() - start_time
print(f"初始化耗时: {init_time:.4f}")
# 方法1直接传入图像像素坐标从深度图获取深度值
pixel_x, pixel_y = 320, 240 # 图像中心点
# 测试多次调用性能
num_trials = 100
print(f"\n测试方法1像素坐标法向量{num_trials}次调用性能...")
start_time = time.time()
for _ in range(num_trials):
result = normal_vector.get_normal_at_pixel(pixel_x, pixel_y)
method1_time = time.time() - start_time
print(f"方法1平均耗时: {method1_time/num_trials:.6f} 秒/次")
if result:
print(f"像素坐标 ({pixel_x}, {pixel_y}) 对应的3D点: {result['coordinates']}")
print(f"该点的法向量: {result['normal']}")
# 方法2已知3D点坐标直接计算法向量
# 给定一组3D点假设已经转换到相机坐标系单位
camera_points = [
np.array([-0.08326775896, -0.15066889276, 0.771]),
np.array([0.15773010017, -0.15340477408, 0.785])
]
# 测试多次调用性能
print(f"\n测试方法2已知3D点法向量{num_trials}次调用性能...")
start_time = time.time()
for _ in range(num_trials):
normals_data = normal_vector.calculate_normals(camera_points)
method2_time = time.time() - start_time
print(f"方法2平均耗时: {method2_time/num_trials:.6f} 秒/次")
# 输出结果
for i, data in enumerate(normals_data):
print(f"{i+1}: 坐标: {data['coordinates']}, 法向量: {data['normal']}")
# 测试单点性能
print(f"\n测试单点法向量查询{num_trials}次调用性能...")
single_point = camera_points[0]
start_time = time.time()
for _ in range(num_trials):
normal_data = normal_vector.get_normal_at_point(single_point)
single_point_time = time.time() - start_time
print(f"单点法向量查询平均耗时: {single_point_time/num_trials:.6f} 秒/次")
# 测试与原vector.py兼容的run方法
print("\n测试run方法兼容原vector.py...")
points_3d_camera_with_labels = [
{"label": "point1", "coordinates": camera_points[0]},
{"label": "point2", "coordinates": camera_points[1]}
]
start_time = time.time()
run_results = normal_vector.run(points_3d_camera_with_labels)
run_time = time.time() - start_time
print(f"run方法耗时: {run_time:.6f}")
# 输出结果
for data in run_results:
print(f"Label: {data['label']}, Coordinates: {data['coordinates']}, Normal: {data['normal']}")
# 测试批量像素处理性能
print("\n测试批量像素处理性能...")
# 创建多个像素点
n_pixels = 1000
pixel_coords = np.random.randint(0, 480, size=(n_pixels, 2))
start_time = time.time()
pixel_results = normal_vector.get_normals_at_pixels(pixel_coords)
pixel_batch_time = time.time() - start_time
print(f"处理 {n_pixels} 个像素点耗时: {pixel_batch_time:.6f}")
print(f"平均每点耗时: {pixel_batch_time/n_pixels:.6f} 秒/点")
# 测试不同规模下的向量化处理性能
print("\n测试不同规模下的向量化处理性能...")
# 小规模测试 (10个点)
small_points = [np.random.rand(3) for _ in range(10)]
start_time = time.time()
small_results = normal_vector.calculate_normals(small_points)
small_time = time.time() - start_time
print(f"处理 10 个点耗时: {small_time:.6f} 秒 (每点 {small_time/10:.6f} 秒)")
# 中规模测试 (100个点)
medium_points = [np.random.rand(3) for _ in range(100)]
start_time = time.time()
medium_results = normal_vector.calculate_normals(medium_points)
medium_time = time.time() - start_time
print(f"处理 100 个点耗时: {medium_time:.6f} 秒 (每点 {medium_time/100:.6f} 秒)")
# 大规模测试 (1000个点)
large_points = [np.random.rand(3) for _ in range(1000)]
start_time = time.time()
large_results = normal_vector.calculate_normals(large_points)
large_time = time.time() - start_time
print(f"处理 1000 个点耗时: {large_time:.6f} 秒 (每点 {large_time/1000:.6f} 秒)")