import json import base64 import platform import requests import sqlite3 import os import hashlib from Crypto.PublicKey import RSA from Crypto.Signature import pkcs1_15 from Crypto.Hash import SHA256 from datetime import datetime from flask import Flask, request, jsonify from cryptography.fernet import Fernet from pathlib import Path # 定义验证状态文件路径 VERIFICATION_STATUS_FILE = os.path.expanduser("~/.license_verification_enabled") # 设置密码(实际应用中应该使用更安全的方式存储) LICENSE_ADMIN_PASSWORD = "robotstorm@license" class LicenseStorage: def __init__(self, client_dir): self.client_dir = client_dir self.db_path = os.path.join(client_dir, ".license.db") self.cipher = None self._init_db_encryption() def _init_db_encryption(self): """Initialize database encryption with a device-specific key""" # Generate a device-specific encryption key device_info = { 'hostname': platform.node(), 'machine': platform.machine(), 'processor': platform.processor(), 'system': platform.system(), 'version': platform.version() } # Convert device info to a consistent string and create hash device_str = json.dumps(device_info, sort_keys=True) device_hash = hashlib.sha256(device_str.encode()).digest() # Create Fernet key (must be 32 bytes base64-encoded) key = base64.urlsafe_b64encode(device_hash) self.cipher = Fernet(key) def _init_db(self): """Initialize database""" with sqlite3.connect(self.db_path) as conn: conn.execute(''' CREATE TABLE IF NOT EXISTS license ( id INTEGER PRIMARY KEY, device_id TEXT UNIQUE, activation_code TEXT, activated_at INTEGER, usage_count INTEGER DEFAULT 0, license_version BIGINT ) ''') def save_license(self, device_id, activation_code, first_activation=False): """Save encrypted license data""" if not os.path.exists(self.db_path): self._init_db() try: # 解码激活码获取版本信息 decoded_data = json.loads(base64.b64decode(activation_code)) license_data = decoded_data['license'] new_version = license_data['generatedAt'] # 加密激活码 encrypted_code = self.cipher.encrypt(activation_code.encode()) # 获取当前许可证信息 current_activation_code, current_usage_count, current_activated_at = self.load_license(device_id) if current_activation_code: # 解码当前激活码获取版本信息 current_data = json.loads(base64.b64decode(current_activation_code)) current_version = current_data['license']['generatedAt'] # 如果是相同版本的许可证,保留使用次数 if current_version == new_version: usage_count = current_usage_count activated_at = current_activated_at else: # 如果是新版本的许可证,重置使用次数 usage_count = 0 activated_at = int(datetime.now().timestamp() * 1000) if first_activation else None else: # 首次激活 usage_count = 0 activated_at = int(datetime.now().timestamp() * 1000) if first_activation else None with sqlite3.connect(self.db_path) as conn: conn.execute(''' INSERT OR REPLACE INTO license (device_id, activation_code, activated_at, usage_count, license_version) VALUES (?, ?, ?, ?, ?) ''', (device_id, encrypted_code, activated_at, usage_count, new_version)) except Exception as e: print(f"Error saving license: {str(e)}") # 如果解析失败,使用默认值 encrypted_code = self.cipher.encrypt(activation_code.encode()) activated_at = int(datetime.now().timestamp() * 1000) if first_activation else None with sqlite3.connect(self.db_path) as conn: conn.execute(''' INSERT OR REPLACE INTO license (device_id, activation_code, activated_at, usage_count, license_version) VALUES (?, ?, ?, ?, ?) ''', (device_id, encrypted_code, activated_at, 0, 0)) def load_license(self, device_id): """Load and decrypt license data""" if not os.path.exists(self.db_path): return None, 0, None try: with sqlite3.connect(self.db_path) as conn: cursor = conn.execute(''' SELECT activation_code, usage_count, activated_at FROM license WHERE device_id = ? ''', (device_id,)) result = cursor.fetchone() if result: encrypted_code, usage_count, activated_at = result try: decrypted_code = self.cipher.decrypt(encrypted_code) return decrypted_code.decode(), usage_count, activated_at except Exception: return None, 0, None return None, 0, None except sqlite3.Error: return None, 0, None def increment_usage(self, device_id): """Increment usage count for the device""" try: with sqlite3.connect(self.db_path) as conn: cursor = conn.execute(''' UPDATE license SET usage_count = usage_count + 1 WHERE device_id = ? RETURNING usage_count ''', (device_id,)) result = cursor.fetchone() return result[0] if result else 0 except sqlite3.Error: return 0 class LicenseClient: def __init__(self, client_dir, disable_license=None): self.client_dir = client_dir # 如果没有指定 disable_license,则从文件状态确定 if disable_license is None: self.disable_license = not os.path.exists(VERIFICATION_STATUS_FILE) else: self.disable_license = disable_license # Ensure client directory exists os.makedirs(client_dir, exist_ok=True) # Load public key from client directory public_key_path = os.path.join(client_dir, 'public.pem') with open(public_key_path, 'r') as f: self.public_key = RSA.import_key(f.read()) # Get device ID from hostname self.device_id = self._get_device_id() # Initialize storage self.storage = LicenseStorage(client_dir) # Initialize activation code and encryption key as None self._activation_code = None self._encryption_key = None self._usage_count = 0 self._activated_at = None # Try to load existing license self._load_existing_license() def _get_device_id(self): """Get device ID from hostname""" return platform.node() def _load_existing_license(self): """Try to load and verify existing license""" try: # 只验证许可证的有效性,不更新内存状态 activation_code, _, _ = self.storage.load_license(self.device_id) if activation_code: is_valid, _ = self.verify_license() if not is_valid: # 无效许可证的情况下,不需要任何操作 # 因为所有验证都会从数据库重新加载 pass except: # 出错时不需要任何操作 pass def increment_usage(self): """Increment usage count and verify license""" # 如果禁用许可证验证,直接返回成功 if self.disable_license: return True, "License verification disabled" # 从数据库加载最新状态 activation_code, usage_count, activated_at = self.storage.load_license(self.device_id) if not activation_code: return False, "No valid license found" # First verify the license is_valid, message = self.verify_license(activation_code) if not is_valid: return False, message # Get current license data decoded_data = json.loads(base64.b64decode(activation_code)) license_data = decoded_data['license'] # Check usage limit if usage_count >= license_data['usageLimit']: return False, "Usage limit exceeded" # Increment usage count new_count = self.storage.increment_usage(self.device_id) return True, f"Usage count: {new_count}/{license_data['usageLimit']}" def get_activation_code(self, server_url="http://localhost:3000"): """Request activation code from server""" response = requests.post( f"{server_url}/generate-license", json={"deviceId": self.device_id} ) if response.status_code == 200: self._activation_code = response.json()['activationCode'] # Save to encrypted storage self.storage.save_license(self.device_id, self._activation_code) self._usage_count = 0 return self._activation_code else: raise Exception(f"Failed to get activation code: {response.text}") def set_activation_code(self, activation_code): """Set activation code and save to storage""" try: # 验证激活码格式和签名 decoded_data = json.loads(base64.b64decode(activation_code)) original_license_data = decoded_data['license'] original_signature = base64.b64decode(decoded_data['signature']) # 首先验证原始数据的签名 # 确保JSON序列化方式与服务端一致 message = json.dumps(original_license_data, separators=(',', ':')) h = SHA256.new(message.encode('utf-8')) try: pkcs1_15.new(self.public_key).verify(h, original_signature) except (ValueError, TypeError): raise Exception("Invalid signature") # 获取当前许可证信息 current_activation_code, current_usage_count, current_activated_at = self.storage.load_license(self.device_id) # 检查是否是重复激活 is_repeated = False if current_activation_code: current_data = json.loads(base64.b64decode(current_activation_code)) current_version = current_data['license']['generatedAt'] new_version = original_license_data['generatedAt'] # 如果是相同版本的许可证 if current_version == new_version: is_repeated = True # 如果新激活码的生成时间早于当前激活码,拒绝激活 elif new_version < current_version: raise Exception("Cannot activate with an older license") # 检查必需的字段 required_fields = ['deviceId', 'generatedAt', 'activationPeriod', 'usagePeriod', 'usageLimit'] for field in required_fields: if field not in original_license_data: raise KeyError(field) # 保存激活码 self.storage.save_license(self.device_id, activation_code, True) # 返回重复激活的信息 if is_repeated: return "repeated", "License already activated, usage count preserved" return "new", "License activated successfully" except KeyError as e: raise Exception(f"Missing required field in activation code: {str(e)}") except json.JSONDecodeError: raise Exception("Invalid activation code format: not a valid JSON") except Exception as e: raise Exception(f"Invalid activation code format: {str(e)}") def get_license_info(self, activation_code=None): """Extract license information from activation code""" # 如果禁用许可证验证,返回模拟的许可证信息 if self.disable_license: current_time = datetime.now() future_time = datetime.fromtimestamp(current_time.timestamp() + 365 * 24 * 60 * 60) # 一年后 return { 'device_id': self.device_id, 'generated_at': current_time.strftime('%Y-%m-%d %H:%M:%S'), 'activated_at': current_time.strftime('%Y-%m-%d %H:%M:%S'), 'activation_deadline': future_time.strftime('%Y-%m-%d %H:%M:%S'), 'expiration_date': future_time.strftime('%Y-%m-%d %H:%M:%S'), 'activation_period': "365.0 days", 'usage_period': "365.0 days", 'is_activated': True, 'is_expired': False, 'status': 'Active', 'usage_count': 0, 'usage_limit': 999999, 'usage_remaining': 999999, 'license_version': int(current_time.timestamp() * 1000) } # 如果没有提供激活码,从数据库加载 if activation_code is None: activation_code, usage_count, activated_at = self.storage.load_license(self.device_id) if not activation_code: return None else: # 如果提供了激活码,使用当前数据库中的使用次数和激活时间 _, usage_count, activated_at = self.storage.load_license(self.device_id) try: decoded_data = json.loads(base64.b64decode(activation_code)) license_data = decoded_data['license'] # Convert timestamps to readable format generated_at = datetime.fromtimestamp(license_data['generatedAt'] / 1000) activated_at_time = datetime.fromtimestamp(activated_at / 1000) if activated_at else None # 计算激活截止时间 activation_deadline = datetime.fromtimestamp( (license_data['generatedAt'] + license_data['activationPeriod']) / 1000 ) # 计算使用期限 usage_expiration = None if activated_at: usage_expiration = datetime.fromtimestamp( (activated_at + license_data['usagePeriod']) / 1000 ) current_time = datetime.now().timestamp() * 1000 is_expired = (usage_expiration and current_time > usage_expiration.timestamp() * 1000) return { 'device_id': license_data['deviceId'], 'generated_at': generated_at.strftime('%Y-%m-%d %H:%M:%S'), 'activated_at': activated_at_time.strftime('%Y-%m-%d %H:%M:%S') if activated_at_time else None, 'activation_deadline': activation_deadline.strftime('%Y-%m-%d %H:%M:%S'), 'expiration_date': usage_expiration.strftime('%Y-%m-%d %H:%M:%S') if usage_expiration else None, 'activation_period': f"{license_data['activationPeriod'] / (24 * 60 * 60 * 1000):.1f} days", 'usage_period': f"{license_data['usagePeriod'] / (24 * 60 * 60 * 1000):.1f} days", 'is_activated': activated_at is not None, 'is_expired': is_expired, 'status': self._get_license_status(license_data, activated_at, usage_count), 'usage_count': usage_count, 'usage_limit': license_data['usageLimit'], 'usage_remaining': license_data['usageLimit'] - usage_count, 'license_version': license_data['generatedAt'] } except Exception as e: return None def verify_license(self, activation_code=None): """Verify the activation code""" # 如果禁用许可证验证,直接返回成功 if self.disable_license: return True, "License verification disabled" # 从数据库加载许可证数据 if activation_code is None: activation_code, usage_count, activated_at = self.storage.load_license(self.device_id) if not activation_code: return False, "No license found" else: # 如果提供了新的激活码,使用数据库中的使用次数和激活时间 _, usage_count, activated_at = self.storage.load_license(self.device_id) try: # Decode the activation code decoded_data = json.loads(base64.b64decode(activation_code)) license_data = decoded_data['license'] signature = base64.b64decode(decoded_data['signature']) # Verify device ID if license_data['deviceId'] != self.device_id: return False, "Invalid device ID (License is bound to a different device)" # 验证签名(使用原始数据,确保与服务端一致的序列化) message = json.dumps(license_data, separators=(',', ':')) h = SHA256.new(message.encode('utf-8')) try: pkcs1_15.new(self.public_key).verify(h, signature) except (ValueError, TypeError): return False, "Invalid signature" # 检查是否已激活 if not activated_at: # 检查激活期限 current_time = datetime.now().timestamp() * 1000 activation_deadline = license_data['generatedAt'] + license_data['activationPeriod'] if current_time > activation_deadline: return False, "Activation period has expired" return False, "License not activated" # 检查使用期限 current_time = datetime.now().timestamp() * 1000 usage_expiration = activated_at + license_data['usagePeriod'] if current_time > usage_expiration: return False, "License has expired" # Verify usage limit if usage_count >= license_data['usageLimit']: return False, "Usage limit exceeded" return True, "License is valid" except Exception as e: return False, f"License verification failed: {str(e)}" def _get_license_status(self, license_data, activated_at, usage_count): """Get detailed license status""" current_time = datetime.now().timestamp() * 1000 if not activated_at: # 检查激活期限 activation_deadline = license_data['generatedAt'] + license_data['activationPeriod'] if current_time > activation_deadline: return 'Activation Period Expired' return 'Not Activated' usage_expiration = activated_at + license_data['usagePeriod'] if current_time > usage_expiration: return 'Expired' if usage_count >= license_data['usageLimit']: return 'Usage Limit Exceeded' return 'Active' def check_license_usable(self): """Check if the license can be used""" # 如果禁用许可证验证,直接返回成功 if self.disable_license: return True, "License verification disabled" # 从数据库加载许可证数据 activation_code, usage_count, activated_at = self.storage.load_license(self.device_id) if not activation_code: return False, "No license found" try: # 解码激活码 decoded_data = json.loads(base64.b64decode(activation_code)) license_data = decoded_data['license'] # 验证签名 message = json.dumps(license_data, separators=(',', ':')) h = SHA256.new(message.encode('utf-8')) try: signature = base64.b64decode(decoded_data['signature']) pkcs1_15.new(self.public_key).verify(h, signature) except (ValueError, TypeError): return False, "Invalid signature" # 检查设备ID if license_data['deviceId'] != self.device_id: return False, "Invalid device ID" # 检查是否已激活 if not activated_at: # 检查激活期限 current_time = datetime.now().timestamp() * 1000 activation_deadline = license_data['generatedAt'] + license_data['activationPeriod'] if current_time > activation_deadline: return False, "Activation period has expired" return False, "License not activated" # 检查使用期限 current_time = datetime.now().timestamp() * 1000 usage_expiration = activated_at + license_data['usagePeriod'] if current_time > usage_expiration: return False, "License has expired" # 检查使用次数 if usage_count >= license_data['usageLimit']: return False, "Usage limit exceeded" # 更新内存中的状态 self._activation_code = activation_code self._usage_count = usage_count self._activated_at = activated_at return True, "License is valid and can be used" except Exception as e: return False, f"License verification failed: {str(e)}" def set_license_verification(self, enable_verification): """设置是否启用许可证验证""" if enable_verification: # 创建验证文件,启用验证 try: with open(VERIFICATION_STATUS_FILE, 'w') as f: f.write(str(datetime.now())) self.disable_license = False return True, "许可证验证已启用" except Exception as e: return False, f"启用许可证验证失败: {str(e)}" else: # 删除验证文件,禁用验证 try: if os.path.exists(VERIFICATION_STATUS_FILE): os.remove(VERIFICATION_STATUS_FILE) self.disable_license = True return True, "许可证验证已禁用" except Exception as e: return False, f"禁用许可证验证失败: {str(e)}" # Create Flask application app = Flask(__name__) license_client = None @app.route('/api/license/activate', methods=['POST']) def activate_license(): """Activate license with provided activation code""" if not request.is_json: return jsonify({'error': 'Content-Type must be application/json'}), 400 data = request.get_json() activation_code = data.get('activation_code') if not activation_code: return jsonify({'error': 'activation_code is required'}), 400 try: # Set and verify the activation code activation_type, message = license_client.set_activation_code(activation_code) is_valid, verify_message = license_client.verify_license() if not is_valid: return jsonify({'error': verify_message}), 400 response = { 'activation_type': activation_type, 'message': message, 'license_info': license_client.get_license_info() } # 如果是重复激活,使用 202 Accepted 状态码 status_code = 202 if activation_type == "repeated" else 200 return jsonify(response), status_code except Exception as e: return jsonify({'error': str(e)}), 400 @app.route('/api/license/info', methods=['GET']) def get_license_info(): """Get current license information and status""" license_info = license_client.get_license_info() if license_info is None: return jsonify({'error': 'No valid license found'}), 404 is_valid, message = license_client.verify_license() license_info['is_valid'] = is_valid license_info['message'] = message return jsonify(license_info) @app.route('/api/license/use', methods=['POST']) def use_license(): """Record a usage of the license""" is_valid, message = license_client.increment_usage() if not is_valid: return jsonify({'error': message}), 400 return jsonify({ 'message': message, 'license_info': license_client.get_license_info() }) @app.route('/api/license/check', methods=['GET']) def check_license(): """Check if the license can be used""" can_use, reason = license_client.check_license_usable() return jsonify({ 'can_use': can_use, 'reason': reason, 'license_info': license_client.get_license_info() if can_use else None }) @app.route('/api/license/config', methods=['POST']) def configure_license(): """配置许可证验证状态""" if not request.is_json: return jsonify({'error': '内容类型必须为application/json'}), 400 data = request.get_json() password = data.get('password') enable_verification = data.get('enable_verification') # 检查是否提供了所有必要参数 if password is None or enable_verification is None: return jsonify({'error': '缺少必要参数'}), 400 # 验证密码 if password != LICENSE_ADMIN_PASSWORD: return jsonify({'error': '密码错误'}), 403 # 设置验证状态 success, message = license_client.set_license_verification(enable_verification) if success: return jsonify({ 'message': message, 'verification_enabled': not license_client.disable_license }) else: return jsonify({'error': message}), 500 def main(): global license_client # Get the directory where the script is located script_dir = os.path.dirname(os.path.abspath(__file__)) # Initialize client with its own directory,根据验证文件状态决定是否启用验证 license_client = LicenseClient(script_dir) # Print current device ID and verify existing license print(f"Device ID (hostname): {license_client.device_id}") # Check if we have a valid license is_valid, message = license_client.verify_license() if is_valid: if license_client.disable_license: print("许可证验证已禁用,所有许可证检查将通过") else: print("存储中找到有效许可证") license_info = license_client.get_license_info() print(f"许可证状态: {license_info['status']}") print(f"到期日期: {license_info['expiration_date']}") print(f"使用次数: {license_info['usage_count']}/{license_info['usage_limit']}") else: print(f"未找到有效许可证: {message}") print("请使用/api/license/activate接口激活") print("\n启动许可证验证服务器...") # Run Flask application app.run(host='0.0.0.0', port=5288) if __name__ == "__main__": main()