Python 套接字接收过时的数据,因为主线程 time.sleep( )ing

Python socket receives outdated data because main-thread time.sleep( )ing

提问人:Mika 提问时间:7/26/2023 最后编辑:Mika 更新时间:7/27/2023 访问量:54

问:

我正在从套接字接收数据。我连接到套接字并用 .whilesocket.recv(size)

所有器件均在高接收频率下正常工作。 但是,如果我添加一个 in 循环,我每次都会开始收到过时的值。看起来套接字充满了旧数据,只要主机在一秒内发送数据最多 0.002 次,我只能以 1 秒的接收频率接收过时的数据(例如)。 我无法共享来自套接字的数据(只要它对于发布来说太宽太重),但这是代码:time.sleep(sec)while

import ctypes
import datetime
import logging
import socket
import time

from app.service.c_structures import RTDStructure
logging.basicConfig(level=logging.DEBUG)


class RTDSerializer:
    def __init__(self, ip: str, port: int = 29000, frequency: float = 0.002):
        self.data: dict = {}
        self.ip = ip
        self.port = port
        self.frequency = frequency
        self.sock = None
        self.struct_size = ctypes.sizeof(RTDStructure)
        self.logger = logging
        print(ctypes.sizeof(RTDStructure))

    def connect(self):
        try:
            self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            self.sock.connect((self.ip, self.port))
            self.sock.settimeout(None)
            logging.debug(f"Socket connect [{self.ip}:{self.port}] --> Ok")
            while True:
                c_structure = RTDStructure.from_buffer_copy(self.receive_raw_data() * ctypes.sizeof(RTDStructure))
                self.data = self.ctypes_to_dict(c_structure)
                print(datetime.datetime.now(), self.data['move_des_q'])
                time.sleep(self.frequency)
        except Exception as error:
            logging.error(f"Socket connect [{self.ip}:{self.port}] --> False\n{error}")
            return 0


    def receive_raw_data(self) -> bytes or connect:
        raw_data = self.sock.recv(self.struct_size)
        if raw_data == b'':
            logging.error('Connection lost')
            return self.connect()
        return raw_data

    def ctypes_to_dict(self, ctypes_obj) -> dict or list:
        if isinstance(ctypes_obj, ctypes.Structure):
            data_dict = {}
            for field_name, field_type in ctypes_obj.get_fields():
                field_value = getattr(ctypes_obj, field_name)
                if isinstance(field_value, (ctypes.Structure, ctypes.Array)):
                    data_dict[field_name] = self.ctypes_to_dict(field_value)
                else:
                    data_dict[field_name] = field_value
            return data_dict
        elif isinstance(ctypes_obj, ctypes.Array):
            data_list = []
            for element in ctypes_obj:
                if isinstance(element, (ctypes.Structure, ctypes.Array)):
                    data_list.append(self.ctypes_to_dict(element))
                else:
                    data_list.append(element)
            return data_list


if __name__ == '__main__':

    rtd = RTDSerializer(ip='192.168.0.226', port=29000, frequency=0.05)
    rtd.connect()

我从套接字接收数据。这是一个带有 C 值的字节串。我用ctypes结构序列化它并转换为字典。序列化算法对于本主题并不重要。

此外,套接字有时会返回 0 字节,因此,我每次都需要在 func 中检查此问题。receive_raw_data

我试图强制重新连接循环。这以某种方式解决了过时信息的问题。喜欢这个:while

    def connect(self):
        try:
            logging.debug(f"Socket connect [{self.ip}:{self.port}] --> Ok")
            while True:
                self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
                self.sock.connect((self.ip, self.port))
                self.sock.settimeout(None)
                c_structure = RTDStructure.from_buffer_copy(self.receive_raw_data() * ctypes.sizeof(RTDStructure))
                self.data = self.ctypes_to_dict(c_structure)
                print(datetime.datetime.now(), self.data['move_des_q'])
                time.sleep(self.frequency)
                self.sock.close()
        except Exception as error:
            logging.error(f"Socket connect [{self.ip}:{self.port}] --> False\n{error}")
            return 0

但是,当我使用非常高的接收频率时,它会对工作速度影响太大。就主机发送频率而言,这真的很重要。

那么,我该如何解决这个问题呢?

插座断开有什么问题?

为什么套接字要填满,只是不通过过时的数据?

我正在考虑函数,它将清除所有未使用的数据。当线程处于休眠状态时,但是,这将是有点不合理的复杂......有什么想法吗?clear_buffer

UPD:可能是我应该只写一个打印线程,它每次都会唤醒一次,按频率值设置,打印当前值,该值将被套接字填充,然后再次休眠,同时套接字线程将以高频运行并覆盖这个值?data_bufferdata_buffer

带有微示例的新小 UPD:

import ctypes
import datetime
import logging
import socket
import time

logging.basicConfig(level=logging.DEBUG)


class RTDReceiver:
    def __init__(self, ip: str, port: int, frequency: float = 0.002):
        self.data: dict = {}
        self.ip = ip
        self.port = port
        self.frequency = frequency
        self.sock = None
        self.struct_size = 1064  # 1064 bytes in my case. 

    def connect(self):
        self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.sock.connect((self.ip, self.port))
        self.sock.settimeout(None)

    def receive_raw_data(self) -> bytes:
        raw_data = self.sock.recv(self.struct_size)
        if raw_data == b'':
            logging.error('Connection lost')
            self.connect()
        time.sleep(self.frequency)
        return raw_data


if __name__ == '__main__':

    rec = RTDReceiver(ip='here your ip', port='here is your port',  frequency=0.02)
    rec.connect()
    while True:
        print(rec.receive_raw_data())
python c python-3.x 套接字

评论

0赞 quamrana 7/26/2023
你似乎有很多问题相互叠加。首先,可以调用的事实是使用递归而不是循环。下一篇: 什么是?您是否正在复制接收到的数据?(还有其他问题)receive_raw_data()self.connect()self.receive_raw_data() * ctypes.sizeof(RTDStructure)
0赞 Mika 7/26/2023
@quamrana,非常感谢。当我重新发明一些函数时,我没有注意到这个递归问题。关于下一个问题,这又是我的错。我对这篇文章中的答案有误解。 stackoverflow.com/questions/76730751/......感谢您的问题更正:)!
0赞 quamrana 7/26/2023
我试图了解你的问题,以便给你一个答案,但ATM的问题太多了。您是否有兴趣只接收 sleep() 之后某个时间点的最新可用数据?
0赞 Mika 7/27/2023
@quamrana .对于因我的词汇量少而造成的任何误解,我深表歉意。关于这个问题,是的,我正在尝试接收可用的最新数据。只要我不能轻松设置主机发送频率,我就想通过更改接收频率来设置它。所以,是的,我想接收最新的可用数据。我还添加了新的微示例。只是接收数据。套接字断开,仅此而已。
0赞 quamrana 7/27/2023
应该有一种方法可以丢弃旧信息并保留最新信息,同时仍能应对断开连接。

答:

0赞 John Bollinger 7/26/2023 #1

为什么套接字要填满,只是不通过过时的数据?

因为这就是套接字的工作方式。尤其是像您正在使用的面向流的套接字。字节按发送顺序接收。必须先读取较早的字节,然后才能读取较晚的字节。

你说,

只要主机在一秒内发送数据最多 0.002 次

,但这将是一个非常低的数据速率,除非你的确实非常大。我猜你的意思是主机发送数据的频率为每 0.002 秒一次 == 每秒 500 次。无论是哪种方式,如果您希望能够读取最新数据,接收方都需要跟上发送方的步伐。RTDStructure

插座断开有什么问题?

很难确定,但我推测接收方远远落后于发送方,以至于套接字的接收缓冲区填满,因此机器停止接受来自发送方的新数据包。虽然这本身不会导致连接关闭,但如果它持续足够长的时间(当然可以),那么发送方很可能会从其末端关闭连接。

此外,套接字有时会返回 0 字节,因此,我每次都需要在 func 中检查此问题。receive_raw_data

是的,你有。在一次调用中接收的字节数也可能比您请求的字节数少,因此为了实现稳健性,您需要提供通过两个或多个调用接收传输的功能。这就是流套接字的工作方式。recv()

那么,我该如何解决这个问题呢?

有几种可能性,其中包括:

  • 尽可能快地接收消息,也许在专用于此的线程中。从客户端的那些中选择,以你想要的频率,丢弃未选择的那些。

  • 使服务器的数据速率可按连接进行配置。客户端连接,告诉服务器发送数据的频率,然后以该速率接收。

  • 更改消息交换模式。您目前有一个特别简单的方法,其中客户端只需要连接,然后服务器只需以它选择的任何速率推送数据,而无需来自客户端的任何进一步消息。如果切换到请求/响应模式,客户端必须向服务器发送某种消息(可能只是一个字节)才能让服务器发送数据,那么服务器就不能轻易地领先于客户端。但是,这需要对服务器进行最多的更改,并且请求延迟可能会降低您可以实现的最大数据速率。

  • 作为前一个项目的变体,客户端可以在每次想要读取一个项目时建立新的连接,然后在只读取一个项目后断开连接。可能每次都能获得新数据,但它会增加更多的延迟。

评论

0赞 Mika 7/26/2023
谢谢。不幸的是,我不能重写主机,只要这是一个工作项目,并且团队领导不想重写这段......遗留代码...“我们没有时间”:“)
0赞 John Bollinger 7/26/2023
在这种情况下,@Mika,我认为您需要客户端跟上服务器(建议的第一个选项),或者每次它想要读取项目时打开一个新连接(我将其添加到此答案中作为额外的替代方案)。
0赞 Mika 7/26/2023
是的,我认为我需要跟上服务器。因为打开/关闭连接每个循环都会消耗大量时间(以毫秒为单位)。