Skip to content

Instantly share code, notes, and snippets.

@alexcrichton
Created February 17, 2017 17:40
Show Gist options
  • Star 4 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save alexcrichton/52482525b06ce6efe12506e4b6fe9c5b to your computer and use it in GitHub Desktop.
Save alexcrichton/52482525b06ce6efe12506e4b6fe9c5b to your computer and use it in GitHub Desktop.
extern crate futures;
extern crate futures_cpupool;
extern crate rand;
use futures::{Future, Sink, Stream};
/// Sleep for a random time between 0 and 1 second.
fn sleep_random() {
let sleep_time_ms = rand::random::<u8>() as f64 / 255.0 * 1000.0;
std::thread::sleep(std::time::Duration::from_millis(sleep_time_ms as u64));
}
fn main() {
// produce input
let (mut tx, rx) = futures::sync::mpsc::channel(3);
let handle = std::thread::Builder::new()
.name("source".to_string())
.spawn(|| {
for i in 1..10 {
sleep_random();
println!("Source produced {} (in thread \"{}\")",
i,
std::thread::current().name().unwrap());
tx = tx.send(i).wait().unwrap();
}
})
.unwrap();
// generate a thread pool
let pool = futures_cpupool::Builder::new()
.name_prefix("pool-")
.create();
// process input
let rx = rx.map(|i| {
pool.spawn_fn(move || {
sleep_random();
println!("Stage 0 received {} (in thread \"{}\")",
i,
std::thread::current().name().unwrap());
Ok(i)
})
})
.buffered(10)
.map(|i| {
pool.spawn_fn(move || {
sleep_random();
println!("Stage 1 received {} (in thread \"{}\")",
i,
std::thread::current().name().unwrap());
Ok(i)
})
})
.buffered(10)
.map(|i| {
pool.spawn_fn(move || {
sleep_random();
println!("Stage 2 received {} (in thread \"{}\")",
i,
std::thread::current().name().unwrap());
Ok(i)
})
})
.buffered(10);
// consume stream
for (i, r) in rx.wait().enumerate() {
assert_eq!(r.unwrap(), i + 1);
}
handle.join().unwrap();
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment