Created
August 2, 2018 16:29
-
-
Save vi/65e2c68fbed3baacce2a18b56d875070 to your computer and use it in GitHub Desktop.
Rust Tokio fast TCP socket close handling
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env run-cargo-script | |
//! ```cargo | |
//! [dependencies] | |
//! tokio = "=0.1.7" | |
//! tokio-codec = "*" | |
//! tokio-io = "=0.1.7" | |
//! futures-cpupool = "*" | |
//! mio = "*" | |
//! | |
//! [replace] | |
//! "tokio:0.1.7" = { path = "/mnt/src/git/tokio" } | |
//! "tokio-io:0.1.7" = { path = "/mnt/src/git/tokio/tokio-io" } | |
//! ``` | |
extern crate tokio; | |
extern crate tokio_codec; | |
extern crate tokio_io; | |
extern crate futures_cpupool; | |
extern crate mio; | |
use tokio_codec::{Decoder, LinesCodec}; | |
use tokio::net::TcpListener; | |
use tokio::prelude::*; | |
use std::env; | |
use std::net::SocketAddr; | |
use std::sync::{Mutex,Arc}; | |
use std::io::{Error as IoError,ErrorKind}; | |
use futures_cpupool::CpuPool; | |
use tokio::net::TcpStream; | |
struct WaitForSocketClosure(TcpStream); | |
impl Future for WaitForSocketClosure { | |
type Item = (); | |
type Error = (); | |
fn poll(&mut self) -> Result<Async<()>,()> { | |
match self.0.poll_read_ready(mio::Ready::empty()) { | |
Ok(Async::Ready(_)) => { | |
println!("HUP"); | |
Ok(Async::Ready(())) | |
}, | |
Ok(Async::NotReady) => { | |
//println!("not_ready"); | |
Ok(Async::NotReady) | |
}, | |
Err(_) => { | |
println!("err"); | |
Err(()) | |
}, | |
} | |
} | |
} | |
fn main() -> Result<(),Box<std::error::Error>> { | |
let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string()); | |
let addr = addr.parse::<SocketAddr>()?; | |
let socket = TcpListener::bind(&addr)?; | |
println!("Listening on: {}", addr); | |
let pool = CpuPool::new(1); | |
let done = socket | |
.incoming() | |
.map_err(|e| println!("failed to accept socket; error = {:?}", e)) | |
.for_each(move |socket| { | |
let socket2 = socket.try_clone().unwrap(); | |
let framed = LinesCodec::new().framed(socket); | |
let (writer, reader) = framed.split(); | |
let pool = pool.clone(); | |
let abort_flag = Arc::new(Mutex::new(false)); | |
let abort_flag2 = abort_flag.clone(); | |
let abort_flag3 = abort_flag.clone(); | |
let wait_for_socket_closed_future = WaitForSocketClosure(socket2); | |
let processor = reader | |
.and_then(move |bytes| { | |
println!("Received request: {:?}", bytes); | |
let abort_flag = abort_flag.clone(); | |
pool.spawn_fn(move || { | |
for i in {0..19} { | |
match abort_flag.lock() { | |
Ok(ref x) if **x => { | |
println!("Request aborted: {:?}", bytes); | |
return future::err( | |
IoError::new( | |
ErrorKind::ConnectionAborted, "...")); | |
}, | |
_ => (), | |
} | |
if i % 5 == 0 { | |
println!("Processing request: {:?} {}%", bytes, i*5); | |
} | |
::std::thread::sleep(std::time::Duration::from_millis(100)); | |
} | |
println!("Processing finished: {:?}", bytes); | |
future::ok(bytes) | |
}) | |
}) | |
.forward(writer) | |
.and_then(|_| { | |
println!("Socket received FIN packet and closed connection"); | |
Ok(()) | |
}) | |
.or_else(|err| { | |
println!("Socket closed with error: {:?}", err); | |
Err(err) | |
}) | |
.then(move |result| { | |
println!("Socket closed with result: {:?}", result); | |
if let Ok(ref mut x) = abort_flag2.lock() { **x = true; }; | |
Ok(()) | |
}); | |
let prog = processor.select( | |
wait_for_socket_closed_future | |
.map(move |()| { | |
if let Ok(ref mut x) = abort_flag3.lock() { **x = true; }; | |
}) | |
).map(drop).map_err(drop); | |
tokio::spawn(prog) | |
}); | |
tokio::run(done); | |
Ok(()) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment