提问人:Fabrex 提问时间:12/22/2022 最后编辑:Fabrex 更新时间:12/22/2022 访问量:376
在 Rust 中为 RX 和 TX 时间戳实现异步套接字 recvmsg
Implement async socket recvmsg for RX and TX timestamping in Rust
问:
目的
此项目是使用 AsyncFd(来自 tokio)包装套接字文件描述符的测试。
代码示例
套接字文件:
use nix::{
cmsg_space,
sys::{
socket::{ControlMessageOwned, MsgFlags, SockaddrIn},
time::TimeSpec,
},
};
use tokio::io::unix::{AsyncFd, TryIoError};
use std::{
io::{IoSlice, IoSliceMut},
marker::PhantomData,
os::unix::prelude::{AsRawFd, RawFd},
};
pub struct AsyncSocket<'a, T: 'a> {
inner: AsyncFd<RawFd>,
phantom: PhantomData<&'a T>,
}
impl<'a, T> AsyncSocket<'a, T> {
pub fn new(fd: RawFd) -> tokio::io::Result<Self> {
Ok(Self {
// inner: AsyncFd::new(fd)?,
inner: AsyncFd::new(fd)?,
phantom: PhantomData,
})
}
pub async fn write_to(
&'a self,
buffer: &'a [IoSlice<'_>; 1],
socket_address: &SockaddrIn,
) -> Result<usize, TryIoError> {
let mut guard = self.inner.writable().await.unwrap();
let flags = MsgFlags::empty();
let cmsgs = &mut [];
match guard.try_io(|inner| {
match nix::sys::socket::sendmsg(
inner.as_raw_fd(),
buffer,
cmsgs,
flags,
Some(socket_address),
) {
Ok(read_bytes) => Ok(read_bytes),
Err(would_block) => Err(std::io::Error::from_raw_os_error(would_block as i32)),
}
}) {
Ok(res) => match res {
Ok(read_bytes) => Ok(read_bytes),
Err(e) => {
eprintln!("Error {}", e);
Ok(0)
}
},
Err(e) => Err(e),
}
}
pub async fn read(
&'a self,
buffer: &'a mut [IoSliceMut<'_>; 1],
flags: MsgFlags,
) -> Result<usize, TryIoError> {
buffer[0].fill(0);
let mut guard = self.inner.readable().await.unwrap();
match guard.try_io(|inner| {
let sys_time = nix::time::clock_gettime(nix::time::ClockId::CLOCK_REALTIME).unwrap();
println!("Real clock {:?}", sys_time);
println!("FLAG = {:?}", flags);
match nix::sys::socket::recvmsg::<()>(
inner.as_raw_fd(),
buffer,
Some(&mut cmsg_space!(
nix::sys::socket::MsgFlags,
nix::sys::socket::TimestampingFlag,
nix::sys::socket::SockFlag
)),
flags,
) {
Ok(result) => {
let mut ts = TimeSpec::new(0, 0);
let mut _thw = TimeSpec::new(0, 0);
let control_messages: Vec<ControlMessageOwned> = result.cmsgs().collect();
println!("Control message length = {}", control_messages.len());
for c in control_messages {
match c {
ControlMessageOwned::ScmTimestampsns(timestamps) => {
_thw = timestamps.hw_raw;
ts = timestamps.system;
println!("Timestamps {:?}", timestamps);
}
ControlMessageOwned::ScmRights(_) => println!("ScmRights"),
ControlMessageOwned::ScmCredentials(_) => println!("ScmCredentials"),
ControlMessageOwned::ScmTimestamp(_) => println!("ScmTimestamp"),
ControlMessageOwned::ScmTimestampns(_) => println!("ScmTimestampns"),
ControlMessageOwned::Ipv4PacketInfo(_) => println!("Ipv4PacketInfo"),
ControlMessageOwned::Ipv6PacketInfo(_) => println!("Ipv6PacketInfo"),
ControlMessageOwned::Ipv4OrigDstAddr(_) => println!("Ipv4OrigDstAddr"),
ControlMessageOwned::Ipv6OrigDstAddr(_) => println!("Ipv6OrigDstAddr"),
ControlMessageOwned::UdpGroSegments(_) => println!("UdpGroSegments"),
ControlMessageOwned::RxqOvfl(_) => println!("RxqOvfl"),
ControlMessageOwned::Ipv4RecvErr(a, b) => {
println!("Received ipv4 Err {:?} from {:?}", a, b);
}
ControlMessageOwned::Ipv6RecvErr(_, _) => println!("Ipv6RecvErr"),
_ => println!("Other"),
}
}
let soft_diff = diff_systime(ts, sys_time);
// let hw_diff = diff_systime(thw, sys_time);
if soft_diff != sys_time {
let delta = std::time::Duration::from(soft_diff).as_micros();
println!("Soft Delta is {}", delta);
}
// } else if hw_diff != sys_time {
// // let delta = std::time::Duration::from(hw_diff).as_micros();
// // println!("Hard Delta is {}", delta);
// // }
return Ok(result.bytes);
}
Err(errno) => {
match errno {
nix::errno::Errno::EAGAIN => println!("EAGAIN Error"),
_ => println!("Other error {:?}", errno),
}
let error = std::io::Error::from_raw_os_error(errno as i32);
Err(error)
}
}
}) {
Ok(res) => match res {
Ok(read_bytes) => Ok(read_bytes),
Err(_e) => {
println!("Error from socket {:?}", std::io::Error::last_os_error());
Ok(0)
}
},
Err(e) => {
println!("Guard error {:?}", std::io::Error::last_os_error());
Err(e)
}
}
// }
}
}
impl<'a, T> AsRawFd for AsyncSocket<'a, T> {
fn as_raw_fd(&self) -> RawFd {
self.inner.as_raw_fd()
}
}
impl<'a, T> Drop for AsyncSocket<'a, T> {
fn drop(&mut self) {
let fd = self.inner.as_raw_fd();
unsafe { nix::libc::close(fd) };
}
}
fn diff_systime(first: TimeSpec, second: TimeSpec) -> TimeSpec {
if second > first {
second - first
} else {
first - second
}
}
Error.rs
#[derive(thiserror::Error, Debug)]
pub enum LibError {
AddrParseError(#[from] std::net::AddrParseError),
#[error(transparent)]
IO(#[from] std::io::Error),
OSError(#[from] nix::Error),
}
impl std::fmt::Display for LibError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{:?}", &self)
}
}
Main.rs:
use std::{
io::{IoSlice, IoSliceMut},
str::FromStr,
sync::{
atomic::{AtomicU8, Ordering},
Arc,
},
time::Duration,
};
mod error;
mod socket;
use nix::sys::socket::{
bind, setsockopt,
sockopt::{self},
AddressFamily, MsgFlags, SockFlag, SockProtocol, SockType, SockaddrIn, TimestampingFlag,
};
use socket::AsyncSocket;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let local_sock_addr = SockaddrIn::from_str("0.0.0.0:6790").unwrap();
let local_sock_addr1 = SockaddrIn::from_str("192.168.1.84:44581").unwrap();
let send_sock_addr = SockaddrIn::from_str("192.168.1.123:6790").unwrap();
let rsock = nix::sys::socket::socket(
AddressFamily::Inet,
SockType::Datagram,
SockFlag::all(),
SockProtocol::Udp,
)?;
let ssock = nix::sys::socket::socket(
AddressFamily::Inet,
SockType::Datagram,
SockFlag::all(),
SockProtocol::Udp,
)?;
// let sock_txtime = sock_txtime {
// clockid: nix::time::ClockId::CLOCK_MONOTONIC.as_raw(),
// flags: SOF_TXTIME_REPORT_ERRORS,
// };
setsockopt(rsock, sockopt::Timestamping, &TimestampingFlag::all())?;
setsockopt(ssock, sockopt::Timestamping, &TimestampingFlag::all())?;
// setsockopt(ssock, sockopt::ReuseAddr, &true)?;
// setsockopt(rsock, sockopt::ReuseAddr, &true)?;
// setsockopt(ssock, sockopt::TxTime, &sock_txtime)?;
bind(ssock, &local_sock_addr1)?;
bind(rsock, &local_sock_addr)?;
let recv_socket: AsyncSocket<i32> = AsyncSocket::new(rsock)?;
let send_socket: AsyncSocket<i32> = AsyncSocket::new(ssock)?;
let atomic_i = Arc::new(AtomicU8::new(1));
let mut read_buf = [0u8; 1024];
let mut iov2 = [IoSliceMut::new(&mut read_buf)];
// let mut rbuf1 = [0u8; 1024];
let mut rbuf2 = [0u8; 1024];
// let mut iov3 = [IoSliceMut::new(&mut rbuf1)];
let mut iov4 = [IoSliceMut::new(&mut rbuf2)];
loop {
tokio::select! {
read = recv_socket.read(&mut iov2, MsgFlags::empty()) => {
match read {
Ok(v) => {
println!("Recv sock Received {} bytes in mes {:?}", v, iov2[0].iter().take(v).collect::<Vec<&u8>>());
let i = atomic_i.load(Ordering::Relaxed);
let sbuf: Vec<u8> = (1u8..=i).map(|el| el).collect();
let iov1 = [IoSlice::new(&mut sbuf.as_slice())];
tokio::time::sleep(Duration::from_millis(15)).await;
let _ = recv_socket.write_to(&iov1, &local_sock_addr1).await;
},
Err(e) => println!("Recv Err {:?}", e),
}
},
_tick = tokio::time::sleep(Duration::from_millis(500)) => {
// println!("Tick");
let i = atomic_i.load(Ordering::Relaxed);
if i == 3 {
continue;
// In case you want the sending to last forever
// atomic_i
// .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |n| Some(n - n))
// .unwrap();
// break;
}
let sbuf: Vec<u8> = (1u8..=i).map(|el| el).collect();
let iov1 = [IoSlice::new(&mut sbuf.as_slice())];
let _ = send_socket.write_to(&iov1, &send_sock_addr).await;
// Calling read here results in a deadlock
println!("Message {} sent", i);
atomic_i.fetch_add(1, Ordering::Relaxed);
},
read2 = send_socket.read(&mut iov4, MsgFlags::empty()) => {
match read2 {
Ok(v) => {
println!("Send sock Received {} bytes in mes {:?}", v, iov4[0].iter().take(v).collect::<Vec<&u8>>());
// This second read call is done to retrieve any messages present in the Error queue (timestamps are there)
// match send_socket.read(&mut iov3, MsgFlags::MSG_ERRQUEUE).await {
// Ok(v) => println!("Send sock Received from Error queue {} bytes in mes {:?}", v, iov3[0].iter().take(v).collect::<Vec<&u8>>()),
// Err(e) => println!("Send Err {:?}", e),
// }
},
Err(e) => println!("Send Err {:?}", e),
}
},
// Adding this entry results in very inconsistent behavior for receiving Tx timestamps
// read1 = send_socket.read(&mut iov3, MsgFlags::MSG_ERRQUEUE) => {
// match read1 {
// Ok(v) => println!("Send sock Received from Error queue {} bytes in mes {:?}", v, iov3[0].iter().take(v).collect::<Vec<&u8>>()),
// Err(e) => println!("Send Err {:?}", e),
// }
// },
}
println!("\n")
}
// Ok(())
}
Cargo.toml:
[package]
name = "socket-timestamp-test"
version = "0.1.0"
edition = "2021"
[dependencies]
bytes = "1.3.0"
futures = "0.3.25"
log = "0.4.17"
mio = { version = "0.8.5", features = ["os-ext"] }
nix = { version = "0.26.1", features = ["socket"] }
num_enum = "0.5.7"
thiserror = "1.0.37"
tokio = { version = "1.2", features = [
"sync",
"net",
"macros",
"rt-multi-thread",
"time",
] }
赋予动机
其动机是能够访问常见用户网络库(如 tokio、std、socket2、smoltcp 等)中不存在的配置,例如检索 RX 和 TX 数据包的时间戳消息。
这些值可以通过在正确配置的套接字(配置有 SOF 时间戳标志)中调用 libc 来检索。
通过访问套接字的控制消息,可以从套接字错误队列中检索 Tx 时间戳。recvmsg
当前问题
Tx 时间戳存储在错误队列中(使用 MSG_ERRQUEUE 时),但套接字的轮询仅在收到消息缓冲区时发生。这会导致以下行为:
套接字 1 发送消息 时间戳内容已排队 ...不触发轮询,也不检索消息
套接字 1 接收响应 如果使用 MSG_ERRQUEUE 调用 recvmsg:
- 轮询套接字,直到收到已发送消息的所有时间戳
如果在没有MSG_ERRQUEUE的情况下调用 recvmsg:
- 套接字被轮询并接收响应的有效负载,但不会读取错误队列中的数据(这是预期行为)
如果在 select!关闭,它会导致不一致的行为,有时您会收到错误排队的消息,但有时您不会。这意味着需要再次调用 recvmsg 才能接收剩余的 Tx 时间戳。
预期的整体行为
如果 socket 配置为生成 TX 时间戳,并且 Error 队列已收到此类信息,则异步运行时应自动轮询结果。 当数据进入正常的 recv 缓冲区时,异步运行时也应轮询这些结果。
可能的解决方案
解决此问题的一种方法是使用 recvmsg(...MSG_ERRQUEUE) 发送数据包后。但这很麻烦,并且由于调度,recvmsg 调用无法在发送后立即完成。理想情况下,我希望当 recv 缓冲区中有消息并且 recv 缓冲区中没有消息但错误队列中有一个排队条目时进行轮询。 另一种解决方案是在 recvmsg 之后放置一个带有 MSG_ERRQUEUE 的 recvmsg(不带 MSG_ERRQUE)。这里的问题是,如果套接字没有收到任何东西,我们也不会轮询错误队列。
征求意见
我想得到帮助,弄清楚如何在不手动调用 recvmsg (...MSG_ERRQUEUE)得到它。
我愿意接受有关解决问题的不同方法的建议,即没有 tokio AsyncFd。
答: 暂无答案
评论