import paramiko import time import os import pexpect import atexit import socket import subprocess def execute_command_on_remote(host, username, password, command, port=22): # 创建SSH客户端 ssh = paramiko.SSHClient() ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) try: # 连接到远程服务器,指定端口 ssh.connect(hostname=host, port=port, username=username, password=password) # 执行命令 stdin, stdout, stderr = ssh.exec_command(command) # 获取命令输出和错误信息 output = stdout.read().decode('utf-8') error = stderr.read().decode('utf-8') return output, error finally: # 关闭SSH连接 ssh.close() def execute_command_on_remote_subprocess(host, username, password, command, port=22, timeout=30): # 使用 sshpass 来传递密码 ssh_command = [ "sshpass", "-p", password, # 使用提供的密码 "ssh", "-p", str(port), f"{username}@{host}", command ] try: # 执行命令并获取输出 result = subprocess.run(ssh_command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, timeout=timeout, text=True) # 获取命令输出和错误信息 output = result.stdout error = result.stderr return output, error except subprocess.TimeoutExpired: return None, "Command timed out" except Exception as e: return None, str(e) # # Ping 检查远程主机是否在线 # def is_host_down(host, timeout=60): # start_time = time.time() # while time.time() - start_time < timeout: # response = os.system(f"ping -c 1 {host}") # if response != 0: # ping 不通 # return True # time.sleep(5) # return False def is_host_down(host, port=22, timeout=20): def check_ssh_port(host, port): try: with socket.create_connection((host, port), timeout=1): return False # 端口开放,主机未关闭 except (socket.timeout, ConnectionRefusedError, OSError): return True # 端口不可连接,主机可能已关闭 start_time = time.time() while time.time() - start_time < timeout: if check_ssh_port(host, port): return True # 端口不可连接,主机可能已经关闭 time.sleep(0.1) return False # class ReverseSSH: # def __init__(self, server_ip, server_user, server_password, local_port=22, remote_port=2222): # self.server_ip = server_ip # self.server_user = server_user # self.server_password = server_password # self.local_port = local_port # self.remote_port = remote_port # self.tunnel_process = None # atexit.register(self.stop_tunnel) # def start_tunnel(self): # try: # # 构建反向隧道的 SSH 命令,去掉 '-f' # command = f"ssh -o StrictHostKeyChecking=no -N -R {self.remote_port}:localhost:{self.local_port} {self.server_user}@{self.server_ip}" # # 使用 pexpect 启动进程并等待密码提示 # self.tunnel_process = pexpect.spawn(command) # self.tunnel_process.expect("password:") # # 输入密码 # self.tunnel_process.sendline(self.server_password) # return {"status": "success", "message": "反向隧道已建立成功!"} # except Exception as e: # return {"status": "error", "message": str(e)} # def stop_tunnel(self): # if self.tunnel_process: # self.tunnel_process.terminate(force=True) # return {"status": "success", "message": "反向隧道已关闭。"} # return {"status": "error", "message": "隧道未启动,无法关闭。"} class ReverseSSH: def __init__(self, server_ip, server_user, server_password, local_port=22, max_retries=3, retry_interval=2): self.server_ip = server_ip self.server_user = server_user self.server_password = server_password self.local_port = local_port self.remote_port = None # 将在隧道建立后自动分配 self.tunnel_process = None self.max_retries = max_retries self.retry_interval = retry_interval atexit.register(self.stop_tunnel) def start_tunnel(self): for attempt in range(self.max_retries): print(f"Attempt {attempt + 1}/{self.max_retries} to establish reverse SSH tunnel") try: # 构建反向隧道的 autossh 命令,指定远程端口为0以自动分配 command = ( f"autossh -M 0 -o ServerAliveInterval=60 -o ServerAliveCountMax=3 " f"-o StrictHostKeyChecking=no -N -R 0:localhost:{self.local_port} {self.server_user}@{self.server_ip}" ) print(f"Reverse SSH command: {command}") # 使用 pexpect 启动进程并等待密码提示 self.tunnel_process = pexpect.spawn(command) self.tunnel_process.expect("password:") self.tunnel_process.sendline(self.server_password) # 获取服务器分配的端口 self.tunnel_process.expect(r"Allocated port (\d+)", timeout=5) self.remote_port = int(self.tunnel_process.match.group(1)) return {"status": "success", "message": f"反向隧道已成功建立在端口 {self.remote_port}!", "port": self.remote_port} except Exception as e: print(f"Failed to establish tunnel on attempt {attempt + 1}: {e}") self.stop_tunnel() # 确保关闭任何正在运行的进程 time.sleep(self.retry_interval) return {"status": "error", "message": "Failed to establish tunnel after multiple attempts."} def stop_tunnel(self): if self.tunnel_process: # 优雅地关闭隧道进程 self.tunnel_process.terminate() time.sleep(1) # 等待进程终止 # 检查是否完全关闭 if self.tunnel_process.isalive(): # 强制终止进程 self.tunnel_process.kill() time.sleep(1) # 等待进程完全释放资源 # 检查进程是否已经完全停止 if not self.tunnel_process.isalive(): self.tunnel_process = None return {"status": "success", "message": "反向隧道已成功关闭,端口已释放。"} else: return {"status": "error", "message": "反向隧道关闭失败,端口可能仍在占用。"} return {"status": "error", "message": "隧道未启动,无法关闭。"} def check_tunnel_status(self): """ 检查反向隧道的连接状态。 如果隧道进程仍在运行,返回连接正常的状态。 如果隧道进程已停止,返回连接断开的状态。 """ if self.tunnel_process and self.tunnel_process.isalive(): return {"status": "connected", "message": f"隧道仍然连接在端口 {self.remote_port}"} else: return {"status": "disconnected", "message": "隧道已断开"} if __name__ == '__main__': # SERVER_IP = "8.138.8.114" # 替换为您的服务器 IP # SERVER_USER = "root" # 替换为 root 用户 # SERVER_PASSWORD = "JuShenFengBao209" # 替换为 root 用户密码 # reverse_ssh = ReverseSSH(SERVER_IP, SERVER_USER, SERVER_PASSWORD) # result = reverse_ssh.start_tunnel() # print(result) # time.sleep(200) # result = reverse_ssh.stop_tunnel() remote_host = "192.168.100.20" # 远程主机的 IP 地址 remote_username = "root" # 远程主机的用户名 remote_password = "bestcobot" # 远程主机的密码 remote_port = 8822 # 远程主机的 SSH 端口 sudo_password = "jsfb" if not is_host_down(remote_host): print("Remote host is alive, processing shutdown") remote_command = "shutdown -h now" stdout, stderr = execute_command_on_remote( remote_host, remote_username, remote_password, remote_command, port=remote_port, )