This commit is contained in:
LiangShiyun 2025-06-03 21:22:04 +08:00
parent 961a86d423
commit 3981c448c4
28 changed files with 1090 additions and 157 deletions

View File

@ -1860,7 +1860,7 @@ class TaskBase(smach.State):
if self.resources.robot.force_sensor:
self.resources.robot.force_sensor.stop_background_reading()
self.resources.robot.force_sensor = XjcSensor(arm_ip=self.resources.robot.arm_config['arm_ip']) # 初始化实例化力传感器
self.resources.robot.force_sensor = XjcSensor() # 初始化实例化力传感器
time.sleep(0.2)
code = self.resources.robot.sensor_set_zero()
if code != 0:
@ -3037,7 +3037,7 @@ class EmergencyExitState(smach.State):
# 这里可以处理一些清理工作
self.resources.logger.log_error("Emergency exit triggered, shutting down...")
self.resources.robot.stop()
self.resources.robot.arm.power_off()
# self.resources.robot.arm.power_off()
self.resources.stop_event.clear()
# 或者使用 sys.exit() 直接退出程序
# sys.exit(1)

View File

@ -115,6 +115,7 @@ class MassageRobot:
# arm 实例化时机械臂类内部进行通讯连接
self.arm = dobot_nova5(arm_ip = self.arm_config['arm_ip'])
# self.arm.start()
# self.arm.chooseRightFrame()
self.arm.init()
self.thermotherapy = None
@ -164,16 +165,17 @@ class MassageRobot:
self.adjust_wrench = np.zeros(6,dtype=np.float64)
self.skip_pos = np.zeros(6,dtype=np.float64)
# 按摩头参数暂时使用本地数据
massage_head_dir = self.arm_config['massage_head_dir']
all_items = os.listdir(massage_head_dir)
head_config_files = [f for f in all_items if os.path.isfile(os.path.join(massage_head_dir, f))]
self.playload_dict = {}
for file in head_config_files:
file_address = massage_head_dir + '/' + file
play_load = read_yaml(file_address)
self.playload_dict[play_load['name']] = play_load
# self.playload_dict = self.vtxdb.get("robot_config", "massage_head")
#按摩头参数暂时使用本地数据
# massage_head_dir = self.arm_config['massage_head_dir']
# all_items = os.listdir(massage_head_dir)
# head_config_files = [f for f in all_items if os.path.isfile(os.path.join(massage_head_dir, f))]
# self.playload_dict = {}
# for file in head_config_files:
# file_address = massage_head_dir + '/' + file
# play_load = read_yaml(file_address)
# self.playload_dict[play_load['name']] = play_load
self.playload_dict = self.vtxdb.get("robot_config", "massage_head")
# print(self.playload_dict)
self.current_head = 'none'
self.is_waitting = False
@ -220,6 +222,7 @@ class MassageRobot:
# ---
self.last_time = -1
self.cur_time = -1
self.tool_num = 0
# 预测步骤
def kalman_predict(self,x, P, Q):
@ -417,8 +420,10 @@ class MassageRobot:
tcp_offset = self.playload_dict[name]["tcp_offset"]
tcp_offset_str = "{" + ",".join(map(str, tcp_offset)) + "}"
print("tcp_offset_str",tcp_offset_str)
self.arm.setEndEffector(i=9,tool_i=tcp_offset_str)
self.arm.chooseEndEffector(i=9)
self.tool_num = 9
self.arm.setEndEffector(i=self.tool_num,tool_i=tcp_offset_str)
self.arm.chooseEndEffector(i=self.tool_num)
print(self.arm.get_arm_position())
self.logger.log_info(f"切换到{name}按摩头")
@ -554,9 +559,9 @@ class MassageRobot:
time.sleep(sleep_duration)
# print(f"real sleep:{time.time()-b2}")
# print("self111:",self.arm.feedbackData.robotMode)
# print("self111:",self.arm.feedbackData.robotMode,tcp_command)
self.arm.dashboard.socket_dobot.sendall(tcp_command)
# print("self111222:",self.arm.feedbackData.robotMode)
# print("self111222:",self.arm.feedbackData.robotMode,tcp_command)
if self.arm.feedbackData.robotMode in [3, 4, 6, 8, 9, 11] or not self.arm.feedbackData.EnableState:
self.logger.log_error(f"机械臂异常:{self.arm.feedbackData.robotMode}")
self.arm.dashboard.Stop()
@ -665,14 +670,20 @@ class MassageRobot:
@track_function("set_position",True)
def set_position(self,pose,is_wait = False):
print("self.robotmode1:",self.arm.feedbackData.robotMode)
self.logger.log_info(f"set postion:{pose}")
x,y,z = pose[0],pose[1],pose[2]
roll,pitch,yaw = pose[3],pose[4],pose[5]
pose_command = np.concatenate([[x,y,z], [roll,pitch,yaw]])
pose_command = np.concatenate([np.array([x,y,z]), np.array([roll,pitch,yaw])])
time.sleep(0.1)
code = self.arm.RunPoint_P_inPose_M_RAD(poses_list = pose_command)
print("self.robotmode2:",self.arm.feedbackData.robotMode)
# inverse_joint = self.arm.getInverseKin(poses_list = pose_command,tool = self.tool_num)
code = self.arm.RunPoint_P_inPose_M_RAD(pose = pose_command)
time.sleep(0.1)
return code
# print("inverse_joint:",inverse_joint,np.array(inverse_joint)*np.pi/180)
# code = self.arm.move_joint(q = np.array(inverse_joint)*np.pi/180)
# return code
@track_function("move_to_point",True)
def move_to_point(self, t, pose, is_interrupt=True, wait=True, timeout=0.5, interpolation:Literal["linear","circle","cloud_point"] ='linear', algorithm:Literal["admittance","position","hybrid","admithybrid","hybridPid","hybridAdmit","admittanceZ"] = "admittance",is_switch_controller = False):
@ -1671,8 +1682,8 @@ if __name__ == "__main__":
# robot.dt = ts[1] - ts[0]
ready_pose = [0.1543467,-0.1349880,0.4553604,-180.0000*np.pi/180,-30.0000*np.pi/180,0.0042*np.pi/180]
robot.current_head = 'finger_head'
ready_pose = [0.4543467,-0.1349880,0.4553604,-180.0000*np.pi/180,-30.0000*np.pi/180,0.0042*np.pi/180]
robot.current_head = 'none'
robot.force_sensor.disable_active_transmission()
time.sleep(0.5)
@ -1683,15 +1694,24 @@ if __name__ == "__main__":
robot.force_sensor.set_zero()
time.sleep(0.5)
robot.force_sensor.enable_active_transmission()
robot.set_position(pose = ready_pose)
robot.init_hardwares(ready_pose)
time.sleep(3)
robot.switch_payload("none")
robot.switch_payload("finger_head")
robot.controller_manager.switch_controller('admittance')
position,quat_rot = robot.arm.get_arm_position()
euler_rot = R.from_quat(quat_rot).as_euler('xyz')
target_point = [position[0], position[1], position[2], euler_rot[0], euler_rot[1], euler_rot[2]]
robot.move_to_point(pose=target_point,t=60)
# target_point = [position[0], position[1], position[2], euler_rot[0], euler_rot[1], euler_rot[2]]
target_point = [0.12191350715303009, -0.1636467057977184, 0.007338312730729687, 8.37758040957278e-05, 1.5708137800874167, -1.5708329787091884]
# target_point = [0.3543467,-0.1349880,0.4553604,-180.0000*np.pi/180,-30.0000*np.pi/180,0.0042*np.pi/180]
# robot.move_to_point(pose=target_point,t=60)
robot.arm.move_joint(q = robot.arm.off_pos)
robot.arm.move_joint(q = robot.arm.init_pos)
pose1 = [0.353467,-0.1349880,0.403604,-180.0000*np.pi/180,-30.0000*np.pi/180,0.0042*np.pi/180]
robot.set_position(pose = pose1)
robot.set_position([0.12189544299531646, -0.31364670468715944, 0.00733569473686102, 2.153932270623249, 0.6209756952505509, 0.0])
# robot.controller_manager.switch_controller('position')

View File

@ -40,100 +40,108 @@ class AdmittanceController(BaseController):
self.laset_print_time = 0
# def step(self,dt):
# # 方向统一
# if self.state.desired_orientation.dot(self.state.arm_orientation) < 0:
# self.state.arm_orientation = -self.state.arm_orientation
# # 缓存常用计算
# arm_ori_quat = R.from_quat(self.state.arm_orientation)
# arm_ori_mat = arm_ori_quat.as_matrix()
# # 位置误差
# temp_pose_error = self.state.arm_position - self.state.desired_position
# self.state.pose_error[:3] = arm_ori_mat.T @ temp_pose_error
# # 姿态误差(四元数)
# rot_err_quat = arm_ori_quat.inv() * R.from_quat(self.state.desired_orientation)
# self.state.pose_error[3:] = -rot_err_quat.as_rotvec(degrees=False)
# # 期望加速度
# wrench_err = self.state.external_wrench_tcp - self.state.desired_wrench
# D_vel = self.D @ (self.state.arm_desired_twist - self.state.desired_twist)
# K_pose = self.K @ self.state.pose_error
# self.state.arm_desired_acc = self.M_inv @ (wrench_err - D_vel - K_pose)
# self.clip_command(self.state.arm_desired_acc, "acc")
# ## 更新速度和位姿
# self.state.arm_desired_twist += self.state.arm_desired_acc * dt
# self.clip_command(self.state.arm_desired_twist, "vel")
# # 计算位姿变化
# delta_pose = np.concatenate([
# arm_ori_mat @ (self.state.arm_desired_twist[:3] * dt),
# self.state.arm_desired_twist[3:] * dt
# ])
# self.clip_command(delta_pose, "pose")
# # 更新四元数
# delta_ori_quat = R.from_rotvec(delta_pose[3:]).as_quat()
# arm_ori_quat_new = arm_ori_quat * R.from_quat(delta_ori_quat)
# self.state.arm_orientation_command = arm_ori_quat_new.as_quat()
# # 归一化四元数
# self.state.arm_orientation_command /= np.linalg.norm(self.state.arm_orientation_command)
# # 更新位置
# self.state.arm_position_command = self.state.arm_position + delta_pose[:3]
def step(self,dt):
# 计算误差 位置直接作差,姿态误差以旋转向量表示
temp_pose_error = self.state.arm_position - self.state.desired_position
# if time.time() - self.laset_print_time > 5:
# print(f'temp_pose_error: {temp_pose_error} ||| arm_position: {self.state.arm_position} ||| desired_position: {self.state.desired_position}')
# 方向统一
if self.state.desired_orientation.dot(self.state.arm_orientation) < 0:
self.state.arm_orientation = -self.state.arm_orientation
self.state.pose_error[:3] = R.from_quat(self.state.arm_orientation).as_matrix().T @ temp_pose_error
# if time.time() - self.laset_print_time > 5:
# print("pose_error:",self.state.pose_error[:3])
# 计算误差 位置直接作差,姿态误差以旋转向量表示
#rot_err_mat = R.from_quat(self.state.arm_orientation).as_matrix() @ R.from_quat(self.state.desired_orientation).as_matrix().T
rot_err_mat = R.from_quat(self.state.arm_orientation).as_matrix().T @ R.from_quat(self.state.desired_orientation).as_matrix()
# print(f'rot_err_mat: {rot_err_mat} ||| arm_orientation: {R.from_quat(self.state.arm_orientation).as_euler('xyz',False)} ||| desired_orientation: {R.from_quat(self.state.desired_orientation).as_euler('xyz',False)}')
rot_err_rotvex = R.from_matrix(rot_err_mat).as_rotvec(degrees=False)
self.state.pose_error[3:] = -rot_err_rotvex
#wrench_err = self.state.external_wrench_base - self.state.desired_wrench
# 缓存常用计算
arm_ori_quat = R.from_quat(self.state.arm_orientation)
arm_ori_mat = arm_ori_quat.as_matrix()
# 位置误差
temp_pose_error = self.state.arm_position - self.state.desired_position
self.state.pose_error[:3] = arm_ori_mat.T @ temp_pose_error
# 姿态误差(四元数)
rot_err_quat = arm_ori_quat.inv() * R.from_quat(self.state.desired_orientation)
self.state.pose_error[3:] = -rot_err_quat.as_rotvec(degrees=False)
# 期望加速度
wrench_err = self.state.external_wrench_tcp - self.state.desired_wrench
if time.time() - self.laset_print_time > 5:
print(f'wrench_err: {wrench_err} ||| external_wrench_tcp: {self.state.external_wrench_tcp} ||| desired_wrench: {self.state.desired_wrench}')
self.state.arm_desired_acc = np.linalg.inv(self.M) @ (wrench_err - self.D @ (self.state.arm_desired_twist -self.state.desired_twist) - self.K @ self.state.pose_error)
# if time.time() - self.laset_print_time > 5:
# print("@@@:",wrench_err - self.D @ (self.state.arm_desired_twist -self.state.desired_twist) - self.K @ self.state.pose_error)
self.clip_command(self.state.arm_desired_acc,"acc")
self.state.arm_desired_twist = self.state.arm_desired_acc * dt + self.state.arm_desired_twist
self.clip_command(self.state.arm_desired_twist,"vel")
delta_pose = self.state.arm_desired_twist * dt
D_vel = self.D @ (self.state.arm_desired_twist - self.state.desired_twist)
K_pose = self.K @ self.state.pose_error
self.state.arm_desired_acc = self.M_inv @ (wrench_err - D_vel - K_pose)
self.clip_command(self.state.arm_desired_acc, "acc")
## 更新速度和位姿
self.state.arm_desired_twist += self.state.arm_desired_acc * dt
self.clip_command(self.state.arm_desired_twist, "vel")
# 计算位姿变化
delta_pose = np.concatenate([
arm_ori_mat @ (self.state.arm_desired_twist[:3] * dt),
self.state.arm_desired_twist[3:] * dt
])
self.clip_command(delta_pose, "pose")
delta_pose[:3] = self.pos_scale_factor * delta_pose[:3]
delta_pose[3:] = self.rot_scale_factor * delta_pose[3:]
# if time.time() - self.laset_print_time > 5:
# print("delta_pose:",delta_pose)
delta_pose[:3] = R.from_quat(self.state.arm_orientation).as_matrix() @ delta_pose[:3]
# # update position
# delta_pose[:3] = arm_ori_mat @ delta_pose[:3]
# if time.time() - self.laset_print_time > 5:
# print("tf_delta_pose:",delta_pose)
self.clip_command(delta_pose,"pose")
# testlsy
delta_ori_mat = R.from_rotvec(delta_pose[3:]).as_matrix()
#arm_ori_mat = delta_ori_mat @ R.from_quat(self.state.arm_orientation).as_matrix()
arm_ori_mat = R.from_quat(self.state.arm_orientation).as_matrix() @ delta_ori_mat
self.state.arm_orientation_command = R.from_matrix(arm_ori_mat).as_quat()
# arm_ori_mat = R.from_quat(self.state.arm_orientation).as_rotvec(degrees=False) + delta_pose[3:]
# self.state.arm_orientation_command = R.from_rotvec(arm_ori_mat).as_quat()
# self.state.arm_orientation_command = R.from_matrix(arm_ori_mat).as_quat()
# 更新四元数
delta_ori_quat = R.from_rotvec(delta_pose[3:]).as_quat()
arm_ori_quat_new = arm_ori_quat * R.from_quat(delta_ori_quat)
self.state.arm_orientation_command = arm_ori_quat_new.as_quat()
# 归一化四元数
self.state.arm_orientation_command /= np.linalg.norm(self.state.arm_orientation_command)
# 更新位置
self.state.arm_position_command = self.state.arm_position + delta_pose[:3]
if time.time() - self.laset_print_time > 1:
print("11111111111111111111111:",self.state.desired_position,self.state.arm_position_command,self.state.arm_desired_acc)
# if time.time() - self.laset_print_time > 1:
# print("11111111111111111111111:",self.state.desired_position,self.state.arm_position_command,self.state.arm_position)
# def step(self,dt):
# # 计算误差 位置直接作差,姿态误差以旋转向量表示
# temp_pose_error = self.state.arm_position - self.state.desired_position
# # if time.time() - self.laset_print_time > 5:
# # print(f'temp_pose_error: {temp_pose_error} ||| arm_position: {self.state.arm_position} ||| desired_position: {self.state.desired_position}')
# if self.state.desired_orientation.dot(self.state.arm_orientation) < 0:
# self.state.arm_orientation = -self.state.arm_orientation
# self.state.pose_error[:3] = R.from_quat(self.state.arm_orientation).as_matrix().T @ temp_pose_error
# # if time.time() - self.laset_print_time > 5:
# # print("pose_error:",self.state.pose_error[:3])
# # 计算误差 位置直接作差,姿态误差以旋转向量表示
# #rot_err_mat = R.from_quat(self.state.arm_orientation).as_matrix() @ R.from_quat(self.state.desired_orientation).as_matrix().T
# rot_err_mat = R.from_quat(self.state.arm_orientation).as_matrix().T @ R.from_quat(self.state.desired_orientation).as_matrix()
# # print(f'rot_err_mat: {rot_err_mat} ||| arm_orientation: {R.from_quat(self.state.arm_orientation).as_euler('xyz',False)} ||| desired_orientation: {R.from_quat(self.state.desired_orientation).as_euler('xyz',False)}')
# rot_err_rotvex = R.from_matrix(rot_err_mat).as_rotvec(degrees=False)
# self.state.pose_error[3:] = -rot_err_rotvex
# #wrench_err = self.state.external_wrench_base - self.state.desired_wrench
# wrench_err = self.state.external_wrench_tcp - self.state.desired_wrench
# if time.time() - self.laset_print_time > 5:
# print(f'wrench_err: {wrench_err} ||| external_wrench_tcp: {self.state.external_wrench_tcp} ||| desired_wrench: {self.state.desired_wrench}')
# self.state.arm_desired_acc = np.linalg.inv(self.M) @ (wrench_err - self.D @ (self.state.arm_desired_twist -self.state.desired_twist) - self.K @ self.state.pose_error)
# # if time.time() - self.laset_print_time > 5:
# # print("@@@:",wrench_err - self.D @ (self.state.arm_desired_twist -self.state.desired_twist) - self.K @ self.state.pose_error)
# self.clip_command(self.state.arm_desired_acc,"acc")
# self.state.arm_desired_twist = self.state.arm_desired_acc * dt + self.state.arm_desired_twist
# self.clip_command(self.state.arm_desired_twist,"vel")
# delta_pose = self.state.arm_desired_twist * dt
# delta_pose[:3] = self.pos_scale_factor * delta_pose[:3]
# delta_pose[3:] = self.rot_scale_factor * delta_pose[3:]
# # if time.time() - self.laset_print_time > 5:
# # print("delta_pose:",delta_pose)
# delta_pose[:3] = R.from_quat(self.state.arm_orientation).as_matrix() @ delta_pose[:3]
# # if time.time() - self.laset_print_time > 5:
# # print("tf_delta_pose:",delta_pose)
# self.clip_command(delta_pose,"pose")
# # testlsy
# delta_ori_mat = R.from_rotvec(delta_pose[3:]).as_matrix()
# #arm_ori_mat = delta_ori_mat @ R.from_quat(self.state.arm_orientation).as_matrix()
# arm_ori_mat = R.from_quat(self.state.arm_orientation).as_matrix() @ delta_ori_mat
# self.state.arm_orientation_command = R.from_matrix(arm_ori_mat).as_quat()
# # arm_ori_mat = R.from_quat(self.state.arm_orientation).as_rotvec(degrees=False) + delta_pose[3:]
# # self.state.arm_orientation_command = R.from_rotvec(arm_ori_mat).as_quat()
# # self.state.arm_orientation_command = R.from_matrix(arm_ori_mat).as_quat()
# self.state.arm_position_command = self.state.arm_position + delta_pose[:3]
# if time.time() - self.laset_print_time > 1:
# print("11111111111111111111111:",self.state.desired_position,self.state.arm_position_command,self.state.arm_position)
def step_traj(self,dt):
# 方向统一

View File

@ -8,10 +8,10 @@ mass_rot: [0.11, 0.11, 0.11]
# P
# stiff_tran: [300, 300, 300]
# stiff_rot: [7, 7, 7]
# stiff_tran: [270, 270, 270]
# stiff_rot: [11, 11, 11]
stiff_tran: [0, 0, 0]
stiff_tran: [270, 270, 270]
stiff_rot: [11, 11, 11]
# stiff_tran: [0, 0, 0]
# stiff_rot: [11, 11, 11]
# stiff_tran: [100, 100, 100]
# stiff_rot: [1, 1, 1]
# D

View File

@ -1893,6 +1893,7 @@ class DobotApiDashboard(DobotApi):
else:
print("coordinateMode param is wrong")
return ""
print("movJ:string,",string)
params = []
if user != -1:
params.append('user={:d}'.format(user))

View File

@ -148,8 +148,13 @@ class dobot_nova5:
self.feedFour = None
atexit.register(self.exit_task)
self.chooseRightFrame()
self.init_pos = [-45.0009079*np.pi/180,55.5785141*np.pi/180,-120.68821716*np.pi/180,5.11103201*np.pi/180,90.00195312*np.pi/180,-90.00085449*np.pi/180]
self.standby_pos = [-45.0009079*np.pi/180,55.5785141*np.pi/180,-120.68821716*np.pi/180,5.11103201*np.pi/180,90.00195312*np.pi/180,-90.00085449*np.pi/180]
self.off_pos = [-44.996*np.pi/180,87.0092*np.pi/180,-147.0087*np.pi/180,-0.0048*np.pi/180,90.0010*np.pi/180,-90.0021*np.pi/180]
self.cam_pose = [-45.0050*np.pi/180,30.0682*np.pi/180,-66.1887*np.pi/180,-21.8836*np.pi/180,-90.3064*np.pi/180,-89.9977*np.pi/180]
self.cam_length = 1.75
self.level_base = [-180.0000*np.pi/180,-30.0000*np.pi/180,0.0042*np.pi/180]
self.is_exit = False
self.is_standby = False
self.filter_matirx = np.zeros((1,7)) # 位姿伺服用
@ -281,6 +286,7 @@ class dobot_nova5:
def waitArrival(self, command):
sendCommandID = self.parseResultId(command)[1]
print("self.feedbackData.robotMode6:",self.feedbackData.robotMode)
# print("self.parseResultId(command)",self.parseResultId(command))
while True:
if self.feedbackData.robotMode in [3, 4, 6, 9, 10, 11]:
@ -290,7 +296,7 @@ class dobot_nova5:
self.dashboard.EmergencyStop(mode=1)
return -1
if self.feedbackData.EnableState:
print("self.feedbackData.robotCurrentCommandID",self.feedbackData.robotCurrentCommandID)
# print("self.feedbackData.robotCurrentCommandID",self.feedbackData.robotCurrentCommandID)
if self.feedbackData.robotCurrentCommandID > sendCommandID:
break
else:
@ -315,11 +321,13 @@ class dobot_nova5:
is_arrived = self.waitArrival(command)
if is_arrived == 0:
# self.dashboard.MovJ()
print(f'Arrived at the joint position: {q}')
return 0
else:
print(f'Failed to arrive at the joint position: {q}')
return -1
print(f'After {cnt} retries, still failed to arrive at the joint position: {q}')
return -1
@ -333,7 +341,7 @@ class dobot_nova5:
targetAngle = self.__transform_rad_2_deg(q)
currentAngle = np.array(currentAngle)
targetAngle = np.array(targetAngle)
self.dashboard.SpeedFactor(70)
self.dashboard.SpeedFactor(50)
for i in range(len(targetAngle)):
while np.abs(currentAngle[i] - targetAngle[i]) >threshold:
@ -356,12 +364,15 @@ class dobot_nova5:
# 单位 m/rad -> mm/deg
max_retry_count = params.get('max_retry_count', 3)
cnt = 0
print("self.robotmode3:",self.feedbackData.robotMode)
poses_list_mm_deg = self.__transform_m_rad_2_mm_deg(pose)
while cnt < max_retry_count:
cnt += 1
print("self.robotmode4:",self.feedbackData.robotMode)
command = self.dashboard.MovJ(*poses_list_mm_deg,0)
print("self.robotmode5:",self.feedbackData.robotMode)
is_arrived = self.waitArrival(command)
is_arrived = self.waitArrival(command = command)
if is_arrived == 0:
print(f'Arrived at the joint position: {pose}')
return 0
@ -588,7 +599,7 @@ class dobot_nova5:
pose = [float(x.strip()) for x in pose_str.split(",")]
return pose
def getInverseKin(self,poses_list,user=-1,tool=-1,useJointNear=-1,JointNear=''):
def getInverseKin(self,poses_list,user=1,tool=-1,useJointNear=1,JointNear=''):
'''
poses_list X Y Z RX RY RZ
useJointNear 0 或不携带 表示 JointNear无效
@ -747,7 +758,7 @@ class dobot_nova5:
pos: [x,y,z] in m
quat: [qx,qy,qz,qw] 四元数
'''
pose = self.getPose(user=0, tool=tool)
pose = self.getPose(user=1, tool=tool)
x,y,z,rx,ry,rz = pose # x y z in mm; rx ry rz in degree
x /= 1000
y /= 1000
@ -842,6 +853,8 @@ class dobot_nova5:
if __name__ == "__main__":
# print(R.from_euler("xyz", np.array([180, 0, 0]), degrees=True).as_matrix())
# sleep(5)
dobot = dobot_nova5("192.168.5.1")
@ -852,22 +865,43 @@ if __name__ == "__main__":
# dobot.dashboard.MoveJog("J1+")
# time.sleep(1)
# dobot.dashboard.MoveJog("")
# print(dobot.getAngel())
print("dobot.getangle::",dobot.getAngel())
# # while True:
# # time.sleep(1)
# # dobot.start()
# posJ = [0,30,-120,0,90,0]
# time.sleep(1)
joint = [-45.0050*np.pi/180,30.0682*np.pi/180,-66.1887*np.pi/180,-21.8836*np.pi/180,-90.3064*np.pi/180,-89.9977*np.pi/180]
# dobot.move_joint(q = joint)
time.sleep(1)
joint1 = [-44.8416*np.pi/180, 39.5626*np.pi/180, -80.5683*np.pi/180, -18.0396*np.pi/180, -89.4168*np.pi/180, -90.0026*np.pi/180]
try:
dobot.start_drag()
# dobot.move_joint(q = joint)
# dobot.move_joint(q = joint1)
# pose1 = [0.353467,-0.1349880,0.403604,-180.0000*np.pi/180,-30.0000*np.pi/180,0.0042*np.pi/180]
# dobot.RunPoint_P_inPose_M_RAD(pose=pose1)
while True:
time.sleep(1)
except KeyboardInterrupt:
# robot_thread.join()
print("用户中断操作。")
except Exception as e:
# robot_thread.join()
print("Exception occurred at line:", e.__traceback__.tb_lineno)
print("发生异常:", e)
# dobot.RunPoint_P_inJoint(posJ)
# sleep(1)
# dobot.setEndEffector(i=9,tool_i="{0.0,0.0,154.071,0.0,0.0,0.0}")
# dobot.chooseEndEffector(i=9)
# print("Arm pose:",dobot.getPose())
# dobot.RunPoint_P_inPose([300, -135, 224.34, -180, 0, -90])
# dobot.start_drag()
# time.sleep(60)
# dobot.stop_drag()
# # dobot.RunPoint_P_inJoint(posJ)
# # sleep(1)
# # dobot.setEndEffector(i=9,tool_i="{0.0,0.0,154.071,0.0,0.0,0.0}")
# # dobot.chooseEndEffector(i=9)
# # print("Arm pose:",dobot.getPose())
# # dobot.RunPoint_P_inPose([300, -135, 224.34, -180, 0, -90])
# # dobot.start_drag()
# dobot.disableRobot()
dobot.disableRobot()

View File

@ -427,12 +427,12 @@ if __name__ == "__main__":
xjc_sensor.set_zero()
# time.sleep(1)
# xjc_sensor.enable_active_transmission()
while True:
# sensor_data = xjc_sensor.read()
sensor_data = xjc_sensor.read_data_f32()
current_time = time.strftime("%Y-%m-%d_%H-%M-%S")
if sensor_data is None:
print('failed to get force sensor data!')
print(f"{current_time}-----{sensor_data}")
rate.sleep(False)
# break
# while True:
# # sensor_data = xjc_sensor.read()
# sensor_data = xjc_sensor.read_data_f32()
# current_time = time.strftime("%Y-%m-%d_%H-%M-%S")
# if sensor_data is None:
# print('failed to get force sensor data!')
# print(f"{current_time}-----{sensor_data}")
# rate.sleep(False)
# # break

View File

@ -0,0 +1,235 @@
import requests
import base64
import cv2
import open3d as o3d
import numpy as np
import paramiko
import time
class ToolCamera:
def __init__(self, host):
"""
初始化 CameraClient
"""
self.host = host
self.port = 22 # SSH端口号默认是22
self.username = "jsfb" # SSH用户名
self.password = "jsfb" # SSH密码
self.root_password = "jsfb"
# 要执行的命令
self.start_command = "systemctl restart cam_server"
self.stop_command = "systemctl stop cam_server"
def start(self):
self.execute_command_on_remote(
host=self.host,
username=self.username,
password=self.password,
root_password=self.root_password,
command=self.start_command
)
def stop(self):
self.execute_command_on_remote(
host=self.host,
username=self.username,
password=self.password,
root_password=self.root_password,
command=self.stop_command
)
def execute_command_on_remote(self, host, username, password, root_password, command, port=22):
# 创建SSH客户端
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
try:
# 连接到远程服务器,指定端口
ssh.connect(hostname=host, port=port, username=username, password=password)
# 构建完整的命令字符串将root密码传递给sudo
sudo_command = f'echo {root_password} | sudo -S {command}'
# 获取通道对象
transport = ssh.get_transport()
channel = transport.open_session()
channel.get_pty() # 获取伪终端
channel.exec_command(sudo_command) # 执行命令
# 检查命令是否在后台执行
while True:
# 如果命令在后台运行, channel.exit_status_ready() 将会一直是 False
if channel.exit_status_ready():
break
# 非阻塞方式读取输出
if channel.recv_ready():
output = channel.recv(1024).decode('utf-8')
print(output)
# 非阻塞方式读取错误输出
if channel.recv_stderr_ready():
error = channel.recv_stderr(1024).decode('utf-8')
print(error)
# 延时防止CPU占用过高
time.sleep(0.1)
return channel.recv_exit_status()
finally:
# 关闭SSH连接
ssh.close()
def get_latest_frame(self):
"""
发送请求到服务器以获取最新的RGB和深度图像数据
返回:
tuple: 包含RGB图像和深度图像的元组 (rgb_image, depth_image)如果请求失败则返回 (None, None)
"""
try:
# 发送GET请求到服务器
response = requests.get("http://" + self.host + ":8000/get_latest_frame", timeout=(3, 5))
# 检查请求是否成功
if response.status_code == 200:
data = response.json()
# 获取Base64编码的RGB和深度图像数据
rgb_image_base64 = data['rgb_image']
depth_image_base64 = data['depth_image']
camera_intrinsics = data['camera_intrinsics']
# 解码Base64为二进制数据
rgb_image_data = base64.b64decode(rgb_image_base64)
depth_image_data = base64.b64decode(depth_image_base64)
# 转换为NumPy数组
rgb_image_np = np.frombuffer(rgb_image_data, np.uint8)
depth_image_np = np.frombuffer(depth_image_data, np.uint8)
# 解码为OpenCV图像
rgb_image = cv2.imdecode(rgb_image_np, cv2.IMREAD_COLOR)
depth_image = cv2.imdecode(depth_image_np, cv2.IMREAD_UNCHANGED)
return rgb_image, depth_image, camera_intrinsics
else:
print(f"Failed to retrieve data from server, status code: {response.status_code}")
return None, None, None
except Exception as e:
print(f"Exception occurred: {e}")
return None, None, None
def display_images(self, rgb_image, depth_image):
"""
显示RGB和深度图像
参数:
rgb_image (numpy.ndarray): RGB图像
depth_image (numpy.ndarray): 深度图像
"""
if rgb_image is not None and depth_image is not None:
cv2.imshow('RGB Image', rgb_image)
cv2.imshow('Depth Image', depth_image)
cv2.waitKey(0)
cv2.destroyAllWindows()
else:
print("No image data to display.")
def display_rgb_point_cloud(self, rgb_image, depth_image, camera_intrinsics):
"""
显示RGB点云将RGB图像和深度图像转换为点云并显示
参数:
rgb_image (np.ndarray): RGB图像
depth_image (np.ndarray): 深度图像
camera_intrinsics (dict): 相机的内参字典包含 'fx', 'fy', 'cx', 'cy'
返回:
None
"""
# 获取RGB图像和深度图像的尺寸
rgb_h, rgb_w = rgb_image.shape[:2]
depth_h, depth_w = depth_image.shape[:2]
# 计算裁剪区域
start_x = (rgb_w - depth_w) // 2
start_y = (rgb_h - depth_h) // 2
# 裁剪RGB图像以匹配深度图像的尺寸
rgb_image_cropped = rgb_image[start_y:start_y + depth_h, start_x:start_x + depth_w]
rgb_image_cropped = cv2.cvtColor(rgb_image_cropped, cv2.COLOR_RGB2BGR)
# 将深度图像转换为浮点型并将单位从毫米转换为米(假设深度图像以毫米为单位)
depth_image = depth_image.astype(np.float32) / 1000.0
# 创建点云的空数组
points = []
colors = []
# 相机内参
fx = camera_intrinsics['fx']
fy = camera_intrinsics['fy']
cx = camera_intrinsics['cx']
cy = camera_intrinsics['cy']
# 遍历每个像素将其转换为3D点
for v in range(depth_h):
for u in range(depth_w):
z = depth_image[v, u]
if z > 0: # 排除无效深度
x = (u - cx) * z / fx
y = (v - cy) * z / fy
points.append([x, y, z])
colors.append(rgb_image_cropped[v, u] / 255.0) # 颜色归一化到[0,1]
# 将点云和颜色转换为NumPy数组
points = np.array(points)
colors = np.array(colors)
# 创建Open3D点云对象
point_cloud = o3d.geometry.PointCloud()
point_cloud.points = o3d.utility.Vector3dVector(points)
point_cloud.colors = o3d.utility.Vector3dVector(colors)
# 显示点云
o3d.visualization.draw_geometries([point_cloud])
if __name__ == "__main__":
import time
import os
# 初始化客户端并指定服务器地址
cam = ToolCamera(host='127.0.0.1')
# cam.stop()
cam.start()
time.sleep(2)
# 获取最新的RGB和深度图像
rgb_image, depth_image, camera_intrinsics = cam.get_latest_frame()
print(camera_intrinsics)
max_depth = np.max(depth_image)
print(np.min(depth_image),np.max(depth_image))
# print(depth_image[200, 320])
# depth_image = (depth_image / max_depth * 65535).astype(np.uint16)
# print(np.min(depth_image),np.max(depth_image))
# 对图像进行水平翻转
# rgb_image = cv2.flip(rgb_image, 1)
# depth_image = cv2.flip(depth_image, 1)
# 显示图像
cam.display_images(rgb_image, depth_image)
cv2.imwrite("aucpuncture2point/configs/using_img/color.png",rgb_image)
cv2.imwrite("aucpuncture2point/configs/using_img/depth.png",depth_image)
# 获取当前时间并格式化为文件夹名 (格式: 年-月-日_时-分-秒)
current_time = time.strftime("%Y-%m-%d_%H-%M-%S")
base_dir = "/home/jsfb/jsfb_ws/collected_data/test_images"
# 创建一个根据时间命名的新文件夹
folder_path = os.path.join(base_dir, current_time)
os.makedirs(folder_path, exist_ok=True) # 如果文件夹不存在则创建
cv2.imwrite(os.path.join(folder_path, "color.png"), rgb_image)
cv2.imwrite(os.path.join(folder_path, "depth.png"), depth_image)
# 显示RGB点云
# cam.display_rgb_point_cloud(rgb_image, depth_image, camera_intrinsics)

View File

@ -0,0 +1,293 @@
import cv2
import os
from datetime import datetime
import numpy as np
import sys
from pathlib import Path
import time
from scipy.spatial.transform import Rotation as R
sys.path.append(str(Path(__file__).resolve().parent.parent))
sys.path.append('/home/jsfb/jsfb_ws/MassageRobot_aubo/Massage/MassageControl')
from hardware.remote_cam import ToolCamera
from hardware.aubo_C5 import AuboC5
np.set_printoptions(precision=8, suppress=True)
# 设置寻找亚像素角点的参数采用的停止准则是最大循环次数30和最大误差容限0.001
criteria = (cv2.TERM_CRITERIA_MAX_ITER | cv2.TERM_CRITERIA_EPS, 30, 0.001)
# 标定计算类
# -- 连接AuboC5
# -- 连接 ToolCamera
# -- 运动到初始位置 计算下一步运动位姿
# -- 开始手眼标定
# -- 输出结果
class Calibration:
def __init__(self, board_size, square_size, file_folder):
"""
初始化标定计算类
board_size: 标定板的格子数格式为 (width, height)
square_size: 格子的大小单位为米
"""
self.board_size = board_size
self.square_size = square_size
self.objp = self._create_object_points()
self.image_points = []
self.robot_poses = []
self.intrinsics = None
self.distortion_cofficients = None
self.cam = ToolCamera(host='127.0.0.1')
self.cam.start()
time.sleep(5)
self.size = (400,640)
rgb_image, depth_image, camera_intrinsics = self.cam.get_latest_frame()
if rgb_image is None :
print(f'===============相机连接失败请检查并重试==============')
sys.exit(1)
else:
print(f'===============相机连接成功==============')
self.size = rgb_image.shape[:2]
self.intrinsics = camera_intrinsics
self.arm = AuboC5()
self.arm.init()
self.obj_points = []
print(f'===============机械臂连接成功==============')
date_time_str = time.strftime("%Y%m%d%H%M%S", time.localtime())
folder_name = f"calibration_data_{date_time_str}"
file_folder+=folder_name
os.makedirs(file_folder, exist_ok=True)
print(f'===============创建文件夹成功=============')
self.file_address = file_folder
def _create_object_points(self):
"""创建标定板的3D点"""
objp = np.zeros((self.board_size[0] * self.board_size[1], 3), np.float32)
objp[:, :2] = np.mgrid[0:self.board_size[0], 0:self.board_size[1]].T.reshape(-1, 2) * self.square_size
return objp
def collect_data(self):
#移动到初始位置
pose = [(90) * (np.pi / 180), 4.23 * (np.pi / 180), 124.33 * (np.pi / 180), 61.56 * (np.pi / 180), -89.61 * (np.pi / 180), 0 * (np.pi / 180)]
code = self.arm.move_joint(pose,4,wait=True)
if code == -1:
print("运动到拍照位置失败")
#拍照
self.check_corners()
#运动到下一个位置
# 目前只设计了44条轨迹
for i in range (0,45):
next_pose = self.get_next_pose_ugly(i)
self.arm.move_joint(next_pose, 4,wait=True)
if code == -1:
print("运动到拍照位置失败")
else:
self.check_corners()
def check_corners(self):
rgb, depth, intrinsics = self.cam.get_latest_frame()
gray = cv2.cvtColor(rgb, cv2.COLOR_BGR2GRAY)
ret, corners = cv2.findChessboardCorners(gray, self.board_size, None)
if ret:
robot_pose = self.get_pose_homomat()
self.add_sample(gray,corners,robot_pose)
print(f"robot_pose = {robot_pose}")
pic_name = self.file_address + "/"+str(len(self.image_points)) + ".jpg"
print(f"save path as: {pic_name}")
# 绘制检测到的角点 (用于可视化,可选)
cv2.drawChessboardCorners(rgb, self.board_size, corners, ret)
cv2.imwrite(pic_name, rgb)
return True
return False
def add_sample(self, gray, corners,robot_pose):
"""
添加标定样本
image: 捕获的图像
robot_pose: 机械臂的位姿格式为 [x, y, z, roll, pitch, yaw]
"""
# 检测棋盘角点
self.obj_points.append(self.objp)
corners2 = cv2.cornerSubPix(gray, corners, (5, 5), (-1, -1), criteria) # 在原角点的基础上寻找亚像素角点
if [corners2]:
self.image_points.append(corners2)
else:
self.image_points.append(corners)
self.robot_poses.append(robot_pose)
def get_pose_homomat(self):
pose = self.arm.robot_rpc_client.getRobotInterface(self.arm.robot_name).getRobotState().getTcpPose()
x, y, z, rx, ry, rz = pose
r = R.from_euler('xyz', [rx, ry, rz])
rotation_matrix = r.as_matrix()
translation_vector = np.array([x, y, z]).reshape(3, 1)
homo_mat = np.eye(4) # 创建 4x4 单位矩阵
homo_mat[:3, :3] = rotation_matrix
homo_mat[:3, 3] = translation_vector.flatten()
return homo_mat
def camera_calib(self):
# print(f"\n\n\n\n\nsize is {self.size}")
ret, mtx, dist, rvecs, tvecs = cv2.calibrateCamera(self.obj_points, self.image_points, (self.size[1], self.size[0]), None, None)
# print("rvecs,tvecs:",rvecs,tvecs)
if ret:
print("内参矩阵:\n", mtx) # 内参数矩阵
print("畸变系数:\n", dist) # 畸变系数 distortion cofficients = (k_1,k_2,p_1,p_2,k_3)
print("++++++++++相机标定完成++++++++++++++")
# 从mtx中提取内参
fx = mtx[0, 0] # 焦距 fx
fy = mtx[1, 1] # 焦距 fy
cx = mtx[0, 2] # 主点 cx
cy = mtx[1, 2] # 主点 cy
# 从 distortion_array 提取畸变系数
k1 = dist[0][0] # 畸变系数 k1
k2 = dist[0][1] # 畸变系数 k2
p1 = dist[0][2] # 畸变系数 p1
p2 = dist[0][3] # 畸变系数 p2
k3 = dist[0][4] # 畸变系数 k3
# self.intrinsics = mtx
self.intrinsics = {
'fx': fx,
'fy': fy,
'cx': cx,
'cy': cy,
'distortion_coeffs': {
'k1': k1,
'k2': k2,
'p1': p1,
'p2': p2,
'k3': k3
}
}
self.distortion_cofficients = dist
else:
print("-----------相机标定失败--------------")
return rvecs, tvecs
def split_robot_pose(self):
"""
将包含 4x4 齐次矩阵的 robot_pose 列表拆分成 R_arm t_arm
Args:
robot_pose: 包含 4x4 齐次矩阵的 Python list
Returns:
R_arm: 旋转矩阵的 Python list, 每个元素是 3x3 numpy 数组
t_arm: 平移向量的 Python list, 每个元素是 3x1 numpy 数组
"""
R_arm = []
t_arm = []
for pose_matrix in self.robot_poses:
# # 1. 提取旋转矩阵 (3x3)
rotation_matrix = pose_matrix[:3, :3]
R_arm.append(rotation_matrix)
# R_arm.append(pose_matrix[:3])
# 2. 提取平移向量 (3x1)
translation_vector = pose_matrix[:3, 3] # 获取齐次矩阵的平移部分 (1x3 numpy 数组)
translation_vector = translation_vector.reshape(3, 1) # 转换为 3x1 的列向量
t_arm.append(translation_vector)
return R_arm, t_arm
def calibrate(self):
rvecs, tvecs = self.camera_calib()
r_arm, t_arm= self.split_robot_pose()
print(rvecs)
print(tvecs)
print(r_arm)
print(t_arm)
rvecs_array = np.array(rvecs)
tvecs_array = np.array(tvecs)
t_arma = np.array(t_arm)
r_arma = np.array(r_arm)
print(rvecs_array.shape)
print(tvecs_array.shape)
print(t_arma.shape)
print(r_arma.shape)
R, t = cv2.calibrateHandEye(r_arma, t_arma, rvecs_array, tvecs_array, cv2.CALIB_HAND_EYE_TSAI)
print("+++++++++++手眼标定完成+++++++++++++++")
self.arm.exit_task()
return R, t ,self.intrinsics
#得到图片以后 根据pose判断方向
def get_next_pose_future(self):
return None
def get_next_pose_ugly(self,index):
poselist =[
[1.5707963267948966, 0.07382742735936015, 2.16996785900455, 1.0744246875277093, -1.5639895427121187, 0.0],
[1.5707963267948966, 0.07382742735936015, 2.16996785900455, 1.1267845650875392, -1.5116296651522887, 0.0],
[1.5707963267948966, 0.07382742735936015, 2.16996785900455, 1.1791444426473692, -1.4592697875924587, 0.0],
[1.5707963267948966, 0.07382742735936015, 2.16996785900455, 1.0744246875277093, -1.5639895427121187, 0.0],
[1.5707963267948966, 0.07382742735936015, 2.16996785900455, 1.0220648099678793, -1.5116296651522887, 0.0],
[1.5707963267948966, 0.07382742735936015, 2.16996785900455, 0.9697049324080494, -1.4592697875924587, 0.0],
[1.5707963267948966, 0.07382742735936015, 2.16996785900455, 0.9173450548482196, -1.4069099100326288, 0.0],
[1.5707963267948966, 0.07382742735936015, 2.16996785900455, 1.0744246875277093, -1.5639895427121187, 0.0],
[1.5707963267948966, 0.07382742735936015, 2.16996785900455, 1.1267845650875392, -1.6163494202719486, 0.0],
[1.5707963267948966, 0.07382742735936015, 2.16996785900455, 1.1791444426473692, -1.6687092978317786, 0.0],
[1.5707963267948966, 0.07382742735936015, 2.16996785900455, 1.2315043202071991, -1.7210691753916085, 0.0],
[1.5707963267948966, 0.07382742735936015, 2.16996785900455, 1.0744246875277093, -1.5639895427121187, 0.0],
[1.5707963267948966, 0.07382742735936015, 2.16996785900455, 1.109331272567596, -1.4941763726323454, 0.0],
[1.5707963267948966, 0.07382742735936015, 2.16996785900455, 1.1442378576074825, -1.424363202552572, 0.0],
[1.5707963267948966, 0.07382742735936015, 2.16996785900455, 1.1791444426473692, -1.3545500324727988, 0.0],
[1.5707963267948966, 0.07382742735936015, 2.16996785900455, 1.2140510276872558, -1.2847368623930255, 0.0],
[1.5707963267948966, 0.07382742735936015, 2.16996785900455, 1.0744246875277093, -1.5639895427121187, 0.0],
[1.5707963267948966, 0.07382742735936015, 2.16996785900455, 1.109331272567596, -1.633802712791892, 0.0],
[1.5707963267948966, 0.07382742735936015, 2.16996785900455, 1.1442378576074825, -1.7036158828716652, 0.0],
[1.5707963267948966, 0.07382742735936015, 2.16996785900455, 1.0744246875277093, -1.5639895427121187, 0.0],
[1.5707963267948966, 0.07382742735936015, 2.16996785900455, 1.0395181024878226, -1.4941763726323454, 0.0],
[1.5707963267948966, 0.07382742735936015, 2.16996785900455, 1.004611517447936, -1.424363202552572, 0.0],
[1.5707963267948966, 0.07382742735936015, 2.16996785900455, 0.9697049324080493, -1.3545500324727988, 0.0],
[1.5707963267948966, 0.07382742735936015, 2.16996785900455, 0.9347983473681627, -1.2847368623930255, 0.0],
[1.5707963267948966, 0.07382742735936015, 2.16996785900455, 1.0744246875277093, -1.5639895427121187, 0.0],
[1.5707963267948966, 0.06510078109938851, 2.16996785900455, 1.0395181024878226, -1.4941763726323454, 0.0],
[1.5707963267948966, 0.05637413483941686, 2.16996785900455, 1.004611517447936, -1.424363202552572, 0.0],
[1.5707963267948966, 0.047647488579445216, 2.16996785900455, 0.9697049324080493, -1.3545500324727988, 0.0],
[1.5707963267948966, 0.07382742735936015, 2.16996785900455, 1.0744246875277093, -1.5639895427121187, 0.0],
[1.562069680534925, 0.07382742735936015, 2.16996785900455, 1.0395181024878226, -1.4941763726323454, 0.0],
[1.5533430342749535, 0.07382742735936015, 2.16996785900455, 1.004611517447936, -1.424363202552572, 0.0],
[1.544616388014982, 0.07382742735936015, 2.16996785900455, 0.9697049324080493, -1.3545500324727988, 0.0],
[1.5358897417550104, 0.07382742735936015, 2.16996785900455, 0.9347983473681627, -1.2847368623930255, 0.0],
[1.5271630954950388, 0.07382742735936015, 2.16996785900455, 0.8998917623282761, -1.2149236923132523, 0.0],
[1.5707963267948966, 0.07382742735936015, 2.16996785900455, 1.0744246875277093, -1.5639895427121187, 0.0],
[1.579522973054868, 0.07382742735936015, 2.16996785900455, 1.109331272567596, -1.5988961277520053, 0.0],
[1.5882496193148397, 0.07382742735936015, 2.16996785900455, 1.1442378576074825, -1.633802712791892, 0.0],
[1.5969762655748112, 0.07382742735936015, 2.16996785900455, 1.1791444426473692, -1.6687092978317786, 0.0],
[1.6057029118347828, 0.07382742735936015, 2.16996785900455, 1.2140510276872558, -1.7036158828716652, 0.0],
[1.6144295580947543, 0.07382742735936015, 2.16996785900455, 1.2489576127271425, -1.7385224679115518, 0.0],
[1.5707963267948966, 0.07382742735936015, 2.16996785900455, 1.0744246875277093, -1.5639895427121187, 0.0],
[1.579522973054868, 0.07382742735936015, 2.16996785900455, 1.1267845650875392, -1.581442835232062, 0.0],
[1.5882496193148397, 0.07382742735936015, 2.16996785900455, 1.1791444426473692, -1.5988961277520053, 0.0],
[1.5969762655748112, 0.07382742735936015, 2.16996785900455, 1.2315043202071991, -1.6163494202719486, 0.0],
[1.6057029118347828, 0.07382742735936015, 2.16996785900455, 1.283864197767029, -1.633802712791892, 0.0],
[1.6144295580947543, 0.07382742735936015, 2.16996785900455, 1.336224075326859, -1.6512560053118353, 0.0]
]
return poselist[index]
if __name__ == '__main__':
# 标定板的width 对应的角点的个数 6
# 标定板的height 对应的角点的个数 3
calibration_board_size = [6,3]
calibration_board_square_size = 0.03 #unit - meter
systemPath = "/home/jsfb/Documents/"
calib = Calibration(calibration_board_size,calibration_board_square_size,systemPath)
calib.collect_data()
R, t,intrinsics = calib.calibrate()
print("旋转矩阵:")
print(R)
print("平移向量:")
print(t)
print("内参:")
print(intrinsics)

View File

@ -94,13 +94,21 @@ class Identifier:
x y z : mm
r p y : deg
'''
pose_command = np.concatenate([[x,y,z], [roll,pitch,yaw]])
# pose_command = np.concatenate([[x,y,z]/1000, [roll,pitch,yaw]*np.pi/180])
pose_command = np.concatenate([
np.array([x, y, z]) / 1000.0, # mm ➝ m
np.array([roll, pitch, yaw]) * np.pi / 180.0 # deg ➝ rad
])
cur = self.arm.getAngel() # 接口调用: 获取机器人的关节位置
cur_str = "{" + ",".join(map(str, cur)) + "}"
# print("cur_str",cur_str)
target_J = self.arm.getInverseKin(poses_list=pose_command,useJointNear=1,JointNear=cur_str)
print(target_J)
self.arm.RunPoint_P_inJoint(target_J)
print("pose_command:",pose_command)
self.arm.RunPoint_P_inPose_M_RAD(pose = pose_command)
# target_J = self.arm.getInverseKin(poses_list=pose_command,useJointNear=1,JointNear=cur_str)
# print(target_J)
# self.arm.RunPoint_P_inJoint(target_J)
def identify_param_auto(self,ready_pose,cnt,sample_num = 5):
if cnt < 15:
@ -170,19 +178,20 @@ class Identifier:
if __name__ == '__main__':
arm = dobot_nova5("192.168.5.1")
arm.start()
arm = dobot_nova5()
# arm.start()
arm.setEndEffector(i=9,tool_i="{0.0,0.0,154.071,0.0,0.0,0.0}")
arm.chooseEndEffector(i=9)
arm.init()
sensor = XjcSensor("192.168.5.1", 60000)
sensor = XjcSensor()
sensor.connect()
sensor.disable_active_transmission()
sensor.disable_active_transmission()
sensor.disable_active_transmission()
atexit.register(sensor.disconnect)
# identifier = Identifier(arm=arm,sensor=sensor,config_path="/home/jsfb/jsfb/MassageRobot_Dobot/Massage/MassageControl/config/massage_head/finger_playload.yaml")
identifier = Identifier(arm=arm,sensor=sensor,config_path="/home/jsfb/jsfb_ws/global_config/massage_head/finger_playload.yaml")
ready_pose = [0.353467*1000,-0.1349880*1000,0.403604*1000,-180.0000,-30.0000,0.0042]
# ready_pose = [250.0, -135.0, 344.3392, -180.0, 0.0, -90.0]
# time.sleep(1)
# identifier.identify_param_auto(ready_pose,45)
time.sleep(1)
identifier.identify_param_auto(ready_pose,15)
arm.disableRobot()

View File

@ -0,0 +1,328 @@
# cython: language_level=3
import time
import csv
import atexit
from pathlib import Path
import sys
import ctypes
import threading
import pygame # 添加pygame导入
sys.path.append(str(Path(__file__).resolve().parent.parent))
from hardware.aubo_C5 import AuboC5
from hardware.force_sensor_aubo import XjcSensor
# 初始化libc
try:
libc = ctypes.CDLL("libc.so.6")
except Exception as e:
print(f"Failed to load libc: {e}")
libc = None
class JointTrajectoryPlayer:
def __init__(self, csv_path: str, arm: AuboC5 = None, sensor: XjcSensor = None):
"""
初始化轨迹回放器
:param csv_path: 轨迹CSV文件路径
:param arm: 可选机械臂控制器
:param sensor: 可选力传感器
"""
self.csv_path = csv_path
self.arm = arm
self.sensor = sensor
self.file = None
self.reader = None
self.header = None
self.keep_playing = True
self.play_thread = None
def __enter__(self):
"""上下文管理器入口,打开文件"""
self.file = open(self.csv_path, 'r')
self.reader = csv.reader(self.file)
self.header = next(self.reader) # 读取表头
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""上下文管理器退出,关闭文件"""
if self.file:
self.file.close()
def read_next_joint_angles(self):
"""
读取下一行关节角度
:return: (timestamp, [j1,j2,j3,j4,j5,j6]) None(文件结束)
"""
try:
row = next(self.reader)
timestamp = float(row[0])
angles = [float(x) for x in row[1:7]] # 读取6个关节角度
return timestamp, angles
except (StopIteration, ValueError, IndexError):
return None
def playback(self, speed_factor: float = 1.0):
"""
回放轨迹
:param speed_factor: 回放速度因子(1.0为原速)
"""
while self.keep_playing: # 外部可以通过设置keep_playing为False来停止播放
# 每次循环开始时重新打开文件和读取器
self.file = open(self.csv_path, 'r')
self.reader = csv.reader(self.file)
self.header = next(self.reader) # 读取表头
start_time = time.time()
first_timestamp = None
while self.keep_playing: # 内部循环,播放单次轨迹
data = self.read_next_joint_angles()
if data is None:
break
timestamp, angles = data
# 初始化第一个时间戳
if first_timestamp is None:
first_timestamp = timestamp
continue
usleep(15000)
# 执行运动
if self.arm:
self.arm.mc.servoJoint(angles, 0, 0, 0.15, 0.1, 200)
print(f"时间: {timestamp:.3f}s, 角度: {angles}")
# 关闭当前文件
self.file.close()
return 0
def start(self, speed_factor: float = 1.0):
"""启动轨迹播放,返回是否成功启动"""
if self.play_thread and self.play_thread.is_alive():
return False # 已经在运行中
self.keep_playing = True
self.play_thread = threading.Thread(target=self.playback, kwargs={'speed_factor': speed_factor})
self.play_thread.daemon = True
self.play_thread.start()
return True
def stop(self):
"""停止轨迹播放,返回是否成功停止"""
if not self.play_thread or not self.play_thread.is_alive():
return False # 未在运行或已停止
self.keep_playing = False
return True
class RobotDanceController:
"""机器人舞蹈控制器,单例模式"""
_instance = None
def __new__(cls, *args, **kwargs):
if cls._instance is None:
cls._instance = super(RobotDanceController, cls).__new__(cls)
cls._instance._initialized = False
return cls._instance
def __init__(self):
if self._initialized:
return
self.arm = None
self.player = None
self.is_running = False
self._initialized = True
self.music_playing = False
def initialize(self):
"""初始化机械臂和轨迹播放器"""
try:
# 初始化机械臂
self.arm = AuboC5()
self.arm.init()
# 设置CSV路径
script_dir = Path(__file__).resolve().parent
csv_path = script_dir / "joint_processed_0.025s_6.csv"
# 创建轨迹播放器
self.player = JointTrajectoryPlayer(str(csv_path), self.arm)
return True
except Exception as e:
print(f"初始化失败: {e}")
return False
def play_music(self):
"""播放背景音乐,从头开始,循环播放"""
try:
# 初始化pygame混音器
pygame.mixer.init()
# 获取音乐文件路径
script_dir = Path(__file__).resolve().parent
# music_path = script_dir / "demo_music.mp3"
music_path = script_dir / "demo_music1.mp3"
# 加载并播放音乐从头开始设置循环播放loops=-1表示无限循环
pygame.mixer.music.load(str(music_path))
pygame.mixer.music.play(loops=-1)
self.music_playing = True
print("背景音乐已开始播放(循环模式)")
return True
except Exception as e:
print(f"播放音乐失败: {e}")
return False
def stop_music(self):
"""停止背景音乐"""
if self.music_playing:
try:
pygame.mixer.music.stop()
pygame.mixer.quit()
self.music_playing = False
print("背景音乐已停止")
except Exception as e:
print(f"停止音乐失败: {e}")
def start_demo(self):
"""开始演示,返回是否成功启动"""
if self.is_running:
return False
# 如果未初始化,先初始化
if not self.arm or not self.player:
if not self.initialize():
return False
try:
# 移动到舞蹈准备姿势
self.arm.move_joint([1.571,0.578,2.35,0.205,1.569,0.0])
time.sleep(2)
# 先播放背景音乐
self.play_music()
# 创建一个线程等待14秒后启动机器人运动
def start_robot_movement_at_music_mark():
try:
start_time = time.time()
# wait_time = 8.0 # 8秒
wait_time = 4.0 # 4秒
print(f"等待音乐播放到{wait_time}秒...")
time.sleep(wait_time)
# 检查是否仍在运行状态(可能用户已经停止了演示)
if self.is_running:
print(f"音乐已播放{wait_time}秒,开始机器人运动")
# 启动轨迹播放器
# 启动伺服
self.arm.enable_servo()
time.sleep(1)
self.arm.enable_servo()
time.sleep(1)
self.arm.enable_servo()
time.sleep(1)
self.player.start()
except Exception as e:
print(f"启动机器人运动失败: {e}")
# 设置为正在运行状态,这样线程中可以检查状态
self.is_running = True
# 启动等待线程
threading.Thread(target=start_robot_movement_at_music_mark, daemon=True).start()
return True
except Exception as e:
# 发生错误时,确保停止音乐
self.stop_music()
self.is_running = False
print(f"启动演示失败: {e}")
return False
def stop_demo(self):
"""停止演示,返回是否成功停止"""
if not self.is_running:
return False
try:
# 停止背景音乐
self.stop_music()
# 停止播放器
if self.player:
self.player.stop()
# 关闭伺服
if self.arm:
self.arm.disable_servo()
time.sleep(1)
self.arm.disable_servo()
time.sleep(1)
self.arm.disable_servo()
time.sleep(1)
# 回到初始位置
self.arm.move_joint(self.arm.off_pos)
self.is_running = False
return True
except Exception as e:
print(f"停止演示失败: {e}")
# 即使发生异常,也将状态设为未运行
self.is_running = False
return False
def cleanup(self):
"""清理资源"""
try:
if self.is_running:
self.stop_demo()
else:
# 确保音乐停止
self.stop_music()
if self.arm:
self.arm.power_off()
return True
except Exception as e:
print(f"清理资源失败: {e}")
return False
def usleep(microseconds):
"""Linux 下的微秒级休眠(比 time.sleep 更精确)"""
if libc and hasattr(libc, 'usleep'):
libc.usleep(microseconds)
else:
# 如果libc不可用使用time.sleep作为备选
time.sleep(microseconds / 1000000.0)
if __name__ == '__main__':
# 作为独立脚本运行时提供简单的CLI接口
controller = RobotDanceController()
try:
print("开始机器人舞蹈演示...")
if controller.start_demo():
print("演示已开始按Ctrl+C停止...")
# 等待用户按Ctrl+C
while True:
time.sleep(1)
except KeyboardInterrupt:
print("\n接收到停止信号,正在停止演示...")
finally:
controller.stop_demo()
controller.cleanup()
print("演示已停止,资源已清理。")

Binary file not shown.

Before

Width:  |  Height:  |  Size: 376 KiB

After

Width:  |  Height:  |  Size: 366 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 376 KiB

After

Width:  |  Height:  |  Size: 366 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 376 KiB

After

Width:  |  Height:  |  Size: 366 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 376 KiB

After

Width:  |  Height:  |  Size: 366 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 376 KiB

After

Width:  |  Height:  |  Size: 366 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 376 KiB

After

Width:  |  Height:  |  Size: 366 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 2.0 MiB

After

Width:  |  Height:  |  Size: 1.8 MiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 455 KiB

After

Width:  |  Height:  |  Size: 417 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 112 KiB

After

Width:  |  Height:  |  Size: 106 KiB

View File

@ -64,8 +64,11 @@ class CoordinateTransformer:
self.camera_matrix = None
self.camera_trans = None
self.camera_matrix = np.array(self.vtxdb.get("robot_config","camera.camera_matrix"))
self.camera_trans = np.array(self.vtxdb.get("robot_config","camera.camera_trans"))
self.camera_matrix = np.array([[0.9996735,-0.0251487,-0.00260522],[-0.02535628,-0.9994429,0.02170123],[-0.00315538,-0.02162809,-0.99976111]])
self.camera_trans = np.array([-0.00767654,-0.00675844,-0.1762473])
# self.camera_matrix = np.array(self.vtxdb.get("robot_config","camera.camera_matrix"))
# self.camera_trans = np.array(self.vtxdb.get("robot_config","camera.camera_trans"))
camera_offset = self.vtxdb.get("robot_config","camera.camera_offset")
if camera_offset:
@ -342,12 +345,14 @@ class CoordinateTransformer:
camera_coordinate = np.array(camera_coordinate)
# 将相机坐标系转换到机械臂末端坐标系
arm_coordinate = [0, 0, 0]
if self.camera_matrix is not None and self.camera_trans is not None:
camera_matrix = copy.deepcopy(self.camera_matrix)
camera_trans = copy.deepcopy(self.camera_trans)
else:
camera_matrix = np.array([[-0.04169144,0.99888175,-0.02229508],[0.99912694,0.04174075,0.00175093],[0.00267958,-0.02220262,-0.9997499]])
camera_trans = np.array([-0.00209053,-0.01021561,-0.16877656])
camera_matrix = copy.deepcopy(self.camera_matrix)
camera_trans = copy.deepcopy(self.camera_trans)
# if self.camera_matrix is not None and self.camera_trans is not None:
# camera_matrix = copy.deepcopy(self.camera_matrix)
# camera_trans = copy.deepcopy(self.camera_trans)
# else:
# camera_matrix = np.array([[-0.04169144,0.99888175,-0.02229508],[0.99912694,0.04174075,0.00175093],[0.00267958,-0.02220262,-0.9997499]])
# camera_trans = np.array([-0.00209053,-0.01021561,-0.16877656])
arm_coordinate = camera_matrix @ (camera_coordinate/1000) + camera_trans
# 将机械臂末端坐标系转为机械臂基座坐标系
targetPosition = R.from_quat(self.quaternion).as_matrix() @ arm_coordinate + self.position

Binary file not shown.

After

Width:  |  Height:  |  Size: 366 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 366 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 366 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 366 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 366 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.8 MiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 417 KiB