Skip to content

Instantly share code, notes, and snippets.

@gliderkite
Last active October 15, 2019 19:34
Show Gist options
  • Save gliderkite/9a084ed3405f08e473c346a0d0ff0ada to your computer and use it in GitHub Desktop.
Save gliderkite/9a084ed3405f08e473c346a0d0ff0ada to your computer and use it in GitHub Desktop.
tokio TCP read_until error

Steps to recreate the issue:

  1. Create a 20MB file to download named file.txt

    truncate -s 20MB file.txt
    
  2. Run the TCP server:

    cargo run --release --bin tcp_server
    
  3. Run the TCP client

    cargo run --release --bin tcp_client
    

The error will present itself when tokio::io::read will return 0, even if all the file data has been sent from the TCP server, for example as:

thread 'tokio-runtime-worker-4' panicked at 'Received less than expected: 19991908 bytes Take { inner: TcpStream { addr: V4(127.0.0.1:38606), peer: V4(127.0.0.1:49152), fd: 186 }, limit: 8092 }', src/tcp_client.rs:46:17
[package]
name = "tcp"
version = "0.1.0"
edition = "2018"
[dependencies]
failure = "0.1"
tokio = "0.1"
futures = "0.1"
[[bin]]
name = "tcp_server"
path = "src/tcp_server.rs"
[[bin]]
name = "tcp_client"
path = "src/tcp_client.rs"
use failure::Error;
use futures::future::Future;
use std::io::BufReader;
use tokio::net::TcpStream;
pub const EOF: u8 = 0;
pub const FILE_SIZE: u64 = 20_000_000; // 20MB
pub const SERVER_SOCKET_ADDR: &str = "127.0.0.1:49152";
const REQUEST_SIZE: usize = 100;
pub fn send(stream: TcpStream) -> impl Future<Item = TcpStream, Error = Error> {
let mut data = vec![b'a'; REQUEST_SIZE - 1];
data.push(EOF);
assert_eq!(REQUEST_SIZE, data.len());
assert_eq!(data[REQUEST_SIZE - 1], EOF);
tokio::io::write_all(stream, data)
.map(|(stream, _)| stream)
.map_err(Error::from)
}
pub fn read_until(stream: TcpStream) -> impl Future<Item = TcpStream, Error = Error> {
let buffer = Vec::new();
let reader = BufReader::new(stream);
tokio::io::read_until(reader, EOF, buffer)
.map_err(Error::from)
.map(move |(s, buff)| {
assert_eq!(buff.len(), REQUEST_SIZE);
s.into_inner()
})
}
pub fn read_exact(stream: TcpStream) -> impl Future<Item = TcpStream, Error = Error> {
let buffer = vec![0; REQUEST_SIZE];
tokio::io::read_exact(stream, buffer)
.map_err(Error::from)
.map(move |(s, _)| s)
}
use failure::Error;
use futures::future::{loop_fn, Future, Loop};
use futures::lazy;
use std::io::Read;
use std::net::SocketAddr;
use tokio::net::TcpStream;
mod common;
fn main() {
tokio::run(lazy(|| {
for _ in 0..100 {
tokio::spawn(download_file().map_err(|e| panic!("{:?}", e)));
}
Ok(())
}))
}
fn download_file() -> impl Future<Item = (), Error = Error> {
let socket_addr: SocketAddr = common::SERVER_SOCKET_ADDR.parse().unwrap();
let request_download = TcpStream::connect(&socket_addr)
.map_err(Error::from)
.and_then(move |stream| common::send(stream))
.and_then(move |stream| common::read_until(stream));
let parse_data_stream = move |stream| {
const BUFF_SIZE: usize = 4096;
let buffer = vec![0; BUFF_SIZE];
loop_fn((stream, buffer, 0), move |(stream, buffer, acc)| {
tokio::io::read(stream, buffer)
.map_err(Error::from)
.map(move |(stream, buffer, n)| {
if n > 0 {
Loop::Continue((stream, buffer, acc + n as u64))
} else {
Loop::Break((stream, acc))
}
})
})
};
request_download.and_then(move |stream| {
let stream = stream.take(common::FILE_SIZE);
parse_data_stream(stream).map(move |(stream, n)| {
if n != common::FILE_SIZE {
panic!("Received less than expected: {} bytes {:?}", n, stream);
}
})
})
}
use failure::Error;
use futures::future::Future;
use futures::stream::Stream;
use std::net::{ SocketAddr};
use std::path::PathBuf;
use tokio::net::{TcpListener, TcpStream};
mod common;
const FILE_PATH: &str = "file.txt";
fn main() -> Result<(), Error> {
let socket_addr: SocketAddr = common::SERVER_SOCKET_ADDR.parse().unwrap();
let listener = TcpListener::bind(&socket_addr)?
.incoming()
.map_err(Error::from)
.for_each(|stream| {
common::read_until(stream)
.and_then(|stream| common::send(stream))
.and_then(|stream| send_file(stream, FILE_PATH.into()))
.map(drop)
});
tokio::run(listener.map_err(drop));
Ok(())
}
fn send_file(stream: TcpStream, path: PathBuf) -> impl Future<Item = TcpStream, Error = Error> {
tokio::fs::File::open(path)
.and_then(move |file| {
tokio::io::copy(file, stream).map(move |(n, _, stream)| {
assert_eq!(n, common::FILE_SIZE);
stream
})
})
.map_err(Error::from)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment