2025-05-27 15:46:31 +08:00

388 lines
16 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import requests
import base64
import urllib.parse
import json
import cv2
import math
from PIL import Image, ImageDraw, ImageFont
try:
from .config import Config
except:
from config import Config
import time
import numpy as np
import sys
parent_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
sys.path.append(parent_dir)
from tools.log import CustomLogger
class LegAcupointsDetector:
def __init__(self):
"""
初始化 LegAcupointsDetector 类
"""
self.api_key = "D00xknhpZJg7oz0QN6lraJQc"
self.secret_key = "2z9nicevtdBEs4I1NXbMVhzGuvUz6yAT"
self.font_path = Config.get_font_path()
self.logger = CustomLogger(log_name="LegAcupointsDetector",propagate=True)
def get_access_token(self):
"""
获取百度 AI 的 access_token
:return: access_token
"""
url = "https://aip.baidubce.com/oauth/2.0/token"
params = {
"grant_type": "client_credentials",
"client_id": self.api_key,
"client_secret": self.secret_key
}
retries = 3
for i in range(retries):
try:
response = requests.post(url, params=params)
response.raise_for_status() # 如果响应状态码不是 200会抛出异常
return response.json().get("access_token")
except requests.exceptions.RequestException as e:
self.logger.log_error(f"请求失败,重试 {i+1}/{retries} 次: {e}")
if i < retries - 1:
time.sleep(3) # 等待 3 秒后重试
else:
self.logger.log_error("获取 access_token 失败,超过最大重试次数")
return None
def get_file_content_as_base64(self, path, urlencoded=False):
"""
获取文件的 base64 编码
:param path: 文件路径
:param urlencoded: 是否对结果进行 URL 编码
:return: base64 编码的字符串
"""
with open(path, "rb") as f:
content = base64.b64encode(f.read()).decode("utf8")
if urlencoded:
content = urllib.parse.quote_plus(content)
return content
def get_inital_6points_from_api(self, image_path):
"""
处理单张图片并返回腿部六个关键点坐标
:param image_path: 输入图片路径
:return: 腿部六个关键点坐标的字典
"""
self.logger.log_info(f"开始处理图片:{image_path}")
access_token = self.get_access_token()
if access_token is None:
self.logger.log_error(f"没有获取到token")
return None
url = f"https://aip.baidubce.com/rest/2.0/image-classify/v1/body_analysis?access_token={access_token}"
payload = 'image=' + self.get_file_content_as_base64(image_path, True)
headers = {
'Content-Type': 'application/x-www-form-urlencoded',
'Accept': 'application/json'
}
# 发送请求
self.logger.log_info("正在发送 API 请求...")
response = requests.post(url, headers=headers, data=payload.encode("utf-8"))
# 解析 API 返回的结果
try:
output = json.loads(response.text)
if not isinstance(output, dict) or 'person_info' not in output or not isinstance(output['person_info'], list):
self.logger.log_error("API 返回结果格式无效")
return None
# 获取第一个人的信息
person_info = output['person_info'][0]
# 提取腿部六个关键点坐标
leg_points = {
"承扶左": (int(person_info['body_parts']['left_hip']['x']), int(person_info['body_parts']['left_hip']['y'])),
"委中左": (int(person_info['body_parts']['left_knee']['x']), int(person_info['body_parts']['left_knee']['y'])),
"昆仑左": (int(person_info['body_parts']['left_ankle']['x']), int(person_info['body_parts']['left_ankle']['y'])),
"承扶右": (int(person_info['body_parts']['right_hip']['x']), int(person_info['body_parts']['right_hip']['y'])),
"委中右": (int(person_info['body_parts']['right_knee']['x']), int(person_info['body_parts']['right_knee']['y'])),
"昆仑右": (int(person_info['body_parts']['right_ankle']['x']), int(person_info['body_parts']['right_ankle']['y']))
}
self.logger.log_info("成功解析 API 返回的关键点坐标")
self.logger.log_info(f"关键点坐标:{leg_points}")
return leg_points
except KeyError as e:
self.logger.log_error(f"API 返回结果缺少关键字段:{e}")
return None
except json.JSONDecodeError as e:
self.logger.log_error(f"API 返回结果解析失败:{e}")
return None
def error_process(self, leg_points, threshold=20):
"""
检查左右承扶部、膝盖、脚踝之间的距离是否过近,并确保左右两边的点没有反
:param leg_points: 腿部关键点坐标字典
:param threshold: 距离阈值(默认 10 像素)
:return: 如果检查通过返回 True否则返回 None
"""
self.logger.log_info("开始检查关键点距离和左右顺序...")
# 定义左右关键点名称
left_keys = ["承扶左", "委中左", "昆仑左"]
right_keys = ["承扶右", "委中右", "昆仑右"]
# 检查左右两边的点是否反了,如果反了则交换
for left_key, right_key in zip(left_keys, right_keys):
left_point = leg_points[left_key]
right_point = leg_points[right_key]
# 如果左边的 x 坐标大于右边的 x 坐标,则交换
if left_point[0] > right_point[0]:
leg_points[left_key], leg_points[right_key] = right_point, left_point
self.logger.log_warning(f"左右点反了,已交换:{left_key}{right_key}")
# 计算左右承扶部距离
left_hip = leg_points["承扶左"]
right_hip = leg_points["承扶右"]
hip_distance = math.sqrt((left_hip[0] - right_hip[0]) ** 2 + (left_hip[1] - right_hip[1]) ** 2)
# 计算左右委中盖距离
left_knee = leg_points["委中左"]
right_knee = leg_points["委中右"]
knee_distance = math.sqrt((left_knee[0] - right_knee[0]) ** 2 + (left_knee[1] - right_knee[1]) ** 2)
# 计算左右脚踝距离
left_ankle = leg_points["昆仑左"]
right_ankle = leg_points["昆仑右"]
ankle_distance = math.sqrt((left_ankle[0] - right_ankle[0]) ** 2 + (left_ankle[1] - right_ankle[1]) ** 2)
# 检查距离是否小于阈值
if hip_distance < threshold:
self.logger.log_error(f"左右承扶部距离过近:{hip_distance} < {threshold}")
return None
if knee_distance < threshold:
self.logger.log_error(f"左右委中盖距离过近:{knee_distance} < {threshold}")
return None
if ankle_distance < threshold:
self.logger.log_error(f"左右脚踝距离过近:{ankle_distance} < {threshold}")
return None
self.logger.log_info("关键点距离检查通过")
return True
def calculate_new_point(self, point1, point2, ratio):
"""
基于两个点,沿着两个点的连线移动一定比例的距离,计算新的点
:param point1: 第一个点 (x1, y1)
:param point2: 第二个点 (x2, y2)
:param ratio: 移动的比例(例如 2/5
:return: 新的点 (x, y)
"""
x1, y1 = point1
x2, y2 = point2
# 计算两点之间的向量
dx = x2 - x1
dy = y2 - y1
# 计算两点之间的距离
current_distance = math.sqrt(dx ** 2 + dy ** 2)
# 计算新点的坐标
if current_distance == 0:
raise ValueError("两点重合,无法计算新点")
scale = ratio # 比例直接作为缩放因子
new_x = x1 + dx * scale
new_y = y1 + dy * scale
return int(new_x), int(new_y)
def calculate_points_between_kunlun_and_weizhong(self, kunlun, weizhong):
"""
基于昆仑和委中两个点,沿着委中方向移动 1/5、2/5、4/5 的距离,计算三个新点
:param kunlun: 昆仑点 (x, y)
:param weizhong: 委中点 (x, y)
:return: 三个新点的列表 [(x1, y1), (x2, y2), (x3, y3)]
"""
# 计算昆仑和委中之间的距离
dx = weizhong[0] - kunlun[0]
dy = weizhong[1] - kunlun[1]
distance = math.sqrt(dx ** 2 + dy ** 2)
# 计算 1/5、2/5、4/5 的距离
# distances = [distance * 0.15, distance * 0.35, distance * 0.65]
distances = [distance * 0.4, distance * 0.65, distance * 0.85]
# 计算三个新点
new_points = []
for dist in distances:
scale = dist / distance
new_x = kunlun[0] + dx * scale
new_y = kunlun[1] + dy * scale
new_points.append((int(new_x), int(new_y)))
return new_points
def plot_acupoints_on_image(self, image_path, acupoints, output_path):
"""
在指定图片上绘制穴位点,并标注名称,保存结果。
参数:
image_path (str): 图片文件路径。
acupoints (dict): 包含穴位坐标的字典 {"穴位名称": (x, y), ...}。
output_path (str): 保存结果图片的路径。
"""
# 读取图片
image = cv2.imread(image_path)
if image is None:
raise FileNotFoundError(f"无法加载背景图:{image_path}")
# 将图像分辨率增大到原来的三倍
scale_factor = 2
image = cv2.resize(image, None, fx=scale_factor, fy=scale_factor, interpolation=cv2.INTER_LINEAR)
# 调整穴位点坐标
acupoints = {name: (x * scale_factor, y * scale_factor) for name, (x, y) in acupoints.items()}
# (252, 229, 179)
# 定义蓝紫色渐变颜色BGR
purple_colors = [
(223, 87, 98), # 浅蓝紫色
(223, 87, 98), # 中等蓝紫色
(223, 87, 98),
]
# 绘制穴位点
for name, (x, y) in acupoints.items():
if x == 0 and y == 0:
continue
# 绘制蓝紫色带光圈的穴位点
radius = 4 # 穴位点半径
for r in range(radius, 0, -1):
alpha = r / radius # alpha 从 1中心到 0边缘
# 计算渐变颜色
color_index = int(alpha * (len(purple_colors) - 1))
color = purple_colors[color_index]
# 绘制渐变圆
cv2.circle(image, (x, y), r, color, -1, cv2.LINE_AA)
# 添加高光效果
highlight_radius = int(radius * 0.6)
# highlight_color = (200, 200, 200) # 白色高光
highlight_color = (0, 255, 185) # 白色高光
highlight_pos = (x, y)
cv2.circle(image, highlight_pos, highlight_radius, highlight_color, -1, cv2.LINE_AA)
# 使用 PIL 绘制文本
image_pil = Image.fromarray(cv2.cvtColor(image, cv2.COLOR_BGR2RGB))
draw = ImageDraw.Draw(image_pil)
font = ImageFont.truetype(self.font_path, 8 * scale_factor) # 字体大小也放大三倍
for name, (x, y) in acupoints.items():
if x == 0 and y == 0:
continue
if name == "上委中左":
x_offset, y_offset = -40 * scale_factor, -5 * scale_factor
elif "" in name:
x_offset, y_offset = -30 * scale_factor, -5 * scale_factor
else:
x_offset, y_offset = 5 * scale_factor, -5 * scale_factor
# 绘制带白边的黑字
text_pos = (x + x_offset, y + y_offset)
for offset in [(-1,-1), (-1,1), (1,-1), (1,1)]:
draw.text((text_pos[0]+offset[0], text_pos[1]+offset[1]),
name, font=font, fill=(255,255,255)) # 白边
draw.text(text_pos, name, font=font, fill=(0,0,0)) # 黑字
# 将图像转换回 OpenCV 格式
image = cv2.cvtColor(np.array(image_pil), cv2.COLOR_RGB2BGR)
# 保存结果
cv2.imwrite(output_path, image)
print(f"结果已保存到:{output_path}")
def process_image(self, input_image_path, output_coordinate_image_path):
"""
处理单张图片并返回腿部关键点坐标
:param input_image_path: 输入图片路径
:param output_coordinate_image_path: 输出图片保存路径
:return: 腿部关键点坐标字典
"""
self.logger.log_info(f"开始处理图片:{input_image_path}")
# 通过 API 获取初始 6 个关键点
leg_points = self.get_inital_6points_from_api(input_image_path)
if leg_points is None:
self.logger.log_error("无法获取初始关键点坐标")
return None
# 检查关键点距离是否过近
error = self.error_process(leg_points)
if error is None:
self.logger.log_error("关键点距离检查未通过")
return None
# 计算大腿部分的左右两个穴位
leg_points["殷门左"] = self.calculate_new_point(leg_points["承扶左"], leg_points["委中左"], ratio=2/5)
leg_points["殷门右"] = self.calculate_new_point(leg_points["承扶右"], leg_points["委中右"], ratio=2/5)
leg_points["上委中左"] = self.calculate_new_point(leg_points["承扶左"], leg_points["委中左"], ratio=5.5/7)
leg_points["上委中右"] = self.calculate_new_point(leg_points["承扶右"], leg_points["委中右"], ratio=5.5/7)
# 计算小腿部分的左右两边 6 个穴位点
leg_points["承山左"], leg_points["承筋左"], leg_points["合阳左"] = self.calculate_points_between_kunlun_and_weizhong(leg_points["昆仑左"], leg_points["委中左"])
leg_points["承山右"], leg_points["承筋右"], leg_points["合阳右"] = self.calculate_points_between_kunlun_and_weizhong(leg_points["昆仑右"], leg_points["委中右"])
# 绘制腿部关键点并保存图片
self.plot_acupoints_on_image(input_image_path, leg_points, output_coordinate_image_path)
self.logger.log_info(f"图片处理完成,关键点坐标:{leg_points}")
return leg_points
if __name__ == '__main__':
leg_point = LegAcupointsDetector()
input_image_path = Config.get_image_path("leg.png")
output_image_path = Config.get_output_path("leg_16points.png")
# input_image_path = "aucpuncture2point/configs/using_img/leg.png" # 替换为你的图片路径
# output_coordinate_image_path = "aucpuncture2point/configs/using_img/leg_16points.png" # 替换为输出图片路径
leg_acupoints_list = leg_point.process_image(input_image_path, output_image_path)
print(leg_acupoints_list)
# # 批量处理
# leg_point = LegAcupointsDetector()
# # 输入和输出文件夹路径
# input_folder = r"/home/kira/codes/datas/colors-0872E1"
# output_folder = r"/home/kira/codes/datas/results-0872E11"
# # 确保输出文件夹存在
# if not os.path.exists(output_folder):
# os.makedirs(output_folder)
# # 遍历输入文件夹中的所有文件
# for filename in os.listdir(input_folder):
# # 只处理图片文件(支持常见格式)
# if filename.lower().endswith(('.png', '.jpg', '.jpeg', '.bmp', '.tiff')):
# # 输入图片路径
# input_image_path = os.path.join(input_folder, filename)
# # 输出图片路径
# output_image_name = f"processed_{filename}"
# output_coordinate_image_path = os.path.join(output_folder, output_image_name)
# # 处理单张图片
# try:
# leg_acupoints_list = leg_point.process_image(input_image_path, output_coordinate_image_path)
# print(f"处理完成:{filename} -> {output_image_name}")
# print(f"穴位点坐标:{leg_acupoints_list}")
# except Exception as e:
# print(f"处理失败:{filename},错误信息:{e}")