如何在 Rust 中将 AsyncReader “分叉”为两个?

How to "fork" an AsyncReader into two in Rust?

提问人:Starua 提问时间:7/11/2023 最后编辑:Starua 更新时间:7/13/2023 访问量:64

问:

我目前正在 Rust 中实现一个 TCP 代理,它可以检测 HTTPS 连接中的服务器名称并根据它决定使用哪个代理。具体来说,我需要从 TcpStream 复制数据并将其传递给 tokio_rustls::LazyConfigAcceptor 以检测 ClientHello 消息。

我正在寻找一种解决方案,了解如何有效地将 AsyncReader 分叉为两个独立的读取器,这两个读取器都可以从底层读取器读取数据而不会相互阻塞。此外,我希望缓冲区能够动态增长,以适应每个读取器读取的不同数量的数据。

let (reader, reader_copy) = fork(reader);
// ClientHello data will read by detect_server_name *and* forwarded to proxy
let server_name = detect_server_name(reader_copy).await;
proxy_map.get(&server_name).forward(reader).await; 
rust 代理 io 缓冲

评论

1赞 Chayim Friedman 7/11/2023
它们会读取相同的数据,还是当一个人读取某些内容时,另一个人看不到它?
0赞 Starua 7/11/2023
@ChayimFriedman相同的数据

答:

1赞 Starua 7/13/2023 #1

通过共享一个 VecDeq 缓冲区并标记两个索引来解决

use std::cmp::min;
use std::collections::VecDeque;
use std::pin::Pin;
use std::sync::{Arc, Mutex};
use std::task::{ready, Waker};
use std::task::{Context, Poll};
use tokio::io::{AsyncRead, ReadBuf};

struct Inner<R> {
    reader: R,
    buffer: VecDeque<u8>,
    cursors: (usize, usize),
    wakers: (Option<Waker>, Option<Waker>),
}
pub struct AsyncForkReader<R> {
    inner: Arc<Mutex<Inner<R>>>,
    is_left: bool,
}
impl<R> AsyncForkReader<R> {
    pub fn new(reader: R) -> (Self, Self) {
        let inner = Arc::new(Mutex::new(Inner {
            reader,
            buffer: VecDeque::new(),
            cursors: (0, 0),
            wakers: (None, None),
        }));
        (
            Self {
                inner: inner.clone(),
                is_left: true,
            },
            Self {
                inner,
                is_left: false,
            },
        )
    }
}

impl<R: AsyncRead> AsyncRead for AsyncForkReader<R> {
    fn poll_read(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        buf: &mut ReadBuf<'_>,
    ) -> Poll<std::io::Result<()>> {
        let mut inner = self.inner.lock().unwrap();
        let Inner {
            ref mut buffer,
            ref mut cursors,
            ref mut reader,
            ref mut wakers,
        } = *inner;
        let (ref mut cursor, ref mut other_cursor);
        let (ref mut waker, ref mut other_waker);
        if self.is_left {
            cursor = &mut cursors.0;
            waker = &mut wakers.0;
            other_cursor = &mut cursors.1;
            other_waker = &mut wakers.1;
        } else {
            cursor = &mut cursors.1;
            waker = &mut wakers.1;
            other_cursor = &mut cursors.0;
            other_waker = &mut wakers.0;
        };

        if buffer.len() == *cursor {
            *waker = Some(cx.waker().clone());
            // According to the documentation, only the last waker should be called when poll is called many times
            // So we need to wake another fork manualy to make sure waker another fork will not be coverd and nerver wake again
            if let Some(waker) = other_waker.take() {
                waker.wake();
            }
            // If reads all buffer then try poll more
            let origin_len = buf.filled().len();
            // Cannot use subject: Mutex
            // Safety: Never moved out
            ready!(unsafe { Pin::new_unchecked(reader) }.poll_read(cx, buf))?;
            let slice = buf.filled().split_at(origin_len).1;
            buffer.reserve(slice.len());
            buffer.extend(slice.iter());
            *cursor = buffer.len();
        } else {
            *waker = None;
            // get datas in buffer
            let len = min(buffer.len() - *cursor, buf.remaining());

            buffer
                .range(*cursor..(*cursor + len))
                .for_each(|value| buf.put_slice(&[*value]));
            *cursor += len;

            // drop data that both unused by A and B
            let release_len = min(*cursor, *other_cursor);
            if release_len > 0 {
                buffer.drain(0..release_len);
                *cursor -= release_len;
                *other_cursor -= release_len;
            }
        }
        Poll::Ready(Ok(()))
    }
}