Skip to content

Instantly share code, notes, and snippets.

@alamb
Last active July 17, 2024 01:49
Show Gist options
  • Save alamb/bd0e086448ef9b438aeebd6f550e23ed to your computer and use it in GitHub Desktop.
Save alamb/bd0e086448ef9b438aeebd6f550e23ed to your computer and use it in GitHub Desktop.
// Please find the full, tested version in
// https://github.com/influxdata/influxdb_iox/blob/fe155e15fb2ad166aee66b0458e63c24a8128dd4/query/src/exec/task.rs#L101-L118
pub struct DedicatedExecutor {
state: Arc<Mutex<State>>,
}
/// Runs futures (and any `tasks` that are `tokio::task::spawned` by
/// them) on a separate Tokio Executor
struct State {
/// Channel for requests -- the dedicated executor takes requests
/// from here and runs them.
requests: Option<std::sync::mpsc::Sender<Task>>,
/// Thread which has a different Tokio runtime
/// installed and spawns tasks there
thread: Option<std::thread::JoinHandle<()>>,
}
impl DedicatedExecutor {
/// Creates a new `DedicatedExecutor` with a dedicated Tokio
/// executor that is separate from the threadpool created via
/// `[tokio::main]`.
pub fn new(thread_name: &str, num_threads: usize) -> Self {
let thread_name = thread_name.to_string();
let (tx, rx) = std::sync::mpsc::channel::<Task>();
let thread = std::thread::spawn(move || {
// Create a new Runtime to run tasks
let runtime = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.thread_name(&thread_name)
.worker_threads(num_threads)
// Lower OS priority of worker threads to prioritize main runtime
.on_thread_start(move || set_current_thread_priority_low())
.build()
.expect("Creating Tokio runtime");
// Pull task requests off the channel and send them to the executor
runtime.block_on(async move {
while let Ok(task) = rx.recv() {
tokio::task::spawn(async move {
task.run().await;
});
}
let state = State {
requests: Some(tx),
thread: Some(thread),
};
Self {
state: Arc::new(Mutex::new(state)),
}
}
/// Runs the specified Future (and any tasks it spawns) on the
/// `DedicatedExecutor`.
pub fn spawn<T>(&self, task: T) -> Job<T::Output>
where
T: Future + Send + 'static,
T::Output: Send + 'static,
{
let (tx, rx) = tokio::sync::oneshot::channel();
let fut = Box::pin(async move {
let task_output = task.await;
tx.send(task_output).ok()
});
let mut state = self.state.lock();
let task = Task {
fut,
};
if let Some(requests) = &mut state.requests {
// would fail if someone has started shutdown
requests.send(task).ok();
} else {
warn!("tried to schedule task on an executor that was shutdown");
}
Job { rx, cancel }
}
#[pin_project(PinnedDrop)]
pub struct Job<T> {
#[pin]
rx: Receiver<T>,
}
impl<T> Future for Job<T> {
type Output = Result<T, Error>;
fn poll(
self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Self::Output> {
let this = self.project();
this.rx.poll(cx)
}
}
@Ciantic
Copy link

Ciantic commented Jan 19, 2022

Hello, I read your article with great interest.

However there is one thing that is missing, why don't you mention the spawn_blocking? I thought it's ideal for CPU bound tasks without await, turning them to awaitable tasks.

Like if the idea is to make another threadpool, I guess one would be to use spawn_blocking and rayon, not sure how this additional runtime achives the same thing yet. I must read your article few more times.

@alamb
Copy link
Author

alamb commented Jan 19, 2022

why don't you mention the spawn_blocking? I thought it's ideal for CPU bound tasks without await, turning them to awaitable tasks.

Indeed it does @Ciantic -- however it requires a dedicated thread so if you launch 100 spawn_blocking tasks tokio will create 100 threads; This may be problematic for some use cases as there is non trivial overhead for creating and managing OS threads

@inevity
Copy link

inevity commented Jan 20, 2022

@Ciantic spawn_blocking is not ideal for cpu bound task, ideal for syncing io or blocking io. Because the Cpu bound task just need one thread per core, no need so many thread such 500 for spawn_blocking.

@Ciantic
Copy link

Ciantic commented Jan 20, 2022

@inevity if I need just one thread for CPU bound task spawn_blocking works as is. If need multiple CPU bound tasks, I would use rayon inside the spawn_blocking call...

I haven't yet had time to figure out how this new runtime improves things. Or how to even use it, because cpu bound tasks don't have await inside, so I don't know how having Futures help.

@alamb
Copy link
Author

alamb commented Jan 20, 2022

@Ciantic I would encourage you to look at how DataFusion execution runs its PhysicalPlans -- namely it is a dataflow graph that needs to wait for its inputs to be ready prior to execution.

So the await in that case is used to 'wait' for the input to be computed (which may also spawn parallel tasks or do network IO)

Would love some ideas of how to use rayon in stead.

@inevity
Copy link

inevity commented Jan 22, 2022

@Ciantic please ref https://ryhl.io/blog/async-what-is-blocking/. No need use rayon in said spawn_block call, which use tokio runtime. just only use the rayon runtime/threadpool.

@Ciantic
Copy link

Ciantic commented Jan 22, 2022

Thanks a lot, I learned something. Notably the spawn_blocking docs mentions rayon, and because it mentions it right after spawn_blocking:

Hint: If using rayon, you can use a oneshot channel to send the result back to Tokio when the rayon task finishes.

I thought it meant that one would run the rayon task inside the spawn_blocking using one shot channel to talk back.

It seems like spawn_blocking is not required at all to run rayon tasks in Tokio. Only thing needed is oneshot channel and rayon::spawn...

@mooreniemi
Copy link

Just noting you can see ^that in tokio-rayon: https://github.com/andybarron/tokio-rayon/blob/main/src/global.rs#L26

@3r1co
Copy link

3r1co commented Dec 7, 2023

Is there any reason why don't use the max_blocking_threads operation when configuring your default Tokio Runtime?
I'm using this approach and spawn all blocking tasks through tokio::task::spawn_blocking here, but I'm wondering if I'm missing something.

@alamb
Copy link
Author

alamb commented Dec 7, 2023

I do not know of any reason to avoid max_blocking_threads

@3r1co
Copy link

3r1co commented Dec 8, 2023

@alamb , so you think it's a viable alternative for the dedicated executor?

@alamb
Copy link
Author

alamb commented Dec 8, 2023

If the proposal is to use tokio::task::spawn_blocking instead of a dedicated executor, that is also a possibility but it comes with tradeoffs:

  1. Creating new threads is expensive (it requires several sys calls and allocations) compared to scheduling a new task on an existing executor
  2. As the number of threads increases past the available cores, the overhead of context switching as the OS does the scheduling becomes substantial as well

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment