提问人:ysingh-1636 提问时间:10/24/2023 更新时间:10/25/2023 访问量:42
如何使用 Python 的 Asynchio 在服务器和两个客户端之间进行通信?
How to communicate between a server and two clients using Python's Asynchio?
问:
我正在开发一个基本的 Python 应用程序,该应用程序使用异步服务器和多个客户端,但我在与第二个客户端通信时遇到了一些问题。
client.py - 轮询目录中的新文件并将内容发送到服务器
# Currently just sends the contents of a known file to the server.
# The polling portion I plan on adding later
import asyncio
async def send_file(filename, server_host, server_port):
reader, writer = await asyncio.open_connection(server_host, server_port)
writer.write(filename.encode())
with open(filename, 'rb') as file:
while True:
data = file.read(1024)
if not data:
break
writer.write(data)
writer.close()
if __name__ == '__main__':
server_host = '127.0.0.1' # Change to the server's IP address or hostname.
server_port = 8888
filename = 'temp.txt' # Change to the file you want to send.
asyncio.run(send_file(filename, server_host, server_port))
server.py- 等待来自客户端 #1 的数据并将其写入文件。完成后向客户端 #2 发出警报
import asyncio
async def handle_client(reader, writer):
try:
data = await reader.read(1024)
if not data:
return
# Writes data from Client #1
filename = data.decode().strip()
print(f"Receiving file: {filename}")
with open(filename, 'wb') as file:
while True:
data = await reader.read(1024)
if not data:
break
file.write(data)
print(f"Received file: {filename}")
# Send message to the receiver that the server is done writing
response = "Server has written data from client.py"
writer.write(response.encode())
await writer.drain()
except asyncio.CancelledError:
pass
finally:
writer.close()
async def main():
server = await asyncio.start_server(
handle_client, '127.0.0.1', 8888)
addr = server.sockets[0].getsockname()
print(f'Serving on {addr}')
async with server:
await server.serve_forever()
if __name__ == '__main__':
asyncio.run(main())
receiver.py - 从服务器打印的完成状态
import asyncio
async def receive_data():
reader, writer = await asyncio.open_connection('127.0.0.1', 8888)
response = await reader.read(100)
message = response.decode()
print(f"Received: {message}")
print("Closing the connection")
writer.close()
await writer.wait_closed()
async def main():
await receive_data()
if __name__ == '__main__':
server_host = '127.0.0.1' # Change to the server's IP address or hostname.
server_port = 8888
filename = 'temp.txt' # Change to the file you want to send.
asyncio.run(main())
我能够获得 client.py 和 server.py 成功通信,以至于位于客户端目录中的 temp.txt 的内容基本上被复制到 server.py 的目录中,但我的问题是 receiver.py 从不打印来自服务器的完成消息。我认为这可能与我如何设置连接有关,但我不确定,因为我对异步任务相当陌生。
非常感谢任何帮助,谢谢!
答:
每个程序将作为不同的客户端连接到您的服务器程序 - 这意味着“响应”消息将写入“client.py”持有的连接中,并且根本没有字节会在“receiver.py”持有的单独连接中传输。
Asyncio“执行其操作”,以便连接到服务器的每个客户端程序将对“handle_client”进行不同的调用,该调用将与任何其他调用同时运行。
因此,在您的代码中,第二个并发调用将 server.py 进行,这可能会永远等待某些数据到达该连接,因为您的“receiver.py”不会发送任何内容 - (或者只是完成,因为没有数据 - 不确定那里)。handle_client
如果你需要两个不同的程序与服务器通信,服务器必须“知道”它们正在与哪个程序通信,并且需要一些内部机制来在不同的“任务”之间传输其内部数据 - 每个任务处理一个连接。
您可以启动不同的服务器,使用不同的端口号,以便具有不同角色的程序将每个服务器连接到适当的端口号,或者将一些带内信息添加到临时协议中,以便服务器知道识别“接收方”程序,而不是客户端。
因此,我更改了您的代码以执行此操作:如果发送的文件名是“receiver”,则服务器会在另一个函数中单独处理它,该函数将仅发送“已完成”消息。我还加入了一个最小的计数器机制,以便任意数量的“receiver.py”程序可以同时连接到服务器,当第一个“客户端”程序连接并发送文件时,所有接收者都会收到通知。
(client.py 进行了最小的修改,只是在文件名后发送换行符。“receiver.py”也只是更改为发送“接收器”作为文件名,因此服务器会相应地处理它)
server.py
import asyncio
from queue import Empty
receivers = set()
receiver_msg_queue = asyncio.Queue()
async def handle_client(reader, writer):
global receivers
try:
# read only one line of text as the file name (readline(), not "read(1024)"
data = await reader.readline()
if not data:
return
# Writes data from Client #1
filename = data.decode().strip()
# if the connection is from a receiver program, handle it in a different function
# altogether:
if filename == "receiver":
receivers.add(asyncio.current_task())
print("receiver connected")
await handle_receiver(writer)
return
print(f"Receiving file: {filename}")
# I added a sufix to the file, so I can run things on the same directory:
with open(filename + ".copy", 'wb') as file:
while True:
data = await reader.read(1024)
if not data:
break
file.write(data)
print(f"Received file: {filename}")
# Send message to the receiver that the server is done writing
response = "Server has written data from client.py"
receiver_msg_queue.put_nowait((response, set()))
writer.write(response.encode())
await writer.drain()
except asyncio.CancelledError:
pass
finally:
writer.close()
async def handle_receiver(writer):
"""will be called once, concurrently, for each receiver program connected"""
try:
this_receiver = asyncio.current_task()
while True:
message, processed_at = await receiver_msg_queue.get()
if this_receiver in processed_at: # this receiver already processed this message,
# post message back
receiver_msg_queue.put_nowait((message, processed_at))
# wait a bit in order for other receives to conclude
await asyncio.sleep(0.005)
continue
# act on this message
writer.write(message.encode())
# add receiver to processed_list
processed_at.add(this_receiver) # NB: this is fine for asyncio. Multithreading would need
# to use a "Lock" for this update.
if processed_at == receivers: # we are the last receiver to process the message
continue # do not repost message to queue
receiver_msg_queue.put_nowait((message, processed_at))
# wait a bit in order for other receives to conclude processing
await asyncio.sleep(0.005)
except asyncio.CancelledError:
writer.close()
async def main():
server = await asyncio.start_server(
handle_client, '127.0.0.1', 8888)
addr = server.sockets[0].getsockname()
print(f'Serving on {addr}')
async with server:
await server.serve_forever()
if __name__ == '__main__':
asyncio.run(main())
client.py
...
async def send_file(filename, server_host, server_port):
reader, writer = await asyncio.open_connection(server_host, server_port)
writer.write(filename.encode() + b"\r\n")
...
...
receiver.py
...
reader, writer = await asyncio.open_connection('127.0.0.1', 8888)
writer.write(b"receiver\r\n")
response = await reader.readline()
...
评论
while True
handle_receiver
while True
handle_receiver
try
评论