力控调试第一阶段结束,需要针对越疆底层进行消息队列/验证的开发

This commit is contained in:
Ziwei.He 2025-05-15 16:39:33 +08:00
parent c42a4000ab
commit 2064831435
22 changed files with 299 additions and 130 deletions

View File

@ -14,6 +14,9 @@ import threading
import time
import os
from scipy.spatial.transform import Rotation as R
import ctypes
libc = ctypes.CDLL("libc.so.6")
import atexit
class MassageRobot:
def __init__(self,arm_config_path,is_log=False):
self.logger = CustomLogger(
@ -30,9 +33,11 @@ class MassageRobot:
self.arm_state = ArmState()
self.arm_config = read_yaml(arm_config_path)
# arm 实例化时机械臂类内部进行通讯连接
self.arm = dobot_nova5(arm_ip=self.arm_config['arm_ip'])
self.force_sensor = XjcSensor(host=self.arm_config['arm_ip'],port=60000)
self.arm = dobot_nova5(self.arm_config['arm_ip'])
self.arm.start()
# 传感器
self.force_sensor = XjcSensor(self.arm_config['arm_ip'],60000)
self.force_sensor.connect()
# 控制器初始化(初始化为导纳控制)
self.controller_manager = ControllerManager(self.arm_state)
self.controller_manager.add_controller(AdmittanceController,'admittance',self.arm_config['controller'][0])
@ -49,6 +54,16 @@ class MassageRobot:
self.playload_dict[play_load['name']] = play_load
self.current_head = 'none'
# 读取数据
self.gravity_base = None
self.force_zero = None
self.torque_zero = None
self.tool_position = None
self.mass_center_position = None
self.s_tf_matrix_t = None
arm_orientation_set0 = np.array([-180,0,-90])
self.b_rotation_s_set0 = R.from_euler('xyz',arm_orientation_set0,degrees=True).as_matrix()
# 频率数据赋值
self.control_rate = Rate(self.arm_config['control_rate'])
self.sensor_rate = Rate(self.arm_config['sensor_rate'])
@ -94,7 +109,6 @@ class MassageRobot:
self.R_tcp = np.eye(6) * 0.1
# 传感器故障计数器
self.sensor_fail_count = 0
# 机械臂初始化,适配中间层
self.arm.init()
@ -117,13 +131,6 @@ class MassageRobot:
return x_update, P_update
def update_wrench(self):
compensation_config = self.playload_dict[self.current_head]
# 读取数据
gravity_base = np.array(compensation_config['gravity_base'])
force_zero = np.array(compensation_config['force_zero'])
torque_zero = np.array(compensation_config['torque_zero'])
tool_position = np.array(compensation_config['tcp_offset'])
mass_center_position = np.array(compensation_config['mass_center_position'])
# 当前的机械臂到末端转换 (实时)
b_rotation_s = R.from_quat(self.arm_state.arm_orientation).as_matrix()
# 读取数据
@ -132,31 +139,34 @@ class MassageRobot:
self.force_sensor.stop_background_reading()
self.logger.log_error("传感器数据读取失败")
return -1
# 反重力补偿
sensor_data[:3] = sensor_data[:3] + self.force_zero + self.b_rotation_s_set0.T @ self.gravity_base
sensor_data[3:] = sensor_data[3:] + self.torque_zero + np.cross(self.mass_center_position, self.b_rotation_s_set0.T @ self.gravity_base)
# 传感器数据通过矫正计算得到外来施加力 传感器坐标系下
# 重力和零位
gravity_in_sensor = b_rotation_s.T @ gravity_base
s_force = sensor_data[:3] - force_zero - gravity_in_sensor
# 重力
gravity_in_sensor = b_rotation_s.T @ self.gravity_base
s_force = sensor_data[:3] - self.force_zero - gravity_in_sensor
# 力矩
s_torque = sensor_data[3:] - torque_zero - np.cross(mass_center_position,gravity_in_sensor)
s_torque = sensor_data[3:] - self.torque_zero - np.cross(self.mass_center_position,gravity_in_sensor)
wrench = np.concatenate([s_force,s_torque])
# 传感器工具转换
s_tf_matrix_t = self.get_tf_matrix(tool_position[:3], R.from_euler('xyz',tool_position[3:],degrees=False).as_quat())
# 传感器到TCP
wrench = self.wrench_coordinate_conversion(s_tf_matrix_t,wrench)
wrench = self.wrench_coordinate_conversion(self.s_tf_matrix_t,wrench)
# 交给ARM STATE集中管理
self.arm_state.external_wrench_tcp = wrench
self.arm_state.external_wrench_base = np.concatenate([b_rotation_s @ self.arm_state.external_wrench_tcp[:3],
b_rotation_s @ self.arm_state.external_wrench_tcp[3:]])
# self.arm_state.external_wrench_base = np.concatenate([b_rotation_s @ self.arm_state.external_wrench_tcp[:3],
# b_rotation_s @ self.arm_state.external_wrench_tcp[3:]])
# 卡尔曼滤波
x_base_predict, P_base_predict = self.kalman_predict(x = self.x_base,
P = self.P_base,
Q = self.Q_base)
self.x_base, self.P_base = self.kalman_update(x_predict = x_base_predict,
P_predict = P_base_predict,
z = self.arm_state.external_wrench_base,
R = self.R_base)
self.arm_state.external_wrench_base = self.x_base
self.arm_state.last_external_wrench_base = self.arm_state.external_wrench_base
# x_base_predict, P_base_predict = self.kalman_predict(x = self.x_base,
# P = self.P_base,
# Q = self.Q_base)
# self.x_base, self.P_base = self.kalman_update(x_predict = x_base_predict,
# P_predict = P_base_predict,
# z = self.arm_state.external_wrench_base,
# R = self.R_base)
# self.arm_state.external_wrench_base = self.x_base
# self.arm_state.last_external_wrench_base = self.arm_state.external_wrench_base
# 对tcp坐标系下的外力外矩进行平滑
x_tcp_predict, P_tcp_predict = self.kalman_predict(x = self.x_tcp,
P = self.P_tcp,
@ -174,9 +184,11 @@ class MassageRobot:
while (not self.arm.is_exit) and (not self.exit_event.is_set()):
try:
if not self.is_waitting:
self.arm_state.arm_position,
self.arm_state.arm_orientation = self.arm.get_arm_position()
code = self.update_wrench()
self.arm_state.arm_position,self.arm_state.arm_orientation = self.arm.get_arm_position()
code = self.update_wrench()
self.logger.log_blue(f"measure position: {self.arm_state.arm_position}")
self.logger.log_blue(f"measure orientation: {self.arm_state.arm_orientation}")
self.logger.log_blue(f"measure update wrench:{self.arm_state.external_wrench_tcp}")
if code == -1:
self.sensor_fail_count += 1
self.logger.log_error(f"传感器线程数据读取失败-{self.sensor_fail_count}")
@ -193,13 +205,34 @@ class MassageRobot:
def arm_command_loop(self):
self.logger.log_info("机械臂控制线程启动")
self.arm.filter_matirx = np.zeros((8,7)) # 滑动窗口
while (not self.arm.is_exit) and (not self.exit_event.is_set()):
try:
if not self.is_waitting:
process_start_time = time.time()
self.controller_manager.step(self.control_rate.to_sec())
self.last_command_time += 1
code = self.arm.ServoPose(self.arm_state.arm_position,
self.arm_state.arm_orientation_command)
status, joints_processed = self.arm.process_command(self.arm_state.arm_position_command,
self.arm_state.arm_orientation_command)
self.logger.log_blue(f"处理后的关节角度: {joints_processed}")
run_time = time.time() - process_start_time
sleep_duration = self.control_rate.to_sec() - run_time
delta_command = np.linalg.norm(self.arm_state.arm_position_command - self.arm_state.arm_position * 1000)
self.logger.log_blue(f"期望位置:{self.arm_state.desired_position}")
self.logger.log_blue(f"现在的位置:{self.arm_state.arm_position}")
self.logger.log_blue(f"指令位置:{self.arm_state.arm_position_command}")
self.logger.log_blue(f"位置变化量: {delta_command}")
if delta_command > 20: # 20 mm
self.logger.log_error("机械臂位置变化过大,可能是数据异常")
break
if sleep_duration > 0:
libc.usleep(int(sleep_duration * 1e6))
if status == 0:
# self.logger.log_blue("不进行伺服运动,只计算")
self.logger.log_info("进行伺服运动")
code = self.arm.ServoJoint(joints_processed,t=0.008,aheadtime=20,gain=200)
else:
self.logger.log_error("滑动窗口数据处理失败")
self.exit_event.set()
if code == -1:
self.logger.log_error("机械臂急停")
# self.stop() # 底层已经做了急停处理
@ -207,20 +240,39 @@ class MassageRobot:
except Exception as e:
self.logger.log_error(f"机械臂控制失败:{e}")
self.exit_event.set()
self.control_rate.precise_sleep()
def start(self):
if self.exit_event.is_set():
# 线程
self.exit_event.clear()
self.arm_measure_thread = threading.Thread(target=self.arm_measure_loop)
self.arm_control_thread = threading.Thread(target=self.arm_command_loop)
self.arm_measure_thread.start()
# 线程启动
self.arm_measure_thread.start() ## 测量线程
position,quat_rot = self.arm.get_arm_position()
# 初始值
self.arm_state.desired_position = position
self.arm_state.arm_position_command = position
self.arm_state.desired_orientation = quat_rot
self.arm_state.arm_orientation_command = quat_rot
self.arm_state.arm_position = position
self.arm_state.arm_orientation = quat_rot
for i in range(20):
self.controller_manager.step(self.control_rate.to_sec())
self.logger.log_blue(f"position command: {self.arm_state.arm_position_command}")
self.logger.log_blue(f"orientation command: {R.from_quat(self.arm_state.arm_orientation_command).as_euler('xyz',degrees=True)}")
position, quat_rot = self.arm.get_arm_position()
self.logger.log_blue(f"position current:{position}")
self.logger.log_blue(f"orientation current: {R.from_quat(quat_rot).as_euler('xyz',degrees=False)}")
self.command_rate.precise_sleep()
poistion ,quat_rot = self.arm.get_arm_position()
self.arm_state.desired_position = poistion
self.arm_state.arm_position_command = poistion
self.arm_state.desired_orientation = quat_rot
self.arm_state.arm_orientation_command = quat_rot
# 线程启动
self.arm_control_thread.start() ## 赋初始值后控制线程
self.logger.log_info("MassageRobot启动")
time.sleep(0.5)
@ -239,27 +291,31 @@ class MassageRobot:
def init_hardwares(self,ready_pose):
self.ready_pose = np.array(ready_pose)
self.switch_payload(self.current_head)
self.arm_state.desired_orientation = self.ready_pose[:3]
euler_angles = self.ready_pose
self.arm_state.desired_orientation = R.from_euler('xyz',euler_angles).as_quat()
print(self.arm.get_arm_position())
time.sleep(0.5)
def switch_payload(self,name):
if name in self.playload_dict:
self.stop()
self.current_head = name
compensation_config = self.playload_dict[self.current_head]
# 读取数据
self.gravity_base = np.array(compensation_config['gravity_base'])
self.force_zero = np.array(compensation_config['force_zero'])
self.torque_zero = np.array(compensation_config['torque_zero'])
self.tool_position = np.array(compensation_config['tcp_offset'])
self.mass_center_position = np.array(compensation_config['mass_center_position'])
self.s_tf_matrix_t = self.get_tf_matrix(self.tool_position[:3], R.from_euler('xyz',self.tool_position[3:],degrees=False).as_quat())
tcp_offset = self.playload_dict[name]["tcp_offset"]
tcp_offset_str = "{" + ",".join(map(str, tcp_offset)) + "}"
print(tcp_offset_str)
self.arm.setEndEffector(i=1,tool_i=tcp_offset_str)
self.arm.chooseEndEffector(i=1)
print("tcp_offset_str",tcp_offset_str)
self.arm.setEndEffector(i=9,tool_i=tcp_offset_str)
self.arm.chooseEndEffector(i=9)
print(self.arm.get_arm_position())
self.logger.log_info(f"切换到{name}按摩头")
R_matrix = R.from_euler('xyz',self.ready_pose[3:] ,degrees=False).as_matrix()
ready_position = self.ready_pose[:3] + R_matrix @ self.playload_dict[self.current_head]['tcp_offset'][:3]
ready_orientation = R_matrix @ R.from_euler('xyz',self.playload_dict[self.current_head]['tcp_offset'][3:],degrees=False).as_matrix()
ready_orientation_euler = R.from_matrix(ready_orientation).as_euler('xyz',degrees=False)
self.arm_state.desired_position = ready_position
self.arm_state.desired_orientation = R.from_euler('xyz',ready_orientation_euler,degrees=False).as_quat()
self.controller_manager.switch_controller('position')
else:
self.logger.log_error(f"未找到{name}按摩头")
@ -274,13 +330,13 @@ class MassageRobot:
time.sleep(1)
# 工具函数
def get_tf_matrix(position,orientation):
def get_tf_matrix(self,position,orientation):
tf_matrix = np.eye(4)
rotation_matrix = R.from_quat(orientation).as_matrix()
tf_matrix[:3,3] = position
tf_matrix[:3,:3] = rotation_matrix
return tf_matrix
def wrench_coordinate_conversion(tf_matrix, wrench):
def wrench_coordinate_conversion(self,tf_matrix, wrench):
rot_matrix = tf_matrix[:3, :3]
vector_p = tf_matrix[:3, 3]
temp_force = wrench[:3]
@ -290,4 +346,33 @@ class MassageRobot:
return np.concatenate([force, torque])
if __name__ == "__main__":
robot = MassageRobot(arm_config_path="MassageControl/config/robot_config.yaml")
robot = MassageRobot(arm_config_path="/home/jsfb/jsfb/MassageRobot_Dobot/Massage/MassageControl/config/robot_config.yaml")
ready_pose = [250.0, -135.0, 344.3392, -180.0, 0.0, -90.0]
robot.current_head = 'finger_head'
robot.force_sensor.disable_active_transmission()
time.sleep(0.5)
robot.force_sensor.disable_active_transmission()
time.sleep(0.5)
robot.force_sensor.disable_active_transmission()
time.sleep(0.5)
robot.force_sensor.set_zero()
time.sleep(0.5)
robot.force_sensor.enable_active_transmission()
robot.init_hardwares(ready_pose)
time.sleep(3)
robot.controller_manager.switch_controller('admittance')
atexit.register(robot.force_sensor.disconnect)
robot.arm_state.desired_wrench = np.array([0,0,-0.5,0,0,0])
try:
robot_thread = threading.Thread(target=robot.start)
robot_thread.start()
except KeyboardInterrupt:
print("用户中断操作。")
except Exception as e:
print("Exception occurred at line:", e.__traceback__.tb_lineno)
print("发生异常:", e)
# robot.arm.disableRobot()

View File

@ -40,13 +40,11 @@ class AdmittanceController(BaseController):
def step(self,dt):
# 计算误差 位置直接作差,姿态误差以旋转向量表示
temp_pose_error = self.state.arm_position - self.state.desired_position
# if time.time() - self.laset_print_time > 5:
# print(f'temp_pose_error: {temp_pose_error} ||| arm_position: {self.state.arm_position} ||| desired_position: {self.state.desired_position}')
if self.state.desired_orientation.dot(self.state.arm_orientation) < 0:
self.state.arm_orientation = -self.state.arm_orientation
self.state.pose_error[:3] = R.from_quat(self.state.arm_orientation).as_matrix().T @ temp_pose_error
# if time.time() - self.laset_print_time > 5:
# print("pose_error:",self.state.pose_error[:3])
@ -97,16 +95,17 @@ class AdmittanceController(BaseController):
# self.state.arm_orientation_command = R.from_matrix(arm_ori_mat).as_quat()
self.state.arm_position_command = self.state.arm_position + delta_pose[:3]
# if time.time() - self.laset_print_time > 0.1:
# print("-------------admittance_1-------------------------------")
# print("arm_position:",self.state.arm_position)
# print("desired_position:",self.state.desired_position)
# print("arm_orientation",R.from_quat(self.state.arm_orientation).as_euler('xyz',degrees=True))
# print("desired_orientation",R.from_quat(self.state.desired_orientation).as_euler('xyz',degrees=True))
# print("arm_position_command",self.state.arm_position_command)
# print("arm_orientation_command",R.from_quat(self.state.arm_orientation_command).as_euler('xyz',degrees=True))
# print("delta_pose:",delta_pose)
# self.laset_print_time = time.time()
if time.time() - self.laset_print_time > 0.1:
print("-------------admittance_1-------------------------------")
print("arm_position:",self.state.arm_position)
print("desired_position:",self.state.desired_position)
print("arm_orientation",R.from_quat(self.state.arm_orientation).as_euler('xyz',degrees=True))
print("desired_orientation",R.from_quat(self.state.desired_orientation).as_euler('xyz',degrees=True))
print("arm_position_command",self.state.arm_position_command)
print("arm_orientation_command",R.from_quat(self.state.arm_orientation_command).as_euler('xyz',degrees=True))
print("delta_pose:",delta_pose)
print("delta_ori_mat",delta_ori_mat)
self.laset_print_time = time.time()

View File

@ -0,0 +1,28 @@
name: 'finger_head'
sensor_mass: 0.334
tool_type: Cylinder
tcp_offset:
- 0.0
- 0.0
- 154.071
- 0.0
- 0.0
- 0.0
tool_radius: 0.04
tool_mass: 0.57
mass_center_position:
- 0.0023247590120227682
- -0.0015892432698610681
- 0.04139459803098493
force_zero:
- 0.07030932685649408
- -0.329927477170834
- 0.71291678232261
torque_zero:
- 0.024149745145239786
- -0.015198458671894979
- -0.014812358735378579
gravity_base:
- -0.17234130565475442
- -0.0796927884554296
- -6.980274002014547

View File

@ -4,25 +4,25 @@ tool_type: Cylinder
tcp_offset:
- 0
- 0
- 0.031
- 0.028
- 0
- 0
- 0
tool_radius: 0.04
tool_mass: 0
mass_center_position:
- 0.021919029914178913
- -0.010820480799427892
- 0.011034252651402962
- 0.004926946239200975
- -0.008545471637774995
- 0.014413364622196482
force_zero:
- 0.8846671183185211
- -0.6473878547983709
- -2.1312346218888862
- -1.8509176581629614
- 0.9174475966495169
- 0.8638814839798294
torque_zero:
- 0.017893715524241308
- 0.04546799757174578
- -0.029532236049108707
- -0.049990966029271326
- 0.03568758430423695
- -0.046932565285401684
gravity_base:
- 0.9041730658541057
- 1.6570854791729466
- -1.8745612276068087
- -0.2705897753073113
- 0.1628143673462279
- -0.10555453347781811

View File

@ -0,0 +1,9 @@
name: position
Kp: [70,70,70,7,7,7]
Ki: [0, 0, 0, 0, 0, 0]
Kd: [5, 5, 5, 1, 1, 1]
# Kp: [60,60,60,6,6,6]
# Ki: [10, 10, 10, 1, 1, 1]
# Kd: [10, 10, 10,4,4,4]

View File

@ -2,11 +2,12 @@
arm_ip: '192.168.5.1'
# controller
controller: ['Massage/MassageControl/config/admittance.yaml']
controller: ['Massage/MassageControl/config/admittance.yaml',
'Massage/MassageControl/config/position.yaml']
# massage heads diretory
massage_head_dir: 'Massage/MassageControl/config/massage_head'
control_rate: 50
sensor_rate: 100
control_rate: 120
sensor_rate: 120
command_rate: 120

View File

@ -860,7 +860,7 @@ class DobotApiDashboard(DobotApi):
if useJointNear != -1:
params.append('useJointNear={:d}'.format(useJointNear))
if JointNear != '':
params.append('JointNear={:s}'.format(JointNear))
params.append('jointNear={:s}'.format(JointNear))
for ii in params:
string = string + ','+ii
string = string + ')'

View File

@ -1,9 +1,14 @@
from dobot_api import DobotApiFeedBack,DobotApiDashboard
try:
from dobot_api import DobotApiFeedBack,DobotApiDashboard
except:
from hardware.dobot_api import DobotApiFeedBack,DobotApiDashboard
import threading
from time import sleep,time
import re
import copy
import numpy as np
from scipy.spatial.transform import Rotation as R
import math
'''
MODE DESCRIPTION CN
@ -79,6 +84,7 @@ class dobot_nova5:
def start(self):
self.dashboard = DobotApiDashboard(self.ip,self.dashboardPort)
self.feedFour = DobotApiFeedBack(self.ip,self.feedFourPort)
self.dashboard.EmergencyStop(mode=0) # 松开急停
self.clearError() # 清除报警
if self.parseResultId(self.dashboard.EnableRobot())[0] != 0:
print("使能失败:检查29999端口是否被占用")
@ -118,7 +124,7 @@ class dobot_nova5:
if self.feedbackData.robotMode == 10: # 10 当前工程暂停
print("机械臂为暂停状态")
res = self.dashboard.Continue()
code = int(res[0])
code = self.parseResultId(res)[0]
if code == 0:
print("机械臂继续已暂停的运动指令")
if self.feedbackData.robotMode == 5 and self.feedbackData.robotCurrentCommandID == currentCommandID:
@ -149,12 +155,12 @@ class dobot_nova5:
currentCommandID = self.parseResultId(self.dashboard.GetCurrentCommandID())[1]
if self.feedbackData.robotMode == 9 or self.feedbackData.robotMode == 11:
self.dashboard.Stop()
self.dashboard.EmergencyStop()
self.dashboard.EmergencyStop(mode=1)
print("当前工作停止,机械臂处于急停状态")
if self.feedbackData.robotMode == 10: # 10 当前工程暂停
print("机械臂为暂停状态")
res = self.dashboard.Continue()
code = int(res[0])
code = self.parseResultId(res)[0]
if code == 0:
print("机械臂继续已暂停的运动指令")
if self.feedbackData.robotMode == 5 and self.feedbackData.robotCurrentCommandID == currentCommandID:
@ -183,7 +189,7 @@ class dobot_nova5:
if self.feedbackData.robotMode == 10: # 10 当前工程暂停
print("机械臂为暂停状态")
res = self.dashboard.Continue()
code = int(res[0])
code = self.parseResultId(res)[0]
if code == 0:
print("机械臂继续已暂停的运动指令")
if self.feedbackData.robotMode == 5 and self.feedbackData.robotCurrentCommandID == currentCommandID:
@ -212,7 +218,7 @@ class dobot_nova5:
if self.feedbackData.robotMode == 10: # 10 当前工程暂停
print("机械臂为暂停状态")
res = self.dashboard.Continue()
code = int(res[0])
code = self.parseResultId(res)[0]
if code == 0:
print("机械臂继续已暂停的运动指令")
if self.feedbackData.robotMode == 5 and self.feedbackData.robotCurrentCommandID == currentCommandID:
@ -233,7 +239,7 @@ class dobot_nova5:
if self.feedbackData.robotMode == 10: # 10 当前工程暂停
print("机械臂为暂停状态")
res = self.dashboard.Continue()
code = int(res[0])
code = self.parseResultId(res)[0]
if code == 0:
print("机械臂继续已暂停的运动指令")
return 0
@ -250,7 +256,7 @@ class dobot_nova5:
if self.feedbackData.robotMode == 10: # 10 当前工程暂停
print("机械臂为暂停状态")
res = self.dashboard.Continue()
code = int(res[0])
code = self.parseResultId(res)[0]
if code == 0:
print("机械臂继续已暂停的运动指令")
@ -268,7 +274,8 @@ class dobot_nova5:
return
res = self.dashboard.SetTool(i,tool_i)
code = int(res[0])
print(res)
code = self.parseResultId(res)[0]
if code == 0:
print(f"设置工具坐标系{i}的偏移量成功")
else:
@ -283,7 +290,8 @@ class dobot_nova5:
print("工具坐标系序号超出范围")
return
res = self.dashboard.Tool(i)
code = int(res[0])
print(res)
code = self.parseResultId(res)[0]
if code == 0:
print(f"切换为工具坐标系{i}成功")
else:
@ -304,7 +312,7 @@ class dobot_nova5:
return
res = self.dashboard.SetUser(i,base_i)
code = int(res[0])
code = self.parseResultId(res)[0]
if code == 0:
print(f"设置用户坐标系{i}成功")
else:
@ -319,7 +327,7 @@ class dobot_nova5:
print("工具坐标系序号超出范围")
return
res = self.dashboard.User(i)
code = int(res[0])
code = self.parseResultId(res)[0]
if code == 0:
print(f"切换为用户基坐标系{i}成功")
else:
@ -330,7 +338,8 @@ class dobot_nova5:
if (user == -1) != (tool == -1): ## XOR 必须同时设置或者同时不设置
print("必须同时设置或同时不设置坐标系参数")
res = self.dashboard.GetPose(user,tool) # 返回字符串
code = int(res[0]) # 拿到的是字符串
# print("get pose",res)
code = self.parseResultId(res)[0]
if code == 0:
pass
# print("success")
@ -346,7 +355,7 @@ class dobot_nova5:
def getAngel(self):
res = self.dashboard.GetAngle()
code = int(res[0])
code = self.parseResultId(res)[0]
if code == 0:
pass
# print("success")
@ -357,12 +366,13 @@ class dobot_nova5:
start = res.find("{")+1
end = res.find("}")
angle_str = res[start:end]
print("res:",res)
angle = [float(x.strip()) for x in angle_str.split(",")]
return angle
def getForwardKin(self,joints_list,user=-1,tool=-1):
res = self.dashboard.PositiveKin(*joints_list,user,tool)
code = int(res[0])
code = self.parseResultId(res)[0]
if code == 0:
pass
# print("success")
@ -381,10 +391,11 @@ class dobot_nova5:
poses_list X Y Z RX RY RZ
useJointNear 0 或不携带 表示 JointNear无效
1 表示根据JointNear就近选解
jointNear string "{j1,j2,j3,j4,j5,j6}"
jointNear string "jointNear={j1,j2,j3,j4,j5,j6}"
'''
res = self.dashboard.InverseKin(*poses_list,user,tool,useJointNear,JointNear)
code = int(res[0])
print(res)
code = self.parseResultId(res)[0]
if code == 0:
pass
# print("success")
@ -400,7 +411,7 @@ class dobot_nova5:
def disableRobot(self):
res = self.dashboard.DisableRobot()
code = int(res[0])
code = self.parseResultId(res)[0]
if code == 0:
print("下使能机械臂成功")
else:
@ -409,7 +420,7 @@ class dobot_nova5:
def clearError(self):
res = self.dashboard.ClearError()
code = int(res[0])
code = self.parseResultId(res)[0]
if code == 0:
print("清楚报警成功")
else:
@ -418,7 +429,7 @@ class dobot_nova5:
def start_drag(self):
res = self.dashboard.StartDrag()
code = int(res[0])
code = self.parseResultId(res)[0]
if code == 0:
print("拖拽模式开启成功")
else:
@ -427,7 +438,7 @@ class dobot_nova5:
def stop_drag(self):
res = self.dashboard.StopDrag()
code = int(res[0])
code = self.parseResultId(res)[0]
if code == 0:
print("拖拽模式关闭成功")
else:
@ -436,7 +447,7 @@ class dobot_nova5:
def stop_motion(self):
res = self.dashboard.Stop()
code = int(res[0])
code = self.parseResultId(res)[0]
if code == 0:
print("机械臂停止下发运动指令队列")
else:
@ -457,6 +468,7 @@ class dobot_nova5:
self.last_input_command = None
self.tcp_offset = None
self.init_J = [0,30,-120,0,90,0]
self.filter_matirx = np.zeros((8,7))
sleep(1)
self.is_standby = False
code = self.RunPoint_P_inJoint(self.init_J)
@ -470,24 +482,55 @@ class dobot_nova5:
def get_arm_position(self):
pose = self.getPose()
x,y,z,rx,ry,rz = pose # x y z in mm; rx ry rz in degree
x /= 1000
y /= 1000
z /= 1000
pos = np.array([x,y,z])
quat_rot = R.from_euler('xyz', [rx, ry, rz], degrees=True).as_quat()
return pos,quat_rot
def process_command(self,arm_position_command, arm_orientation_command):
rot_euler = R.from_quat(arm_orientation_command).as_euler('xyz',degrees=True)
rot_euler = np.clip(rot_euler,-180,180)
arm_position_command *= 1000
pose_command = np.concatenate([arm_position_command, rot_euler])
print("pose_command",pose_command)
if (not self.is_exit) and (not self.is_standby):
cur = self.getAngel() # 参考关节角度
target = self.getInverseKin(pose_command,-1,-1,1,"{" + ",".join(map(str, cur)) + "}")
# print("target",target)
self.joint_current = copy.deepcopy(cur)
self.joint_target = target
ress = np.array(target)
ress = np.append(ress,[1])
self.filter_matirx = np.append(self.filter_matirx,[ress],axis=0)
self.filter_matirx = np.delete(self.filter_matirx,0,axis=0)
sum_joint = np.sum(self.filter_matirx,axis=0)
sum_joint[:6] = sum_joint[:6]/sum_joint[6]
result = sum_joint[:6]
return 0, result
return -1, None
if __name__ == "__main__":
# sleep(5)
dobot = dobot_nova5("192.168.5.1")
dobot.start()
posJ = [0,30,-120,0,90,0]
dobot.RunPoint_P_inJoint(posJ)
sleep(1)
dobot.setEndEffector(i=9,tool_i="{0.0,0.0,154.071,0.0,0.0,0.0}")
dobot.chooseEndEffector(i=9)
print(dobot.getPose())
# pos,rot_quat = dobot.get_arm_position()
# print("pos",pos)
# print("rot_quat",rot_quat)
for i in range(400):
posJ[2] += 0.04
dobot.ServoJoint(posJ,t=0.008,aheadtime=20,gain=200)
sleep(0.007)
sleep(0.5)
# for i in range(400):
# posJ[2] += 0.04
# dobot.ServoJoint(posJ,t=0.008,aheadtime=20,gain=200)
# sleep(0.007)
dobot.disableRobot()

View File

@ -28,9 +28,6 @@ class XjcSensor:
self._stop_background = False
self._last_reading = None
self._reading_lock = threading.Lock()
# 建立 TCP 连接
self.connect()
# 注册退出时的清理函数
atexit.register(self.disconnect)
@ -261,7 +258,7 @@ class XjcSensor:
return (crc >> 8) & 0xFF, crc & 0xFF
def __del__(self):
"""析构时自动断开连接"""
"""析构时断开连接"""
self.disconnect()
def disable_active_transmission(self) -> int:
@ -544,7 +541,7 @@ def save_intervals_to_csv(timestamps, filename="sensor_intervals.csv"):
if __name__ == "__main__":
# 替换为你的传感器 IP 和端口
sensor = XjcSensor("192.168.5.1", 60000)
sensor.connect()
# 示例操作
sensor.disable_active_transmission()

View File

@ -97,6 +97,7 @@ class Identifier:
pose_command = np.concatenate([[x,y,z], [roll,pitch,yaw]])
cur = self.arm.getAngel() # 接口调用: 获取机器人的关节位置
cur_str = "{" + ",".join(map(str, cur)) + "}"
# print("cur_str",cur_str)
target_J = self.arm.getInverseKin(poses_list=pose_command,useJointNear=1,JointNear=cur_str)
print(target_J)
self.arm.RunPoint_P_inJoint(target_J)
@ -111,7 +112,7 @@ class Identifier:
roll=desired_cart_pose[3],pitch=desired_cart_pose[4],yaw=desired_cart_pose[5])
# self.sensor.set_zero()
time.sleep(1)
delta = 60/cnt*6
delta = 50/cnt*6
quarter = cnt // 6
for i in range(cnt):
if i % 2 == 0:
@ -120,19 +121,21 @@ class Identifier:
control_pose[4]+= delta * np.where((i <= quarter) | (3* quarter< i <= 5*quarter), -1, 1)
desired_cart_pose = control_pose.copy()
print("desired_cart_pose:",desired_cart_pose)
self.set_position(x=desired_cart_pose[0],
y=desired_cart_pose[1],
z=desired_cart_pose[2],
roll=desired_cart_pose[3],
pitch=desired_cart_pose[4],
yaw=desired_cart_pose[5])
self.set_position(
x=desired_cart_pose[0],
y=desired_cart_pose[1],
z=desired_cart_pose[2],
roll=desired_cart_pose[3],
pitch=desired_cart_pose[4],
yaw=desired_cart_pose[5]
)
print("move done")
time.sleep(1)
wrench = self.__read_data_once(sample_num)
print(f"wrench:{wrench}")
print("read done")
cur_pose = self.arm.getPose()
b_R_e = R.from_euler('xyz', cur_pose[3:], degrees=False).as_matrix()
b_R_e = R.from_euler('xyz', cur_pose[3:], degrees=True).as_matrix()
self.__add_data(wrench, b_R_e)
print(f"=======添加了第{i+1}组数据=======")
time.sleep(0.5)
@ -157,7 +160,9 @@ class Identifier:
with open(self.config_path, 'w') as file:
pass
update_yaml(self.config_path, dict_data)
# 逐个更新键值对
for key, value in dict_data.items():
update_yaml(self.config_path, key, value)
print("更新操作已完成。")
else:
print("更新操作已取消。")
@ -165,17 +170,19 @@ class Identifier:
if __name__ == '__main__':
arm = dobot_nova5()
arm.setEndEffector(i=9,tool_i="x,y,z,rx,ry,rz")
arm = dobot_nova5("192.168.5.1")
arm.start()
arm.setEndEffector(i=9,tool_i="{0.0,0.0,154.071,0.0,0.0,0.0}")
arm.chooseEndEffector(i=9)
arm.init()
sensor = XjcSensor()
sensor = XjcSensor("192.168.5.1", 60000)
sensor.connect()
sensor.disable_active_transmission()
sensor.disable_active_transmission()
sensor.disable_active_transmission()
atexit.register(sensor.disconnect)
identifier = Identifier(arm=arm,sensor=sensor,config_path="/home/jsfb/jsfb_ws/global_config/massage_head/none_playload.yaml")
ready_pose = [-1,-1,-1,-1,-1,-1]
time.sleep(1)
identifier.identify_param_auto(ready_pose,45)
# identifier = Identifier(arm=arm,sensor=sensor,config_path="/home/jsfb/jsfb/MassageRobot_Dobot/Massage/MassageControl/config/massage_head/finger_playload.yaml")
# ready_pose = [250.0, -135.0, 344.3392, -180.0, 0.0, -90.0]
# time.sleep(1)
# identifier.identify_param_auto(ready_pose,45)
arm.disableRobot()

View File