MassageRobot_Dobot/License/license_client.py
2025-05-27 15:46:31 +08:00

662 lines
27 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import json
import base64
import platform
import requests
import sqlite3
import os
import hashlib
from Crypto.PublicKey import RSA
from Crypto.Signature import pkcs1_15
from Crypto.Hash import SHA256
from datetime import datetime
from flask import Flask, request, jsonify
from cryptography.fernet import Fernet
from pathlib import Path
# 定义验证状态文件路径
VERIFICATION_STATUS_FILE = os.path.expanduser("~/.license_verification_enabled")
# 设置密码(实际应用中应该使用更安全的方式存储)
LICENSE_ADMIN_PASSWORD = "robotstorm@license"
class LicenseStorage:
def __init__(self, client_dir):
self.client_dir = client_dir
self.db_path = os.path.join(client_dir, ".license.db")
self.cipher = None
self._init_db_encryption()
def _init_db_encryption(self):
"""Initialize database encryption with a device-specific key"""
# Generate a device-specific encryption key
device_info = {
'hostname': platform.node(),
'machine': platform.machine(),
'processor': platform.processor(),
'system': platform.system(),
'version': platform.version()
}
# Convert device info to a consistent string and create hash
device_str = json.dumps(device_info, sort_keys=True)
device_hash = hashlib.sha256(device_str.encode()).digest()
# Create Fernet key (must be 32 bytes base64-encoded)
key = base64.urlsafe_b64encode(device_hash)
self.cipher = Fernet(key)
def _init_db(self):
"""Initialize database"""
with sqlite3.connect(self.db_path) as conn:
conn.execute('''
CREATE TABLE IF NOT EXISTS license (
id INTEGER PRIMARY KEY,
device_id TEXT UNIQUE,
activation_code TEXT,
activated_at INTEGER,
usage_count INTEGER DEFAULT 0,
license_version BIGINT
)
''')
def save_license(self, device_id, activation_code, first_activation=False):
"""Save encrypted license data"""
if not os.path.exists(self.db_path):
self._init_db()
try:
# 解码激活码获取版本信息
decoded_data = json.loads(base64.b64decode(activation_code))
license_data = decoded_data['license']
new_version = license_data['generatedAt']
# 加密激活码
encrypted_code = self.cipher.encrypt(activation_code.encode())
# 获取当前许可证信息
current_activation_code, current_usage_count, current_activated_at = self.load_license(device_id)
if current_activation_code:
# 解码当前激活码获取版本信息
current_data = json.loads(base64.b64decode(current_activation_code))
current_version = current_data['license']['generatedAt']
# 如果是相同版本的许可证,保留使用次数
if current_version == new_version:
usage_count = current_usage_count
activated_at = current_activated_at
else:
# 如果是新版本的许可证,重置使用次数
usage_count = 0
activated_at = int(datetime.now().timestamp() * 1000) if first_activation else None
else:
# 首次激活
usage_count = 0
activated_at = int(datetime.now().timestamp() * 1000) if first_activation else None
with sqlite3.connect(self.db_path) as conn:
conn.execute('''
INSERT OR REPLACE INTO license
(device_id, activation_code, activated_at, usage_count, license_version)
VALUES (?, ?, ?, ?, ?)
''', (device_id, encrypted_code, activated_at, usage_count, new_version))
except Exception as e:
print(f"Error saving license: {str(e)}")
# 如果解析失败,使用默认值
encrypted_code = self.cipher.encrypt(activation_code.encode())
activated_at = int(datetime.now().timestamp() * 1000) if first_activation else None
with sqlite3.connect(self.db_path) as conn:
conn.execute('''
INSERT OR REPLACE INTO license
(device_id, activation_code, activated_at, usage_count, license_version)
VALUES (?, ?, ?, ?, ?)
''', (device_id, encrypted_code, activated_at, 0, 0))
def load_license(self, device_id):
"""Load and decrypt license data"""
if not os.path.exists(self.db_path):
return None, 0, None
try:
with sqlite3.connect(self.db_path) as conn:
cursor = conn.execute('''
SELECT activation_code, usage_count, activated_at FROM license WHERE device_id = ?
''', (device_id,))
result = cursor.fetchone()
if result:
encrypted_code, usage_count, activated_at = result
try:
decrypted_code = self.cipher.decrypt(encrypted_code)
return decrypted_code.decode(), usage_count, activated_at
except Exception:
return None, 0, None
return None, 0, None
except sqlite3.Error:
return None, 0, None
def increment_usage(self, device_id):
"""Increment usage count for the device"""
try:
with sqlite3.connect(self.db_path) as conn:
cursor = conn.execute('''
UPDATE license
SET usage_count = usage_count + 1
WHERE device_id = ?
RETURNING usage_count
''', (device_id,))
result = cursor.fetchone()
return result[0] if result else 0
except sqlite3.Error:
return 0
class LicenseClient:
def __init__(self, client_dir, disable_license=None):
self.client_dir = client_dir
# 如果没有指定 disable_license则从文件状态确定
if disable_license is None:
self.disable_license = not os.path.exists(VERIFICATION_STATUS_FILE)
else:
self.disable_license = disable_license
# Ensure client directory exists
os.makedirs(client_dir, exist_ok=True)
# Load public key from client directory
public_key_path = os.path.join(client_dir, 'public.pem')
with open(public_key_path, 'r') as f:
self.public_key = RSA.import_key(f.read())
# Get device ID from hostname
self.device_id = self._get_device_id()
# Initialize storage
self.storage = LicenseStorage(client_dir)
# Initialize activation code and encryption key as None
self._activation_code = None
self._encryption_key = None
self._usage_count = 0
self._activated_at = None
# Try to load existing license
self._load_existing_license()
def _get_device_id(self):
"""Get device ID from hostname"""
return platform.node()
def _load_existing_license(self):
"""Try to load and verify existing license"""
try:
# 只验证许可证的有效性,不更新内存状态
activation_code, _, _ = self.storage.load_license(self.device_id)
if activation_code:
is_valid, _ = self.verify_license()
if not is_valid:
# 无效许可证的情况下,不需要任何操作
# 因为所有验证都会从数据库重新加载
pass
except:
# 出错时不需要任何操作
pass
def increment_usage(self):
"""Increment usage count and verify license"""
# 如果禁用许可证验证,直接返回成功
if self.disable_license:
return True, "License verification disabled"
# 从数据库加载最新状态
activation_code, usage_count, activated_at = self.storage.load_license(self.device_id)
if not activation_code:
return False, "No valid license found"
# First verify the license
is_valid, message = self.verify_license(activation_code)
if not is_valid:
return False, message
# Get current license data
decoded_data = json.loads(base64.b64decode(activation_code))
license_data = decoded_data['license']
# Check usage limit
if usage_count >= license_data['usageLimit']:
return False, "Usage limit exceeded"
# Increment usage count
new_count = self.storage.increment_usage(self.device_id)
return True, f"Usage count: {new_count}/{license_data['usageLimit']}"
def get_activation_code(self, server_url="http://localhost:3000"):
"""Request activation code from server"""
response = requests.post(
f"{server_url}/generate-license",
json={"deviceId": self.device_id}
)
if response.status_code == 200:
self._activation_code = response.json()['activationCode']
# Save to encrypted storage
self.storage.save_license(self.device_id, self._activation_code)
self._usage_count = 0
return self._activation_code
else:
raise Exception(f"Failed to get activation code: {response.text}")
def set_activation_code(self, activation_code):
"""Set activation code and save to storage"""
try:
# 验证激活码格式和签名
decoded_data = json.loads(base64.b64decode(activation_code))
original_license_data = decoded_data['license']
original_signature = base64.b64decode(decoded_data['signature'])
# 首先验证原始数据的签名
# 确保JSON序列化方式与服务端一致
message = json.dumps(original_license_data, separators=(',', ':'))
h = SHA256.new(message.encode('utf-8'))
try:
pkcs1_15.new(self.public_key).verify(h, original_signature)
except (ValueError, TypeError):
raise Exception("Invalid signature")
# 获取当前许可证信息
current_activation_code, current_usage_count, current_activated_at = self.storage.load_license(self.device_id)
# 检查是否是重复激活
is_repeated = False
if current_activation_code:
current_data = json.loads(base64.b64decode(current_activation_code))
current_version = current_data['license']['generatedAt']
new_version = original_license_data['generatedAt']
# 如果是相同版本的许可证
if current_version == new_version:
is_repeated = True
# 如果新激活码的生成时间早于当前激活码,拒绝激活
elif new_version < current_version:
raise Exception("Cannot activate with an older license")
# 检查必需的字段
required_fields = ['deviceId', 'generatedAt', 'activationPeriod', 'usagePeriod', 'usageLimit']
for field in required_fields:
if field not in original_license_data:
raise KeyError(field)
# 保存激活码
self.storage.save_license(self.device_id, activation_code, True)
# 返回重复激活的信息
if is_repeated:
return "repeated", "License already activated, usage count preserved"
return "new", "License activated successfully"
except KeyError as e:
raise Exception(f"Missing required field in activation code: {str(e)}")
except json.JSONDecodeError:
raise Exception("Invalid activation code format: not a valid JSON")
except Exception as e:
raise Exception(f"Invalid activation code format: {str(e)}")
def get_license_info(self, activation_code=None):
"""Extract license information from activation code"""
# 如果禁用许可证验证,返回模拟的许可证信息
if self.disable_license:
current_time = datetime.now()
future_time = datetime.fromtimestamp(current_time.timestamp() + 365 * 24 * 60 * 60) # 一年后
return {
'device_id': self.device_id,
'generated_at': current_time.strftime('%Y-%m-%d %H:%M:%S'),
'activated_at': current_time.strftime('%Y-%m-%d %H:%M:%S'),
'activation_deadline': future_time.strftime('%Y-%m-%d %H:%M:%S'),
'expiration_date': future_time.strftime('%Y-%m-%d %H:%M:%S'),
'activation_period': "365.0 days",
'usage_period': "365.0 days",
'is_activated': True,
'is_expired': False,
'status': 'Active',
'usage_count': 0,
'usage_limit': 999999,
'usage_remaining': 999999,
'license_version': int(current_time.timestamp() * 1000)
}
# 如果没有提供激活码,从数据库加载
if activation_code is None:
activation_code, usage_count, activated_at = self.storage.load_license(self.device_id)
if not activation_code:
return None
else:
# 如果提供了激活码,使用当前数据库中的使用次数和激活时间
_, usage_count, activated_at = self.storage.load_license(self.device_id)
try:
decoded_data = json.loads(base64.b64decode(activation_code))
license_data = decoded_data['license']
# Convert timestamps to readable format
generated_at = datetime.fromtimestamp(license_data['generatedAt'] / 1000)
activated_at_time = datetime.fromtimestamp(activated_at / 1000) if activated_at else None
# 计算激活截止时间
activation_deadline = datetime.fromtimestamp(
(license_data['generatedAt'] + license_data['activationPeriod']) / 1000
)
# 计算使用期限
usage_expiration = None
if activated_at:
usage_expiration = datetime.fromtimestamp(
(activated_at + license_data['usagePeriod']) / 1000
)
current_time = datetime.now().timestamp() * 1000
is_expired = (usage_expiration and current_time > usage_expiration.timestamp() * 1000)
return {
'device_id': license_data['deviceId'],
'generated_at': generated_at.strftime('%Y-%m-%d %H:%M:%S'),
'activated_at': activated_at_time.strftime('%Y-%m-%d %H:%M:%S') if activated_at_time else None,
'activation_deadline': activation_deadline.strftime('%Y-%m-%d %H:%M:%S'),
'expiration_date': usage_expiration.strftime('%Y-%m-%d %H:%M:%S') if usage_expiration else None,
'activation_period': f"{license_data['activationPeriod'] / (24 * 60 * 60 * 1000):.1f} days",
'usage_period': f"{license_data['usagePeriod'] / (24 * 60 * 60 * 1000):.1f} days",
'is_activated': activated_at is not None,
'is_expired': is_expired,
'status': self._get_license_status(license_data, activated_at, usage_count),
'usage_count': usage_count,
'usage_limit': license_data['usageLimit'],
'usage_remaining': license_data['usageLimit'] - usage_count,
'license_version': license_data['generatedAt']
}
except Exception as e:
return None
def verify_license(self, activation_code=None):
"""Verify the activation code"""
# 如果禁用许可证验证,直接返回成功
if self.disable_license:
return True, "License verification disabled"
# 从数据库加载许可证数据
if activation_code is None:
activation_code, usage_count, activated_at = self.storage.load_license(self.device_id)
if not activation_code:
return False, "No license found"
else:
# 如果提供了新的激活码,使用数据库中的使用次数和激活时间
_, usage_count, activated_at = self.storage.load_license(self.device_id)
try:
# Decode the activation code
decoded_data = json.loads(base64.b64decode(activation_code))
license_data = decoded_data['license']
signature = base64.b64decode(decoded_data['signature'])
# Verify device ID
if license_data['deviceId'] != self.device_id:
return False, "Invalid device ID (License is bound to a different device)"
# 验证签名(使用原始数据,确保与服务端一致的序列化)
message = json.dumps(license_data, separators=(',', ':'))
h = SHA256.new(message.encode('utf-8'))
try:
pkcs1_15.new(self.public_key).verify(h, signature)
except (ValueError, TypeError):
return False, "Invalid signature"
# 检查是否已激活
if not activated_at:
# 检查激活期限
current_time = datetime.now().timestamp() * 1000
activation_deadline = license_data['generatedAt'] + license_data['activationPeriod']
if current_time > activation_deadline:
return False, "Activation period has expired"
return False, "License not activated"
# 检查使用期限
current_time = datetime.now().timestamp() * 1000
usage_expiration = activated_at + license_data['usagePeriod']
if current_time > usage_expiration:
return False, "License has expired"
# Verify usage limit
if usage_count >= license_data['usageLimit']:
return False, "Usage limit exceeded"
return True, "License is valid"
except Exception as e:
return False, f"License verification failed: {str(e)}"
def _get_license_status(self, license_data, activated_at, usage_count):
"""Get detailed license status"""
current_time = datetime.now().timestamp() * 1000
if not activated_at:
# 检查激活期限
activation_deadline = license_data['generatedAt'] + license_data['activationPeriod']
if current_time > activation_deadline:
return 'Activation Period Expired'
return 'Not Activated'
usage_expiration = activated_at + license_data['usagePeriod']
if current_time > usage_expiration:
return 'Expired'
if usage_count >= license_data['usageLimit']:
return 'Usage Limit Exceeded'
return 'Active'
def check_license_usable(self):
"""Check if the license can be used"""
# 如果禁用许可证验证,直接返回成功
if self.disable_license:
return True, "License verification disabled"
# 从数据库加载许可证数据
activation_code, usage_count, activated_at = self.storage.load_license(self.device_id)
if not activation_code:
return False, "No license found"
try:
# 解码激活码
decoded_data = json.loads(base64.b64decode(activation_code))
license_data = decoded_data['license']
# 验证签名
message = json.dumps(license_data, separators=(',', ':'))
h = SHA256.new(message.encode('utf-8'))
try:
signature = base64.b64decode(decoded_data['signature'])
pkcs1_15.new(self.public_key).verify(h, signature)
except (ValueError, TypeError):
return False, "Invalid signature"
# 检查设备ID
if license_data['deviceId'] != self.device_id:
return False, "Invalid device ID"
# 检查是否已激活
if not activated_at:
# 检查激活期限
current_time = datetime.now().timestamp() * 1000
activation_deadline = license_data['generatedAt'] + license_data['activationPeriod']
if current_time > activation_deadline:
return False, "Activation period has expired"
return False, "License not activated"
# 检查使用期限
current_time = datetime.now().timestamp() * 1000
usage_expiration = activated_at + license_data['usagePeriod']
if current_time > usage_expiration:
return False, "License has expired"
# 检查使用次数
if usage_count >= license_data['usageLimit']:
return False, "Usage limit exceeded"
# 更新内存中的状态
self._activation_code = activation_code
self._usage_count = usage_count
self._activated_at = activated_at
return True, "License is valid and can be used"
except Exception as e:
return False, f"License verification failed: {str(e)}"
def set_license_verification(self, enable_verification):
"""设置是否启用许可证验证"""
if enable_verification:
# 创建验证文件,启用验证
try:
with open(VERIFICATION_STATUS_FILE, 'w') as f:
f.write(str(datetime.now()))
self.disable_license = False
return True, "许可证验证已启用"
except Exception as e:
return False, f"启用许可证验证失败: {str(e)}"
else:
# 删除验证文件,禁用验证
try:
if os.path.exists(VERIFICATION_STATUS_FILE):
os.remove(VERIFICATION_STATUS_FILE)
self.disable_license = True
return True, "许可证验证已禁用"
except Exception as e:
return False, f"禁用许可证验证失败: {str(e)}"
# Create Flask application
app = Flask(__name__)
license_client = None
@app.route('/api/license/activate', methods=['POST'])
def activate_license():
"""Activate license with provided activation code"""
if not request.is_json:
return jsonify({'error': 'Content-Type must be application/json'}), 400
data = request.get_json()
activation_code = data.get('activation_code')
if not activation_code:
return jsonify({'error': 'activation_code is required'}), 400
try:
# Set and verify the activation code
activation_type, message = license_client.set_activation_code(activation_code)
is_valid, verify_message = license_client.verify_license()
if not is_valid:
return jsonify({'error': verify_message}), 400
response = {
'activation_type': activation_type,
'message': message,
'license_info': license_client.get_license_info()
}
# 如果是重复激活,使用 202 Accepted 状态码
status_code = 202 if activation_type == "repeated" else 200
return jsonify(response), status_code
except Exception as e:
return jsonify({'error': str(e)}), 400
@app.route('/api/license/info', methods=['GET'])
def get_license_info():
"""Get current license information and status"""
license_info = license_client.get_license_info()
if license_info is None:
return jsonify({'error': 'No valid license found'}), 404
is_valid, message = license_client.verify_license()
license_info['is_valid'] = is_valid
license_info['message'] = message
return jsonify(license_info)
@app.route('/api/license/use', methods=['POST'])
def use_license():
"""Record a usage of the license"""
is_valid, message = license_client.increment_usage()
if not is_valid:
return jsonify({'error': message}), 400
return jsonify({
'message': message,
'license_info': license_client.get_license_info()
})
@app.route('/api/license/check', methods=['GET'])
def check_license():
"""Check if the license can be used"""
can_use, reason = license_client.check_license_usable()
return jsonify({
'can_use': can_use,
'reason': reason,
'license_info': license_client.get_license_info() if can_use else None
})
@app.route('/api/license/config', methods=['POST'])
def configure_license():
"""配置许可证验证状态"""
if not request.is_json:
return jsonify({'error': '内容类型必须为application/json'}), 400
data = request.get_json()
password = data.get('password')
enable_verification = data.get('enable_verification')
# 检查是否提供了所有必要参数
if password is None or enable_verification is None:
return jsonify({'error': '缺少必要参数'}), 400
# 验证密码
if password != LICENSE_ADMIN_PASSWORD:
return jsonify({'error': '密码错误'}), 403
# 设置验证状态
success, message = license_client.set_license_verification(enable_verification)
if success:
return jsonify({
'message': message,
'verification_enabled': not license_client.disable_license
})
else:
return jsonify({'error': message}), 500
def main():
global license_client
# Get the directory where the script is located
script_dir = os.path.dirname(os.path.abspath(__file__))
# Initialize client with its own directory根据验证文件状态决定是否启用验证
license_client = LicenseClient(script_dir)
# Print current device ID and verify existing license
print(f"Device ID (hostname): {license_client.device_id}")
# Check if we have a valid license
is_valid, message = license_client.verify_license()
if is_valid:
if license_client.disable_license:
print("许可证验证已禁用,所有许可证检查将通过")
else:
print("存储中找到有效许可证")
license_info = license_client.get_license_info()
print(f"许可证状态: {license_info['status']}")
print(f"到期日期: {license_info['expiration_date']}")
print(f"使用次数: {license_info['usage_count']}/{license_info['usage_limit']}")
else:
print(f"未找到有效许可证: {message}")
print("请使用/api/license/activate接口激活")
print("\n启动许可证验证服务器...")
# Run Flask application
app.run(host='0.0.0.0', port=5288)
if __name__ == "__main__":
main()