如何使用 Python 的 Asynchio 在服务器和两个客户端之间进行通信?

How to communicate between a server and two clients using Python's Asynchio?

提问人:ysingh-1636 提问时间:10/24/2023 更新时间:10/25/2023 访问量:42

问:

我正在开发一个基本的 Python 应用程序,该应用程序使用异步服务器和多个客户端,但我在与第二个客户端通信时遇到了一些问题。

client.py - 轮询目录中的新文件并将内容发送到服务器

# Currently just sends the contents of a known file to the server.
# The polling portion I plan on adding later
import asyncio

async def send_file(filename, server_host, server_port):
    reader, writer = await asyncio.open_connection(server_host, server_port)
    writer.write(filename.encode())
    with open(filename, 'rb') as file:
        while True:
            data = file.read(1024)
            if not data:
                break
            writer.write(data)

    writer.close()

if __name__ == '__main__':
    server_host = '127.0.0.1'  # Change to the server's IP address or hostname.
    server_port = 8888

    filename = 'temp.txt'  # Change to the file you want to send.

    asyncio.run(send_file(filename, server_host, server_port))

server.py- 等待来自客户端 #1 的数据并将其写入文件。完成后向客户端 #2 发出警报

import asyncio

async def handle_client(reader, writer):
    try:
        data = await reader.read(1024)
        if not data:
            return
        # Writes data from Client #1
        filename = data.decode().strip()
        print(f"Receiving file: {filename}")
        with open(filename, 'wb') as file:
            while True:
                data = await reader.read(1024)
                if not data:
                    break
                file.write(data)
        print(f"Received file: {filename}")
        
        # Send message to the receiver that the server is done writing
        response = "Server has written data from client.py"
        writer.write(response.encode())
        await writer.drain()
        
    except asyncio.CancelledError:
        pass
    finally:
        writer.close()

async def main():
    server = await asyncio.start_server(
        handle_client, '127.0.0.1', 8888)

    addr = server.sockets[0].getsockname()
    print(f'Serving on {addr}')

    async with server:
        await server.serve_forever()

if __name__ == '__main__':
    asyncio.run(main())

receiver.py - 从服务器打印的完成状态

import asyncio

async def receive_data():
    reader, writer = await asyncio.open_connection('127.0.0.1', 8888)

    response = await reader.read(100)
    message = response.decode()
    print(f"Received: {message}")

    print("Closing the connection")
    writer.close()
    await writer.wait_closed()
    
async def main():
    await receive_data()

if __name__ == '__main__':
    server_host = '127.0.0.1'  # Change to the server's IP address or hostname.
    server_port = 8888

    filename = 'temp.txt'  # Change to the file you want to send.

    asyncio.run(main())

我能够获得 client.py 和 server.py 成功通信,以至于位于客户端目录中的 temp.txt 的内容基本上被复制到 server.py 的目录中,但我的问题是 receiver.py 从不打印来自服务器的完成消息。我认为这可能与我如何设置连接有关,但我不确定,因为我对异步任务相当陌生。

非常感谢任何帮助,谢谢!

python-3.x 异步 python-asyncio

评论


答:

0赞 jsbueno 10/25/2023 #1

每个程序将作为不同的客户端连接到您的服务器程序 - 这意味着“响应”消息将写入“client.py”持有的连接中,并且根本没有字节会在“receiver.py”持有的单独连接中传输。

Asyncio“执行其操作”,以便连接到服务器的每个客户端程序将对“handle_client”进行不同的调用,该调用将与任何其他调用同时运行。 因此,在您的代码中,第二个并发调用将 server.py 进行,这可能会永远等待某些数据到达该连接,因为您的“receiver.py”不会发送任何内容 - (或者只是完成,因为没有数据 - 不确定那里)。handle_client

如果你需要两个不同的程序与服务器通信,服务器必须“知道”它们正在与哪个程序通信,并且需要一些内部机制来在不同的“任务”之间传输其内部数据 - 每个任务处理一个连接。

您可以启动不同的服务器,使用不同的端口号,以便具有不同角色的程序将每个服务器连接到适当的端口号,或者将一些带内信息添加到临时协议中,以便服务器知道识别“接收方”程序,而不是客户端。

因此,我更改了您的代码以执行此操作:如果发送的文件名是“receiver”,则服务器会在另一个函数中单独处理它,该函数将仅发送“已完成”消息。我还加入了一个最小的计数器机制,以便任意数量的“receiver.py”程序可以同时连接到服务器,当第一个“客户端”程序连接并发送文件时,所有接收者都会收到通知。

(client.py 进行了最小的修改,只是在文件名后发送换行符。“receiver.py”也只是更改为发送“接收器”作为文件名,因此服务器会相应地处理它)

server.py

import asyncio
from queue import Empty


receivers = set()
receiver_msg_queue = asyncio.Queue()

async def handle_client(reader, writer):
    global receivers
    try:
        # read only one line of text as the file name (readline(), not "read(1024)"
        data = await reader.readline()
        if not data:
            return
        # Writes data from Client #1
        filename = data.decode().strip()

        # if the connection is from a receiver program, handle it in a different function
        # altogether:
        if filename == "receiver":
            receivers.add(asyncio.current_task())
            print("receiver connected")
            await handle_receiver(writer)
            return
        print(f"Receiving file: {filename}")
        # I added a sufix to the file, so I can run things on the same directory:
        with open(filename + ".copy", 'wb') as file:
            while True:
                data = await reader.read(1024)
                if not data:
                    break
                file.write(data)
        print(f"Received file: {filename}")

        # Send message to the receiver that the server is done writing
        response = "Server has written data from client.py"
        receiver_msg_queue.put_nowait((response, set()))
        writer.write(response.encode())
        await writer.drain()

    except asyncio.CancelledError:
        pass
    finally:
        writer.close()

async def handle_receiver(writer):
    """will be called once, concurrently, for each receiver program connected"""
    try:
        this_receiver = asyncio.current_task()
        while True:
            message, processed_at = await receiver_msg_queue.get()
            if this_receiver in processed_at: # this receiver already processed this message,
                # post message back
                receiver_msg_queue.put_nowait((message, processed_at))
                # wait a bit in order for other receives to conclude
                await asyncio.sleep(0.005)
                continue
            # act on this message
            writer.write(message.encode())
            # add receiver to processed_list
            processed_at.add(this_receiver)  # NB: this is fine for asyncio. Multithreading would need 
                                             # to use a "Lock" for this update.
            if processed_at == receivers: # we are the last receiver to process the message
                continue  # do not repost message to queue
            receiver_msg_queue.put_nowait((message, processed_at))
            # wait a bit in order for other receives to conclude processing
            await asyncio.sleep(0.005)

    except asyncio.CancelledError:
        writer.close()


async def main():
    server = await asyncio.start_server(
        handle_client, '127.0.0.1', 8888)

    addr = server.sockets[0].getsockname()
    print(f'Serving on {addr}')

    async with server:
        await server.serve_forever()

if __name__ == '__main__':
    asyncio.run(main())

client.py

...

async def send_file(filename, server_host, server_port):
    reader, writer = await asyncio.open_connection(server_host, server_port)
    writer.write(filename.encode() + b"\r\n")
    ...

...

receiver.py

...
    reader, writer = await asyncio.open_connection('127.0.0.1', 8888)
    writer.write(b"receiver\r\n")
    response = await reader.readline()
...

评论

0赞 ysingh-1636 10/25/2023
感谢您的澄清!接收器永远不会得到任何东西是有道理的,因为我对服务器如何知道它将数据发送到哪个连接感到困惑。我想知道接收器是否需要在客户端之前或之后启动。如果我在客户端之前启动它,它似乎会收到一个空字符串并关闭,但在客户端之后启动它似乎工作得很好。此外,我认为接收器连接可以永远保持打开状态,对吧?因此,它不会在第一条消息后关闭。
0赞 ysingh-1636 10/25/2023
经过修补,似乎应该在顶级协程中创建recevier_msg_queue。因此,将创建msg_queue移动到 asynch def main() 并将队列传递给 handle_client 函数......receiver_msg_queue = 异步。Queue() 服务器 = await asyncio.start_server( lambda r, w: handle_client(r, w, receiver_msg_queue), '127.0.0.1', 8888)
0赞 jsbueno 10/25/2023
hat 使接收器结尾终止在此代码中是它确实设计用于这样做的。只需在正文中添加一个即可。创建变量的重组对于较大的项目有好处,它们在这里处于模块级别,因为它更简单。while Truehandle_receiver
0赞 ysingh-1636 10/25/2023
添加 to 似乎不会向接收者发送任何内容。接收器似乎从未打印过响应,即使我在 .有没有办法让接收器循环永远在自己的脚本中等待数据?while Truehandle_receivertry
0赞 jsbueno 10/25/2023
是的,当然有,我没有让代码完全正常工作的原因是因为它不是你要求的(出于研究目的,我决定添加多个接收器的功能,这让事情变得有点复杂对于不止一条消息)