在 Rust 中为 RX 和 TX 时间戳实现异步套接字 recvmsg

Implement async socket recvmsg for RX and TX timestamping in Rust

提问人:Fabrex 提问时间:12/22/2022 最后编辑:Fabrex 更新时间:12/22/2022 访问量:376

问:

目的

此项目是使用 AsyncFd(来自 tokio)包装套接字文件描述符的测试。

代码示例

套接字文件:

use nix::{
    cmsg_space,
    sys::{
        socket::{ControlMessageOwned, MsgFlags, SockaddrIn},
        time::TimeSpec,
    },
};
use tokio::io::unix::{AsyncFd, TryIoError};

use std::{
    io::{IoSlice, IoSliceMut},
    marker::PhantomData,
    os::unix::prelude::{AsRawFd, RawFd},
};

pub struct AsyncSocket<'a, T: 'a> {
    inner: AsyncFd<RawFd>,
    phantom: PhantomData<&'a T>,
}

impl<'a, T> AsyncSocket<'a, T> {
    pub fn new(fd: RawFd) -> tokio::io::Result<Self> {
        Ok(Self {
            // inner: AsyncFd::new(fd)?,
            inner: AsyncFd::new(fd)?,
            phantom: PhantomData,
        })
    }

    pub async fn write_to(
        &'a self,
        buffer: &'a [IoSlice<'_>; 1],
        socket_address: &SockaddrIn,
    ) -> Result<usize, TryIoError> {
        let mut guard = self.inner.writable().await.unwrap();
        let flags = MsgFlags::empty();
        let cmsgs = &mut [];
        match guard.try_io(|inner| {
            match nix::sys::socket::sendmsg(
                inner.as_raw_fd(),
                buffer,
                cmsgs,
                flags,
                Some(socket_address),
            ) {
                Ok(read_bytes) => Ok(read_bytes),
                Err(would_block) => Err(std::io::Error::from_raw_os_error(would_block as i32)),
            }
        }) {
            Ok(res) => match res {
                Ok(read_bytes) => Ok(read_bytes),
                Err(e) => {
                    eprintln!("Error {}", e);
                    Ok(0)
                }
            },
            Err(e) => Err(e),
        }
    }

    pub async fn read(
        &'a self,
        buffer: &'a mut [IoSliceMut<'_>; 1],
        flags: MsgFlags,
    ) -> Result<usize, TryIoError> {
        buffer[0].fill(0);
        let mut guard = self.inner.readable().await.unwrap();

        match guard.try_io(|inner| {
            let sys_time = nix::time::clock_gettime(nix::time::ClockId::CLOCK_REALTIME).unwrap();
            println!("Real clock {:?}", sys_time);
            println!("FLAG = {:?}", flags);
            match nix::sys::socket::recvmsg::<()>(
                inner.as_raw_fd(),
                buffer,
                Some(&mut cmsg_space!(
                    nix::sys::socket::MsgFlags,
                    nix::sys::socket::TimestampingFlag,
                    nix::sys::socket::SockFlag
                )),
                flags,
            ) {
                Ok(result) => {
                    let mut ts = TimeSpec::new(0, 0);
                    let mut _thw = TimeSpec::new(0, 0);
                    let control_messages: Vec<ControlMessageOwned> = result.cmsgs().collect();

                    println!("Control message length = {}", control_messages.len());
                    for c in control_messages {
                        match c {
                            ControlMessageOwned::ScmTimestampsns(timestamps) => {
                                _thw = timestamps.hw_raw;
                                ts = timestamps.system;
                                println!("Timestamps {:?}", timestamps);
                            }
                            ControlMessageOwned::ScmRights(_) => println!("ScmRights"),
                            ControlMessageOwned::ScmCredentials(_) => println!("ScmCredentials"),
                            ControlMessageOwned::ScmTimestamp(_) => println!("ScmTimestamp"),
                            ControlMessageOwned::ScmTimestampns(_) => println!("ScmTimestampns"),
                            ControlMessageOwned::Ipv4PacketInfo(_) => println!("Ipv4PacketInfo"),
                            ControlMessageOwned::Ipv6PacketInfo(_) => println!("Ipv6PacketInfo"),
                            ControlMessageOwned::Ipv4OrigDstAddr(_) => println!("Ipv4OrigDstAddr"),
                            ControlMessageOwned::Ipv6OrigDstAddr(_) => println!("Ipv6OrigDstAddr"),
                            ControlMessageOwned::UdpGroSegments(_) => println!("UdpGroSegments"),
                            ControlMessageOwned::RxqOvfl(_) => println!("RxqOvfl"),
                            ControlMessageOwned::Ipv4RecvErr(a, b) => {
                                println!("Received ipv4 Err {:?} from {:?}", a, b);
                            }
                            ControlMessageOwned::Ipv6RecvErr(_, _) => println!("Ipv6RecvErr"),
                            _ => println!("Other"),
                        }
                    }

                    let soft_diff = diff_systime(ts, sys_time);

                    // let hw_diff = diff_systime(thw, sys_time);

                    if soft_diff != sys_time {
                        let delta = std::time::Duration::from(soft_diff).as_micros();
                        println!("Soft Delta is {}", delta);
                    }
                    // } else if hw_diff != sys_time {
                    // //     let delta = std::time::Duration::from(hw_diff).as_micros();
                    // //     println!("Hard Delta is {}", delta);
                    // // }

                    return Ok(result.bytes);
                }
                Err(errno) => {
                    match errno {
                        nix::errno::Errno::EAGAIN => println!("EAGAIN Error"),
                        _ => println!("Other error {:?}", errno),
                    }
                    let error = std::io::Error::from_raw_os_error(errno as i32);
                    Err(error)
                }
            }
        }) {
            Ok(res) => match res {
                Ok(read_bytes) => Ok(read_bytes),
                Err(_e) => {
                    println!("Error from socket {:?}", std::io::Error::last_os_error());
                    Ok(0)
                }
            },
            Err(e) => {
                println!("Guard error {:?}", std::io::Error::last_os_error());
                Err(e)
            }
        }
        // }
    }
}

impl<'a, T> AsRawFd for AsyncSocket<'a, T> {
    fn as_raw_fd(&self) -> RawFd {
        self.inner.as_raw_fd()
    }
}

impl<'a, T> Drop for AsyncSocket<'a, T> {
    fn drop(&mut self) {
        let fd = self.inner.as_raw_fd();
        unsafe { nix::libc::close(fd) };
    }
}

fn diff_systime(first: TimeSpec, second: TimeSpec) -> TimeSpec {
    if second > first {
        second - first
    } else {
        first - second
    }
}

Error.rs

#[derive(thiserror::Error, Debug)]
pub enum LibError {
    AddrParseError(#[from] std::net::AddrParseError),
    #[error(transparent)]
    IO(#[from] std::io::Error),
    OSError(#[from] nix::Error),
}

impl std::fmt::Display for LibError {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        write!(f, "{:?}", &self)
    }
}

Main.rs:

use std::{
    io::{IoSlice, IoSliceMut},
    str::FromStr,
    sync::{
        atomic::{AtomicU8, Ordering},
        Arc,
    },
    time::Duration,
};
mod error;
mod socket;

use nix::sys::socket::{
    bind, setsockopt,
    sockopt::{self},
    AddressFamily, MsgFlags, SockFlag, SockProtocol, SockType, SockaddrIn, TimestampingFlag,
};
use socket::AsyncSocket;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let local_sock_addr = SockaddrIn::from_str("0.0.0.0:6790").unwrap();
    let local_sock_addr1 = SockaddrIn::from_str("192.168.1.84:44581").unwrap();

    let send_sock_addr = SockaddrIn::from_str("192.168.1.123:6790").unwrap();

    let rsock = nix::sys::socket::socket(
        AddressFamily::Inet,
        SockType::Datagram,
        SockFlag::all(),
        SockProtocol::Udp,
    )?;

    let ssock = nix::sys::socket::socket(
        AddressFamily::Inet,
        SockType::Datagram,
        SockFlag::all(),
        SockProtocol::Udp,
    )?;

    // let sock_txtime = sock_txtime {
    //     clockid: nix::time::ClockId::CLOCK_MONOTONIC.as_raw(),
    //     flags: SOF_TXTIME_REPORT_ERRORS,
    // };
    setsockopt(rsock, sockopt::Timestamping, &TimestampingFlag::all())?;
    setsockopt(ssock, sockopt::Timestamping, &TimestampingFlag::all())?;
    // setsockopt(ssock, sockopt::ReuseAddr, &true)?;
    // setsockopt(rsock, sockopt::ReuseAddr, &true)?;

    // setsockopt(ssock, sockopt::TxTime, &sock_txtime)?;
    bind(ssock, &local_sock_addr1)?;
    bind(rsock, &local_sock_addr)?;

    let recv_socket: AsyncSocket<i32> = AsyncSocket::new(rsock)?;
    let send_socket: AsyncSocket<i32> = AsyncSocket::new(ssock)?;
    let atomic_i = Arc::new(AtomicU8::new(1));

    let mut read_buf = [0u8; 1024];
    let mut iov2 = [IoSliceMut::new(&mut read_buf)];

    // let mut rbuf1 = [0u8; 1024];
    let mut rbuf2 = [0u8; 1024];
    // let mut iov3 = [IoSliceMut::new(&mut rbuf1)];
    let mut iov4 = [IoSliceMut::new(&mut rbuf2)];

    loop {
        tokio::select! {
            read = recv_socket.read(&mut iov2, MsgFlags::empty()) => {
                match read {
                    Ok(v) => {
                        println!("Recv sock Received {} bytes in mes {:?}", v, iov2[0].iter().take(v).collect::<Vec<&u8>>());
                        let i = atomic_i.load(Ordering::Relaxed);
                        let sbuf: Vec<u8> = (1u8..=i).map(|el| el).collect();

                        let iov1 = [IoSlice::new(&mut sbuf.as_slice())];
                        tokio::time::sleep(Duration::from_millis(15)).await;

                        let _ = recv_socket.write_to(&iov1, &local_sock_addr1).await;

                    },
                    Err(e) => println!("Recv Err {:?}", e),
                }
            },
            _tick = tokio::time::sleep(Duration::from_millis(500)) => {
                // println!("Tick");
                let i = atomic_i.load(Ordering::Relaxed);
                if i == 3 {
                    continue;
                    // In case you want the sending to last forever

                    // atomic_i
                    //     .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |n| Some(n - n))
                    //     .unwrap();
                    // break;
                }
                let sbuf: Vec<u8> = (1u8..=i).map(|el| el).collect();
                let iov1 = [IoSlice::new(&mut sbuf.as_slice())];
                let _ = send_socket.write_to(&iov1, &send_sock_addr).await;
                // Calling read here results in a deadlock
                println!("Message {} sent", i);
                atomic_i.fetch_add(1, Ordering::Relaxed);
            },
            read2 = send_socket.read(&mut iov4, MsgFlags::empty()) => {
                match read2 {
                                Ok(v) => {
                                    println!("Send sock Received {} bytes in mes {:?}", v, iov4[0].iter().take(v).collect::<Vec<&u8>>());
                                    // This second read call is done to retrieve any messages present in the Error queue (timestamps are there)
                                    // match send_socket.read(&mut iov3, MsgFlags::MSG_ERRQUEUE).await {
                                    //     Ok(v) => println!("Send sock Received from Error queue {} bytes in mes {:?}", v, iov3[0].iter().take(v).collect::<Vec<&u8>>()),
                                    //     Err(e) => println!("Send Err {:?}", e),
                                    // }
                                 },
                                Err(e) => println!("Send Err {:?}", e),
                            }
            },
            // Adding this entry results in very inconsistent behavior for receiving Tx timestamps
            // read1 = send_socket.read(&mut iov3, MsgFlags::MSG_ERRQUEUE) => {
            //     match read1 {
                                // Ok(v) => println!("Send sock Received from Error queue {} bytes in mes {:?}", v, iov3[0].iter().take(v).collect::<Vec<&u8>>()),
                                // Err(e) => println!("Send Err {:?}", e),
            //                 }
            // },

        }
        println!("\n")
    }

    // Ok(())
}

Cargo.toml:

[package]
name = "socket-timestamp-test"
version = "0.1.0"
edition = "2021"

[dependencies]
bytes = "1.3.0"
futures = "0.3.25"
log = "0.4.17"
mio = { version = "0.8.5", features = ["os-ext"] }
nix = { version = "0.26.1", features = ["socket"] }
num_enum = "0.5.7"
thiserror = "1.0.37"
tokio = { version = "1.2", features = [
    "sync",
    "net",
    "macros",
    "rt-multi-thread",
    "time",
] }

赋予动机

其动机是能够访问常见用户网络库(如 tokio、std、socket2、smoltcp 等)中不存在的配置,例如检索 RX 和 TX 数据包的时间戳消息。 这些值可以通过在正确配置的套接字(配置有 SOF 时间戳标志)中调用 libc 来检索。 通过访问套接字的控制消息,可以从套接字错误队列中检索 Tx 时间戳。recvmsg

当前问题

Tx 时间戳存储在错误队列中(使用 MSG_ERRQUEUE 时),但套接字的轮询仅在收到消息缓冲区时发生。这会导致以下行为:

套接字 1 发送消息 时间戳内容已排队 ...不触发轮询,也不检索消息

套接字 1 接收响应 如果使用 MSG_ERRQUEUE 调用 recvmsg:

  • 轮询套接字,直到收到已发送消息的所有时间戳

如果在没有MSG_ERRQUEUE的情况下调用 recvmsg:

  • 套接字被轮询并接收响应的有效负载,但不会读取错误队列中的数据(这是预期行为)

如果在 select!关闭,它会导致不一致的行为,有时您会收到错误排队的消息,但有时您不会。这意味着需要再次调用 recvmsg 才能接收剩余的 Tx 时间戳。

预期的整体行为

如果 socket 配置为生成 TX 时间戳,并且 Error 队列已收到此类信息,则异步运行时应自动轮询结果。 当数据进入正常的 recv 缓冲区时,异步运行时也应轮询这些结果。

可能的解决方案

解决此问题的一种方法是使用 recvmsg(...MSG_ERRQUEUE) 发送数据包后。但这很麻烦,并且由于调度,recvmsg 调用无法在发送后立即完成。理想情况下,我希望当 recv 缓冲区中有消息并且 recv 缓冲区中没有消息但错误队列中有一个排队条目时进行轮询。 另一种解决方案是在 recvmsg 之后放置一个带有 MSG_ERRQUEUE 的 recvmsg(不带 MSG_ERRQUE)。这里的问题是,如果套接字没有收到任何东西,我们也不会轮询错误队列。

征求意见

我想得到帮助,弄清楚如何在不手动调用 recvmsg (...MSG_ERRQUEUE)得到它。

我愿意接受有关解决问题的不同方法的建议,即没有 tokio AsyncFd。

套接字 Rust 时间戳 UDP

评论


答: 暂无答案