156 lines
6.0 KiB
Python
156 lines
6.0 KiB
Python
import zmq
|
||
import json
|
||
import logging
|
||
import time
|
||
import uuid
|
||
import atexit
|
||
from datetime import datetime
|
||
|
||
class VTXClient:
|
||
def __init__(self, use_logger=True):
|
||
self.context = zmq.Context()
|
||
self.socket = self.context.socket(zmq.DEALER)
|
||
self.socket.connect("ipc:///tmp/5555")
|
||
self.socket.setsockopt(zmq.SNDTIMEO, 200)
|
||
self.socket.setsockopt(zmq.RCVTIMEO, 200)
|
||
self.use_logger = use_logger
|
||
if use_logger:
|
||
logging.basicConfig(
|
||
level=logging.INFO,
|
||
format='%(asctime)s - %(levelname)s - %(message)s'
|
||
)
|
||
self.logger = logging.getLogger(__name__)
|
||
atexit.register(self.close)
|
||
|
||
def _log(self, level, message):
|
||
"""统一的日志输出方法
|
||
|
||
Args:
|
||
level: 日志级别 (debug, info, warning, error)
|
||
message: 要输出的消息
|
||
"""
|
||
if self.use_logger:
|
||
getattr(self.logger, level)(message)
|
||
else:
|
||
# 如果使用print,为消息添加时间戳和级别
|
||
timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S,%f')[:-3]
|
||
print(f"{timestamp} - {level.upper()} - {message}")
|
||
|
||
def _get_request_id(self):
|
||
"""Generate a unique request ID combining timestamp and UUID"""
|
||
timestamp = datetime.now().strftime('%Y%m%d%H%M%S%f')
|
||
unique_id = str(uuid.uuid4().hex[:8])
|
||
return f"{timestamp}-{unique_id}"
|
||
|
||
def send_request(self, table, method, params):
|
||
request_id = self._get_request_id()
|
||
request = {
|
||
"table": table,
|
||
"method": method,
|
||
"params": params,
|
||
"id": request_id
|
||
}
|
||
try:
|
||
self._log("debug", f"Sending request: {request}")
|
||
self.socket.send_json(request)
|
||
response = self.socket.recv_json()
|
||
self._log("debug", f"Received response: {response}")
|
||
|
||
if "error" in response:
|
||
error_msg = response["error"]["message"]
|
||
self._log("error", f"Request failed: {error_msg}")
|
||
return None
|
||
|
||
return response["result"]
|
||
except zmq.error.Again:
|
||
self._log("error", "Request timed out. Server may be unavailable.")
|
||
return None
|
||
except zmq.ZMQError as e:
|
||
self._log("error", f"ZMQ Error: {str(e)}")
|
||
return None
|
||
except Exception as e:
|
||
self._log("error", f"Error in send_request: {str(e)}")
|
||
return None
|
||
|
||
def set(self, table, key, value):
|
||
result = self.send_request(table, "set", {"key": key, "value": value})
|
||
if result is None:
|
||
self._log("warning", f"Failed to set {key} in {table}")
|
||
|
||
def get(self, table, key):
|
||
result = self.send_request(table, "get", {"key": key})
|
||
if result is None:
|
||
self._log("warning", f"Failed to get {key} from {table}")
|
||
return None
|
||
return result.get("value", None)
|
||
|
||
def delete(self, table, key):
|
||
result = self.send_request(table, "delete", {"key": key})
|
||
if result is None:
|
||
self._log("warning", f"Failed to delete {key} from {table}")
|
||
|
||
def get_version(self):
|
||
"""获取VTX服务器版本"""
|
||
result = self.send_request(None, "get_version", {})
|
||
if result is None:
|
||
self._log("warning", "Failed to get server version")
|
||
return None
|
||
return result.get("version")
|
||
|
||
def close(self):
|
||
try:
|
||
if self.socket and not self.socket.closed:
|
||
self.socket.close()
|
||
self._log("info", "Socket closed successfully")
|
||
# if self.context and not self.context.closed:
|
||
# self.context.term()
|
||
# logger.info("Context terminated successfully")
|
||
except zmq.ZMQError as e:
|
||
self._log("error", f"ZMQ Error during close: {str(e)}")
|
||
except Exception as e:
|
||
self._log("error", f"Error closing VTXClient: {str(e)}")
|
||
|
||
if __name__ == "__main__":
|
||
# 创建一个使用print输出的实例
|
||
vtxdb = VTXClient(use_logger=False)
|
||
print(vtxdb.get("robot_config", ""))
|
||
# 设置机器人基础配置
|
||
vtxdb.set("robot_config", "temp_config.info.name", "VTX-1000")
|
||
vtxdb.set("robot_config", "temp_config.info.model", "Industrial-X")
|
||
vtxdb.set("robot_config", "temp_config.info.serial_number", "VTX2024001")
|
||
|
||
# 设置运动参数
|
||
vtxdb.set("robot_config", "temp_config.motion.max_speed", 100)
|
||
vtxdb.set("robot_config", "temp_config.motion.acceleration", 2.5)
|
||
vtxdb.set("robot_config", "temp_config.motion.deceleration", 3.0)
|
||
vtxdb.set("robot_config", "temp_config.motion.joint_speed_limit", 80)
|
||
|
||
# 设置安全参数
|
||
vtxdb.set("robot_config", "temp_config.safety.collision_threshold", 50)
|
||
vtxdb.set("robot_config", "temp_config.safety.protective_stop_distance", 0.5)
|
||
vtxdb.set("robot_config", "temp_config.safety.reduced_mode_speed", 30)
|
||
|
||
# 设置工具参数
|
||
vtxdb.set("robot_config", "temp_config.tool.weight", 2.5)
|
||
vtxdb.set("robot_config", "temp_config.tool.center_of_mass", [0.0, 0.0, 0.15])
|
||
|
||
# 读取并显示一些配置
|
||
robot_name = vtxdb.get("robot_config", "temp_config.info.name")
|
||
max_speed = vtxdb.get("robot_config", "temp_config.motion.max_speed")
|
||
safety_threshold = vtxdb.get("robot_config", "temp_config.safety.collision_threshold")
|
||
|
||
if robot_name and max_speed and safety_threshold:
|
||
vtxdb._log("info", f"Robot {robot_name} configured with:")
|
||
vtxdb._log("info", f"- Max speed: {max_speed} mm/s")
|
||
vtxdb._log("info", f"- Collision threshold: {safety_threshold} N")
|
||
|
||
# 更新某个配置
|
||
vtxdb.set("robot_config", "temp_config.motion.max_speed", 120)
|
||
updated_speed = vtxdb.get("robot_config", "temp_config.motion.max_speed")
|
||
if updated_speed:
|
||
vtxdb._log("info", f"Updated max speed: {updated_speed} mm/s")
|
||
|
||
# 删除临时配置
|
||
vtxdb.delete("robot_config", "temp_config")
|
||
vtxdb._log("info", "Cleaned up temporary configurations")
|