388 lines
16 KiB
Python
388 lines
16 KiB
Python
import os
|
||
import requests
|
||
import base64
|
||
import urllib.parse
|
||
import json
|
||
import cv2
|
||
import math
|
||
from PIL import Image, ImageDraw, ImageFont
|
||
try:
|
||
from .config import Config
|
||
except:
|
||
from config import Config
|
||
|
||
import time
|
||
|
||
import numpy as np
|
||
import sys
|
||
parent_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
|
||
sys.path.append(parent_dir)
|
||
from tools.log import CustomLogger
|
||
|
||
class LegAcupointsDetector:
|
||
def __init__(self):
|
||
"""
|
||
初始化 LegAcupointsDetector 类
|
||
"""
|
||
self.api_key = "D00xknhpZJg7oz0QN6lraJQc"
|
||
self.secret_key = "2z9nicevtdBEs4I1NXbMVhzGuvUz6yAT"
|
||
self.font_path = Config.get_font_path()
|
||
self.logger = CustomLogger(log_name="LegAcupointsDetector",propagate=True)
|
||
|
||
def get_access_token(self):
|
||
"""
|
||
获取百度 AI 的 access_token
|
||
:return: access_token
|
||
"""
|
||
url = "https://aip.baidubce.com/oauth/2.0/token"
|
||
params = {
|
||
"grant_type": "client_credentials",
|
||
"client_id": self.api_key,
|
||
"client_secret": self.secret_key
|
||
}
|
||
|
||
retries = 3
|
||
for i in range(retries):
|
||
try:
|
||
response = requests.post(url, params=params)
|
||
response.raise_for_status() # 如果响应状态码不是 200,会抛出异常
|
||
return response.json().get("access_token")
|
||
except requests.exceptions.RequestException as e:
|
||
self.logger.log_error(f"请求失败,重试 {i+1}/{retries} 次: {e}")
|
||
if i < retries - 1:
|
||
time.sleep(3) # 等待 3 秒后重试
|
||
else:
|
||
self.logger.log_error("获取 access_token 失败,超过最大重试次数")
|
||
return None
|
||
|
||
def get_file_content_as_base64(self, path, urlencoded=False):
|
||
"""
|
||
获取文件的 base64 编码
|
||
:param path: 文件路径
|
||
:param urlencoded: 是否对结果进行 URL 编码
|
||
:return: base64 编码的字符串
|
||
"""
|
||
with open(path, "rb") as f:
|
||
content = base64.b64encode(f.read()).decode("utf8")
|
||
if urlencoded:
|
||
content = urllib.parse.quote_plus(content)
|
||
return content
|
||
|
||
def get_inital_6points_from_api(self, image_path):
|
||
"""
|
||
处理单张图片并返回腿部六个关键点坐标
|
||
:param image_path: 输入图片路径
|
||
:return: 腿部六个关键点坐标的字典
|
||
"""
|
||
self.logger.log_info(f"开始处理图片:{image_path}")
|
||
access_token = self.get_access_token()
|
||
if access_token is None:
|
||
self.logger.log_error(f"没有获取到token")
|
||
return None
|
||
url = f"https://aip.baidubce.com/rest/2.0/image-classify/v1/body_analysis?access_token={access_token}"
|
||
payload = 'image=' + self.get_file_content_as_base64(image_path, True)
|
||
headers = {
|
||
'Content-Type': 'application/x-www-form-urlencoded',
|
||
'Accept': 'application/json'
|
||
}
|
||
|
||
# 发送请求
|
||
self.logger.log_info("正在发送 API 请求...")
|
||
response = requests.post(url, headers=headers, data=payload.encode("utf-8"))
|
||
|
||
# 解析 API 返回的结果
|
||
try:
|
||
output = json.loads(response.text)
|
||
if not isinstance(output, dict) or 'person_info' not in output or not isinstance(output['person_info'], list):
|
||
self.logger.log_error("API 返回结果格式无效")
|
||
return None
|
||
|
||
# 获取第一个人的信息
|
||
person_info = output['person_info'][0]
|
||
|
||
# 提取腿部六个关键点坐标
|
||
leg_points = {
|
||
"承扶左": (int(person_info['body_parts']['left_hip']['x']), int(person_info['body_parts']['left_hip']['y'])),
|
||
"委中左": (int(person_info['body_parts']['left_knee']['x']), int(person_info['body_parts']['left_knee']['y'])),
|
||
"昆仑左": (int(person_info['body_parts']['left_ankle']['x']), int(person_info['body_parts']['left_ankle']['y'])),
|
||
"承扶右": (int(person_info['body_parts']['right_hip']['x']), int(person_info['body_parts']['right_hip']['y'])),
|
||
"委中右": (int(person_info['body_parts']['right_knee']['x']), int(person_info['body_parts']['right_knee']['y'])),
|
||
"昆仑右": (int(person_info['body_parts']['right_ankle']['x']), int(person_info['body_parts']['right_ankle']['y']))
|
||
}
|
||
|
||
self.logger.log_info("成功解析 API 返回的关键点坐标")
|
||
self.logger.log_info(f"关键点坐标:{leg_points}")
|
||
return leg_points
|
||
|
||
except KeyError as e:
|
||
self.logger.log_error(f"API 返回结果缺少关键字段:{e}")
|
||
return None
|
||
except json.JSONDecodeError as e:
|
||
self.logger.log_error(f"API 返回结果解析失败:{e}")
|
||
return None
|
||
|
||
def error_process(self, leg_points, threshold=20):
|
||
"""
|
||
检查左右承扶部、膝盖、脚踝之间的距离是否过近,并确保左右两边的点没有反
|
||
:param leg_points: 腿部关键点坐标字典
|
||
:param threshold: 距离阈值(默认 10 像素)
|
||
:return: 如果检查通过返回 True,否则返回 None
|
||
"""
|
||
self.logger.log_info("开始检查关键点距离和左右顺序...")
|
||
|
||
# 定义左右关键点名称
|
||
left_keys = ["承扶左", "委中左", "昆仑左"]
|
||
right_keys = ["承扶右", "委中右", "昆仑右"]
|
||
|
||
# 检查左右两边的点是否反了,如果反了则交换
|
||
for left_key, right_key in zip(left_keys, right_keys):
|
||
left_point = leg_points[left_key]
|
||
right_point = leg_points[right_key]
|
||
|
||
# 如果左边的 x 坐标大于右边的 x 坐标,则交换
|
||
if left_point[0] > right_point[0]:
|
||
leg_points[left_key], leg_points[right_key] = right_point, left_point
|
||
self.logger.log_warning(f"左右点反了,已交换:{left_key} 和 {right_key}")
|
||
|
||
# 计算左右承扶部距离
|
||
left_hip = leg_points["承扶左"]
|
||
right_hip = leg_points["承扶右"]
|
||
hip_distance = math.sqrt((left_hip[0] - right_hip[0]) ** 2 + (left_hip[1] - right_hip[1]) ** 2)
|
||
|
||
# 计算左右委中盖距离
|
||
left_knee = leg_points["委中左"]
|
||
right_knee = leg_points["委中右"]
|
||
knee_distance = math.sqrt((left_knee[0] - right_knee[0]) ** 2 + (left_knee[1] - right_knee[1]) ** 2)
|
||
|
||
# 计算左右脚踝距离
|
||
left_ankle = leg_points["昆仑左"]
|
||
right_ankle = leg_points["昆仑右"]
|
||
ankle_distance = math.sqrt((left_ankle[0] - right_ankle[0]) ** 2 + (left_ankle[1] - right_ankle[1]) ** 2)
|
||
|
||
# 检查距离是否小于阈值
|
||
if hip_distance < threshold:
|
||
self.logger.log_error(f"左右承扶部距离过近:{hip_distance} < {threshold}")
|
||
return None
|
||
if knee_distance < threshold:
|
||
self.logger.log_error(f"左右委中盖距离过近:{knee_distance} < {threshold}")
|
||
return None
|
||
if ankle_distance < threshold:
|
||
self.logger.log_error(f"左右脚踝距离过近:{ankle_distance} < {threshold}")
|
||
return None
|
||
|
||
self.logger.log_info("关键点距离检查通过")
|
||
return True
|
||
|
||
def calculate_new_point(self, point1, point2, ratio):
|
||
"""
|
||
基于两个点,沿着两个点的连线移动一定比例的距离,计算新的点
|
||
:param point1: 第一个点 (x1, y1)
|
||
:param point2: 第二个点 (x2, y2)
|
||
:param ratio: 移动的比例(例如 2/5)
|
||
:return: 新的点 (x, y)
|
||
"""
|
||
x1, y1 = point1
|
||
x2, y2 = point2
|
||
|
||
# 计算两点之间的向量
|
||
dx = x2 - x1
|
||
dy = y2 - y1
|
||
|
||
# 计算两点之间的距离
|
||
current_distance = math.sqrt(dx ** 2 + dy ** 2)
|
||
|
||
# 计算新点的坐标
|
||
if current_distance == 0:
|
||
raise ValueError("两点重合,无法计算新点")
|
||
scale = ratio # 比例直接作为缩放因子
|
||
new_x = x1 + dx * scale
|
||
new_y = y1 + dy * scale
|
||
|
||
return int(new_x), int(new_y)
|
||
|
||
def calculate_points_between_kunlun_and_weizhong(self, kunlun, weizhong):
|
||
"""
|
||
基于昆仑和委中两个点,沿着委中方向移动 1/5、2/5、4/5 的距离,计算三个新点
|
||
:param kunlun: 昆仑点 (x, y)
|
||
:param weizhong: 委中点 (x, y)
|
||
:return: 三个新点的列表 [(x1, y1), (x2, y2), (x3, y3)]
|
||
"""
|
||
# 计算昆仑和委中之间的距离
|
||
dx = weizhong[0] - kunlun[0]
|
||
dy = weizhong[1] - kunlun[1]
|
||
distance = math.sqrt(dx ** 2 + dy ** 2)
|
||
|
||
# 计算 1/5、2/5、4/5 的距离
|
||
# distances = [distance * 0.15, distance * 0.35, distance * 0.65]
|
||
distances = [distance * 0.4, distance * 0.65, distance * 0.85]
|
||
|
||
# 计算三个新点
|
||
new_points = []
|
||
for dist in distances:
|
||
scale = dist / distance
|
||
new_x = kunlun[0] + dx * scale
|
||
new_y = kunlun[1] + dy * scale
|
||
new_points.append((int(new_x), int(new_y)))
|
||
|
||
return new_points
|
||
|
||
def plot_acupoints_on_image(self, image_path, acupoints, output_path):
|
||
"""
|
||
在指定图片上绘制穴位点,并标注名称,保存结果。
|
||
|
||
参数:
|
||
image_path (str): 图片文件路径。
|
||
acupoints (dict): 包含穴位坐标的字典 {"穴位名称": (x, y), ...}。
|
||
output_path (str): 保存结果图片的路径。
|
||
"""
|
||
# 读取图片
|
||
image = cv2.imread(image_path)
|
||
if image is None:
|
||
raise FileNotFoundError(f"无法加载背景图:{image_path}")
|
||
|
||
# 将图像分辨率增大到原来的三倍
|
||
scale_factor = 2
|
||
image = cv2.resize(image, None, fx=scale_factor, fy=scale_factor, interpolation=cv2.INTER_LINEAR)
|
||
|
||
# 调整穴位点坐标
|
||
acupoints = {name: (x * scale_factor, y * scale_factor) for name, (x, y) in acupoints.items()}
|
||
# (252, 229, 179)
|
||
# 定义蓝紫色渐变颜色BGR
|
||
purple_colors = [
|
||
(223, 87, 98), # 浅蓝紫色
|
||
(223, 87, 98), # 中等蓝紫色
|
||
(223, 87, 98),
|
||
]
|
||
|
||
# 绘制穴位点
|
||
for name, (x, y) in acupoints.items():
|
||
if x == 0 and y == 0:
|
||
continue
|
||
|
||
# 绘制蓝紫色带光圈的穴位点
|
||
radius = 4 # 穴位点半径
|
||
for r in range(radius, 0, -1):
|
||
alpha = r / radius # alpha 从 1(中心)到 0(边缘)
|
||
|
||
# 计算渐变颜色
|
||
color_index = int(alpha * (len(purple_colors) - 1))
|
||
color = purple_colors[color_index]
|
||
|
||
# 绘制渐变圆
|
||
cv2.circle(image, (x, y), r, color, -1, cv2.LINE_AA)
|
||
|
||
# 添加高光效果
|
||
highlight_radius = int(radius * 0.6)
|
||
# highlight_color = (200, 200, 200) # 白色高光
|
||
highlight_color = (0, 255, 185) # 白色高光
|
||
highlight_pos = (x, y)
|
||
cv2.circle(image, highlight_pos, highlight_radius, highlight_color, -1, cv2.LINE_AA)
|
||
|
||
# 使用 PIL 绘制文本
|
||
image_pil = Image.fromarray(cv2.cvtColor(image, cv2.COLOR_BGR2RGB))
|
||
draw = ImageDraw.Draw(image_pil)
|
||
font = ImageFont.truetype(self.font_path, 8 * scale_factor) # 字体大小也放大三倍
|
||
|
||
for name, (x, y) in acupoints.items():
|
||
if x == 0 and y == 0:
|
||
continue
|
||
if name == "上委中左":
|
||
x_offset, y_offset = -40 * scale_factor, -5 * scale_factor
|
||
elif "左" in name:
|
||
x_offset, y_offset = -30 * scale_factor, -5 * scale_factor
|
||
else:
|
||
x_offset, y_offset = 5 * scale_factor, -5 * scale_factor
|
||
|
||
|
||
# 绘制带白边的黑字
|
||
text_pos = (x + x_offset, y + y_offset)
|
||
for offset in [(-1,-1), (-1,1), (1,-1), (1,1)]:
|
||
draw.text((text_pos[0]+offset[0], text_pos[1]+offset[1]),
|
||
name, font=font, fill=(255,255,255)) # 白边
|
||
draw.text(text_pos, name, font=font, fill=(0,0,0)) # 黑字
|
||
|
||
# 将图像转换回 OpenCV 格式
|
||
image = cv2.cvtColor(np.array(image_pil), cv2.COLOR_RGB2BGR)
|
||
|
||
# 保存结果
|
||
cv2.imwrite(output_path, image)
|
||
print(f"结果已保存到:{output_path}")
|
||
|
||
def process_image(self, input_image_path, output_coordinate_image_path):
|
||
"""
|
||
处理单张图片并返回腿部关键点坐标
|
||
:param input_image_path: 输入图片路径
|
||
:param output_coordinate_image_path: 输出图片保存路径
|
||
:return: 腿部关键点坐标字典
|
||
"""
|
||
self.logger.log_info(f"开始处理图片:{input_image_path}")
|
||
|
||
# 通过 API 获取初始 6 个关键点
|
||
leg_points = self.get_inital_6points_from_api(input_image_path)
|
||
if leg_points is None:
|
||
self.logger.log_error("无法获取初始关键点坐标")
|
||
return None
|
||
|
||
# 检查关键点距离是否过近
|
||
error = self.error_process(leg_points)
|
||
if error is None:
|
||
self.logger.log_error("关键点距离检查未通过")
|
||
return None
|
||
|
||
# 计算大腿部分的左右两个穴位
|
||
leg_points["殷门左"] = self.calculate_new_point(leg_points["承扶左"], leg_points["委中左"], ratio=2/5)
|
||
leg_points["殷门右"] = self.calculate_new_point(leg_points["承扶右"], leg_points["委中右"], ratio=2/5)
|
||
|
||
leg_points["上委中左"] = self.calculate_new_point(leg_points["承扶左"], leg_points["委中左"], ratio=5.5/7)
|
||
leg_points["上委中右"] = self.calculate_new_point(leg_points["承扶右"], leg_points["委中右"], ratio=5.5/7)
|
||
|
||
# 计算小腿部分的左右两边 6 个穴位点
|
||
leg_points["承山左"], leg_points["承筋左"], leg_points["合阳左"] = self.calculate_points_between_kunlun_and_weizhong(leg_points["昆仑左"], leg_points["委中左"])
|
||
leg_points["承山右"], leg_points["承筋右"], leg_points["合阳右"] = self.calculate_points_between_kunlun_and_weizhong(leg_points["昆仑右"], leg_points["委中右"])
|
||
|
||
# 绘制腿部关键点并保存图片
|
||
self.plot_acupoints_on_image(input_image_path, leg_points, output_coordinate_image_path)
|
||
|
||
self.logger.log_info(f"图片处理完成,关键点坐标:{leg_points}")
|
||
return leg_points
|
||
|
||
if __name__ == '__main__':
|
||
leg_point = LegAcupointsDetector()
|
||
input_image_path = Config.get_image_path("leg.png")
|
||
output_image_path = Config.get_output_path("leg_16points.png")
|
||
# input_image_path = "aucpuncture2point/configs/using_img/leg.png" # 替换为你的图片路径
|
||
# output_coordinate_image_path = "aucpuncture2point/configs/using_img/leg_16points.png" # 替换为输出图片路径
|
||
leg_acupoints_list = leg_point.process_image(input_image_path, output_image_path)
|
||
print(leg_acupoints_list)
|
||
|
||
# # 批量处理
|
||
# leg_point = LegAcupointsDetector()
|
||
|
||
# # 输入和输出文件夹路径
|
||
# input_folder = r"/home/kira/codes/datas/colors-0872E1"
|
||
# output_folder = r"/home/kira/codes/datas/results-0872E11"
|
||
|
||
# # 确保输出文件夹存在
|
||
# if not os.path.exists(output_folder):
|
||
# os.makedirs(output_folder)
|
||
|
||
# # 遍历输入文件夹中的所有文件
|
||
# for filename in os.listdir(input_folder):
|
||
# # 只处理图片文件(支持常见格式)
|
||
# if filename.lower().endswith(('.png', '.jpg', '.jpeg', '.bmp', '.tiff')):
|
||
# # 输入图片路径
|
||
# input_image_path = os.path.join(input_folder, filename)
|
||
|
||
# # 输出图片路径
|
||
# output_image_name = f"processed_{filename}"
|
||
# output_coordinate_image_path = os.path.join(output_folder, output_image_name)
|
||
|
||
# # 处理单张图片
|
||
# try:
|
||
# leg_acupoints_list = leg_point.process_image(input_image_path, output_coordinate_image_path)
|
||
# print(f"处理完成:{filename} -> {output_image_name}")
|
||
# print(f"穴位点坐标:{leg_acupoints_list}")
|
||
# except Exception as e:
|
||
# print(f"处理失败:{filename},错误信息:{e}")
|
||
|