import numpy as np import open3d as o3d import cv2 import time import time from functools import wraps def print_runtime(func): @wraps(func) def wrapper(*args, **kwargs): start = time.time() # 记录开始时间 result = func(*args, **kwargs) # 执行函数 end = time.time() # 记录结束时间 print(f"{func.__name__} 运行耗时: {end - start:.3f}秒") # 打印耗时 return result return wrapper class SimpleNormalVector: def __init__(self, depth_path, rgb_path, intrinsics): """ 初始化类,设置深度图像文件路径和相机内参。 将耗时的点云生成和法向量计算提前到初始化阶段完成。 """ self.depth_path = depth_path self.rgb_path = rgb_path self.intrinsics = intrinsics # 预先读取深度图像 self.depth_image = cv2.imread(depth_path, cv2.IMREAD_UNCHANGED) # 在初始化时就创建RGBD图像和点云 self.rgbd_image = self.create_rgbd_image() self.pcd, self.points, self.normals = self.generate_point_cloud(self.rgbd_image) # 创建KD树用于快速最近邻搜索 self.pcd_tree = o3d.geometry.KDTreeFlann(self.pcd) def create_rgbd_image(self): """ 创建RGBD图像,使用深度图像和RGB图像 """ color_raw = o3d.io.read_image(self.rgb_path) depth_raw = o3d.io.read_image(self.depth_path) # 将深度图像转换为 NumPy 数组 depth_image = np.asarray(depth_raw) # 创建掩膜:深度值为0的区域 mask = (depth_image == 0).astype(np.uint8) # 使用 OpenCV 的 inpaint 函数进行深度修复 depth_image_inpainted = cv2.inpaint(depth_image, mask, inpaintRadius=3, flags=cv2.INPAINT_TELEA) # 将修复后的深度图像转换回 Open3D 图像 depth_fixed = o3d.geometry.Image(depth_image_inpainted) # 创建RGBD图像 rgbd_image = o3d.geometry.RGBDImage.create_from_color_and_depth( color_raw, depth_fixed, convert_rgb_to_intensity=False) return rgbd_image def generate_point_cloud(self, rgbd_image): """ 从RGBD图像生成点云 返回点云对象、点坐标数组和法向量数组 """ # 使用相机内参生成点云 fx = self.intrinsics['fx'] fy = self.intrinsics['fy'] cx = self.intrinsics['cx'] cy = self.intrinsics['cy'] intrinsic = o3d.camera.PinholeCameraIntrinsic() intrinsic.set_intrinsics(640, 480, fx, fy, cx, cy) pcd = o3d.geometry.PointCloud.create_from_rgbd_image(rgbd_image, intrinsic) pcd = pcd.voxel_down_sample(voxel_size=0.01) # 降采样 # 估算点云的法向量 pcd.estimate_normals(search_param=o3d.geometry.KDTreeSearchParamHybrid(radius=0.1, max_nn=30)) # 获取点云的点和法向量 points = np.asarray(pcd.points) normals = np.asarray(pcd.normals) return pcd, points, normals def get_normal_at_point(self, target_point): """ 使用KD树快速查找最近点,并获取其法向量 """ # 使用KD树查找最近的点 _, idx, _ = self.pcd_tree.search_knn_vector_3d(target_point, 1) # 获取最近点的法向量 normal = self.normals[idx[0]] return {'coordinates': target_point, 'normal': normal} def calculate_normals_batch(self, target_points): """ 批量计算多个目标点的法向量,使用向量化操作减少循环次数 """ n_points = len(target_points) # 预分配结果数组 normals_out = np.zeros((n_points, 3)) indices = np.zeros(n_points, dtype=np.int32) # 批量查找最近点的索引 for i, point in enumerate(target_points): _, idx, _ = self.pcd_tree.search_knn_vector_3d(point, 1) indices[i] = idx[0] # 使用索引批量获取法向量 normals_out = self.normals[indices] # 构建结果字典列表 results = [] for i, point in enumerate(target_points): results.append({ 'coordinates': point, 'normal': normals_out[i] }) return results def calculate_normals_vectorized(self, target_points): """ 使用向量化操作计算法向量,适用于大量点的情况 """ # 转换为numpy数组确保向量化操作 points_array = np.array(target_points) # 为每个目标点找到点云中最近的点 # 计算所有目标点到所有点云点的距离矩阵 # 这会消耗大量内存,但速度很快 # target_points: (n_targets, 3), self.points: (n_cloud, 3) # 使用广播计算距离矩阵: (n_targets, n_cloud) n_targets = points_array.shape[0] n_cloud = self.points.shape[0] # 对于大型点云,分批处理以避免内存溢出 batch_size = 1000 # 每批处理的目标点数 closest_indices = np.zeros(n_targets, dtype=np.int32) for batch_start in range(0, n_targets, batch_size): batch_end = min(batch_start + batch_size, n_targets) batch_points = points_array[batch_start:batch_end] # 计算这批点到所有点云点的距离 # 展开 (a-b)^2 = a^2 - 2ab + b^2 以避免大矩阵 batch_sq_dist = np.sum(batch_points**2, axis=1, keepdims=True) cloud_sq_dist = np.sum(self.points**2, axis=1) dot_product = np.dot(batch_points, self.points.T) # 计算欧氏距离的平方 distances = batch_sq_dist - 2 * dot_product + cloud_sq_dist # 找到每行(每个目标点)的最小距离索引 min_indices = np.argmin(distances, axis=1) closest_indices[batch_start:batch_end] = min_indices # 使用最近点的索引获取对应的法向量 normals_out = self.normals[closest_indices] # 构建结果 results = [] for i in range(n_targets): results.append({ 'coordinates': target_points[i], 'normal': normals_out[i] }) return results def calculate_normals(self, target_points): """ 计算多个目标点的法向量,根据点数量自动选择最佳方法 """ # if len(target_points) < 10: # # 少量点时用循环,避免向量化的开销 # normals_data = [] # for point in target_points: # normal_data = self.get_normal_at_point(point) # normals_data.append(normal_data) # return normals_data # elif len(target_points) < 500: # # 中等数量点时,使用批量查询但保留KD树 # return self.calculate_normals_batch(target_points) # else: # # 大量点时,使用完全向量化的方法 # return self.calculate_normals_vectorized(target_points) return self.calculate_normals_batch(target_points) def pixel_to_camera_point(self, pixel_x, pixel_y, depth_value=None): """ 将像素坐标转换为相机坐标系中的3D点 pixel_x, pixel_y: 像素坐标 depth_value: 可选,指定的深度值。如果为None,则从深度图中获取 """ if depth_value is None: # 确保像素坐标在图像范围内 if pixel_x < 0 or pixel_x >= self.depth_image.shape[1] or pixel_y < 0 or pixel_y >= self.depth_image.shape[0]: return None # 获取该像素的深度值(单位:毫米) depth_value = self.depth_image[pixel_y, pixel_x] # 相机内参 fx = self.intrinsics['fx'] fy = self.intrinsics['fy'] cx = self.intrinsics['cx'] cy = self.intrinsics['cy'] # 计算相机坐标系中的3D点(单位:米) x = (pixel_x - cx) * depth_value / fx / 1000.0 y = (pixel_y - cy) * depth_value / fy / 1000.0 z = depth_value / 1000.0 return np.array([x, y, z]) def get_normal_at_pixel(self, pixel_x, pixel_y, depth_value=None): """ 获取特定像素位置的法向量 """ # 将像素坐标转换为相机坐标系下的3D点 camera_point = self.pixel_to_camera_point(pixel_x, pixel_y, depth_value) if camera_point is None: return None # 计算该点的法向量 return self.get_normal_at_point(camera_point) def pixels_to_camera_points(self, pixel_coords): """ 批量将像素坐标转换为相机坐标系中的3D点 pixel_coords: 像素坐标数组,形状为(n, 2),每行为(x, y) 返回:相机坐标系中的3D点数组,形状为(n, 3) """ # 预分配结果数组 n_pixels = len(pixel_coords) camera_points = np.zeros((n_pixels, 3)) # 获取所有像素点的深度值 depth_values = np.zeros(n_pixels) for i, (px, py) in enumerate(pixel_coords): if 0 <= px < self.depth_image.shape[1] and 0 <= py < self.depth_image.shape[0]: depth_values[i] = self.depth_image[py, px] # 相机内参 fx = self.intrinsics['fx'] fy = self.intrinsics['fy'] cx = self.intrinsics['cx'] cy = self.intrinsics['cy'] # 批量计算相机坐标 camera_points[:, 0] = (pixel_coords[:, 0] - cx) * depth_values / fx / 1000.0 camera_points[:, 1] = (pixel_coords[:, 1] - cy) * depth_values / fy / 1000.0 camera_points[:, 2] = depth_values / 1000.0 return camera_points def get_normals_at_pixels(self, pixel_coords): """ 批量获取多个像素位置的法向量 pixel_coords: 像素坐标数组,形状为(n, 2),每行为(x, y) """ # 将像素坐标批量转换为相机坐标系下的3D点 camera_points = self.pixels_to_camera_points(pixel_coords) # 计算法向量 return self.calculate_normals(camera_points) @print_runtime def run(self, points_3d_camera_with_labels, visualize=False): """ 与原始vector.py中的run函数兼容的方法 points_3d_camera_with_labels: 包含label和coordinates的点列表 visualize: 是否可视化结果(此参数仅为保持接口兼容,在此实现中不起作用) 返回: 包含label、coordinates和normal的字典列表 """ # 提取目标点坐标列表 target_points = [point_data['coordinates'] for point_data in points_3d_camera_with_labels] # 使用优化的批量计算法向量 normals_data = self.calculate_normals(target_points) # 构建与原始输出格式一致的结果 result = [] for i, point_data in enumerate(points_3d_camera_with_labels): result.append({ 'label': point_data.get('label', ''), 'coordinates': normals_data[i]['coordinates'], 'normal': normals_data[i]['normal'] }) return result if __name__ == "__main__": # 示例用法 # 设置相机内参 intrinsics = {'fx': 453.17746, 'fy': 453.17746, 'cx': 325.943024, 'cy': 243.559982} # 深度图和RGB图路径 depth_image_path = 'aucpuncture2point/configs/using_img/depth.png' rgb_image_path = 'aucpuncture2point/configs/using_img/color.png' print("开始初始化...") start_time = time.time() normal_vector = SimpleNormalVector(depth_image_path, rgb_image_path, intrinsics) init_time = time.time() - start_time print(f"初始化耗时: {init_time:.4f} 秒") # 方法1:直接传入图像像素坐标,从深度图获取深度值 pixel_x, pixel_y = 320, 240 # 图像中心点 # 测试多次调用性能 num_trials = 100 print(f"\n测试方法1(像素坐标法向量){num_trials}次调用性能...") start_time = time.time() for _ in range(num_trials): result = normal_vector.get_normal_at_pixel(pixel_x, pixel_y) method1_time = time.time() - start_time print(f"方法1平均耗时: {method1_time/num_trials:.6f} 秒/次") if result: print(f"像素坐标 ({pixel_x}, {pixel_y}) 对应的3D点: {result['coordinates']}") print(f"该点的法向量: {result['normal']}") # 方法2:已知3D点坐标,直接计算法向量 # 给定一组3D点(假设已经转换到相机坐标系,单位:米) camera_points = [ np.array([-0.08326775896, -0.15066889276, 0.771]), np.array([0.15773010017, -0.15340477408, 0.785]) ] # 测试多次调用性能 print(f"\n测试方法2(已知3D点法向量){num_trials}次调用性能...") start_time = time.time() for _ in range(num_trials): normals_data = normal_vector.calculate_normals(camera_points) method2_time = time.time() - start_time print(f"方法2平均耗时: {method2_time/num_trials:.6f} 秒/次") # 输出结果 for i, data in enumerate(normals_data): print(f"点 {i+1}: 坐标: {data['coordinates']}, 法向量: {data['normal']}") # 测试单点性能 print(f"\n测试单点法向量查询{num_trials}次调用性能...") single_point = camera_points[0] start_time = time.time() for _ in range(num_trials): normal_data = normal_vector.get_normal_at_point(single_point) single_point_time = time.time() - start_time print(f"单点法向量查询平均耗时: {single_point_time/num_trials:.6f} 秒/次") # 测试与原vector.py兼容的run方法 print("\n测试run方法(兼容原vector.py)...") points_3d_camera_with_labels = [ {"label": "point1", "coordinates": camera_points[0]}, {"label": "point2", "coordinates": camera_points[1]} ] start_time = time.time() run_results = normal_vector.run(points_3d_camera_with_labels) run_time = time.time() - start_time print(f"run方法耗时: {run_time:.6f} 秒") # 输出结果 for data in run_results: print(f"Label: {data['label']}, Coordinates: {data['coordinates']}, Normal: {data['normal']}") # 测试批量像素处理性能 print("\n测试批量像素处理性能...") # 创建多个像素点 n_pixels = 1000 pixel_coords = np.random.randint(0, 480, size=(n_pixels, 2)) start_time = time.time() pixel_results = normal_vector.get_normals_at_pixels(pixel_coords) pixel_batch_time = time.time() - start_time print(f"处理 {n_pixels} 个像素点耗时: {pixel_batch_time:.6f} 秒") print(f"平均每点耗时: {pixel_batch_time/n_pixels:.6f} 秒/点") # 测试不同规模下的向量化处理性能 print("\n测试不同规模下的向量化处理性能...") # 小规模测试 (10个点) small_points = [np.random.rand(3) for _ in range(10)] start_time = time.time() small_results = normal_vector.calculate_normals(small_points) small_time = time.time() - start_time print(f"处理 10 个点耗时: {small_time:.6f} 秒 (每点 {small_time/10:.6f} 秒)") # 中规模测试 (100个点) medium_points = [np.random.rand(3) for _ in range(100)] start_time = time.time() medium_results = normal_vector.calculate_normals(medium_points) medium_time = time.time() - start_time print(f"处理 100 个点耗时: {medium_time:.6f} 秒 (每点 {medium_time/100:.6f} 秒)") # 大规模测试 (1000个点) large_points = [np.random.rand(3) for _ in range(1000)] start_time = time.time() large_results = normal_vector.calculate_normals(large_points) large_time = time.time() - start_time print(f"处理 1000 个点耗时: {large_time:.6f} 秒 (每点 {large_time/1000:.6f} 秒)")