Skip to content

Instantly share code, notes, and snippets.

@nohupped
Forked from alamb/dedicated_executor.rs
Created January 13, 2023 15:06
Show Gist options
  • Save nohupped/1e7e8222b74107eadaf6b9a77423d9ff to your computer and use it in GitHub Desktop.
Save nohupped/1e7e8222b74107eadaf6b9a77423d9ff to your computer and use it in GitHub Desktop.
// Please find the full, tested version in
// https://github.com/influxdata/influxdb_iox/blob/fe155e15fb2ad166aee66b0458e63c24a8128dd4/query/src/exec/task.rs#L101-L118
pub struct DedicatedExecutor {
state: Arc<Mutex<State>>,
}
/// Runs futures (and any `tasks` that are `tokio::task::spawned` by
/// them) on a separate Tokio Executor
struct State {
/// Channel for requests -- the dedicated executor takes requests
/// from here and runs them.
requests: Option<std::sync::mpsc::Sender<Task>>,
/// Thread which has a different Tokio runtime
/// installed and spawns tasks there
thread: Option<std::thread::JoinHandle<()>>,
}
impl DedicatedExecutor {
/// Creates a new `DedicatedExecutor` with a dedicated Tokio
/// executor that is separate from the threadpool created via
/// `[tokio::main]`.
pub fn new(thread_name: &str, num_threads: usize) -> Self {
let thread_name = thread_name.to_string();
let (tx, rx) = std::sync::mpsc::channel::<Task>();
let thread = std::thread::spawn(move || {
// Create a new Runtime to run tasks
let runtime = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.thread_name(&thread_name)
.worker_threads(num_threads)
// Lower OS priority of worker threads to prioritize main runtime
.on_thread_start(move || set_current_thread_priority_low())
.build()
.expect("Creating Tokio runtime");
// Pull task requests off the channel and send them to the executor
runtime.block_on(async move {
while let Ok(task) = rx.recv() {
tokio::task::spawn(async move {
task.run().await;
});
}
let state = State {
requests: Some(tx),
thread: Some(thread),
};
Self {
state: Arc::new(Mutex::new(state)),
}
}
/// Runs the specified Future (and any tasks it spawns) on the
/// `DedicatedExecutor`.
pub fn spawn<T>(&self, task: T) -> Job<T::Output>
where
T: Future + Send + 'static,
T::Output: Send + 'static,
{
let (tx, rx) = tokio::sync::oneshot::channel();
let fut = Box::pin(async move {
let task_output = task.await;
tx.send(task_output).ok()
});
let mut state = self.state.lock();
let task = Task {
fut,
};
if let Some(requests) = &mut state.requests {
// would fail if someone has started shutdown
requests.send(task).ok();
} else {
warn!("tried to schedule task on an executor that was shutdown");
}
Job { rx, cancel }
}
#[pin_project(PinnedDrop)]
pub struct Job<T> {
#[pin]
rx: Receiver<T>,
}
impl<T> Future for Job<T> {
type Output = Result<T, Error>;
fn poll(
self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Self::Output> {
let this = self.project();
this.rx.poll(cx)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment