Skip to content

Instantly share code, notes, and snippets.

@drogus
Last active May 17, 2022
Embed
What would you like to do?
use crossbeam_deque::{Injector, Stealer, Worker};
use tokio::sync::Mutex;
use std::{iter, sync::Arc, time::Duration};
const NUMBER_OF_WORKERS: usize = 10;
#[derive(Debug)]
pub struct Job {}
// this is taken straight of the crossbeam::deque docs
fn find_task<T>(
local: &Worker<T>,
global: &Injector<T>,
stealers: &[Stealer<T>],
) -> Option<T> {
// Pop a task from the local queue, if not empty.
local.pop().or_else(|| {
// Otherwise, we need to look for a task elsewhere.
iter::repeat_with(|| {
// Try stealing a batch of tasks from the global queue.
global.steal_batch_and_pop(local)
// Or try stealing a task from one of the other threads.
.or_else(|| stealers.iter().map(|s| s.steal()).collect())
})
// Loop while no task was stolen and any steal operation needs to be retried.
.find(|s| !s.is_retry())
// Extract the stolen task, if there is one.
.and_then(|s| s.success())
})
}
pub fn start_workers() -> Arc<Injector<Job>> {
let mut stealers_vec = Vec::new();
let global_orig = Arc::new(Injector::new());
let mut workers = Vec::new();
// create a number of workers and fill the stealers vec
for i in 0..NUMBER_OF_WORKERS {
let worker: Worker<Job> = Worker::new_fifo();
stealers_vec.push(worker.stealer());
workers.push(Arc::new(Mutex::new(worker)));
}
for (i, worker) in workers.into_iter().enumerate() {
let global = global_orig.clone();
let stealers = stealers_vec.clone();
tokio::spawn(async move {
let task_number = i;
let stealers = &stealers[..];
loop {
// try to find task
let task = { find_task(&*worker.lock().await, &global, &stealers) };
match task {
Some(task) => {
// execute the task here
println!("JOB: {:?}, task_number: {:?}", task, task_number);
tokio::time::sleep(Duration::from_millis(10)).await;
},
None => break
}
}
});
}
global_orig
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let injector = auction_tracker::fetcher::start_workers();
for _ in 0..1000 {
injector.push(Job {});
}
tokio::time::sleep(Duration::from_millis(100)).await;
Ok(())
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment