662 lines
27 KiB
Python
662 lines
27 KiB
Python
import json
|
||
import base64
|
||
import platform
|
||
import requests
|
||
import sqlite3
|
||
import os
|
||
import hashlib
|
||
from Crypto.PublicKey import RSA
|
||
from Crypto.Signature import pkcs1_15
|
||
from Crypto.Hash import SHA256
|
||
from datetime import datetime
|
||
from flask import Flask, request, jsonify
|
||
from cryptography.fernet import Fernet
|
||
from pathlib import Path
|
||
|
||
# 定义验证状态文件路径
|
||
VERIFICATION_STATUS_FILE = os.path.expanduser("~/.license_verification_enabled")
|
||
|
||
# 设置密码(实际应用中应该使用更安全的方式存储)
|
||
LICENSE_ADMIN_PASSWORD = "robotstorm@license"
|
||
|
||
class LicenseStorage:
|
||
def __init__(self, client_dir):
|
||
self.client_dir = client_dir
|
||
self.db_path = os.path.join(client_dir, ".license.db")
|
||
self.cipher = None
|
||
self._init_db_encryption()
|
||
|
||
def _init_db_encryption(self):
|
||
"""Initialize database encryption with a device-specific key"""
|
||
# Generate a device-specific encryption key
|
||
device_info = {
|
||
'hostname': platform.node(),
|
||
'machine': platform.machine(),
|
||
'processor': platform.processor(),
|
||
'system': platform.system(),
|
||
'version': platform.version()
|
||
}
|
||
|
||
# Convert device info to a consistent string and create hash
|
||
device_str = json.dumps(device_info, sort_keys=True)
|
||
device_hash = hashlib.sha256(device_str.encode()).digest()
|
||
|
||
# Create Fernet key (must be 32 bytes base64-encoded)
|
||
key = base64.urlsafe_b64encode(device_hash)
|
||
self.cipher = Fernet(key)
|
||
|
||
def _init_db(self):
|
||
"""Initialize database"""
|
||
with sqlite3.connect(self.db_path) as conn:
|
||
conn.execute('''
|
||
CREATE TABLE IF NOT EXISTS license (
|
||
id INTEGER PRIMARY KEY,
|
||
device_id TEXT UNIQUE,
|
||
activation_code TEXT,
|
||
activated_at INTEGER,
|
||
usage_count INTEGER DEFAULT 0,
|
||
license_version BIGINT
|
||
)
|
||
''')
|
||
|
||
def save_license(self, device_id, activation_code, first_activation=False):
|
||
"""Save encrypted license data"""
|
||
if not os.path.exists(self.db_path):
|
||
self._init_db()
|
||
|
||
try:
|
||
# 解码激活码获取版本信息
|
||
decoded_data = json.loads(base64.b64decode(activation_code))
|
||
license_data = decoded_data['license']
|
||
new_version = license_data['generatedAt']
|
||
|
||
# 加密激活码
|
||
encrypted_code = self.cipher.encrypt(activation_code.encode())
|
||
|
||
# 获取当前许可证信息
|
||
current_activation_code, current_usage_count, current_activated_at = self.load_license(device_id)
|
||
|
||
if current_activation_code:
|
||
# 解码当前激活码获取版本信息
|
||
current_data = json.loads(base64.b64decode(current_activation_code))
|
||
current_version = current_data['license']['generatedAt']
|
||
|
||
# 如果是相同版本的许可证,保留使用次数
|
||
if current_version == new_version:
|
||
usage_count = current_usage_count
|
||
activated_at = current_activated_at
|
||
else:
|
||
# 如果是新版本的许可证,重置使用次数
|
||
usage_count = 0
|
||
activated_at = int(datetime.now().timestamp() * 1000) if first_activation else None
|
||
else:
|
||
# 首次激活
|
||
usage_count = 0
|
||
activated_at = int(datetime.now().timestamp() * 1000) if first_activation else None
|
||
|
||
with sqlite3.connect(self.db_path) as conn:
|
||
conn.execute('''
|
||
INSERT OR REPLACE INTO license
|
||
(device_id, activation_code, activated_at, usage_count, license_version)
|
||
VALUES (?, ?, ?, ?, ?)
|
||
''', (device_id, encrypted_code, activated_at, usage_count, new_version))
|
||
|
||
except Exception as e:
|
||
print(f"Error saving license: {str(e)}")
|
||
# 如果解析失败,使用默认值
|
||
encrypted_code = self.cipher.encrypt(activation_code.encode())
|
||
activated_at = int(datetime.now().timestamp() * 1000) if first_activation else None
|
||
with sqlite3.connect(self.db_path) as conn:
|
||
conn.execute('''
|
||
INSERT OR REPLACE INTO license
|
||
(device_id, activation_code, activated_at, usage_count, license_version)
|
||
VALUES (?, ?, ?, ?, ?)
|
||
''', (device_id, encrypted_code, activated_at, 0, 0))
|
||
|
||
def load_license(self, device_id):
|
||
"""Load and decrypt license data"""
|
||
if not os.path.exists(self.db_path):
|
||
return None, 0, None
|
||
|
||
try:
|
||
with sqlite3.connect(self.db_path) as conn:
|
||
cursor = conn.execute('''
|
||
SELECT activation_code, usage_count, activated_at FROM license WHERE device_id = ?
|
||
''', (device_id,))
|
||
result = cursor.fetchone()
|
||
if result:
|
||
encrypted_code, usage_count, activated_at = result
|
||
try:
|
||
decrypted_code = self.cipher.decrypt(encrypted_code)
|
||
return decrypted_code.decode(), usage_count, activated_at
|
||
except Exception:
|
||
return None, 0, None
|
||
return None, 0, None
|
||
except sqlite3.Error:
|
||
return None, 0, None
|
||
|
||
def increment_usage(self, device_id):
|
||
"""Increment usage count for the device"""
|
||
try:
|
||
with sqlite3.connect(self.db_path) as conn:
|
||
cursor = conn.execute('''
|
||
UPDATE license
|
||
SET usage_count = usage_count + 1
|
||
WHERE device_id = ?
|
||
RETURNING usage_count
|
||
''', (device_id,))
|
||
result = cursor.fetchone()
|
||
return result[0] if result else 0
|
||
except sqlite3.Error:
|
||
return 0
|
||
|
||
class LicenseClient:
|
||
def __init__(self, client_dir, disable_license=None):
|
||
self.client_dir = client_dir
|
||
|
||
# 如果没有指定 disable_license,则从文件状态确定
|
||
if disable_license is None:
|
||
self.disable_license = not os.path.exists(VERIFICATION_STATUS_FILE)
|
||
else:
|
||
self.disable_license = disable_license
|
||
|
||
# Ensure client directory exists
|
||
os.makedirs(client_dir, exist_ok=True)
|
||
|
||
# Load public key from client directory
|
||
public_key_path = os.path.join(client_dir, 'public.pem')
|
||
with open(public_key_path, 'r') as f:
|
||
self.public_key = RSA.import_key(f.read())
|
||
|
||
# Get device ID from hostname
|
||
self.device_id = self._get_device_id()
|
||
# Initialize storage
|
||
self.storage = LicenseStorage(client_dir)
|
||
# Initialize activation code and encryption key as None
|
||
self._activation_code = None
|
||
self._encryption_key = None
|
||
self._usage_count = 0
|
||
self._activated_at = None
|
||
# Try to load existing license
|
||
self._load_existing_license()
|
||
|
||
def _get_device_id(self):
|
||
"""Get device ID from hostname"""
|
||
return platform.node()
|
||
|
||
def _load_existing_license(self):
|
||
"""Try to load and verify existing license"""
|
||
try:
|
||
# 只验证许可证的有效性,不更新内存状态
|
||
activation_code, _, _ = self.storage.load_license(self.device_id)
|
||
if activation_code:
|
||
is_valid, _ = self.verify_license()
|
||
if not is_valid:
|
||
# 无效许可证的情况下,不需要任何操作
|
||
# 因为所有验证都会从数据库重新加载
|
||
pass
|
||
except:
|
||
# 出错时不需要任何操作
|
||
pass
|
||
|
||
def increment_usage(self):
|
||
"""Increment usage count and verify license"""
|
||
# 如果禁用许可证验证,直接返回成功
|
||
if self.disable_license:
|
||
return True, "License verification disabled"
|
||
|
||
# 从数据库加载最新状态
|
||
activation_code, usage_count, activated_at = self.storage.load_license(self.device_id)
|
||
if not activation_code:
|
||
return False, "No valid license found"
|
||
|
||
# First verify the license
|
||
is_valid, message = self.verify_license(activation_code)
|
||
if not is_valid:
|
||
return False, message
|
||
|
||
# Get current license data
|
||
decoded_data = json.loads(base64.b64decode(activation_code))
|
||
license_data = decoded_data['license']
|
||
|
||
# Check usage limit
|
||
if usage_count >= license_data['usageLimit']:
|
||
return False, "Usage limit exceeded"
|
||
|
||
# Increment usage count
|
||
new_count = self.storage.increment_usage(self.device_id)
|
||
|
||
return True, f"Usage count: {new_count}/{license_data['usageLimit']}"
|
||
|
||
def get_activation_code(self, server_url="http://localhost:3000"):
|
||
"""Request activation code from server"""
|
||
response = requests.post(
|
||
f"{server_url}/generate-license",
|
||
json={"deviceId": self.device_id}
|
||
)
|
||
if response.status_code == 200:
|
||
self._activation_code = response.json()['activationCode']
|
||
# Save to encrypted storage
|
||
self.storage.save_license(self.device_id, self._activation_code)
|
||
self._usage_count = 0
|
||
return self._activation_code
|
||
else:
|
||
raise Exception(f"Failed to get activation code: {response.text}")
|
||
|
||
def set_activation_code(self, activation_code):
|
||
"""Set activation code and save to storage"""
|
||
try:
|
||
# 验证激活码格式和签名
|
||
decoded_data = json.loads(base64.b64decode(activation_code))
|
||
original_license_data = decoded_data['license']
|
||
original_signature = base64.b64decode(decoded_data['signature'])
|
||
|
||
# 首先验证原始数据的签名
|
||
# 确保JSON序列化方式与服务端一致
|
||
message = json.dumps(original_license_data, separators=(',', ':'))
|
||
h = SHA256.new(message.encode('utf-8'))
|
||
try:
|
||
pkcs1_15.new(self.public_key).verify(h, original_signature)
|
||
except (ValueError, TypeError):
|
||
raise Exception("Invalid signature")
|
||
|
||
# 获取当前许可证信息
|
||
current_activation_code, current_usage_count, current_activated_at = self.storage.load_license(self.device_id)
|
||
|
||
# 检查是否是重复激活
|
||
is_repeated = False
|
||
if current_activation_code:
|
||
current_data = json.loads(base64.b64decode(current_activation_code))
|
||
current_version = current_data['license']['generatedAt']
|
||
new_version = original_license_data['generatedAt']
|
||
|
||
# 如果是相同版本的许可证
|
||
if current_version == new_version:
|
||
is_repeated = True
|
||
# 如果新激活码的生成时间早于当前激活码,拒绝激活
|
||
elif new_version < current_version:
|
||
raise Exception("Cannot activate with an older license")
|
||
|
||
# 检查必需的字段
|
||
required_fields = ['deviceId', 'generatedAt', 'activationPeriod', 'usagePeriod', 'usageLimit']
|
||
for field in required_fields:
|
||
if field not in original_license_data:
|
||
raise KeyError(field)
|
||
|
||
# 保存激活码
|
||
self.storage.save_license(self.device_id, activation_code, True)
|
||
|
||
# 返回重复激活的信息
|
||
if is_repeated:
|
||
return "repeated", "License already activated, usage count preserved"
|
||
return "new", "License activated successfully"
|
||
|
||
except KeyError as e:
|
||
raise Exception(f"Missing required field in activation code: {str(e)}")
|
||
except json.JSONDecodeError:
|
||
raise Exception("Invalid activation code format: not a valid JSON")
|
||
except Exception as e:
|
||
raise Exception(f"Invalid activation code format: {str(e)}")
|
||
|
||
def get_license_info(self, activation_code=None):
|
||
"""Extract license information from activation code"""
|
||
# 如果禁用许可证验证,返回模拟的许可证信息
|
||
if self.disable_license:
|
||
current_time = datetime.now()
|
||
future_time = datetime.fromtimestamp(current_time.timestamp() + 365 * 24 * 60 * 60) # 一年后
|
||
return {
|
||
'device_id': self.device_id,
|
||
'generated_at': current_time.strftime('%Y-%m-%d %H:%M:%S'),
|
||
'activated_at': current_time.strftime('%Y-%m-%d %H:%M:%S'),
|
||
'activation_deadline': future_time.strftime('%Y-%m-%d %H:%M:%S'),
|
||
'expiration_date': future_time.strftime('%Y-%m-%d %H:%M:%S'),
|
||
'activation_period': "365.0 days",
|
||
'usage_period': "365.0 days",
|
||
'is_activated': True,
|
||
'is_expired': False,
|
||
'status': 'Active',
|
||
'usage_count': 0,
|
||
'usage_limit': 999999,
|
||
'usage_remaining': 999999,
|
||
'license_version': int(current_time.timestamp() * 1000)
|
||
}
|
||
|
||
# 如果没有提供激活码,从数据库加载
|
||
if activation_code is None:
|
||
activation_code, usage_count, activated_at = self.storage.load_license(self.device_id)
|
||
if not activation_code:
|
||
return None
|
||
else:
|
||
# 如果提供了激活码,使用当前数据库中的使用次数和激活时间
|
||
_, usage_count, activated_at = self.storage.load_license(self.device_id)
|
||
|
||
try:
|
||
decoded_data = json.loads(base64.b64decode(activation_code))
|
||
license_data = decoded_data['license']
|
||
|
||
# Convert timestamps to readable format
|
||
generated_at = datetime.fromtimestamp(license_data['generatedAt'] / 1000)
|
||
activated_at_time = datetime.fromtimestamp(activated_at / 1000) if activated_at else None
|
||
|
||
# 计算激活截止时间
|
||
activation_deadline = datetime.fromtimestamp(
|
||
(license_data['generatedAt'] + license_data['activationPeriod']) / 1000
|
||
)
|
||
|
||
# 计算使用期限
|
||
usage_expiration = None
|
||
if activated_at:
|
||
usage_expiration = datetime.fromtimestamp(
|
||
(activated_at + license_data['usagePeriod']) / 1000
|
||
)
|
||
|
||
current_time = datetime.now().timestamp() * 1000
|
||
is_expired = (usage_expiration and current_time > usage_expiration.timestamp() * 1000)
|
||
|
||
return {
|
||
'device_id': license_data['deviceId'],
|
||
'generated_at': generated_at.strftime('%Y-%m-%d %H:%M:%S'),
|
||
'activated_at': activated_at_time.strftime('%Y-%m-%d %H:%M:%S') if activated_at_time else None,
|
||
'activation_deadline': activation_deadline.strftime('%Y-%m-%d %H:%M:%S'),
|
||
'expiration_date': usage_expiration.strftime('%Y-%m-%d %H:%M:%S') if usage_expiration else None,
|
||
'activation_period': f"{license_data['activationPeriod'] / (24 * 60 * 60 * 1000):.1f} days",
|
||
'usage_period': f"{license_data['usagePeriod'] / (24 * 60 * 60 * 1000):.1f} days",
|
||
'is_activated': activated_at is not None,
|
||
'is_expired': is_expired,
|
||
'status': self._get_license_status(license_data, activated_at, usage_count),
|
||
'usage_count': usage_count,
|
||
'usage_limit': license_data['usageLimit'],
|
||
'usage_remaining': license_data['usageLimit'] - usage_count,
|
||
'license_version': license_data['generatedAt']
|
||
}
|
||
except Exception as e:
|
||
return None
|
||
|
||
def verify_license(self, activation_code=None):
|
||
"""Verify the activation code"""
|
||
# 如果禁用许可证验证,直接返回成功
|
||
if self.disable_license:
|
||
return True, "License verification disabled"
|
||
|
||
# 从数据库加载许可证数据
|
||
if activation_code is None:
|
||
activation_code, usage_count, activated_at = self.storage.load_license(self.device_id)
|
||
if not activation_code:
|
||
return False, "No license found"
|
||
else:
|
||
# 如果提供了新的激活码,使用数据库中的使用次数和激活时间
|
||
_, usage_count, activated_at = self.storage.load_license(self.device_id)
|
||
|
||
try:
|
||
# Decode the activation code
|
||
decoded_data = json.loads(base64.b64decode(activation_code))
|
||
license_data = decoded_data['license']
|
||
signature = base64.b64decode(decoded_data['signature'])
|
||
|
||
# Verify device ID
|
||
if license_data['deviceId'] != self.device_id:
|
||
return False, "Invalid device ID (License is bound to a different device)"
|
||
|
||
# 验证签名(使用原始数据,确保与服务端一致的序列化)
|
||
message = json.dumps(license_data, separators=(',', ':'))
|
||
h = SHA256.new(message.encode('utf-8'))
|
||
try:
|
||
pkcs1_15.new(self.public_key).verify(h, signature)
|
||
except (ValueError, TypeError):
|
||
return False, "Invalid signature"
|
||
|
||
# 检查是否已激活
|
||
if not activated_at:
|
||
# 检查激活期限
|
||
current_time = datetime.now().timestamp() * 1000
|
||
activation_deadline = license_data['generatedAt'] + license_data['activationPeriod']
|
||
if current_time > activation_deadline:
|
||
return False, "Activation period has expired"
|
||
return False, "License not activated"
|
||
|
||
# 检查使用期限
|
||
current_time = datetime.now().timestamp() * 1000
|
||
usage_expiration = activated_at + license_data['usagePeriod']
|
||
if current_time > usage_expiration:
|
||
return False, "License has expired"
|
||
|
||
# Verify usage limit
|
||
if usage_count >= license_data['usageLimit']:
|
||
return False, "Usage limit exceeded"
|
||
|
||
return True, "License is valid"
|
||
|
||
except Exception as e:
|
||
return False, f"License verification failed: {str(e)}"
|
||
|
||
def _get_license_status(self, license_data, activated_at, usage_count):
|
||
"""Get detailed license status"""
|
||
current_time = datetime.now().timestamp() * 1000
|
||
|
||
if not activated_at:
|
||
# 检查激活期限
|
||
activation_deadline = license_data['generatedAt'] + license_data['activationPeriod']
|
||
if current_time > activation_deadline:
|
||
return 'Activation Period Expired'
|
||
return 'Not Activated'
|
||
|
||
usage_expiration = activated_at + license_data['usagePeriod']
|
||
if current_time > usage_expiration:
|
||
return 'Expired'
|
||
if usage_count >= license_data['usageLimit']:
|
||
return 'Usage Limit Exceeded'
|
||
return 'Active'
|
||
|
||
def check_license_usable(self):
|
||
"""Check if the license can be used"""
|
||
# 如果禁用许可证验证,直接返回成功
|
||
if self.disable_license:
|
||
return True, "License verification disabled"
|
||
|
||
# 从数据库加载许可证数据
|
||
activation_code, usage_count, activated_at = self.storage.load_license(self.device_id)
|
||
if not activation_code:
|
||
return False, "No license found"
|
||
|
||
try:
|
||
# 解码激活码
|
||
decoded_data = json.loads(base64.b64decode(activation_code))
|
||
license_data = decoded_data['license']
|
||
|
||
# 验证签名
|
||
message = json.dumps(license_data, separators=(',', ':'))
|
||
h = SHA256.new(message.encode('utf-8'))
|
||
try:
|
||
signature = base64.b64decode(decoded_data['signature'])
|
||
pkcs1_15.new(self.public_key).verify(h, signature)
|
||
except (ValueError, TypeError):
|
||
return False, "Invalid signature"
|
||
|
||
# 检查设备ID
|
||
if license_data['deviceId'] != self.device_id:
|
||
return False, "Invalid device ID"
|
||
|
||
# 检查是否已激活
|
||
if not activated_at:
|
||
# 检查激活期限
|
||
current_time = datetime.now().timestamp() * 1000
|
||
activation_deadline = license_data['generatedAt'] + license_data['activationPeriod']
|
||
if current_time > activation_deadline:
|
||
return False, "Activation period has expired"
|
||
return False, "License not activated"
|
||
|
||
# 检查使用期限
|
||
current_time = datetime.now().timestamp() * 1000
|
||
usage_expiration = activated_at + license_data['usagePeriod']
|
||
if current_time > usage_expiration:
|
||
return False, "License has expired"
|
||
|
||
# 检查使用次数
|
||
if usage_count >= license_data['usageLimit']:
|
||
return False, "Usage limit exceeded"
|
||
|
||
# 更新内存中的状态
|
||
self._activation_code = activation_code
|
||
self._usage_count = usage_count
|
||
self._activated_at = activated_at
|
||
|
||
return True, "License is valid and can be used"
|
||
|
||
except Exception as e:
|
||
return False, f"License verification failed: {str(e)}"
|
||
|
||
def set_license_verification(self, enable_verification):
|
||
"""设置是否启用许可证验证"""
|
||
if enable_verification:
|
||
# 创建验证文件,启用验证
|
||
try:
|
||
with open(VERIFICATION_STATUS_FILE, 'w') as f:
|
||
f.write(str(datetime.now()))
|
||
self.disable_license = False
|
||
return True, "许可证验证已启用"
|
||
except Exception as e:
|
||
return False, f"启用许可证验证失败: {str(e)}"
|
||
else:
|
||
# 删除验证文件,禁用验证
|
||
try:
|
||
if os.path.exists(VERIFICATION_STATUS_FILE):
|
||
os.remove(VERIFICATION_STATUS_FILE)
|
||
self.disable_license = True
|
||
return True, "许可证验证已禁用"
|
||
except Exception as e:
|
||
return False, f"禁用许可证验证失败: {str(e)}"
|
||
|
||
# Create Flask application
|
||
app = Flask(__name__)
|
||
license_client = None
|
||
|
||
@app.route('/api/license/activate', methods=['POST'])
|
||
def activate_license():
|
||
"""Activate license with provided activation code"""
|
||
if not request.is_json:
|
||
return jsonify({'error': 'Content-Type must be application/json'}), 400
|
||
|
||
data = request.get_json()
|
||
activation_code = data.get('activation_code')
|
||
|
||
if not activation_code:
|
||
return jsonify({'error': 'activation_code is required'}), 400
|
||
|
||
try:
|
||
# Set and verify the activation code
|
||
activation_type, message = license_client.set_activation_code(activation_code)
|
||
is_valid, verify_message = license_client.verify_license()
|
||
|
||
if not is_valid:
|
||
return jsonify({'error': verify_message}), 400
|
||
|
||
response = {
|
||
'activation_type': activation_type,
|
||
'message': message,
|
||
'license_info': license_client.get_license_info()
|
||
}
|
||
|
||
# 如果是重复激活,使用 202 Accepted 状态码
|
||
status_code = 202 if activation_type == "repeated" else 200
|
||
return jsonify(response), status_code
|
||
|
||
except Exception as e:
|
||
return jsonify({'error': str(e)}), 400
|
||
|
||
@app.route('/api/license/info', methods=['GET'])
|
||
def get_license_info():
|
||
"""Get current license information and status"""
|
||
license_info = license_client.get_license_info()
|
||
if license_info is None:
|
||
return jsonify({'error': 'No valid license found'}), 404
|
||
|
||
is_valid, message = license_client.verify_license()
|
||
license_info['is_valid'] = is_valid
|
||
license_info['message'] = message
|
||
|
||
return jsonify(license_info)
|
||
|
||
@app.route('/api/license/use', methods=['POST'])
|
||
def use_license():
|
||
"""Record a usage of the license"""
|
||
is_valid, message = license_client.increment_usage()
|
||
if not is_valid:
|
||
return jsonify({'error': message}), 400
|
||
|
||
return jsonify({
|
||
'message': message,
|
||
'license_info': license_client.get_license_info()
|
||
})
|
||
|
||
@app.route('/api/license/check', methods=['GET'])
|
||
def check_license():
|
||
"""Check if the license can be used"""
|
||
can_use, reason = license_client.check_license_usable()
|
||
return jsonify({
|
||
'can_use': can_use,
|
||
'reason': reason,
|
||
'license_info': license_client.get_license_info() if can_use else None
|
||
})
|
||
|
||
@app.route('/api/license/config', methods=['POST'])
|
||
def configure_license():
|
||
"""配置许可证验证状态"""
|
||
if not request.is_json:
|
||
return jsonify({'error': '内容类型必须为application/json'}), 400
|
||
|
||
data = request.get_json()
|
||
password = data.get('password')
|
||
enable_verification = data.get('enable_verification')
|
||
|
||
# 检查是否提供了所有必要参数
|
||
if password is None or enable_verification is None:
|
||
return jsonify({'error': '缺少必要参数'}), 400
|
||
|
||
# 验证密码
|
||
if password != LICENSE_ADMIN_PASSWORD:
|
||
return jsonify({'error': '密码错误'}), 403
|
||
|
||
# 设置验证状态
|
||
success, message = license_client.set_license_verification(enable_verification)
|
||
|
||
if success:
|
||
return jsonify({
|
||
'message': message,
|
||
'verification_enabled': not license_client.disable_license
|
||
})
|
||
else:
|
||
return jsonify({'error': message}), 500
|
||
|
||
def main():
|
||
global license_client
|
||
|
||
# Get the directory where the script is located
|
||
script_dir = os.path.dirname(os.path.abspath(__file__))
|
||
|
||
# Initialize client with its own directory,根据验证文件状态决定是否启用验证
|
||
license_client = LicenseClient(script_dir)
|
||
|
||
# Print current device ID and verify existing license
|
||
print(f"Device ID (hostname): {license_client.device_id}")
|
||
|
||
# Check if we have a valid license
|
||
is_valid, message = license_client.verify_license()
|
||
if is_valid:
|
||
if license_client.disable_license:
|
||
print("许可证验证已禁用,所有许可证检查将通过")
|
||
else:
|
||
print("存储中找到有效许可证")
|
||
license_info = license_client.get_license_info()
|
||
print(f"许可证状态: {license_info['status']}")
|
||
print(f"到期日期: {license_info['expiration_date']}")
|
||
print(f"使用次数: {license_info['usage_count']}/{license_info['usage_limit']}")
|
||
else:
|
||
print(f"未找到有效许可证: {message}")
|
||
print("请使用/api/license/activate接口激活")
|
||
|
||
print("\n启动许可证验证服务器...")
|
||
# Run Flask application
|
||
app.run(host='0.0.0.0', port=5288)
|
||
|
||
if __name__ == "__main__":
|
||
main() |