406 lines
16 KiB
Python
406 lines
16 KiB
Python
import numpy as np
|
||
import open3d as o3d
|
||
import cv2
|
||
import time
|
||
|
||
import time
|
||
from functools import wraps
|
||
|
||
def print_runtime(func):
|
||
@wraps(func)
|
||
def wrapper(*args, **kwargs):
|
||
start = time.time() # 记录开始时间
|
||
result = func(*args, **kwargs) # 执行函数
|
||
end = time.time() # 记录结束时间
|
||
print(f"{func.__name__} 运行耗时: {end - start:.3f}秒") # 打印耗时
|
||
return result
|
||
return wrapper
|
||
|
||
class SimpleNormalVector:
|
||
def __init__(self, depth_path, rgb_path, intrinsics):
|
||
"""
|
||
初始化类,设置深度图像文件路径和相机内参。
|
||
将耗时的点云生成和法向量计算提前到初始化阶段完成。
|
||
"""
|
||
self.depth_path = depth_path
|
||
self.rgb_path = rgb_path
|
||
self.intrinsics = intrinsics
|
||
|
||
# 预先读取深度图像
|
||
self.depth_image = cv2.imread(depth_path, cv2.IMREAD_UNCHANGED)
|
||
|
||
# 在初始化时就创建RGBD图像和点云
|
||
self.rgbd_image = self.create_rgbd_image()
|
||
self.pcd, self.points, self.normals = self.generate_point_cloud(self.rgbd_image)
|
||
|
||
# 创建KD树用于快速最近邻搜索
|
||
self.pcd_tree = o3d.geometry.KDTreeFlann(self.pcd)
|
||
|
||
def create_rgbd_image(self):
|
||
"""
|
||
创建RGBD图像,使用深度图像和RGB图像
|
||
"""
|
||
color_raw = o3d.io.read_image(self.rgb_path)
|
||
depth_raw = o3d.io.read_image(self.depth_path)
|
||
|
||
# 将深度图像转换为 NumPy 数组
|
||
depth_image = np.asarray(depth_raw)
|
||
|
||
# 创建掩膜:深度值为0的区域
|
||
mask = (depth_image == 0).astype(np.uint8)
|
||
|
||
# 使用 OpenCV 的 inpaint 函数进行深度修复
|
||
depth_image_inpainted = cv2.inpaint(depth_image, mask, inpaintRadius=3, flags=cv2.INPAINT_TELEA)
|
||
|
||
# 将修复后的深度图像转换回 Open3D 图像
|
||
depth_fixed = o3d.geometry.Image(depth_image_inpainted)
|
||
|
||
# 创建RGBD图像
|
||
rgbd_image = o3d.geometry.RGBDImage.create_from_color_and_depth(
|
||
color_raw, depth_fixed, convert_rgb_to_intensity=False)
|
||
|
||
return rgbd_image
|
||
|
||
def generate_point_cloud(self, rgbd_image):
|
||
"""
|
||
从RGBD图像生成点云
|
||
返回点云对象、点坐标数组和法向量数组
|
||
"""
|
||
# 使用相机内参生成点云
|
||
fx = self.intrinsics['fx']
|
||
fy = self.intrinsics['fy']
|
||
cx = self.intrinsics['cx']
|
||
cy = self.intrinsics['cy']
|
||
intrinsic = o3d.camera.PinholeCameraIntrinsic()
|
||
intrinsic.set_intrinsics(640, 480, fx, fy, cx, cy)
|
||
|
||
pcd = o3d.geometry.PointCloud.create_from_rgbd_image(rgbd_image, intrinsic)
|
||
pcd = pcd.voxel_down_sample(voxel_size=0.01) # 降采样
|
||
|
||
# 估算点云的法向量
|
||
pcd.estimate_normals(search_param=o3d.geometry.KDTreeSearchParamHybrid(radius=0.1, max_nn=30))
|
||
|
||
# 获取点云的点和法向量
|
||
points = np.asarray(pcd.points)
|
||
normals = np.asarray(pcd.normals)
|
||
|
||
return pcd, points, normals
|
||
|
||
def get_normal_at_point(self, target_point):
|
||
"""
|
||
使用KD树快速查找最近点,并获取其法向量
|
||
"""
|
||
# 使用KD树查找最近的点
|
||
_, idx, _ = self.pcd_tree.search_knn_vector_3d(target_point, 1)
|
||
|
||
# 获取最近点的法向量
|
||
normal = self.normals[idx[0]]
|
||
|
||
return {'coordinates': target_point, 'normal': normal}
|
||
|
||
def calculate_normals_batch(self, target_points):
|
||
"""
|
||
批量计算多个目标点的法向量,使用向量化操作减少循环次数
|
||
"""
|
||
n_points = len(target_points)
|
||
# 预分配结果数组
|
||
normals_out = np.zeros((n_points, 3))
|
||
indices = np.zeros(n_points, dtype=np.int32)
|
||
|
||
# 批量查找最近点的索引
|
||
for i, point in enumerate(target_points):
|
||
_, idx, _ = self.pcd_tree.search_knn_vector_3d(point, 1)
|
||
indices[i] = idx[0]
|
||
|
||
# 使用索引批量获取法向量
|
||
normals_out = self.normals[indices]
|
||
|
||
# 构建结果字典列表
|
||
results = []
|
||
for i, point in enumerate(target_points):
|
||
results.append({
|
||
'coordinates': point,
|
||
'normal': normals_out[i]
|
||
})
|
||
|
||
return results
|
||
|
||
def calculate_normals_vectorized(self, target_points):
|
||
"""
|
||
使用向量化操作计算法向量,适用于大量点的情况
|
||
"""
|
||
# 转换为numpy数组确保向量化操作
|
||
points_array = np.array(target_points)
|
||
|
||
# 为每个目标点找到点云中最近的点
|
||
# 计算所有目标点到所有点云点的距离矩阵
|
||
# 这会消耗大量内存,但速度很快
|
||
# target_points: (n_targets, 3), self.points: (n_cloud, 3)
|
||
# 使用广播计算距离矩阵: (n_targets, n_cloud)
|
||
n_targets = points_array.shape[0]
|
||
n_cloud = self.points.shape[0]
|
||
|
||
# 对于大型点云,分批处理以避免内存溢出
|
||
batch_size = 1000 # 每批处理的目标点数
|
||
closest_indices = np.zeros(n_targets, dtype=np.int32)
|
||
|
||
for batch_start in range(0, n_targets, batch_size):
|
||
batch_end = min(batch_start + batch_size, n_targets)
|
||
batch_points = points_array[batch_start:batch_end]
|
||
|
||
# 计算这批点到所有点云点的距离
|
||
# 展开 (a-b)^2 = a^2 - 2ab + b^2 以避免大矩阵
|
||
batch_sq_dist = np.sum(batch_points**2, axis=1, keepdims=True)
|
||
cloud_sq_dist = np.sum(self.points**2, axis=1)
|
||
dot_product = np.dot(batch_points, self.points.T)
|
||
|
||
# 计算欧氏距离的平方
|
||
distances = batch_sq_dist - 2 * dot_product + cloud_sq_dist
|
||
|
||
# 找到每行(每个目标点)的最小距离索引
|
||
min_indices = np.argmin(distances, axis=1)
|
||
closest_indices[batch_start:batch_end] = min_indices
|
||
|
||
# 使用最近点的索引获取对应的法向量
|
||
normals_out = self.normals[closest_indices]
|
||
|
||
# 构建结果
|
||
results = []
|
||
for i in range(n_targets):
|
||
results.append({
|
||
'coordinates': target_points[i],
|
||
'normal': normals_out[i]
|
||
})
|
||
|
||
return results
|
||
|
||
def calculate_normals(self, target_points):
|
||
"""
|
||
计算多个目标点的法向量,根据点数量自动选择最佳方法
|
||
"""
|
||
# if len(target_points) < 10:
|
||
# # 少量点时用循环,避免向量化的开销
|
||
# normals_data = []
|
||
# for point in target_points:
|
||
# normal_data = self.get_normal_at_point(point)
|
||
# normals_data.append(normal_data)
|
||
# return normals_data
|
||
# elif len(target_points) < 500:
|
||
# # 中等数量点时,使用批量查询但保留KD树
|
||
# return self.calculate_normals_batch(target_points)
|
||
# else:
|
||
# # 大量点时,使用完全向量化的方法
|
||
# return self.calculate_normals_vectorized(target_points)
|
||
return self.calculate_normals_batch(target_points)
|
||
|
||
def pixel_to_camera_point(self, pixel_x, pixel_y, depth_value=None):
|
||
"""
|
||
将像素坐标转换为相机坐标系中的3D点
|
||
pixel_x, pixel_y: 像素坐标
|
||
depth_value: 可选,指定的深度值。如果为None,则从深度图中获取
|
||
"""
|
||
if depth_value is None:
|
||
# 确保像素坐标在图像范围内
|
||
if pixel_x < 0 or pixel_x >= self.depth_image.shape[1] or pixel_y < 0 or pixel_y >= self.depth_image.shape[0]:
|
||
return None
|
||
# 获取该像素的深度值(单位:毫米)
|
||
depth_value = self.depth_image[pixel_y, pixel_x]
|
||
|
||
# 相机内参
|
||
fx = self.intrinsics['fx']
|
||
fy = self.intrinsics['fy']
|
||
cx = self.intrinsics['cx']
|
||
cy = self.intrinsics['cy']
|
||
|
||
# 计算相机坐标系中的3D点(单位:米)
|
||
x = (pixel_x - cx) * depth_value / fx / 1000.0
|
||
y = (pixel_y - cy) * depth_value / fy / 1000.0
|
||
z = depth_value / 1000.0
|
||
|
||
return np.array([x, y, z])
|
||
|
||
def get_normal_at_pixel(self, pixel_x, pixel_y, depth_value=None):
|
||
"""
|
||
获取特定像素位置的法向量
|
||
"""
|
||
# 将像素坐标转换为相机坐标系下的3D点
|
||
camera_point = self.pixel_to_camera_point(pixel_x, pixel_y, depth_value)
|
||
if camera_point is None:
|
||
return None
|
||
|
||
# 计算该点的法向量
|
||
return self.get_normal_at_point(camera_point)
|
||
|
||
def pixels_to_camera_points(self, pixel_coords):
|
||
"""
|
||
批量将像素坐标转换为相机坐标系中的3D点
|
||
pixel_coords: 像素坐标数组,形状为(n, 2),每行为(x, y)
|
||
返回:相机坐标系中的3D点数组,形状为(n, 3)
|
||
"""
|
||
# 预分配结果数组
|
||
n_pixels = len(pixel_coords)
|
||
camera_points = np.zeros((n_pixels, 3))
|
||
|
||
# 获取所有像素点的深度值
|
||
depth_values = np.zeros(n_pixels)
|
||
for i, (px, py) in enumerate(pixel_coords):
|
||
if 0 <= px < self.depth_image.shape[1] and 0 <= py < self.depth_image.shape[0]:
|
||
depth_values[i] = self.depth_image[py, px]
|
||
|
||
# 相机内参
|
||
fx = self.intrinsics['fx']
|
||
fy = self.intrinsics['fy']
|
||
cx = self.intrinsics['cx']
|
||
cy = self.intrinsics['cy']
|
||
|
||
# 批量计算相机坐标
|
||
camera_points[:, 0] = (pixel_coords[:, 0] - cx) * depth_values / fx / 1000.0
|
||
camera_points[:, 1] = (pixel_coords[:, 1] - cy) * depth_values / fy / 1000.0
|
||
camera_points[:, 2] = depth_values / 1000.0
|
||
|
||
return camera_points
|
||
|
||
def get_normals_at_pixels(self, pixel_coords):
|
||
"""
|
||
批量获取多个像素位置的法向量
|
||
pixel_coords: 像素坐标数组,形状为(n, 2),每行为(x, y)
|
||
"""
|
||
# 将像素坐标批量转换为相机坐标系下的3D点
|
||
camera_points = self.pixels_to_camera_points(pixel_coords)
|
||
|
||
# 计算法向量
|
||
return self.calculate_normals(camera_points)
|
||
|
||
@print_runtime
|
||
def run(self, points_3d_camera_with_labels, visualize=False):
|
||
"""
|
||
与原始vector.py中的run函数兼容的方法
|
||
points_3d_camera_with_labels: 包含label和coordinates的点列表
|
||
visualize: 是否可视化结果(此参数仅为保持接口兼容,在此实现中不起作用)
|
||
返回: 包含label、coordinates和normal的字典列表
|
||
"""
|
||
# 提取目标点坐标列表
|
||
target_points = [point_data['coordinates'] for point_data in points_3d_camera_with_labels]
|
||
|
||
# 使用优化的批量计算法向量
|
||
normals_data = self.calculate_normals(target_points)
|
||
|
||
# 构建与原始输出格式一致的结果
|
||
result = []
|
||
for i, point_data in enumerate(points_3d_camera_with_labels):
|
||
result.append({
|
||
'label': point_data.get('label', ''),
|
||
'coordinates': normals_data[i]['coordinates'],
|
||
'normal': normals_data[i]['normal']
|
||
})
|
||
|
||
return result
|
||
|
||
|
||
if __name__ == "__main__":
|
||
# 示例用法
|
||
# 设置相机内参
|
||
intrinsics = {'fx': 453.17746, 'fy': 453.17746, 'cx': 325.943024, 'cy': 243.559982}
|
||
|
||
# 深度图和RGB图路径
|
||
depth_image_path = 'aucpuncture2point/configs/using_img/depth.png'
|
||
rgb_image_path = 'aucpuncture2point/configs/using_img/color.png'
|
||
|
||
print("开始初始化...")
|
||
start_time = time.time()
|
||
normal_vector = SimpleNormalVector(depth_image_path, rgb_image_path, intrinsics)
|
||
init_time = time.time() - start_time
|
||
print(f"初始化耗时: {init_time:.4f} 秒")
|
||
|
||
# 方法1:直接传入图像像素坐标,从深度图获取深度值
|
||
pixel_x, pixel_y = 320, 240 # 图像中心点
|
||
|
||
# 测试多次调用性能
|
||
num_trials = 100
|
||
print(f"\n测试方法1(像素坐标法向量){num_trials}次调用性能...")
|
||
start_time = time.time()
|
||
for _ in range(num_trials):
|
||
result = normal_vector.get_normal_at_pixel(pixel_x, pixel_y)
|
||
method1_time = time.time() - start_time
|
||
print(f"方法1平均耗时: {method1_time/num_trials:.6f} 秒/次")
|
||
if result:
|
||
print(f"像素坐标 ({pixel_x}, {pixel_y}) 对应的3D点: {result['coordinates']}")
|
||
print(f"该点的法向量: {result['normal']}")
|
||
|
||
# 方法2:已知3D点坐标,直接计算法向量
|
||
# 给定一组3D点(假设已经转换到相机坐标系,单位:米)
|
||
camera_points = [
|
||
np.array([-0.08326775896, -0.15066889276, 0.771]),
|
||
np.array([0.15773010017, -0.15340477408, 0.785])
|
||
]
|
||
|
||
# 测试多次调用性能
|
||
print(f"\n测试方法2(已知3D点法向量){num_trials}次调用性能...")
|
||
start_time = time.time()
|
||
for _ in range(num_trials):
|
||
normals_data = normal_vector.calculate_normals(camera_points)
|
||
method2_time = time.time() - start_time
|
||
print(f"方法2平均耗时: {method2_time/num_trials:.6f} 秒/次")
|
||
|
||
# 输出结果
|
||
for i, data in enumerate(normals_data):
|
||
print(f"点 {i+1}: 坐标: {data['coordinates']}, 法向量: {data['normal']}")
|
||
|
||
# 测试单点性能
|
||
print(f"\n测试单点法向量查询{num_trials}次调用性能...")
|
||
single_point = camera_points[0]
|
||
start_time = time.time()
|
||
for _ in range(num_trials):
|
||
normal_data = normal_vector.get_normal_at_point(single_point)
|
||
single_point_time = time.time() - start_time
|
||
print(f"单点法向量查询平均耗时: {single_point_time/num_trials:.6f} 秒/次")
|
||
|
||
# 测试与原vector.py兼容的run方法
|
||
print("\n测试run方法(兼容原vector.py)...")
|
||
points_3d_camera_with_labels = [
|
||
{"label": "point1", "coordinates": camera_points[0]},
|
||
{"label": "point2", "coordinates": camera_points[1]}
|
||
]
|
||
start_time = time.time()
|
||
run_results = normal_vector.run(points_3d_camera_with_labels)
|
||
run_time = time.time() - start_time
|
||
print(f"run方法耗时: {run_time:.6f} 秒")
|
||
|
||
# 输出结果
|
||
for data in run_results:
|
||
print(f"Label: {data['label']}, Coordinates: {data['coordinates']}, Normal: {data['normal']}")
|
||
|
||
# 测试批量像素处理性能
|
||
print("\n测试批量像素处理性能...")
|
||
# 创建多个像素点
|
||
n_pixels = 1000
|
||
pixel_coords = np.random.randint(0, 480, size=(n_pixels, 2))
|
||
start_time = time.time()
|
||
pixel_results = normal_vector.get_normals_at_pixels(pixel_coords)
|
||
pixel_batch_time = time.time() - start_time
|
||
print(f"处理 {n_pixels} 个像素点耗时: {pixel_batch_time:.6f} 秒")
|
||
print(f"平均每点耗时: {pixel_batch_time/n_pixels:.6f} 秒/点")
|
||
|
||
# 测试不同规模下的向量化处理性能
|
||
print("\n测试不同规模下的向量化处理性能...")
|
||
|
||
# 小规模测试 (10个点)
|
||
small_points = [np.random.rand(3) for _ in range(10)]
|
||
start_time = time.time()
|
||
small_results = normal_vector.calculate_normals(small_points)
|
||
small_time = time.time() - start_time
|
||
print(f"处理 10 个点耗时: {small_time:.6f} 秒 (每点 {small_time/10:.6f} 秒)")
|
||
|
||
# 中规模测试 (100个点)
|
||
medium_points = [np.random.rand(3) for _ in range(100)]
|
||
start_time = time.time()
|
||
medium_results = normal_vector.calculate_normals(medium_points)
|
||
medium_time = time.time() - start_time
|
||
print(f"处理 100 个点耗时: {medium_time:.6f} 秒 (每点 {medium_time/100:.6f} 秒)")
|
||
|
||
# 大规模测试 (1000个点)
|
||
large_points = [np.random.rand(3) for _ in range(1000)]
|
||
start_time = time.time()
|
||
large_results = normal_vector.calculate_normals(large_points)
|
||
large_time = time.time() - start_time
|
||
print(f"处理 1000 个点耗时: {large_time:.6f} 秒 (每点 {large_time/1000:.6f} 秒)") |