AsyncWriteExt 不会写入所有字节 Rust TcpStream

AsyncWriteExt does not write all bytes Rust TcpStream

提问人:Tutu ツ 提问时间:11/16/2023 最后编辑:Tutu ツ 更新时间:11/16/2023 访问量:75

问:

我正在实现一个TCP服务器来传递消息。

从库中,我可以与不同的渠道共享此 tcp 连接。这个库只允许我使用 和 方法来写入和读取字节。这不是问题,因为它适合我的需求。tokio-yamuxAsyncWriteExtAsyncReadExt

当我发送整条消息并且其中缺少字节时,问题就来了,因此应用程序中会发生全局块。客户端发送 X 个字节,在 tcp 发送之前,会添加一个长度为这些字节的标头。通过这种方式,服务器可以确切地知道要读取的字节。

出于这个原因,当我在本例中写入字节时,262144,只有262136是用 or 方法写入的。然后,如标头中所示,总共有262144它被阻止,因为仍有字节要读取。write_allwrite

我不明白为什么会这样。

小例子: 客户:

use futures::prelude::*;
use std::{error::Error, vec};
use tokio::{
    io::{AsyncReadExt, AsyncWriteExt},
    net::TcpStream,
};
use tokio_yamux::{config::Config, session::Session};

#[tokio::main]
pub async fn main() -> Result<(), Box<dyn Error>> {
    let socket = TcpStream::connect("127.0.0.1:8080").await?;
    println!("[client] connected to server: {:?}", socket.peer_addr()?);
    println!("created stream");

    let mut session = Session::new_client(socket, Config::default());
    let ctrl = session.control();

    let mut handles = Vec::new();

    tokio::spawn(async move {
        loop {
            match session.next().await {
                Some(Ok(_)) => (),
                Some(Err(e)) => {
                    println!("{}", e);
                    break;
                }
                None => {
                    println!("closed");
                    break;
                }
            }
        }
    });

    for _i in 0..1 {
        let mut ctrl_clone = ctrl.clone();
        handles.push(tokio::spawn(async move {
            match ctrl_clone.open_stream().await {
                Ok(mut stream) => {


                    // This sections represents Check User Operation
                    // Args -> operation_id = I selected a random i32
                    //      -> queue_id = queue to write

                    // Writes to the server to identify the operation and get the queue
                    let operation_id = 0;
                    let queue_id = Some(2);

                    let data_to_send;

                    match queue_id {
                        Some(id) => {
                            data_to_send = vec![operation_id, id];
                        }
                        None => {
                            data_to_send = vec![operation_id];
                        }
                    }

                    let data_to_send: Vec<u8> = data_to_send
                        .clone()
                        .into_iter()
                        .flat_map(|x| i32::to_be_bytes(x))
                        .collect();

                    stream.write_all(&data_to_send).await.unwrap();
                    stream.flush().await.unwrap();

                    let sv_code: i32;
                    let mut buf = [1; 4];

                    // Reads from server to recieve an ACCEPTED MESSAGE
                    loop {
                        //stream.readable().await.unwrap();

                        match stream.read_exact(&mut buf).await {
                            Ok(0) => {}
                            Ok(n) => {
                                println!("Client: Reading Buffer: n_bytes {:?}", n);
                                sv_code = i32::from_be_bytes(buf);
                                break;
                            }
                            Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {
                                //println!("Err: TCP -> SV (Write))");
                                continue;
                            }
                            Err(_e) => {
                                //return Err(e.into());
                            }
                        }
                    }

                    // Here I return a code to client just to Continue with the execution and start Sending messages
                    // Skipped

                    // This section represents SEND Operation
                    // Header 

                    let data: [u8; 262140] = [1; 262140];

                    let mut vec_data = data.to_vec();
                    
                    let len = data.len() as u32;
                    let len_slices = len.to_be_bytes();

                    for slice in len_slices {
                        vec_data.insert(0, slice);
                    }

                    println!("Total_len: {:?}", vec_data.len());
                    let n = stream.write_all(&vec_data).await.unwrap();
                    println!("N: {:?}", n);
                    stream.flush().await.unwrap();

                    println!("Fin write");
                }
                Err(e) => {
                    println!("{:?}", e);
                }
            }
        }));
    }

    for handle in handles {
        let _ = handle.await;
    }

    Ok(())
}

服务器:

use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpListener;

use std::error::Error;

use futures::prelude::*;
use tokio_yamux::{config::Config, session::Session};

// https://github.com/nervosnetwork/tentacle

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
    let addr = "127.0.0.1:8080".to_string();

    let listener = TcpListener::bind(&addr).await.unwrap();

    while let Ok((socket, _)) = listener.accept().await {
        println!("accepted a socket: {:?}", socket.peer_addr());
        let mut session = Session::new_server(socket, Config::default());
        tokio::spawn(async move {
            while let Some(Ok(mut stream)) = session.next().await {
                println!("Server accept a stream from client: id={}", stream.id());
                tokio::spawn(async move {
                    let mut buffer = Vec::<u8>::with_capacity(8);
                    
                    // Identify operation from Client
                    let op_code = stream.read_i32().await.unwrap();

                    println!("op_id: {:?}", op_code);

                    let queue_id = Some(stream.read_i32().await.unwrap());

                    println!("queue_id: {:?}", queue_id);

                    // Here I write to client -> Accepted
                    let mut sv_code: i32 = 0;
                    stream.write_all(&sv_code.to_be_bytes()).await.unwrap();
                    stream.flush().await.unwrap();

                    // Starting receiving messages
                    let mut total_bytes = 0;
                    let mut n_bytes_read = 0;

                    let mut chunk_id = 0;
                    let mut last_chunk = false;
                    let mut len_slices: [u8; 4] = [0; 4];
                    loop {

                        println!("n_bytes read: {:?}", n_bytes_read);

                        let mut capacity = 65535;
                
                        if n_bytes_read == 0 {
                            capacity = 65539;
                        }
                
                        let mut buffer = Vec::<u8>::with_capacity(capacity);
                                
                        println!("Blocked?");
                        match stream.read_buf(&mut buffer).await {
                            Ok(0) => continue,
                            Ok(n) => {
                                println!("N: {:?}", n);
                                if n_bytes_read == 0 {
                                    for i in 0..4 {
                                        len_slices[i] = buffer.remove(0);
                                    }
                                    total_bytes = u32::from_le_bytes(len_slices);
                                    total_bytes += 4;
                                }
                
                                buffer.truncate(n);
                
                                n_bytes_read += n;
                
                                if n_bytes_read == total_bytes.try_into().unwrap() {
                                    last_chunk = true;
                                }
                
                                chunk_id += 1;
                
                                if last_chunk {
                                    break;
                                }
                            }
                            Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {
                                println!("Err: TCP -> SV (Write))");
                                continue;
                            }
                            Err(e) => {
                                break;
                            }
                        }
                    }

                    println!("Finished");

                });
            }
        });
    }

    Ok(())
}

输出:

客户:

[client] connected to server: 127.0.0.1:8080
created stream
Client: Reading Buffer: n_bytes 4
Total_len: 262144
Blocked here

服务器:

accepted a socket: Ok(127.0.0.1:52430)
Server accept a stream from client: id=1
n_bytes read: 0
Blocked?
N: 65539
n_bytes read: 65539
Blocked?
N: 65535
n_bytes read: 131074
Blocked?
N: 65535
n_bytes read: 196609
Blocked?
N: 65527
n_bytes read: 262136
Blocked?
TCP Rust-tokio

评论

1赞 Chayim Friedman 11/16/2023
write()可能确实不会写入所有字节。 应该。您说您已经对两者都进行了测试,因此请将 替换为 .另外,请发布一个最小的可重复示例write_all()write()write_all()
0赞 Tutu ツ 11/16/2023
我创建了一个小例子,并且工作得很好......我不明白为什么在我的真实代码中没有。我每次写作时都使用 write.flush()。我将添加一个小例子......
1赞 Chayim Friedman 11/16/2023
如果它工作完美,它对我们没有帮助。很高兴您发现了这一点,因为这意味着问题不在您向我们展示的代码中。因此,现在花点时间尝试剥离程序的越来越多的部分,直到找到同样存在相同问题的最小程序。之后,您可以在此处发布。谁知道呢,也许你甚至会一路上自己找到答案!(在这种情况下,请务必在此处发布自我回答)。
1赞 Chayim Friedman 11/16/2023
它靠运气工作,但请注意,您在服务器代码中的第一次使用是不正确的。它不妨少读一些。您需要 read_exact() 或更好的是两个 read_i32()。read_buf()
1赞 Chayim Friedman 11/16/2023
read_buf()会注意这一点,它不会比实际阅读的位置更进一步。

答:

2赞 Tutu ツ 11/16/2023 #1

错误出在多路复用器配置中。在这里你可以看到:tokio_yamux::config::Config,允许的最大窗口数为 262144 字节。在此示例中超出了此值,因此它通过 TcpStream 创建的流达到了设置的限制。如果创建了新配置,则不会再增加此错误的值。Config::default()max_stream_window_size

溶液:

let config = Config {
        accept_backlog: 256,
        enable_keepalive: true,
        keepalive_interval: Duration::from_secs(30),
        connection_write_timeout: Duration::from_secs(10),
        max_stream_count: 65535,
        max_stream_window_size: 4294967295,
    };

let mut session = Session::new_server(socket, config);

评论

0赞 MeetTitan 11/17/2023
我很感激你让它工作,并给你一个赞;但是有没有办法在不把 Y2K 罐子踢到路上的情况下解决它?你能保证你的消息永远不会超过这个新的最大值吗?