Skip to content

Instantly share code, notes, and snippets.

@vi

vi/tcpclose.crs

Created Aug 2, 2018
Embed
What would you like to do?
Rust Tokio fast TCP socket close handling
#!/usr/bin/env run-cargo-script
//! ```cargo
//! [dependencies]
//! tokio = "=0.1.7"
//! tokio-codec = "*"
//! tokio-io = "=0.1.7"
//! futures-cpupool = "*"
//! mio = "*"
//!
//! [replace]
//! "tokio:0.1.7" = { path = "/mnt/src/git/tokio" }
//! "tokio-io:0.1.7" = { path = "/mnt/src/git/tokio/tokio-io" }
//! ```
extern crate tokio;
extern crate tokio_codec;
extern crate tokio_io;
extern crate futures_cpupool;
extern crate mio;
use tokio_codec::{Decoder, LinesCodec};
use tokio::net::TcpListener;
use tokio::prelude::*;
use std::env;
use std::net::SocketAddr;
use std::sync::{Mutex,Arc};
use std::io::{Error as IoError,ErrorKind};
use futures_cpupool::CpuPool;
use tokio::net::TcpStream;
struct WaitForSocketClosure(TcpStream);
impl Future for WaitForSocketClosure {
type Item = ();
type Error = ();
fn poll(&mut self) -> Result<Async<()>,()> {
match self.0.poll_read_ready(mio::Ready::empty()) {
Ok(Async::Ready(_)) => {
println!("HUP");
Ok(Async::Ready(()))
},
Ok(Async::NotReady) => {
//println!("not_ready");
Ok(Async::NotReady)
},
Err(_) => {
println!("err");
Err(())
},
}
}
}
fn main() -> Result<(),Box<std::error::Error>> {
let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string());
let addr = addr.parse::<SocketAddr>()?;
let socket = TcpListener::bind(&addr)?;
println!("Listening on: {}", addr);
let pool = CpuPool::new(1);
let done = socket
.incoming()
.map_err(|e| println!("failed to accept socket; error = {:?}", e))
.for_each(move |socket| {
let socket2 = socket.try_clone().unwrap();
let framed = LinesCodec::new().framed(socket);
let (writer, reader) = framed.split();
let pool = pool.clone();
let abort_flag = Arc::new(Mutex::new(false));
let abort_flag2 = abort_flag.clone();
let abort_flag3 = abort_flag.clone();
let wait_for_socket_closed_future = WaitForSocketClosure(socket2);
let processor = reader
.and_then(move |bytes| {
println!("Received request: {:?}", bytes);
let abort_flag = abort_flag.clone();
pool.spawn_fn(move || {
for i in {0..19} {
match abort_flag.lock() {
Ok(ref x) if **x => {
println!("Request aborted: {:?}", bytes);
return future::err(
IoError::new(
ErrorKind::ConnectionAborted, "..."));
},
_ => (),
}
if i % 5 == 0 {
println!("Processing request: {:?} {}%", bytes, i*5);
}
::std::thread::sleep(std::time::Duration::from_millis(100));
}
println!("Processing finished: {:?}", bytes);
future::ok(bytes)
})
})
.forward(writer)
.and_then(|_| {
println!("Socket received FIN packet and closed connection");
Ok(())
})
.or_else(|err| {
println!("Socket closed with error: {:?}", err);
Err(err)
})
.then(move |result| {
println!("Socket closed with result: {:?}", result);
if let Ok(ref mut x) = abort_flag2.lock() { **x = true; };
Ok(())
});
let prog = processor.select(
wait_for_socket_closed_future
.map(move |()| {
if let Ok(ref mut x) = abort_flag3.lock() { **x = true; };
})
).map(drop).map_err(drop);
tokio::spawn(prog)
});
tokio::run(done);
Ok(())
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.