视觉标定

This commit is contained in:
liangweihao 2025-05-27 16:48:55 +08:00
parent 9ce868092f
commit 66ddb21f80
3 changed files with 283 additions and 13 deletions

View File

@ -0,0 +1,235 @@
import requests
import base64
import cv2
import open3d as o3d
import numpy as np
import paramiko
import time
class ToolCamera:
def __init__(self, host):
"""
初始化 CameraClient
"""
self.host = host
self.port = 22 # SSH端口号默认是22
self.username = "jsfb" # SSH用户名
self.password = "jsfb" # SSH密码
self.root_password = "jsfb"
# 要执行的命令
self.start_command = "systemctl restart cam_server"
self.stop_command = "systemctl stop cam_server"
def start(self):
self.execute_command_on_remote(
host=self.host,
username=self.username,
password=self.password,
root_password=self.root_password,
command=self.start_command
)
def stop(self):
self.execute_command_on_remote(
host=self.host,
username=self.username,
password=self.password,
root_password=self.root_password,
command=self.stop_command
)
def execute_command_on_remote(self, host, username, password, root_password, command, port=22):
# 创建SSH客户端
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
try:
# 连接到远程服务器,指定端口
ssh.connect(hostname=host, port=port, username=username, password=password)
# 构建完整的命令字符串将root密码传递给sudo
sudo_command = f'echo {root_password} | sudo -S {command}'
# 获取通道对象
transport = ssh.get_transport()
channel = transport.open_session()
channel.get_pty() # 获取伪终端
channel.exec_command(sudo_command) # 执行命令
# 检查命令是否在后台执行
while True:
# 如果命令在后台运行, channel.exit_status_ready() 将会一直是 False
if channel.exit_status_ready():
break
# 非阻塞方式读取输出
if channel.recv_ready():
output = channel.recv(1024).decode('utf-8')
print(output)
# 非阻塞方式读取错误输出
if channel.recv_stderr_ready():
error = channel.recv_stderr(1024).decode('utf-8')
print(error)
# 延时防止CPU占用过高
time.sleep(0.1)
return channel.recv_exit_status()
finally:
# 关闭SSH连接
ssh.close()
def get_latest_frame(self):
"""
发送请求到服务器以获取最新的RGB和深度图像数据
返回:
tuple: 包含RGB图像和深度图像的元组 (rgb_image, depth_image)如果请求失败则返回 (None, None)
"""
try:
# 发送GET请求到服务器
response = requests.get("http://" + self.host + ":8000/get_latest_frame", timeout=(3, 5))
# 检查请求是否成功
if response.status_code == 200:
data = response.json()
# 获取Base64编码的RGB和深度图像数据
rgb_image_base64 = data['rgb_image']
depth_image_base64 = data['depth_image']
camera_intrinsics = data['camera_intrinsics']
# 解码Base64为二进制数据
rgb_image_data = base64.b64decode(rgb_image_base64)
depth_image_data = base64.b64decode(depth_image_base64)
# 转换为NumPy数组
rgb_image_np = np.frombuffer(rgb_image_data, np.uint8)
depth_image_np = np.frombuffer(depth_image_data, np.uint8)
# 解码为OpenCV图像
rgb_image = cv2.imdecode(rgb_image_np, cv2.IMREAD_COLOR)
depth_image = cv2.imdecode(depth_image_np, cv2.IMREAD_UNCHANGED)
return rgb_image, depth_image, camera_intrinsics
else:
print(f"Failed to retrieve data from server, status code: {response.status_code}")
return None, None, None
except Exception as e:
print(f"Exception occurred: {e}")
return None, None, None
def display_images(self, rgb_image, depth_image):
"""
显示RGB和深度图像
参数:
rgb_image (numpy.ndarray): RGB图像
depth_image (numpy.ndarray): 深度图像
"""
if rgb_image is not None and depth_image is not None:
cv2.imshow('RGB Image', rgb_image)
cv2.imshow('Depth Image', depth_image)
cv2.waitKey(0)
cv2.destroyAllWindows()
else:
print("No image data to display.")
def display_rgb_point_cloud(self, rgb_image, depth_image, camera_intrinsics):
"""
显示RGB点云将RGB图像和深度图像转换为点云并显示
参数:
rgb_image (np.ndarray): RGB图像
depth_image (np.ndarray): 深度图像
camera_intrinsics (dict): 相机的内参字典包含 'fx', 'fy', 'cx', 'cy'
返回:
None
"""
# 获取RGB图像和深度图像的尺寸
rgb_h, rgb_w = rgb_image.shape[:2]
depth_h, depth_w = depth_image.shape[:2]
# 计算裁剪区域
start_x = (rgb_w - depth_w) // 2
start_y = (rgb_h - depth_h) // 2
# 裁剪RGB图像以匹配深度图像的尺寸
rgb_image_cropped = rgb_image[start_y:start_y + depth_h, start_x:start_x + depth_w]
rgb_image_cropped = cv2.cvtColor(rgb_image_cropped, cv2.COLOR_RGB2BGR)
# 将深度图像转换为浮点型并将单位从毫米转换为米(假设深度图像以毫米为单位)
depth_image = depth_image.astype(np.float32) / 1000.0
# 创建点云的空数组
points = []
colors = []
# 相机内参
fx = camera_intrinsics['fx']
fy = camera_intrinsics['fy']
cx = camera_intrinsics['cx']
cy = camera_intrinsics['cy']
# 遍历每个像素将其转换为3D点
for v in range(depth_h):
for u in range(depth_w):
z = depth_image[v, u]
if z > 0: # 排除无效深度
x = (u - cx) * z / fx
y = (v - cy) * z / fy
points.append([x, y, z])
colors.append(rgb_image_cropped[v, u] / 255.0) # 颜色归一化到[0,1]
# 将点云和颜色转换为NumPy数组
points = np.array(points)
colors = np.array(colors)
# 创建Open3D点云对象
point_cloud = o3d.geometry.PointCloud()
point_cloud.points = o3d.utility.Vector3dVector(points)
point_cloud.colors = o3d.utility.Vector3dVector(colors)
# 显示点云
o3d.visualization.draw_geometries([point_cloud])
if __name__ == "__main__":
import time
import os
# 初始化客户端并指定服务器地址
cam = ToolCamera(host='127.0.0.1')
# cam.stop()
cam.start()
time.sleep(2)
# 获取最新的RGB和深度图像
rgb_image, depth_image, camera_intrinsics = cam.get_latest_frame()
print(camera_intrinsics)
max_depth = np.max(depth_image)
print(np.min(depth_image),np.max(depth_image))
# print(depth_image[200, 320])
# depth_image = (depth_image / max_depth * 65535).astype(np.uint16)
# print(np.min(depth_image),np.max(depth_image))
# 对图像进行水平翻转
# rgb_image = cv2.flip(rgb_image, 1)
# depth_image = cv2.flip(depth_image, 1)
# 显示图像
cam.display_images(rgb_image, depth_image)
cv2.imwrite("aucpuncture2point/configs/using_img/color.png",rgb_image)
cv2.imwrite("aucpuncture2point/configs/using_img/depth.png",depth_image)
# 获取当前时间并格式化为文件夹名 (格式: 年-月-日_时-分-秒)
current_time = time.strftime("%Y-%m-%d_%H-%M-%S")
base_dir = "/home/jsfb/jsfb_ws/collected_data/test_images"
# 创建一个根据时间命名的新文件夹
folder_path = os.path.join(base_dir, current_time)
os.makedirs(folder_path, exist_ok=True) # 如果文件夹不存在则创建
cv2.imwrite(os.path.join(folder_path, "color.png"), rgb_image)
cv2.imwrite(os.path.join(folder_path, "depth.png"), depth_image)
# 显示RGB点云
# cam.display_rgb_point_cloud(rgb_image, depth_image, camera_intrinsics)

View File

@ -8,9 +8,9 @@ import time
from scipy.spatial.transform import Rotation as R
sys.path.append(str(Path(__file__).resolve().parent.parent))
sys.path.append('/home/jsfb/jsfb_ws/MassageRobot_aubo/Massage/MassageControl')
sys.path.append('/home/jsfb/jsfb_ws/MassageRobot_Dobot/Massage/MassageControl')
from hardware.remote_cam import ToolCamera
from hardware.aubo_C5 import AuboC5
from hardware.dobot_nova5 import dobot_nova5
np.set_printoptions(precision=8, suppress=True)
# 设置寻找亚像素角点的参数采用的停止准则是最大循环次数30和最大误差容限0.001
@ -49,8 +49,8 @@ class Calibration:
self.size = rgb_image.shape[:2]
self.intrinsics = camera_intrinsics
self.arm = AuboC5()
self.arm.init()
self.arm = dobot_nova5("192.168.5.1")
self.arm.start()
self.obj_points = []
print(f'===============机械臂连接成功==============')
@ -62,8 +62,8 @@ class Calibration:
def collect_data(self):
#移动到初始位置
pose = [(90) * (np.pi / 180), 4.23 * (np.pi / 180), 124.33 * (np.pi / 180), 61.56 * (np.pi / 180), -89.61 * (np.pi / 180), 0 * (np.pi / 180)]
code = self.arm.move_joint(pose,4,wait=True)
pose = [0,30,-120,0,90,0]
code = self.arm.RunPoint_P_inJoint(pose)
if code == -1:
print("运动到拍照位置失败")
#拍照
@ -72,7 +72,7 @@ class Calibration:
# 目前只设计了44条轨迹
for i in range (0,45):
next_pose = self.get_next_pose_ugly(i)
self.arm.move_joint(next_pose, 4,wait=True)
self.arm.RunPoint_P_inJoint(next_pose)
if code == -1:
print("运动到拍照位置失败")
else:
@ -115,14 +115,12 @@ class Calibration:
self.robot_poses.append(robot_pose)
def get_pose_homomat(self):
pose = self.arm.robot_rpc_client.getRobotInterface(self.arm.robot_name).getRobotState().getTcpPose()
x, y, z, rx, ry, rz = pose
r = R.from_euler('xyz', [rx, ry, rz])
rotation_matrix = r.as_matrix()
translation_vector = np.array([x, y, z]).reshape(3, 1)
arm_position,arm_orientation = self.arm.get_arm_position()
rotation_matrix = R.from_quat(arm_orientation).as_matrix()
translation_vector = arm_position.reshape(3, 1)
homo_mat = np.eye(4) # 创建 4x4 单位矩阵
homo_mat[:3, :3] = rotation_matrix
homo_mat[:3, 3] = translation_vector.flatten()
homo_mat[:3, 3] = translation_vector
return homo_mat
def camera_calib(self):
@ -132,6 +130,7 @@ class Calibration:
if ret:
print("内参矩阵:\n", mtx) # 内参数矩阵
print("畸变系数:\n", dist) # 畸变系数 distortion cofficients = (k_1,k_2,p_1,p_2,k_3)
print("重投影误差:\n",ret)
print("++++++++++相机标定完成++++++++++++++")
self.intrinsics_mtx = mtx
self.disorder = dist

View File

@ -0,0 +1,36 @@
from hardware.dobot_nova5 import dobot_nova5
import cv2
from hardware.remote_cam import ToolCamera
import threading
import keyboard
import numpy as np
class Getpose:
def __init__(self):
self.arm = dobot_nova5("192.168.5.1")
self.arm.start()
self.cam = ToolCamera(host='127.0.0.1')
self.cam.start()
self.thread1=threading.Thread(target=self.show)
self.thread1.start()
self.data=[]
self.arm.start_drag()
def show(self):
while True:
rgb, depth, intrinsics = self.cam.get_latest_frame()
cv2.imshow(rgb)
def add_data(self):
angle=self.arm.getAngel()
self.data=self.data.append(angle)
print(angle)
def save_data(self):
data=np.array(self.data)
np.savetxt('pose.txt', data ,fmt='%.5f')
if __name__ == "__main__":
sele=Getpose()
keyboard.add_hotkey('ctrl+shift+a', lambda: sele.add_data())
keyboard.add_hotkey('ctrl+shift+s', lambda: sele.save_data())
keyboard.add_hotkey('ctrl+shift+c', lambda: sele.arm.disableRobot())
# 阻塞监听(保持程序运行)
keyboard.wait('esc') # 按ESC退出