提问人:Mika 提问时间:7/26/2023 最后编辑:Mika 更新时间:7/27/2023 访问量:54
Python 套接字接收过时的数据,因为主线程 time.sleep( )ing
Python socket receives outdated data because main-thread time.sleep( )ing
问:
我正在从套接字接收数据。我连接到套接字并用 .while
socket.recv(size)
所有器件均在高接收频率下正常工作。
但是,如果我添加一个 in 循环,我每次都会开始收到过时的值。看起来套接字充满了旧数据,只要主机在一秒内发送数据最多 0.002 次,我只能以 1 秒的接收频率接收过时的数据(例如)。
我无法共享来自套接字的数据(只要它对于发布来说太宽太重),但这是代码:time.sleep(sec)
while
import ctypes
import datetime
import logging
import socket
import time
from app.service.c_structures import RTDStructure
logging.basicConfig(level=logging.DEBUG)
class RTDSerializer:
def __init__(self, ip: str, port: int = 29000, frequency: float = 0.002):
self.data: dict = {}
self.ip = ip
self.port = port
self.frequency = frequency
self.sock = None
self.struct_size = ctypes.sizeof(RTDStructure)
self.logger = logging
print(ctypes.sizeof(RTDStructure))
def connect(self):
try:
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.connect((self.ip, self.port))
self.sock.settimeout(None)
logging.debug(f"Socket connect [{self.ip}:{self.port}] --> Ok")
while True:
c_structure = RTDStructure.from_buffer_copy(self.receive_raw_data() * ctypes.sizeof(RTDStructure))
self.data = self.ctypes_to_dict(c_structure)
print(datetime.datetime.now(), self.data['move_des_q'])
time.sleep(self.frequency)
except Exception as error:
logging.error(f"Socket connect [{self.ip}:{self.port}] --> False\n{error}")
return 0
def receive_raw_data(self) -> bytes or connect:
raw_data = self.sock.recv(self.struct_size)
if raw_data == b'':
logging.error('Connection lost')
return self.connect()
return raw_data
def ctypes_to_dict(self, ctypes_obj) -> dict or list:
if isinstance(ctypes_obj, ctypes.Structure):
data_dict = {}
for field_name, field_type in ctypes_obj.get_fields():
field_value = getattr(ctypes_obj, field_name)
if isinstance(field_value, (ctypes.Structure, ctypes.Array)):
data_dict[field_name] = self.ctypes_to_dict(field_value)
else:
data_dict[field_name] = field_value
return data_dict
elif isinstance(ctypes_obj, ctypes.Array):
data_list = []
for element in ctypes_obj:
if isinstance(element, (ctypes.Structure, ctypes.Array)):
data_list.append(self.ctypes_to_dict(element))
else:
data_list.append(element)
return data_list
if __name__ == '__main__':
rtd = RTDSerializer(ip='192.168.0.226', port=29000, frequency=0.05)
rtd.connect()
我从套接字接收数据。这是一个带有 C 值的字节串。我用ctypes结构序列化它并转换为字典。序列化算法对于本主题并不重要。
此外,套接字有时会返回 0 字节,因此,我每次都需要在 func 中检查此问题。receive_raw_data
我试图强制重新连接循环。这以某种方式解决了过时信息的问题。喜欢这个:while
def connect(self):
try:
logging.debug(f"Socket connect [{self.ip}:{self.port}] --> Ok")
while True:
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.connect((self.ip, self.port))
self.sock.settimeout(None)
c_structure = RTDStructure.from_buffer_copy(self.receive_raw_data() * ctypes.sizeof(RTDStructure))
self.data = self.ctypes_to_dict(c_structure)
print(datetime.datetime.now(), self.data['move_des_q'])
time.sleep(self.frequency)
self.sock.close()
except Exception as error:
logging.error(f"Socket connect [{self.ip}:{self.port}] --> False\n{error}")
return 0
但是,当我使用非常高的接收频率时,它会对工作速度影响太大。就主机发送频率而言,这真的很重要。
那么,我该如何解决这个问题呢?
插座断开有什么问题?
为什么套接字要填满,只是不通过过时的数据?
我正在考虑函数,它将清除所有未使用的数据。当线程处于休眠状态时,但是,这将是有点不合理的复杂......有什么想法吗?clear_buffer
UPD:可能是我应该只写一个打印线程,它每次都会唤醒一次,按频率值设置,打印当前值,该值将被套接字填充,然后再次休眠,同时套接字线程将以高频运行并覆盖这个值?data_buffer
data_buffer
带有微示例的新小 UPD:
import ctypes
import datetime
import logging
import socket
import time
logging.basicConfig(level=logging.DEBUG)
class RTDReceiver:
def __init__(self, ip: str, port: int, frequency: float = 0.002):
self.data: dict = {}
self.ip = ip
self.port = port
self.frequency = frequency
self.sock = None
self.struct_size = 1064 # 1064 bytes in my case.
def connect(self):
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.connect((self.ip, self.port))
self.sock.settimeout(None)
def receive_raw_data(self) -> bytes:
raw_data = self.sock.recv(self.struct_size)
if raw_data == b'':
logging.error('Connection lost')
self.connect()
time.sleep(self.frequency)
return raw_data
if __name__ == '__main__':
rec = RTDReceiver(ip='here your ip', port='here is your port', frequency=0.02)
rec.connect()
while True:
print(rec.receive_raw_data())
答:
为什么套接字要填满,只是不通过过时的数据?
因为这就是套接字的工作方式。尤其是像您正在使用的面向流的套接字。字节按发送顺序接收。必须先读取较早的字节,然后才能读取较晚的字节。
你说,
只要主机在一秒内发送数据最多 0.002 次
,但这将是一个非常低的数据速率,除非你的确实非常大。我猜你的意思是主机发送数据的频率为每 0.002 秒一次 == 每秒 500 次。无论是哪种方式,如果您希望能够读取最新数据,接收方都需要跟上发送方的步伐。RTDStructure
插座断开有什么问题?
很难确定,但我推测接收方远远落后于发送方,以至于套接字的接收缓冲区填满,因此机器停止接受来自发送方的新数据包。虽然这本身不会导致连接关闭,但如果它持续足够长的时间(当然可以),那么发送方很可能会从其末端关闭连接。
此外,套接字有时会返回 0 字节,因此,我每次都需要在 func 中检查此问题。
receive_raw_data
是的,你有。在一次调用中接收的字节数也可能比您请求的字节数少,因此为了实现稳健性,您需要提供通过两个或多个调用接收传输的功能。这就是流套接字的工作方式。recv()
那么,我该如何解决这个问题呢?
有几种可能性,其中包括:
尽可能快地接收消息,也许在专用于此的线程中。从客户端的那些中选择,以你想要的频率,丢弃未选择的那些。
使服务器的数据速率可按连接进行配置。客户端连接,告诉服务器发送数据的频率,然后以该速率接收。
更改消息交换模式。您目前有一个特别简单的方法,其中客户端只需要连接,然后服务器只需以它选择的任何速率推送数据,而无需来自客户端的任何进一步消息。如果切换到请求/响应模式,客户端必须向服务器发送某种消息(可能只是一个字节)才能让服务器发送数据,那么服务器就不能轻易地领先于客户端。但是,这需要对服务器进行最多的更改,并且请求延迟可能会降低您可以实现的最大数据速率。
作为前一个项目的变体,客户端可以在每次想要读取一个项目时建立新的连接,然后在只读取一个项目后断开连接。可能每次都能获得新数据,但它会增加更多的延迟。
评论
receive_raw_data()
self.connect()
self.receive_raw_data() * ctypes.sizeof(RTDStructure)