2025-05-27 15:46:31 +08:00

156 lines
6.0 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import zmq
import json
import logging
import time
import uuid
import atexit
from datetime import datetime
class VTXClient:
def __init__(self, use_logger=True):
self.context = zmq.Context()
self.socket = self.context.socket(zmq.DEALER)
self.socket.connect("ipc:///tmp/5555")
self.socket.setsockopt(zmq.SNDTIMEO, 200)
self.socket.setsockopt(zmq.RCVTIMEO, 200)
self.use_logger = use_logger
if use_logger:
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
self.logger = logging.getLogger(__name__)
atexit.register(self.close)
def _log(self, level, message):
"""统一的日志输出方法
Args:
level: 日志级别 (debug, info, warning, error)
message: 要输出的消息
"""
if self.use_logger:
getattr(self.logger, level)(message)
else:
# 如果使用print为消息添加时间戳和级别
timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S,%f')[:-3]
print(f"{timestamp} - {level.upper()} - {message}")
def _get_request_id(self):
"""Generate a unique request ID combining timestamp and UUID"""
timestamp = datetime.now().strftime('%Y%m%d%H%M%S%f')
unique_id = str(uuid.uuid4().hex[:8])
return f"{timestamp}-{unique_id}"
def send_request(self, table, method, params):
request_id = self._get_request_id()
request = {
"table": table,
"method": method,
"params": params,
"id": request_id
}
try:
self._log("debug", f"Sending request: {request}")
self.socket.send_json(request)
response = self.socket.recv_json()
self._log("debug", f"Received response: {response}")
if "error" in response:
error_msg = response["error"]["message"]
self._log("error", f"Request failed: {error_msg}")
return None
return response["result"]
except zmq.error.Again:
self._log("error", "Request timed out. Server may be unavailable.")
return None
except zmq.ZMQError as e:
self._log("error", f"ZMQ Error: {str(e)}")
return None
except Exception as e:
self._log("error", f"Error in send_request: {str(e)}")
return None
def set(self, table, key, value):
result = self.send_request(table, "set", {"key": key, "value": value})
if result is None:
self._log("warning", f"Failed to set {key} in {table}")
def get(self, table, key):
result = self.send_request(table, "get", {"key": key})
if result is None:
self._log("warning", f"Failed to get {key} from {table}")
return None
return result.get("value", None)
def delete(self, table, key):
result = self.send_request(table, "delete", {"key": key})
if result is None:
self._log("warning", f"Failed to delete {key} from {table}")
def get_version(self):
"""获取VTX服务器版本"""
result = self.send_request(None, "get_version", {})
if result is None:
self._log("warning", "Failed to get server version")
return None
return result.get("version")
def close(self):
try:
if self.socket and not self.socket.closed:
self.socket.close()
self._log("info", "Socket closed successfully")
# if self.context and not self.context.closed:
# self.context.term()
# logger.info("Context terminated successfully")
except zmq.ZMQError as e:
self._log("error", f"ZMQ Error during close: {str(e)}")
except Exception as e:
self._log("error", f"Error closing VTXClient: {str(e)}")
if __name__ == "__main__":
# 创建一个使用print输出的实例
vtxdb = VTXClient(use_logger=False)
print(vtxdb.get("robot_config", ""))
# 设置机器人基础配置
vtxdb.set("robot_config", "temp_config.info.name", "VTX-1000")
vtxdb.set("robot_config", "temp_config.info.model", "Industrial-X")
vtxdb.set("robot_config", "temp_config.info.serial_number", "VTX2024001")
# 设置运动参数
vtxdb.set("robot_config", "temp_config.motion.max_speed", 100)
vtxdb.set("robot_config", "temp_config.motion.acceleration", 2.5)
vtxdb.set("robot_config", "temp_config.motion.deceleration", 3.0)
vtxdb.set("robot_config", "temp_config.motion.joint_speed_limit", 80)
# 设置安全参数
vtxdb.set("robot_config", "temp_config.safety.collision_threshold", 50)
vtxdb.set("robot_config", "temp_config.safety.protective_stop_distance", 0.5)
vtxdb.set("robot_config", "temp_config.safety.reduced_mode_speed", 30)
# 设置工具参数
vtxdb.set("robot_config", "temp_config.tool.weight", 2.5)
vtxdb.set("robot_config", "temp_config.tool.center_of_mass", [0.0, 0.0, 0.15])
# 读取并显示一些配置
robot_name = vtxdb.get("robot_config", "temp_config.info.name")
max_speed = vtxdb.get("robot_config", "temp_config.motion.max_speed")
safety_threshold = vtxdb.get("robot_config", "temp_config.safety.collision_threshold")
if robot_name and max_speed and safety_threshold:
vtxdb._log("info", f"Robot {robot_name} configured with:")
vtxdb._log("info", f"- Max speed: {max_speed} mm/s")
vtxdb._log("info", f"- Collision threshold: {safety_threshold} N")
# 更新某个配置
vtxdb.set("robot_config", "temp_config.motion.max_speed", 120)
updated_speed = vtxdb.get("robot_config", "temp_config.motion.max_speed")
if updated_speed:
vtxdb._log("info", f"Updated max speed: {updated_speed} mm/s")
# 删除临时配置
vtxdb.delete("robot_config", "temp_config")
vtxdb._log("info", "Cleaned up temporary configurations")