提问人:Starua 提问时间:7/11/2023 最后编辑:Starua 更新时间:7/13/2023 访问量:64
如何在 Rust 中将 AsyncReader “分叉”为两个?
How to "fork" an AsyncReader into two in Rust?
问:
我目前正在 Rust 中实现一个 TCP 代理,它可以检测 HTTPS 连接中的服务器名称并根据它决定使用哪个代理。具体来说,我需要从 TcpStream 复制数据并将其传递给 tokio_rustls::LazyConfigAcceptor 以检测 ClientHello 消息。
我正在寻找一种解决方案,了解如何有效地将 AsyncReader 分叉为两个独立的读取器,这两个读取器都可以从底层读取器读取数据而不会相互阻塞。此外,我希望缓冲区能够动态增长,以适应每个读取器读取的不同数量的数据。
例
let (reader, reader_copy) = fork(reader);
// ClientHello data will read by detect_server_name *and* forwarded to proxy
let server_name = detect_server_name(reader_copy).await;
proxy_map.get(&server_name).forward(reader).await;
答:
1赞
Starua
7/13/2023
#1
通过共享一个 VecDeq 缓冲区并标记两个索引来解决
use std::cmp::min;
use std::collections::VecDeque;
use std::pin::Pin;
use std::sync::{Arc, Mutex};
use std::task::{ready, Waker};
use std::task::{Context, Poll};
use tokio::io::{AsyncRead, ReadBuf};
struct Inner<R> {
reader: R,
buffer: VecDeque<u8>,
cursors: (usize, usize),
wakers: (Option<Waker>, Option<Waker>),
}
pub struct AsyncForkReader<R> {
inner: Arc<Mutex<Inner<R>>>,
is_left: bool,
}
impl<R> AsyncForkReader<R> {
pub fn new(reader: R) -> (Self, Self) {
let inner = Arc::new(Mutex::new(Inner {
reader,
buffer: VecDeque::new(),
cursors: (0, 0),
wakers: (None, None),
}));
(
Self {
inner: inner.clone(),
is_left: true,
},
Self {
inner,
is_left: false,
},
)
}
}
impl<R: AsyncRead> AsyncRead for AsyncForkReader<R> {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<std::io::Result<()>> {
let mut inner = self.inner.lock().unwrap();
let Inner {
ref mut buffer,
ref mut cursors,
ref mut reader,
ref mut wakers,
} = *inner;
let (ref mut cursor, ref mut other_cursor);
let (ref mut waker, ref mut other_waker);
if self.is_left {
cursor = &mut cursors.0;
waker = &mut wakers.0;
other_cursor = &mut cursors.1;
other_waker = &mut wakers.1;
} else {
cursor = &mut cursors.1;
waker = &mut wakers.1;
other_cursor = &mut cursors.0;
other_waker = &mut wakers.0;
};
if buffer.len() == *cursor {
*waker = Some(cx.waker().clone());
// According to the documentation, only the last waker should be called when poll is called many times
// So we need to wake another fork manualy to make sure waker another fork will not be coverd and nerver wake again
if let Some(waker) = other_waker.take() {
waker.wake();
}
// If reads all buffer then try poll more
let origin_len = buf.filled().len();
// Cannot use subject: Mutex
// Safety: Never moved out
ready!(unsafe { Pin::new_unchecked(reader) }.poll_read(cx, buf))?;
let slice = buf.filled().split_at(origin_len).1;
buffer.reserve(slice.len());
buffer.extend(slice.iter());
*cursor = buffer.len();
} else {
*waker = None;
// get datas in buffer
let len = min(buffer.len() - *cursor, buf.remaining());
buffer
.range(*cursor..(*cursor + len))
.for_each(|value| buf.put_slice(&[*value]));
*cursor += len;
// drop data that both unused by A and B
let release_len = min(*cursor, *other_cursor);
if release_len > 0 {
buffer.drain(0..release_len);
*cursor -= release_len;
*other_cursor -= release_len;
}
}
Poll::Ready(Ok(()))
}
}
评论