Created
June 29, 2020 01:25
-
-
Save dallonf/a093fbc72655bfa5cbd27cae04c46993 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// importing a dependency from Cargo (the package manager) | |
extern crate threadpool; | |
use std::sync::{self, mpsc}; | |
use std::thread; | |
use std::time; | |
// data type to keep a handle for a task | |
struct ThreadpoolTask<T> { | |
rx: mpsc::Receiver<T>, | |
} | |
impl<T: 'static + Send> ThreadpoolTask<T> { | |
// The meat happens in this function. It's essentially a constructor for ThreadpoolTask | |
// It takes a reference to a threadpool and a function that returns a T; | |
// this function will be executed in the thread | |
fn start<F>(threadpool: &threadpool::ThreadPool, task_func: F) -> ThreadpoolTask<T> | |
where | |
F: 'static + FnOnce() -> T + Send, | |
{ | |
// Make a channel for communicating between threads | |
let (tx, rx) = mpsc::channel(); | |
threadpool.execute(move || { | |
// execute the function in the thread | |
let result = task_func(); | |
// Send the result over the channel. Panic (crash) the thread if any I/O errors happen | |
tx.send(result).unwrap(); | |
}); | |
// Return a ThreadpoolTask struct that owns the receiver end of the channel | |
ThreadpoolTask { rx } | |
} | |
// Call this function when you want to get the task's result in the current thread | |
// Will block that thread until it receives that result. | |
// Can potentially return a handleable I/O error; such as if the thread panicked. | |
// | |
// This function consumes the ThreadpoolTask; | |
// using it after calling .join() is a compiler error. | |
fn join(self) -> Result<T, mpsc::RecvError> { | |
// Wait to receive a value over the channel, and return it | |
self.rx.recv() | |
} | |
} | |
// Just a utility to add a .start_task(task_func) method to | |
// the "threadpool" crate's (crate=library) Threadpool object | |
// for convenience | |
trait ThreadpoolSupportingTasks { | |
fn start_task<F, T>(&self, task_func: F) -> ThreadpoolTask<T> | |
where | |
F: 'static + FnOnce() -> T + Send, | |
T: 'static + Send; | |
} | |
impl ThreadpoolSupportingTasks for threadpool::ThreadPool { | |
fn start_task<F, T>(&self, task_func: F) -> ThreadpoolTask<T> | |
where | |
F: 'static + FnOnce() -> T + Send, | |
T: 'static + Send, | |
{ | |
ThreadpoolTask::start(self, task_func) | |
} | |
} | |
// Main function | |
fn main() { | |
// Create a threadpool that has as many threads as | |
// logical CPU cores | |
let threadpool = threadpool::ThreadPool::default(); | |
// Start a task that waits 2 seconds and returns a string | |
let task1 = threadpool.start_task(|| { | |
thread::sleep(time::Duration::from_millis(2000)); | |
"Task 1 complete" | |
}); | |
// Start a task that waits 1 second and returns a string | |
let task2 = threadpool.start_task(|| { | |
thread::sleep(time::Duration::from_millis(1000)); | |
"Task 2 complete" | |
}); | |
// Start a task that waits 2.5 seconds and returns a string | |
let task3 = threadpool.start_task(|| { | |
thread::sleep(time::Duration::from_millis(2500)); | |
"Task 3 complete" | |
}); | |
// Limitation: this doesn't support two threads waiting for the same task to complete, since the .join() | |
// method destroys the ThreadpoolTask when called on any thread | |
// So I grab it in a brand new task whose only purpose is to wait for task3 and start new tasks | |
// I'm sure this is possible, I just don't know how to do it yet. | |
// I'd love to have it work like a Promise where if you .join() after it's already done, | |
// it just gives you the value right away | |
let tasks4_and_5_task = { | |
let threadpool = threadpool.clone(); | |
threadpool.clone().start_task(move || { | |
// Grab the result of Task 3 onto this thread | |
// Make a reference counted-pointer to said result so multiple threads can use it | |
let task3result = sync::Arc::new(task3.join().unwrap()); | |
// Start a task that just uses task3result | |
let task4 = { | |
let task3result = task3result.clone(); | |
threadpool.start_task(move || task3result.to_string() + " and task 4 complete") | |
}; | |
// Start a task that waits for 3 seconds and then uses task3result | |
let task5 = { | |
let task3result = task3result.clone(); | |
threadpool.start_task(move || { | |
thread::sleep(time::Duration::from_millis(1000)); | |
task3result.to_string() + " and task 5 complete" | |
}) | |
}; | |
// Return a tuple of task4 and task5 | |
(task4, task5) | |
}) | |
}; | |
// Print a string in the main thread | |
println!("Sync task complete"); | |
println!("{}", task1.join().unwrap()); // Print Task 1, waits for 2 seconds | |
println!("{}", task2.join().unwrap()); // Print Task 2, happens immediately because it was already done when Task 1 completed | |
let (task4, task5) = tasks4_and_5_task.join().unwrap(); // grab those tasks from earlier | |
println!("{}", task4.join().unwrap()); // Print task 4, happens a half second later because Task 3 took that much more time than task 2 | |
println!("{}", task5.join().unwrap()); // Print task 5, happens a second later because task 5 waits for a second after task 3 | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment