提问人:Andrex Youtube 提问时间:11/17/2023 最后编辑:Andrex Youtube 更新时间:11/17/2023 访问量:15
在websocket服务器运行时从用户获取输入
Take input from user while websocket server running
问:
所以,我需要制作类似僵尸网络的东西,但出于教育目的,只是为了演示僵尸网络是如何工作的,我需要它来制作视频,所以我试图编写自己的,但无法使 websocket 服务器既能进行通信,又能从用户那里获取输入,代码只是冻结在 websocket 上,不会继续接受用户的输入。这是我的代码:
from colorama import Fore, Style
import socket
import title
import os
import asyncio
import websockets
from aioconsole import ainput
connections = 0
started = False
started_lock = asyncio.Lock()
# ----------- TITLE ----------- #
print(title.generate_title())
# ----------- COMMANDS ----------- #
hostname = socket.gethostname()
ip = socket.gethostbyname(hostname)
async def handler(websocket, path):
global connections
connections += 1
try:
async for message in websocket:
pass
except websockets.exceptions.ConnectionClosed:
connections -= 1
async def start_websocket_server():
global started
try:
async with started_lock:
started = True
start_server = await websockets.serve(handler, "localhost", 5914)
print(f"{Fore.BLUE}[*]: {Style.RESET_ALL}Websocket server started!")
await start_server.wait_closed()
except Exception as e:
print(f"{Fore.RED}[ERR]: {Style.RESET_ALL}An error occurred while starting websocket server: {e}")
def clear():
if os.name == 'nt':
os.system('cls')
else:
os.system('clear')
async def user_input():
while True:
if started:
try:
user_input = await ainput(f"{ip}:{hostname}> ")
if user_input.strip() == "help":
print("""List of available commands:
connections - number of connected bots.
sessions - get all bots and ids
getinfo {id} - get info about bot by id
exit - exit the program""")
elif user_input.strip() == "cls" or user_input.strip() == "clear":
clear()
elif user_input.strip() == "connections":
print(f"{Fore.BLUE}[*]: {Style.RESET_ALL}There are currently connected {connections} bots.")
elif user_input.strip() == "exit":
print("Exiting...")
return
else:
print(f"Unknown command '{user_input}'\nView all commands using 'help'")
except EOFError:
print("EOFError. Exiting...")
break
except KeyboardInterrupt:
print("User interrupted. Exiting...")
break
except Exception as e:
print(f"An error occurred: {e}")
async def main():
print(f"{Fore.BLUE}[*]: {Style.RESET_ALL}Starting websocket server...")
# Start WebSocket server
await start_websocket_server()
# Start user input
await user_input()
if __name__ == "__main__":
asyncio.run(main())
还有标题库:
from pyfiglet import Figlet
from colorama import Fore, Style
import random
title_fonts = ['cybermedium', 'rectangles', 'cyberlarge', '3-d', 'banner', 'banner3', 'banner4', 'chunky', 'colossal', 'computer', 'cosmic', 'crawford', 'cricket', 'doom', 'epic', 'poison']
try:
import pyfiglet
except ImportError:
import subprocess
subprocess.run(['pip', 'install', 'pyfiglet'])
import pyfiglet
# Generate and return the title with a random readable font
def generate_title():
selected_font = random.choice(title_fonts)
if selected_font == 'cosmic':
fig = Figlet(font=selected_font, width=120)
title = fig.renderText('Control Hub').strip()
else:
fig = Figlet(font=selected_font, width=100)
title = fig.renderText('ControlHub').strip()
return Fore.MAGENTA + title + f"\n" + Style.RESET_ALL
尝试添加多线程,它有所帮助,但没有太多原因,因为我每次都必须通过任务管理器杀死 python,因为它不会使用 KeyboardInterrupt 退出
答: 暂无答案
评论