-
-
Save alamb/bd0e086448ef9b438aeebd6f550e23ed to your computer and use it in GitHub Desktop.
// Please find the full, tested version in | |
// https://github.com/influxdata/influxdb_iox/blob/fe155e15fb2ad166aee66b0458e63c24a8128dd4/query/src/exec/task.rs#L101-L118 | |
pub struct DedicatedExecutor { | |
state: Arc<Mutex<State>>, | |
} | |
/// Runs futures (and any `tasks` that are `tokio::task::spawned` by | |
/// them) on a separate Tokio Executor | |
struct State { | |
/// Channel for requests -- the dedicated executor takes requests | |
/// from here and runs them. | |
requests: Option<std::sync::mpsc::Sender<Task>>, | |
/// Thread which has a different Tokio runtime | |
/// installed and spawns tasks there | |
thread: Option<std::thread::JoinHandle<()>>, | |
} | |
impl DedicatedExecutor { | |
/// Creates a new `DedicatedExecutor` with a dedicated Tokio | |
/// executor that is separate from the threadpool created via | |
/// `[tokio::main]`. | |
pub fn new(thread_name: &str, num_threads: usize) -> Self { | |
let thread_name = thread_name.to_string(); | |
let (tx, rx) = std::sync::mpsc::channel::<Task>(); | |
let thread = std::thread::spawn(move || { | |
// Create a new Runtime to run tasks | |
let runtime = tokio::runtime::Builder::new_multi_thread() | |
.enable_all() | |
.thread_name(&thread_name) | |
.worker_threads(num_threads) | |
// Lower OS priority of worker threads to prioritize main runtime | |
.on_thread_start(move || set_current_thread_priority_low()) | |
.build() | |
.expect("Creating Tokio runtime"); | |
// Pull task requests off the channel and send them to the executor | |
runtime.block_on(async move { | |
while let Ok(task) = rx.recv() { | |
tokio::task::spawn(async move { | |
task.run().await; | |
}); | |
} | |
let state = State { | |
requests: Some(tx), | |
thread: Some(thread), | |
}; | |
Self { | |
state: Arc::new(Mutex::new(state)), | |
} | |
} | |
/// Runs the specified Future (and any tasks it spawns) on the | |
/// `DedicatedExecutor`. | |
pub fn spawn<T>(&self, task: T) -> Job<T::Output> | |
where | |
T: Future + Send + 'static, | |
T::Output: Send + 'static, | |
{ | |
let (tx, rx) = tokio::sync::oneshot::channel(); | |
let fut = Box::pin(async move { | |
let task_output = task.await; | |
tx.send(task_output).ok() | |
}); | |
let mut state = self.state.lock(); | |
let task = Task { | |
fut, | |
}; | |
if let Some(requests) = &mut state.requests { | |
// would fail if someone has started shutdown | |
requests.send(task).ok(); | |
} else { | |
warn!("tried to schedule task on an executor that was shutdown"); | |
} | |
Job { rx, cancel } | |
} | |
#[pin_project(PinnedDrop)] | |
pub struct Job<T> { | |
#[pin] | |
rx: Receiver<T>, | |
} | |
impl<T> Future for Job<T> { | |
type Output = Result<T, Error>; | |
fn poll( | |
self: Pin<&mut Self>, | |
cx: &mut std::task::Context<'_>, | |
) -> std::task::Poll<Self::Output> { | |
let this = self.project(); | |
this.rx.poll(cx) | |
} | |
} |
Thanks a lot, I learned something. Notably the spawn_blocking docs mentions rayon, and because it mentions it right after spawn_blocking
:
Hint: If using rayon, you can use a oneshot channel to send the result back to Tokio when the rayon task finishes.
I thought it meant that one would run the rayon task inside the spawn_blocking using one shot channel to talk back.
It seems like spawn_blocking
is not required at all to run rayon tasks in Tokio. Only thing needed is oneshot channel and rayon::spawn
...
Just noting you can see ^that in tokio-rayon
: https://github.com/andybarron/tokio-rayon/blob/main/src/global.rs#L26
Is there any reason why don't use the max_blocking_threads
operation when configuring your default Tokio Runtime?
I'm using this approach and spawn all blocking tasks through tokio::task::spawn_blocking
here, but I'm wondering if I'm missing something.
I do not know of any reason to avoid max_blocking_threads
@alamb , so you think it's a viable alternative for the dedicated executor?
If the proposal is to use tokio::task::spawn_blocking
instead of a dedicated executor, that is also a possibility but it comes with tradeoffs:
- Creating new threads is expensive (it requires several sys calls and allocations) compared to scheduling a new task on an existing executor
- As the number of threads increases past the available cores, the overhead of context switching as the OS does the scheduling becomes substantial as well
@Ciantic please ref https://ryhl.io/blog/async-what-is-blocking/. No need use rayon in said spawn_block call, which use tokio runtime. just only use the rayon runtime/threadpool.