Skip to content

Instantly share code, notes, and snippets.

@skade
Last active December 12, 2017 19:32
Show Gist options
  • Star 3 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save skade/25f9b0c87b1d5bd39d9fc6ffe0d1840a to your computer and use it in GitHub Desktop.
Save skade/25f9b0c87b1d5bd39d9fc6ffe0d1840a to your computer and use it in GitHub Desktop.
Passing data between a reactor and a thread pool, back and forth. Playground link: https://play.rust-lang.org/?gist=25f9b0c87b1d5bd39d9fc6ffe0d1840a
extern crate tokio_core;
extern crate futures_cpupool;
extern crate futures;
use futures::future::lazy;
use std::sync::Arc;
// Make sure data is not copy
#[derive(Debug)]
struct Data {
inner: i32
}
// This isn't very interesting and that's kind of the point.
// What this code does is take a value and pass it through
// multiple concurrent systems. It passes the value on when
// a suspension point is reached.
//
// This goes:
// 1. Run a future on a tokio core
// 2. When it finishes, pass the value to a thread on a thread pool
// 3. By the end of the thread, pass the value back to a reactor
// 4. Once the future finishes, put it on the pool again
//
// How is that useful? Imagine having the following scenario:
// 1. You accept an HTTP request using async IO
// 2. You start handling the request in a thread
// 3. You start a couple of backend calls, using an async client
// b. You join on all of them
// 4. You continue running
//
// Often, you'd block the request handling thread at point 3, waiting
// for the async operations to finish.
//
// Using this technique, you can release the request handling thread
// while waiting for the backends to respond and then hop back on one
// when things are done.
//
// Obviously, depending on your scenario, this might or might not make
// sense.
fn main() {
let mut core = tokio_core::reactor::Core::new().unwrap();
let pool = Arc::new(futures_cpupool::CpuPool::new(4));
let data = Data { inner: 42 };
let remote = core.remote();
let on_reactor = lazy(move || {
println!("on_reactor: {:?}", data);
let remote_clone = remote.clone();
let pool_handle = pool.clone();
let on_pool = lazy(move || {
println!("on_pool: {:?}", data);
let another_remote = remote_clone.clone();
let on_reactor_again = lazy(move || {
println!("on_reactor_again: {:?}", data);
let back_to_the_pool = lazy(move || {
println!("back_to_the_pool: {:?}", data);
let result: Result<(),()> = Ok(());
result
});
let pool_future = pool_handle.spawn(back_to_the_pool);
another_remote.spawn(|_| { pool_future });
let result: Result<(),()> = Ok(());
result
});
remote_clone.spawn(|_| { on_reactor_again });
let result: Result<(),()> = Ok(());
result
});
let pool_future = pool.spawn(on_pool);
remote.spawn(|_| { pool_future });
let result: Result<(),()> = Ok(());
result
});
core.run(on_reactor).unwrap();
// This avoids exiting before we are done.
core.turn(None);
core.turn(None);
core.turn(None);
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment