365 lines
13 KiB
Python
365 lines
13 KiB
Python
import cv2
|
||
import numpy as np
|
||
import pandas as pd
|
||
from scipy import ndimage
|
||
|
||
class ThermalProcessor:
|
||
"""
|
||
热成像数据处理类
|
||
将CSV格式的热成像数据转换为热图
|
||
"""
|
||
|
||
def __init__(self):
|
||
"""初始化热成像处理器"""
|
||
# 定义默认的颜色映射
|
||
self.default_colors = [
|
||
(0, 0, 0), # 黑色
|
||
(0, 0, 0.5), # 深蓝
|
||
(0, 0.7, 1), # 亮蓝
|
||
(0.2, 1, 1), # 青蓝
|
||
(0.4, 1, 0.6), # 青绿
|
||
(0.6, 1, 0.2), # 黄绿
|
||
(1, 1, 0), # 黄色
|
||
(1, 0.7, 0), # 橙色
|
||
(0.8, 0.4, 0.2), # 棕色
|
||
(0.7, 0.2, 0.5), # 紫红
|
||
(1, 0, 0) # 红色
|
||
]
|
||
|
||
def read_csv(self, file_path):
|
||
"""
|
||
读取CSV文件并解析数据
|
||
|
||
参数:
|
||
file_path: CSV文件路径
|
||
|
||
返回:
|
||
metadata: 元数据字典
|
||
data: 温度数据矩阵
|
||
"""
|
||
with open(file_path, 'r', encoding='utf-8') as f:
|
||
lines = f.readlines()
|
||
|
||
metadata = {}
|
||
start_index = 0
|
||
|
||
for i, line in enumerate(lines):
|
||
line = line.strip()
|
||
if line.startswith("#"):
|
||
# 解析元数据
|
||
if "宽度" in line:
|
||
metadata["width"] = int(line.split(":")[1].strip())
|
||
elif "高度" in line:
|
||
metadata["height"] = int(line.split(":")[1].strip())
|
||
elif "最小温度" in line:
|
||
metadata["min_temp"] = float(line.split(":")[1].strip().replace("°C", ""))
|
||
elif "最大温度" in line:
|
||
metadata["max_temp"] = float(line.split(":")[1].strip().replace("°C", ""))
|
||
elif "中心温度" in line:
|
||
metadata["center_temp"] = float(line.split(":")[1].strip().replace("°C", ""))
|
||
else:
|
||
start_index = i
|
||
break # 找到数据部分
|
||
|
||
# 读取表格数据
|
||
df = pd.read_csv(file_path, skiprows=start_index)
|
||
|
||
# 转换为numpy数组
|
||
data = df.to_numpy(dtype=np.float32)
|
||
|
||
return metadata, data
|
||
|
||
def enhance_continuity(self, data):
|
||
"""
|
||
通过形态学操作增强区域连续性
|
||
|
||
参数:
|
||
data: 热数据矩阵
|
||
|
||
返回:
|
||
result: 增强后的热数据矩阵
|
||
"""
|
||
# 先对数据进行中值滤波减少噪点
|
||
data_filtered = ndimage.median_filter(data, size=3)
|
||
|
||
# 归一化处理
|
||
norm_data = cv2.normalize(data_filtered, None, 0, 255, cv2.NORM_MINMAX).astype(np.uint8)
|
||
|
||
# 使用自适应阈值以获得更好的边缘细节
|
||
mask = cv2.adaptiveThreshold(norm_data, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C,
|
||
cv2.THRESH_BINARY, 11, 2)
|
||
|
||
# 使用核进行闭操作,更好地连接邻近区域
|
||
kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (7,7))
|
||
closed = cv2.morphologyEx(mask, cv2.MORPH_CLOSE, kernel, iterations=3)
|
||
|
||
# 增加开操作,消除小噪点
|
||
opened = cv2.morphologyEx(closed, cv2.MORPH_OPEN, kernel, iterations=1)
|
||
|
||
# 保留原数据的连续性,使用形态学梯度增强边缘
|
||
result = cv2.bitwise_and(data, data, mask=opened)
|
||
|
||
# 应用高斯滤波以平滑过渡
|
||
return cv2.GaussianBlur(result, (5,5), 0)
|
||
|
||
def generate_temp_ranges(self, min_temp, max_temp, focus_temp):
|
||
"""
|
||
生成温度区间
|
||
|
||
参数:
|
||
min_temp: 最小温度值
|
||
max_temp: 最大温度值
|
||
focus_temp: 集中温度值,在该温度附近的区间将更精细
|
||
|
||
返回:
|
||
temp_ranges: 温度区间列表
|
||
"""
|
||
# 创建一个函数来计算到焦点的距离与区间大小的关系
|
||
def interval_size(distance):
|
||
"""根据到焦点的距离计算区间大小"""
|
||
# 基准区间大小(最小值,在焦点处)
|
||
base_size = 0.5
|
||
# 增长率
|
||
growth_rate = 3.0
|
||
# 计算区间大小
|
||
size = base_size * (1 + growth_rate * (abs(distance) ** 1.5))
|
||
# 限制最大区间大小
|
||
return min(size, (max_temp - min_temp) / 5)
|
||
|
||
# 从焦点开始,向两边生成温度点
|
||
ranges = [focus_temp]
|
||
|
||
# 向上生成温度点,直到超过最大温度
|
||
current = focus_temp
|
||
while current < max_temp:
|
||
distance = (current - focus_temp) / (max_temp - min_temp)
|
||
step = interval_size(distance)
|
||
current += step
|
||
if current < max_temp:
|
||
ranges.append(current)
|
||
|
||
# 向下生成温度点,直到低于最小温度
|
||
current = focus_temp
|
||
while current > min_temp:
|
||
distance = (current - focus_temp) / (max_temp - min_temp)
|
||
step = interval_size(distance)
|
||
current -= step
|
||
if current > min_temp:
|
||
ranges.append(current)
|
||
|
||
# 确保包含最小值和最大值
|
||
if min_temp not in ranges:
|
||
ranges.append(min_temp)
|
||
if max_temp not in ranges:
|
||
ranges.append(max_temp)
|
||
|
||
# 对温度点排序并去重
|
||
ranges = sorted(list(set(ranges)))
|
||
|
||
# 需要的温度点数量(颜色数量-1,因为颜色数比区间数少1)
|
||
required_points = 10 # 11个颜色需要10个温度点(不包括无穷值)
|
||
|
||
# 确保生成的区间数量正确
|
||
while len(ranges) > required_points:
|
||
# 如果温度点太多,移除距离焦点最远的点,但保留最小值和最大值
|
||
distances = []
|
||
for i, r in enumerate(ranges):
|
||
if r == min_temp or r == max_temp:
|
||
distances.append(float('-inf')) # 确保不会删除最小和最大值
|
||
else:
|
||
distances.append(abs(r - focus_temp))
|
||
|
||
furthest_idx = distances.index(max(distances))
|
||
ranges.pop(furthest_idx)
|
||
|
||
# 如果温度点太少,在最大间隔处插入新的点
|
||
while len(ranges) < required_points:
|
||
intervals = [ranges[i+1] - ranges[i] for i in range(len(ranges)-1)]
|
||
largest_interval_idx = intervals.index(max(intervals))
|
||
mid_point = (ranges[largest_interval_idx] + ranges[largest_interval_idx+1]) / 2
|
||
ranges.insert(largest_interval_idx + 1, mid_point)
|
||
|
||
# 添加无穷区间
|
||
ranges = [-np.inf] + ranges + [np.inf]
|
||
|
||
return ranges
|
||
|
||
def create_heatmap(self, data, min_temp=None, max_temp=None, focus_temp=None,
|
||
colors=None, scale_factor=6):
|
||
"""
|
||
根据温度数据创建热图
|
||
|
||
参数:
|
||
data: 温度数据矩阵
|
||
min_temp: 最小温度值,若为None则使用数据中的最小值
|
||
max_temp: 最大温度值,若为None则使用数据中的最大值
|
||
focus_temp: 集中温度值,若为None则使用max_temp的值
|
||
colors: 自定义颜色映射,若为None则使用默认颜色
|
||
scale_factor: 图像缩放倍数,默认为6
|
||
|
||
返回:
|
||
heatmap: 生成的热图图像
|
||
"""
|
||
# 确保数据为numpy数组
|
||
data = np.array(data, dtype=np.float32)
|
||
|
||
# 对原始数据应用高斯平滑,减少离散温度值的跳变
|
||
data = cv2.GaussianBlur(data, (3,3), 0.8)
|
||
|
||
# 增强数据连续性
|
||
data = self.enhance_continuity(data)
|
||
|
||
# 如果未指定温度范围,则使用数据的实际范围
|
||
if min_temp is None:
|
||
min_temp = np.min(data)
|
||
if max_temp is None:
|
||
max_temp = np.max(data)
|
||
if focus_temp is None:
|
||
focus_temp = max_temp
|
||
|
||
# 使用自定义颜色或默认颜色
|
||
if colors is None:
|
||
colors = self.default_colors
|
||
|
||
# 根据参数动态生成温度区间
|
||
temp_ranges = self.generate_temp_ranges(min_temp, max_temp, focus_temp)
|
||
|
||
# 实现平滑过渡
|
||
interpolated_colors = np.zeros((data.shape[0], data.shape[1], 3), dtype=np.float32)
|
||
|
||
# 对每个像素计算相对距离,用于颜色插值
|
||
for i in range(len(temp_ranges) - 1):
|
||
if i == 0 or i == len(temp_ranges) - 2:
|
||
# 边界区域直接使用对应颜色
|
||
mask = (data > temp_ranges[i]) & (data <= temp_ranges[i+1])
|
||
if np.any(mask):
|
||
interpolated_colors[mask] = colors[i]
|
||
continue
|
||
|
||
lower_bound = temp_ranges[i]
|
||
upper_bound = temp_ranges[i+1]
|
||
|
||
# 找出在当前区间内的所有像素
|
||
mask = (data > lower_bound) & (data <= upper_bound)
|
||
if not np.any(mask):
|
||
continue
|
||
|
||
# 计算相对位置并插值
|
||
pixels_in_range = data[mask]
|
||
relative_pos = (pixels_in_range - lower_bound) / (upper_bound - lower_bound)
|
||
|
||
# 在两个颜色之间进行线性插值
|
||
for c in range(3): # RGB通道
|
||
color_values = colors[i][c] * (1 - relative_pos) + colors[i+1][c] * relative_pos
|
||
interpolated_colors[mask, c] = color_values
|
||
|
||
# 将插值后的颜色转换为字节类型
|
||
heatmap = (interpolated_colors * 255).astype(np.uint8)
|
||
|
||
# 将RGB转换为BGR(OpenCV使用BGR格式)
|
||
heatmap = cv2.cvtColor(heatmap, cv2.COLOR_RGB2BGR)
|
||
|
||
# 图像放大和增强
|
||
original_height, original_width = heatmap.shape[:2]
|
||
heatmap = cv2.resize(heatmap,
|
||
(original_width * scale_factor,
|
||
original_height * scale_factor),
|
||
interpolation=cv2.INTER_LANCZOS4)
|
||
|
||
# 应用滤波
|
||
heatmap = cv2.GaussianBlur(heatmap, (9, 9), 2.0)
|
||
heatmap = cv2.bilateralFilter(heatmap, 11, 85, 85)
|
||
heatmap = cv2.bilateralFilter(heatmap, 9, 60, 60)
|
||
|
||
# 对比度增强
|
||
heatmap = cv2.convertScaleAbs(heatmap, alpha=1.08, beta=6)
|
||
|
||
# 提高饱和度
|
||
hsv = cv2.cvtColor(heatmap, cv2.COLOR_BGR2HSV)
|
||
hsv[:,:,1] = np.clip(hsv[:,:,1] * 1.1, 0, 255).astype(np.uint8)
|
||
heatmap = cv2.cvtColor(hsv, cv2.COLOR_HSV2BGR)
|
||
|
||
# 最终旋转
|
||
return cv2.rotate(heatmap, cv2.ROTATE_180)
|
||
|
||
def process_csv(self, file_path, min_temp=None, max_temp=None, focus_temp=None,
|
||
colors=None, scale_factor=6):
|
||
"""
|
||
处理CSV文件并生成热图
|
||
|
||
参数:
|
||
file_path: CSV文件路径
|
||
min_temp: 最小温度值,若为None则使用数据中的最小值
|
||
max_temp: 最大温度值,若为None则使用数据中的最大值
|
||
focus_temp: 集中温度值,若为None则使用max_temp的值
|
||
colors: 自定义颜色映射,若为None则使用默认颜色
|
||
scale_factor: 图像缩放倍数,默认为6
|
||
|
||
返回:
|
||
heatmap: 生成的热图图像
|
||
metadata: 元数据字典
|
||
"""
|
||
# 读取CSV数据
|
||
metadata, data = self.read_csv(file_path)
|
||
|
||
# 使用元数据中的温度范围(如果未指定)
|
||
if min_temp is None and "min_temp" in metadata:
|
||
min_temp = metadata["min_temp"]
|
||
if max_temp is None and "max_temp" in metadata:
|
||
max_temp = metadata["max_temp"]
|
||
if focus_temp is None and "center_temp" in metadata:
|
||
focus_temp = metadata["center_temp"]
|
||
|
||
# 如果仍然没有温度范围,使用数据中的实际范围
|
||
if min_temp is None:
|
||
min_temp = np.min(data)
|
||
if max_temp is None:
|
||
max_temp = np.max(data)
|
||
if focus_temp is None:
|
||
focus_temp = max_temp
|
||
|
||
# 生成热图
|
||
heatmap = self.create_heatmap(
|
||
data,
|
||
min_temp=min_temp,
|
||
max_temp=max_temp,
|
||
focus_temp=focus_temp,
|
||
colors=colors,
|
||
scale_factor=scale_factor
|
||
)
|
||
|
||
return heatmap, metadata
|
||
|
||
def save_image(self, image, file_path):
|
||
"""
|
||
保存图像到文件
|
||
|
||
参数:
|
||
image: 图像
|
||
file_path: 输出文件路径
|
||
|
||
返回:
|
||
success: 是否保存成功
|
||
"""
|
||
return cv2.imwrite(file_path, image)
|
||
|
||
|
||
# 基础使用示例
|
||
if __name__ == "__main__":
|
||
|
||
# 创建处理器实例
|
||
processor = ThermalProcessor()
|
||
|
||
# 处理CSV文件
|
||
csv_path = "thermal_data_20250318_161047.csv" # 替换为你的CSV文件
|
||
heatmap, metadata = processor.process_csv(csv_path)
|
||
|
||
# 显示元数据
|
||
print("文件元数据:")
|
||
for key, value in metadata.items():
|
||
print(f" {key}: {value}")
|
||
|
||
# 保存热图
|
||
output_path = "output_heatmap.png"
|
||
processor.save_image(heatmap, output_path)
|
||
print(f"热图已保存为: {output_path}") |