import zmq import json import logging import time import uuid import atexit from datetime import datetime class VTXClient: def __init__(self, use_logger=True): self.context = zmq.Context() self.socket = self.context.socket(zmq.DEALER) self.socket.connect("ipc:///tmp/5555") self.socket.setsockopt(zmq.SNDTIMEO, 200) self.socket.setsockopt(zmq.RCVTIMEO, 200) self.use_logger = use_logger if use_logger: logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s' ) self.logger = logging.getLogger(__name__) atexit.register(self.close) def _log(self, level, message): """统一的日志输出方法 Args: level: 日志级别 (debug, info, warning, error) message: 要输出的消息 """ if self.use_logger: getattr(self.logger, level)(message) else: # 如果使用print,为消息添加时间戳和级别 timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S,%f')[:-3] print(f"{timestamp} - {level.upper()} - {message}") def _get_request_id(self): """Generate a unique request ID combining timestamp and UUID""" timestamp = datetime.now().strftime('%Y%m%d%H%M%S%f') unique_id = str(uuid.uuid4().hex[:8]) return f"{timestamp}-{unique_id}" def send_request(self, table, method, params): request_id = self._get_request_id() request = { "table": table, "method": method, "params": params, "id": request_id } try: self._log("debug", f"Sending request: {request}") self.socket.send_json(request) response = self.socket.recv_json() self._log("debug", f"Received response: {response}") if "error" in response: error_msg = response["error"]["message"] self._log("error", f"Request failed: {error_msg}") return None return response["result"] except zmq.error.Again: self._log("error", "Request timed out. Server may be unavailable.") return None except zmq.ZMQError as e: self._log("error", f"ZMQ Error: {str(e)}") return None except Exception as e: self._log("error", f"Error in send_request: {str(e)}") return None def set(self, table, key, value): result = self.send_request(table, "set", {"key": key, "value": value}) if result is None: self._log("warning", f"Failed to set {key} in {table}") def get(self, table, key): result = self.send_request(table, "get", {"key": key}) if result is None: self._log("warning", f"Failed to get {key} from {table}") return None return result.get("value", None) def delete(self, table, key): result = self.send_request(table, "delete", {"key": key}) if result is None: self._log("warning", f"Failed to delete {key} from {table}") def get_version(self): """获取VTX服务器版本""" result = self.send_request(None, "get_version", {}) if result is None: self._log("warning", "Failed to get server version") return None return result.get("version") def close(self): try: if self.socket and not self.socket.closed: self.socket.close() self._log("info", "Socket closed successfully") # if self.context and not self.context.closed: # self.context.term() # logger.info("Context terminated successfully") except zmq.ZMQError as e: self._log("error", f"ZMQ Error during close: {str(e)}") except Exception as e: self._log("error", f"Error closing VTXClient: {str(e)}") if __name__ == "__main__": # 创建一个使用print输出的实例 vtxdb = VTXClient(use_logger=False) print(vtxdb.get("robot_config", "")) # 设置机器人基础配置 vtxdb.set("robot_config", "temp_config.info.name", "VTX-1000") vtxdb.set("robot_config", "temp_config.info.model", "Industrial-X") vtxdb.set("robot_config", "temp_config.info.serial_number", "VTX2024001") # 设置运动参数 vtxdb.set("robot_config", "temp_config.motion.max_speed", 100) vtxdb.set("robot_config", "temp_config.motion.acceleration", 2.5) vtxdb.set("robot_config", "temp_config.motion.deceleration", 3.0) vtxdb.set("robot_config", "temp_config.motion.joint_speed_limit", 80) # 设置安全参数 vtxdb.set("robot_config", "temp_config.safety.collision_threshold", 50) vtxdb.set("robot_config", "temp_config.safety.protective_stop_distance", 0.5) vtxdb.set("robot_config", "temp_config.safety.reduced_mode_speed", 30) # 设置工具参数 vtxdb.set("robot_config", "temp_config.tool.weight", 2.5) vtxdb.set("robot_config", "temp_config.tool.center_of_mass", [0.0, 0.0, 0.15]) # 读取并显示一些配置 robot_name = vtxdb.get("robot_config", "temp_config.info.name") max_speed = vtxdb.get("robot_config", "temp_config.motion.max_speed") safety_threshold = vtxdb.get("robot_config", "temp_config.safety.collision_threshold") if robot_name and max_speed and safety_threshold: vtxdb._log("info", f"Robot {robot_name} configured with:") vtxdb._log("info", f"- Max speed: {max_speed} mm/s") vtxdb._log("info", f"- Collision threshold: {safety_threshold} N") # 更新某个配置 vtxdb.set("robot_config", "temp_config.motion.max_speed", 120) updated_speed = vtxdb.get("robot_config", "temp_config.motion.max_speed") if updated_speed: vtxdb._log("info", f"Updated max speed: {updated_speed} mm/s") # 删除临时配置 vtxdb.delete("robot_config", "temp_config") vtxdb._log("info", "Cleaned up temporary configurations")