在websocket服务器运行时从用户获取输入

Take input from user while websocket server running

提问人:Andrex Youtube 提问时间:11/17/2023 最后编辑:Andrex Youtube 更新时间:11/17/2023 访问量:15

问:

所以,我需要制作类似僵尸网络的东西,但出于教育目的,只是为了演示僵尸网络是如何工作的,我需要它来制作视频,所以我试图编写自己的,但无法使 websocket 服务器既能进行通信,又能从用户那里获取输入,代码只是冻结在 websocket 上,不会继续接受用户的输入。这是我的代码:

from colorama import Fore, Style
import socket
import title
import os
import asyncio
import websockets
from aioconsole import ainput

connections = 0
started = False
started_lock = asyncio.Lock()

# ----------- TITLE ----------- #
print(title.generate_title())

# ----------- COMMANDS ----------- #
hostname = socket.gethostname()
ip = socket.gethostbyname(hostname)

async def handler(websocket, path):
    global connections
    connections += 1

    try:
        async for message in websocket:
            pass
    except websockets.exceptions.ConnectionClosed:
        connections -= 1

async def start_websocket_server():
    global started

    try:
        async with started_lock:
            started = True
        start_server = await websockets.serve(handler, "localhost", 5914)
        print(f"{Fore.BLUE}[*]: {Style.RESET_ALL}Websocket server started!")
        await start_server.wait_closed()
    except Exception as e:
        print(f"{Fore.RED}[ERR]: {Style.RESET_ALL}An error occurred while starting websocket server: {e}")

def clear():
    if os.name == 'nt':
        os.system('cls')
    else:
        os.system('clear')

async def user_input():
    while True:
        if started:
            try:
                user_input = await ainput(f"{ip}:{hostname}> ")
                if user_input.strip() == "help":
                    print("""List of available commands:
                    connections - number of connected bots.
                    sessions - get all bots and ids
                    getinfo {id} - get info about bot by id
                    exit - exit the program""")
                elif user_input.strip() == "cls" or user_input.strip() == "clear":
                    clear()
                elif user_input.strip() == "connections":
                    print(f"{Fore.BLUE}[*]: {Style.RESET_ALL}There are currently connected {connections} bots.")
                elif user_input.strip() == "exit":
                    print("Exiting...")
                    return
                else:
                    print(f"Unknown command '{user_input}'\nView all commands using 'help'")
            except EOFError:
                print("EOFError. Exiting...")
                break
            except KeyboardInterrupt:
                print("User interrupted. Exiting...")
                break
            except Exception as e:
                print(f"An error occurred: {e}")

async def main():
    print(f"{Fore.BLUE}[*]: {Style.RESET_ALL}Starting websocket server...")

    # Start WebSocket server
    await start_websocket_server()

    # Start user input
    await user_input()

if __name__ == "__main__":
    asyncio.run(main())

还有标题库:

from pyfiglet import Figlet
from colorama import Fore, Style
import random

title_fonts = ['cybermedium', 'rectangles', 'cyberlarge', '3-d', 'banner', 'banner3', 'banner4', 'chunky', 'colossal', 'computer', 'cosmic', 'crawford', 'cricket', 'doom', 'epic', 'poison']

try:
    import pyfiglet
except ImportError:
    import subprocess
    subprocess.run(['pip', 'install', 'pyfiglet'])
    import pyfiglet

# Generate and return the title with a random readable font
def generate_title():
    selected_font = random.choice(title_fonts)
    if selected_font == 'cosmic':
        fig = Figlet(font=selected_font, width=120)
        title = fig.renderText('Control Hub').strip()
    else:
        fig = Figlet(font=selected_font, width=100)
        title = fig.renderText('ControlHub').strip()
    return Fore.MAGENTA + title + f"\n" + Style.RESET_ALL

尝试添加多线程,它有所帮助,但没有太多原因,因为我每次都必须通过任务管理器杀死 python,因为它不会使用 KeyboardInterrupt 退出

Python 输入 WebSocket 僵尸网络

评论


答: 暂无答案