增加修饰

This commit is contained in:
swayneleong 2025-05-29 19:34:00 +08:00
parent f4450a32e3
commit e01e6452b4
3 changed files with 142 additions and 340 deletions

View File

@ -113,14 +113,9 @@ class MassageRobot:
self.arm_state = ArmState()
self.arm_config = read_yaml(arm_config_path)
# arm 实例化时机械臂类内部进行通讯连接
# 初始化坐标系 TOOL=0 BASE=1
self.arm = dobot_nova5(arm_ip = self.arm_config['arm_ip'])
self.arm.start()
<<<<<<< HEAD
self.arm.chooseRightFrame()
=======
self.arm.init()
>>>>>>> cc63eb47a5fd86a36d8af9fbf38a4b10a4c2ff7f
self.thermotherapy = None
self.shockwave = None
@ -142,22 +137,7 @@ class MassageRobot:
self.controller_manager.add_controller(HybridAdmitController,'hybridAdmit',self.arm_config['controller'][5])
self.controller_manager.add_controller(PositionerSensorController,'positionerSensor',self.arm_config['controller'][6])
self.controller_manager.add_controller(AdmittanceZController,'admittanceZ',self.arm_config['controller'][7])
self.controller_manager.switch_controller('admittance')
<<<<<<< HEAD
# 读取数据
self.gravity_base = None
self.force_zero = None
self.torque_zero = None
self.tool_position = None
self.mass_center_position = None
self.s_tf_matrix_t = None
arm_orientation_set0 = np.array([-180,-30,0])
self.b_rotation_s_set0 = R.from_euler('xyz',arm_orientation_set0,degrees=True).as_matrix()
=======
# 频率数据赋值
self.control_rate = Rate(self.arm_config['control_rate'])
@ -570,14 +550,21 @@ class MassageRobot:
b2 =time.time()
if sleep_duration > 0:
time.sleep(sleep_duration)
print(f"real sleep:{time.time()-b2}")
# print(f"real sleep:{time.time()-b2}")
self.arm.dashboard.socket_dobot.sendall(tcp_command)
if self.arm.feedbackData.robotMode == 10: # 10 当前工程暂停
print("机械臂为暂停状态")
res = self.arm.dashboard.Continue()
code = self.arm.parseResultId(res)[0]
if code == 0:
print("机械臂继续已暂停的运动指令")
if self.arm.feedbackData.robotMode in [3, 4, 6, 8, 9, 11] or not self.arm.feedbackData.EnableState:
self.logger.log_error(f"机械臂异常:{self.arm.feedbackData.robotMode}")
self.arm.dashboard.Stop()
self.arm.dashboard.EmergencyStop(mode=1)
return -1
if self.arm.feedbackData.EnableState and self.arm.feedbackData.robotMode == 10:
if self.arm.feedbackData.robotMode == 10: # 10 当前工程暂停
print("机械臂为暂停状态")
res = self.arm.dashboard.Continue()
code = self.arm.parseResultId(res)[0]
if code == 0:
print("机械臂继续已暂停的运动指令")
except Exception as e:
self.logger.log_error(f"机械臂控制失败:{e}")
@ -1630,32 +1617,32 @@ class MassageRobot:
if __name__ == "__main__":
# waypoints = [
# {"time": 0.0, "position": [0.25, -0.135, 0.3443], "velocity": [0, 0, 0],
# "acceleration": [0, 0, 0],
# "orientation": R.from_euler("xyz", [0, 0, 0], degrees=True).as_quat()},
# {"time": 5.0, "position": [0.30, -0.135, 0.3043], "velocity": [0, 0, 0],
# "acceleration": [0, 0, 0],
# "orientation": R.from_euler("xyz", [0, 30, 30], degrees=True).as_quat()},
# ] ## 单位 m deg
# myInterpolate = TrajectoryInterpolator(waypoints)
# ts = np.linspace(0,5,100)
waypoints = [
{"time": 0.0, "position": [0.25, -0.135, 0.3443], "velocity": [0, 0, 0],
"acceleration": [0, 0, 0],
"orientation": R.from_euler("xyz", [0, 0, 0], degrees=True).as_quat()},
{"time": 5.0, "position": [0.30, -0.135, 0.3043], "velocity": [0, 0, 0],
"acceleration": [0, 0, 0],
"orientation": R.from_euler("xyz", [0, 30, 30], degrees=True).as_quat()},
] ## 单位 m deg
myInterpolate = TrajectoryInterpolator(waypoints)
ts = np.linspace(0,5,100)
robot = MassageRobot(arm_config_path="/home/jsfb/jsfb/MassageRobot_Dobot/Massage/MassageControl/config/robot_config.yaml")
# xr_list, vr_list, ar_list = [], [], []
# for t in ts:
# xr, vr, ar = myInterpolate.interpolate(t)
# xr_list.append(xr)
# vr_list.append(vr)
# ar_list.append(ar)
# xr_array = np.array(xr_list)
# vr_array = np.array(vr_list)
# ar_array = np.array(ar_list)
xr_list, vr_list, ar_list = [], [], []
for t in ts:
xr, vr, ar = myInterpolate.interpolate(t)
xr_list.append(xr)
vr_list.append(vr)
ar_list.append(ar)
xr_array = np.array(xr_list)
vr_array = np.array(vr_list)
ar_array = np.array(ar_list)
# robot.xr = xr_array
# robot.vr = vr_array
# robot.ar = ar_array
# robot.ts = ts
# robot.dt = ts[1] - ts[0]
robot.xr = xr_array
robot.vr = vr_array
robot.ar = ar_array
robot.ts = ts
robot.dt = ts[1] - ts[0]
ready_pose = [250.0, -135.0, 344.3392, -180.0, 0.0, -90.0]

View File

@ -0,0 +1,39 @@
# 定义机器人模式字典
robot_mode_types = {
-1: "ROBOTMODE: 初始化状态,未检测到机械臂,请检查连接是否正确",
1: "INIT: 初始化",
2: "BREAK_OPEN: 抱闸松开",
3: "POWER_OFF: 本体下电",
4: "DISABLED: 未使能(抱闸未松)",
5: "ENABLE: 使能(空闲)",
6: "BACKDRIVE: 拖拽",
7: "RUNNING: 运行状态(脚本/TCP)",
8: "SINGLE_MOVE: 单次运动(点动)",
9: "ERROR: 错误状态",
10: "PAUSE: 暂停状态",
11: "COLLISION: 碰撞状态"
}
# 定义通用错误码字典
general_error_codes = {
0: "无错误:命令下发成功",
-1: "命令执行失败:已收到命令,但执行失败了",
-2: "机器人处于报警状态:机器人报警状态下无法执行指令,需要清除报警后重新下发指令。",
-3: "机器人处于急停状态:机器人急停状态下无法执行指令,需要松开急停并清除报警后重新下发指令。",
-4: "机器人处于下电状态:机器人下电状态下无法执行指令,需要先给机器人上电。",
-5: "机器人处于脚本运行状态:拒绝执行部分指令,需要先暂停/停止脚本。",
-6: "MoveJog指令运动轴与运动类型不匹配修改coordtype参数值详见MoveJog指令说明。",
-7: "机器人处于脚本暂停状态:拒绝执行部分指令,需要先停止脚本。",
-8: "机器人认证过期机器人处于不可用状态需联系FAE处理。",
-10000: "命令错误:下发的命令不存在",
-20000: "参数数量错误:下发命令中的参数数量错误",
-30001: "必选参数类型错误第1个有名称的为命名参数类型错误否则为第1个参数类型错误",
-30002: "必选参数类型错误第2个不带名称的第2个必选参数的类型错误",
-40001: "必选参数范围错误第1个带名称或第1个必选参数超出范围",
-40002: "必选参数范围错误第2个第2个必选参数的参数范围错误",
-50001: "可选参数类型错误第1个命名参数类型错误如 user=\"ss\"否则为第1个可选参数的类型错误",
-50002: "可选参数类型错误第2个不带名称的第2个可选参数类型错误",
-60001: "可选参数范围错误第1个命名参数如 a=200 超出范围或第1个参数超范围",
-60002: "可选参数范围错误第2个第2个可选参数超出范围"
}

View File

@ -9,6 +9,10 @@ try:
from dobot_api import DobotApiFeedBack,DobotApiDashboard
except:
from hardware.dobot_api import DobotApiFeedBack,DobotApiDashboard
try:
from .dobot_message import *
except:
from dobot_message import *
import threading
from time import sleep,time
import re
@ -60,8 +64,9 @@ class dobot_nova5:
self.robotMode = -1
self.robotCurrentCommandID = 0
self.MessageSize = -1
self.DigitalInputs = -1
self.DigitalOutputs = -1
# self.DigitalInputs = -1
# self.DigitalOutputs = -1
self.EnableState = False
self.QActual = -1
self.ActualQuaternion = -1
self.ToolVectorActual = -1
@ -106,6 +111,8 @@ class dobot_nova5:
time.sleep(1)
else:
if self.feedbackData.robotMode == 9 and self.parseResultId(stopId)[0] == -3:
print(robot_mode_types.get(self.feedbackData.robotMode, "未知模式"))
requests.post(
"http://127.0.0.1:5000/on_message", data={"message": "机械臂处于锁定状态<br>请解除锁定后再使用"}
)
@ -140,13 +147,6 @@ class dobot_nova5:
time.sleep(1)
# self.move_joint(self.off_pos)
def parseResultId(self,valueRecv):
# 解析返回值
if "Not Tcp" in valueRecv:
@ -164,8 +164,9 @@ class dobot_nova5:
# 基础字段
self.feedbackData.MessageSize = feedInfo['len'][0]
self.feedbackData.robotMode = feedInfo['RobotMode'][0]
self.feedbackData.DigitalInputs = feedInfo['DigitalInputs'][0]
self.feedbackData.DigitalOutputs = feedInfo['DigitalOutputs'][0]
# self.feedbackData.DigitalInputs = feedInfo['DigitalInputs'][0]
# self.feedbackData.DigitalOutputs = feedInfo['DigitalOutputs'][0]
self.feedbackData.EnableState = feedInfo['EnableStatus'][0]
self.feedbackData.robotCurrentCommandID = feedInfo['CurrentCommandId'][0]
self.feedbackData.QActual = feedInfo['QActual'][0]
self.feedbackData.ActualQuaternion = feedInfo['ActualQuaternion'][0]
@ -180,297 +181,72 @@ class dobot_nova5:
'''
def start_up(self):
# self.dashboard = DobotApiDashboard(self.ip,self.dashboardPort)
# self.feedFour = DobotApiFeedBack(self.ip,self.feedFourPort)
# self.dashboard.EmergencyStop(mode=0) # 松开急停
# self.clearError() # 清除报警
# self.dashboard.RequestControl()
if self.parseResultId(self.dashboard.EnableRobot())[0] != 0:
print("使能失败:检查29999端口是否被占用")
return False
print("机械臂使能成功")
return True
# # 状态反馈线程
# feedback_thread = threading.Thread(
# target=self.GetFeedback # 机器状态反馈线程
# )
# feedback_thread.daemon = True
# feedback_thread.start()
max_try = 3 # 尝试次数
cnt = 0 # 尝试次数计数
while cnt < max_try:
# 关闭碰撞检测
if self.parseResultId(self.dashboard.SetCollisionLevel(level=0))[0] == 0:
time.sleep(0.1)
self.logger.log_info("关闭碰撞检测成功")
code = self.parseResultId(self.dashboard.EnableRobot())[0]
if code == 0:
while 1:
if self.feedbackData.robotMode in [3, 6, 7, 8, 9, 10, 11]:
self.logger.log_error(f"使能失败,机械臂异常:{self.feedbackData.robotMode}")
return False
elif self.feedbackData.robotMode == 5:
self.logger.log_info("机械臂使能成功")
return True
time.sleep(0.1)
def disable_servo(self):
pass
# def disable_servo(self):
# pass
def enable_servo(self):
pass
def RunPoint_P_inPose(self,poses_list):
'''
RUNPOINT_P 以关节运动方式运动到目标点
MovJ flag = 0 通过位姿走点输入为[X Y Z RX RY RZ] unit: mm/deg
'''
# 走点
recvmovemess = self.dashboard.MovJ(*poses_list,0)
print("MovJ:",recvmovemess)
print(self.parseResultId(recvmovemess))
currentCommandID = self.parseResultId(recvmovemess)[1]
print("指令 ID:", currentCommandID)
sleep(0.02)
while True: # 完成判断循环
########## 调试注释 #############
# print("robot mode",self.feedbackData.robotMode)
# print("feedback current ID", self.feedbackData.robotCurrentCommandID)
# print("指令 ID:", currentCommandID)
########## 调试注释 #############
currentCommandID = self.parseResultId(self.dashboard.GetCurrentCommandID())[1]
if self.feedbackData.robotMode == 9 or self.feedbackData.robotMode == 11:
def waitArrival(self, command):
sendCommandID = self.parseResultId(command)[1]
while True:
if self.feedbackData.robotMode in [3, 4, 6, 8, 9, 10, 11]:
self.logger.log_error(f"机械臂异常:{self.feedbackData.robotMode}")
print(robot_mode_types.get(self.feedbackData.robotMode, "未知模式"))
self.dashboard.Stop()
self.dashboard.EmergencyStop(mode=1)
print("当前工作停止,机械臂处于急停状态")
if self.feedbackData.robotMode == 10: # 10 当前工程暂停
print("机械臂为暂停状态")
res = self.dashboard.Continue()
code = self.parseResultId(res)[0]
if code == 0:
print("机械臂继续已暂停的运动指令")
if self.feedbackData.robotMode == 5 and self.feedbackData.robotCurrentCommandID == currentCommandID:
print("该次运动完成")
break
return -1
if self.feedbackData.EnableState:
if self.feedbackData.robotCurrentCommandID > sendCommandID:
break
else:
isFinsh = (self.feedbackData.robotMode == 5)
if self.feedbackData.robotCurrentCommandID == sendCommandID and isFinsh:
break
sleep(0.1)
return 0
def RunPoint_P_inPose_M_RAD(self,poses_list):
'''
RUNPOINT_P 以关节运动方式运动到目标点
MovJ flag = 0 通过位姿走点输入为[X Y Z RX RY RZ] unit: M/RAD
'''
# 单位 m/rad -> mm/deg
poses_list_mm_deg = self.__transform_m_rad_2_mm_deg(poses_list)
# 走点
recvmovemess = self.dashboard.MovJ(*poses_list_mm_deg,0)
print("MovJ:",recvmovemess)
print(self.parseResultId(recvmovemess))
currentCommandID = self.parseResultId(recvmovemess)[1]
print("指令 ID:", currentCommandID)
sleep(0.02)
while True: # 完成判断循环
########## 调试注释 #############
# print("robot mode",self.feedbackData.robotMode)
# print("feedback current ID", self.feedbackData.robotCurrentCommandID)
# print("指令 ID:", currentCommandID)
########## 调试注释 #############
currentCommandID = self.parseResultId(self.dashboard.GetCurrentCommandID())[1]
if self.feedbackData.robotMode == 9 or self.feedbackData.robotMode == 11:
self.dashboard.Stop()
self.dashboard.EmergencyStop(mode=1)
print("当前工作停止,机械臂处于急停状态")
return -1
if self.feedbackData.robotMode == 10: # 10 当前工程暂停
print("机械臂为暂停状态")
res = self.dashboard.Continue()
code = self.parseResultId(res)[0]
if code == 0:
print("机械臂继续已暂停的运动指令")
return -1
if self.feedbackData.robotMode == 5 and self.feedbackData.robotCurrentCommandID == currentCommandID:
print("该次运动完成")
break
sleep(0.1)
return 0
def RunPoint_P_inJoint(self,joints_list):
'''
RUNPOINT_P 以关节运动方式运动到目标点
MovJ flag = 1 通过关节角度走点输入为[J1 J2 J3 J4 J5 J6] unit: deg
'''
# 走点
recvmovemess = self.dashboard.MovJ(*joints_list,1)
print("MovJ:",recvmovemess)
print(self.parseResultId(recvmovemess))
currentCommandID = self.parseResultId(recvmovemess)[1]
print("指令 ID:", currentCommandID)
sleep(0.02)
while True: # 完成判断循环
########## 调试注释 #############
# print("robot mode",self.feedbackData.robotMode)
# print("feedback current ID", self.feedbackData.robotCurrentCommandID)
# print("指令 ID:", currentCommandID)
# print("30004",self.dashboard.GetCurrentCommandID())
########## 调试注释 #############
currentCommandID = self.parseResultId(self.dashboard.GetCurrentCommandID())[1]
if self.feedbackData.robotMode == 9 or self.feedbackData.robotMode == 11:
self.dashboard.Stop()
self.dashboard.EmergencyStop(mode=1)
print("当前工作停止,机械臂处于急停状态")
if self.feedbackData.robotMode == 10: # 10 当前工程暂停
print("机械臂为暂停状态")
res = self.dashboard.Continue()
code = self.parseResultId(res)[0]
if code == 0:
print("机械臂继续已暂停的运动指令")
if self.feedbackData.robotMode == 5 and self.feedbackData.robotCurrentCommandID == currentCommandID:
print("该次运动完成")
break
sleep(0.1) # 避免 CPU 占用过高
return 0
def RunPoint_P_inJoint_RAD(self,joints_list):
def move_joint(self,q,**params):
'''
RUNPOINT_P 以关节运动方式运动到目标点
MovJ flag = 1 通过关节角度走点输入为[J1 J2 J3 J4 J5 J6] unit: rad
'''
# 单位 rad -> deg
joints_list_deg = self.__transform_rad_2_deg(joints_list)
# 走点
recvmovemess = self.dashboard.MovJ(*joints_list_deg,1)
print("MovJ:",recvmovemess)
print(self.parseResultId(recvmovemess))
currentCommandID = self.parseResultId(recvmovemess)[1]
print("指令 ID:", currentCommandID)
sleep(0.02)
while True: # 完成判断循环
########## 调试注释 #############
# print("robot mode",self.feedbackData.robotMode)
# print("feedback current ID", self.feedbackData.robotCurrentCommandID)
# print("指令 ID:", currentCommandID)
# print("30004",self.dashboard.GetCurrentCommandID())
########## 调试注释 #############
currentCommandID = self.parseResultId(self.dashboard.GetCurrentCommandID())[1]
if self.feedbackData.robotMode == 9 or self.feedbackData.robotMode == 11:
self.dashboard.Stop()
self.dashboard.EmergencyStop(mode=1)
print("当前工作停止,机械臂处于急停状态")
if self.feedbackData.robotMode == 10: # 10 当前工程暂停
print("机械臂为暂停状态")
res = self.dashboard.Continue()
code = self.parseResultId(res)[0]
if code == 0:
print("机械臂继续已暂停的运动指令")
if self.feedbackData.robotMode == 5 and self.feedbackData.robotCurrentCommandID == currentCommandID:
print("该次运动完成")
break
sleep(0.1) # 避免 CPU 占用过高
return 0
def RunPoint_L_inPose(self,poses_list):
'''
RUNPOINT_L 以直线运动方式运动到目标点
MovJ flag = 0 通过位姿走点输入为[X Y Z RX RY RZ] unit: mm/deg
'''
# 走点
recvmovemess = self.dashboard.MovL(*poses_list,0)
print("MovL:",recvmovemess)
print(self.parseResultId(recvmovemess))
currentCommandID = self.parseResultId(recvmovemess)[1]
print("指令 ID:", currentCommandID)
while True:
currentCommandID = self.parseResultId(self.dashboard.GetCurrentCommandID())[1]
if self.feedbackData.robotMode == 9 or self.feedbackData.robotMode == 11:
self.dashboard.Stop()
self.dashboard.EmergencyStop(mode=1)
print("当前工作停止,机械臂处于急停状态")
if self.feedbackData.robotMode == 10: # 10 当前工程暂停
print("机械臂为暂停状态")
res = self.dashboard.Continue()
code = self.parseResultId(res)[0]
if code == 0:
print("机械臂继续已暂停的运动指令")
if self.feedbackData.robotMode == 5 and self.feedbackData.robotCurrentCommandID == currentCommandID:
print("该次运动完成")
break
sleep(0.1)
return 0
def RunPoint_L_inPose_M_RAD(self,poses_list):
'''
RUNPOINT_L 以直线运动方式运动到目标点
MovJ flag = 0 通过位姿走点输入为[X Y Z RX RY RZ] unit: m/rad
'''
# 单位 m/rad -> mm/deg
poses_list_mm_deg = self.__transform_m_rad_2_mm_deg(poses_list)
# 走点
recvmovemess = self.dashboard.MovL(*poses_list_mm_deg,0)
print("MovL:",recvmovemess)
print(self.parseResultId(recvmovemess))
currentCommandID = self.parseResultId(recvmovemess)[1]
print("指令 ID:", currentCommandID)
while True:
currentCommandID = self.parseResultId(self.dashboard.GetCurrentCommandID())[1]
if self.feedbackData.robotMode == 9 or self.feedbackData.robotMode == 11:
self.dashboard.Stop()
self.dashboard.EmergencyStop(mode=1)
print("当前工作停止,机械臂处于急停状态")
if self.feedbackData.robotMode == 10: # 10 当前工程暂停
print("机械臂为暂停状态")
res = self.dashboard.Continue()
code = self.parseResultId(res)[0]
if code == 0:
print("机械臂继续已暂停的运动指令")
if self.feedbackData.robotMode == 5 and self.feedbackData.robotCurrentCommandID == currentCommandID:
print("该次运动完成")
break
sleep(0.1)
return 0
def RunPoint_L_inJoint(self,joints_list):
'''
RUNPOINT_L 以直线运动方式运动到目标点
MovJ flag = 0 通过位姿走点输入为[X Y Z RX RY RZ] unit: mm/deg
'''
# 走点
recvmovemess = self.dashboard.MovL(*joints_list,0)
print("MovL:",recvmovemess)
print(self.parseResultId(recvmovemess))
currentCommandID = self.parseResultId(recvmovemess)[1]
print("指令 ID:", currentCommandID)
while True:
currentCommandID = self.parseResultId(self.dashboard.GetCurrentCommandID())[1]
if self.feedbackData.robotMode == 9 or self.feedbackData.robotMode == 11:
self.dashboard.Stop()
self.dashboard.EmergencyStop(mode=1)
print("当前工作停止,机械臂处于急停状态")
if self.feedbackData.robotMode == 10: # 10 当前工程暂停
print("机械臂为暂停状态")
res = self.dashboard.Continue()
code = self.parseResultId(res)[0]
if code == 0:
print("机械臂继续已暂停的运动指令")
if self.feedbackData.robotMode == 5 and self.feedbackData.robotCurrentCommandID == currentCommandID:
print("该次运动完成")
break
sleep(0.1)
return 0
def RunPoint_L_inJoint_RAD(self,joints_list):
'''
RUNPOINT_L 以直线运动方式运动到目标点
MovJ flag = 0 通过位姿走点输入为[X Y Z RX RY RZ] unit: rad
'''
# 单位 rad -> deg
joints_list_deg = self.__transform_rad_2_deg(joints_list)
# 走点
recvmovemess = self.dashboard.MovL(*joints_list_deg,0)
print("MovL:",recvmovemess)
print(self.parseResultId(recvmovemess))
currentCommandID = self.parseResultId(recvmovemess)[1]
print("指令 ID:", currentCommandID)
while True:
currentCommandID = self.parseResultId(self.dashboard.GetCurrentCommandID())[1]
if self.feedbackData.robotMode == 9 or self.feedbackData.robotMode == 11:
self.dashboard.Stop()
self.dashboard.EmergencyStop(mode=1)
print("当前工作停止,机械臂处于急停状态")
if self.feedbackData.robotMode == 10: # 10 当前工程暂停
print("机械臂为暂停状态")
res = self.dashboard.Continue()
code = self.parseResultId(res)[0]
if code == 0:
print("机械臂继续已暂停的运动指令")
if self.feedbackData.robotMode == 5 and self.feedbackData.robotCurrentCommandID == currentCommandID:
print("该次运动完成")
break
sleep(0.1)
return 0
max_retry_count = params.get('max_retry_count', 3)
cnt = 0
joints_list_deg = self.__transform_rad_2_deg(q)
while cnt < max_retry_count:
cnt += 1
command = self.dashboard.MovJ(*joints_list_deg,1)
is_arrived = self.waitArrival(command)
if is_arrived == 0:
print(f'Arrived at the joint position: {q}')
return 0
else:
print(f'Failed to arrive at the joint position: {q}')
return -1
print(f'After {cnt} retries, still failed to arrive at the joint position: {q}')
return -1
def ServoJoint(self,joints_list):
'''
@ -840,7 +616,7 @@ class dobot_nova5:
quat_rot = R.from_euler('xyz', [rx, ry, rz], degrees=True).as_quat()
return pos,quat_rot
def get_pose_at_tool(self,tool=1):
def get_end_position(self,tool=1):
'''
工具默认为1
输出pos,quat