Skip to content

Instantly share code, notes, and snippets.

@rust-play
Created August 1, 2018 20:38
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save rust-play/743a330b1907964b52f9ec7846674d0d to your computer and use it in GitHub Desktop.
Save rust-play/743a330b1907964b52f9ec7846674d0d to your computer and use it in GitHub Desktop.
Code shared from the Rust Playground
#!/usr/bin/env run-cargo-script
// cargo-deps: tokio, tokio-codec, tokio-io, futures-cpupool
extern crate tokio;
extern crate tokio_codec;
extern crate tokio_io;
extern crate futures_cpupool;
use tokio_codec::{Decoder, LinesCodec};
use tokio::net::TcpListener;
use tokio::prelude::*;
use std::env;
use std::net::SocketAddr;
use std::sync::{Mutex,Arc};
use std::io::{Error as IoError,ErrorKind};
use futures_cpupool::CpuPool;
fn main() -> Result<(),Box<std::error::Error>> {
let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string());
let addr = addr.parse::<SocketAddr>()?;
let socket = TcpListener::bind(&addr)?;
println!("Listening on: {}", addr);
let pool = CpuPool::new(1);
let done = socket
.incoming()
.map_err(|e| println!("failed to accept socket; error = {:?}", e))
.for_each(move |socket| {
let framed = LinesCodec::new().framed(socket);
let (writer, reader) = framed.split();
let pool = pool.clone();
let abort_flag = Arc::new(Mutex::new(false));
let abort_flag2 = abort_flag.clone();
let abort_flag3 = abort_flag.clone();
let wait_for_socket_closed_future = {
// unimplemented!()
future::empty::<(),()>()
//future::ok::<(),()>(())
};
let processor = reader
.and_then(move |bytes| {
println!("Received request: {:?}", bytes);
let abort_flag = abort_flag.clone();
pool.spawn_fn(move || {
for i in {0..19} {
match abort_flag.lock() {
Ok(ref x) if **x => {
println!("Request aborted: {:?}", bytes);
return future::err(
IoError::new(
ErrorKind::ConnectionAborted, "..."));
},
_ => (),
}
if i % 5 == 0 {
println!("Processing request: {:?} {}%", bytes, i*5);
}
::std::thread::sleep(std::time::Duration::from_millis(100));
}
println!("Processing finished: {:?}", bytes);
future::ok(bytes)
})
})
.forward(writer)
.and_then(|_| {
println!("Socket received FIN packet and closed connection");
Ok(())
})
.or_else(|err| {
println!("Socket closed with error: {:?}", err);
Err(err)
})
.then(move |result| {
println!("Socket closed with result: {:?}", result);
if let Ok(ref mut x) = abort_flag2.lock() { **x = true; };
Ok(())
});
let prog = processor.select(
wait_for_socket_closed_future
.map(move |()| {
if let Ok(ref mut x) = abort_flag3.lock() { **x = true; };
})
).map(drop).map_err(drop);
tokio::spawn(prog)
});
tokio::run(done);
Ok(())
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment