Last active
May 17, 2022 14:08
-
-
Save drogus/90124f732b638dab911c5da29501184d to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use crossbeam_deque::{Injector, Stealer, Worker}; | |
use tokio::sync::Mutex; | |
use std::{iter, sync::Arc, time::Duration}; | |
const NUMBER_OF_WORKERS: usize = 10; | |
#[derive(Debug)] | |
pub struct Job {} | |
// this is taken straight of the crossbeam::deque docs | |
fn find_task<T>( | |
local: &Worker<T>, | |
global: &Injector<T>, | |
stealers: &[Stealer<T>], | |
) -> Option<T> { | |
// Pop a task from the local queue, if not empty. | |
local.pop().or_else(|| { | |
// Otherwise, we need to look for a task elsewhere. | |
iter::repeat_with(|| { | |
// Try stealing a batch of tasks from the global queue. | |
global.steal_batch_and_pop(local) | |
// Or try stealing a task from one of the other threads. | |
.or_else(|| stealers.iter().map(|s| s.steal()).collect()) | |
}) | |
// Loop while no task was stolen and any steal operation needs to be retried. | |
.find(|s| !s.is_retry()) | |
// Extract the stolen task, if there is one. | |
.and_then(|s| s.success()) | |
}) | |
} | |
pub fn start_workers() -> Arc<Injector<Job>> { | |
let mut stealers_vec = Vec::new(); | |
let global_orig = Arc::new(Injector::new()); | |
let mut workers = Vec::new(); | |
// create a number of workers and fill the stealers vec | |
for i in 0..NUMBER_OF_WORKERS { | |
let worker: Worker<Job> = Worker::new_fifo(); | |
stealers_vec.push(worker.stealer()); | |
workers.push(Arc::new(Mutex::new(worker))); | |
} | |
for (i, worker) in workers.into_iter().enumerate() { | |
let global = global_orig.clone(); | |
let stealers = stealers_vec.clone(); | |
tokio::spawn(async move { | |
let task_number = i; | |
let stealers = &stealers[..]; | |
loop { | |
// try to find task | |
let task = { find_task(&*worker.lock().await, &global, &stealers) }; | |
match task { | |
Some(task) => { | |
// execute the task here | |
println!("JOB: {:?}, task_number: {:?}", task, task_number); | |
tokio::time::sleep(Duration::from_millis(10)).await; | |
}, | |
None => break | |
} | |
} | |
}); | |
} | |
global_orig | |
} | |
#[tokio::main] | |
async fn main() -> Result<(), Box<dyn std::error::Error>> { | |
let injector = auction_tracker::fetcher::start_workers(); | |
for _ in 0..1000 { | |
injector.push(Job {}); | |
} | |
tokio::time::sleep(Duration::from_millis(100)).await; | |
Ok(()) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment