Skip to content

Instantly share code, notes, and snippets.

@dallonf
Created June 29, 2020 01:25
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save dallonf/a093fbc72655bfa5cbd27cae04c46993 to your computer and use it in GitHub Desktop.
Save dallonf/a093fbc72655bfa5cbd27cae04c46993 to your computer and use it in GitHub Desktop.
// importing a dependency from Cargo (the package manager)
extern crate threadpool;
use std::sync::{self, mpsc};
use std::thread;
use std::time;
// data type to keep a handle for a task
struct ThreadpoolTask<T> {
rx: mpsc::Receiver<T>,
}
impl<T: 'static + Send> ThreadpoolTask<T> {
// The meat happens in this function. It's essentially a constructor for ThreadpoolTask
// It takes a reference to a threadpool and a function that returns a T;
// this function will be executed in the thread
fn start<F>(threadpool: &threadpool::ThreadPool, task_func: F) -> ThreadpoolTask<T>
where
F: 'static + FnOnce() -> T + Send,
{
// Make a channel for communicating between threads
let (tx, rx) = mpsc::channel();
threadpool.execute(move || {
// execute the function in the thread
let result = task_func();
// Send the result over the channel. Panic (crash) the thread if any I/O errors happen
tx.send(result).unwrap();
});
// Return a ThreadpoolTask struct that owns the receiver end of the channel
ThreadpoolTask { rx }
}
// Call this function when you want to get the task's result in the current thread
// Will block that thread until it receives that result.
// Can potentially return a handleable I/O error; such as if the thread panicked.
//
// This function consumes the ThreadpoolTask;
// using it after calling .join() is a compiler error.
fn join(self) -> Result<T, mpsc::RecvError> {
// Wait to receive a value over the channel, and return it
self.rx.recv()
}
}
// Just a utility to add a .start_task(task_func) method to
// the "threadpool" crate's (crate=library) Threadpool object
// for convenience
trait ThreadpoolSupportingTasks {
fn start_task<F, T>(&self, task_func: F) -> ThreadpoolTask<T>
where
F: 'static + FnOnce() -> T + Send,
T: 'static + Send;
}
impl ThreadpoolSupportingTasks for threadpool::ThreadPool {
fn start_task<F, T>(&self, task_func: F) -> ThreadpoolTask<T>
where
F: 'static + FnOnce() -> T + Send,
T: 'static + Send,
{
ThreadpoolTask::start(self, task_func)
}
}
// Main function
fn main() {
// Create a threadpool that has as many threads as
// logical CPU cores
let threadpool = threadpool::ThreadPool::default();
// Start a task that waits 2 seconds and returns a string
let task1 = threadpool.start_task(|| {
thread::sleep(time::Duration::from_millis(2000));
"Task 1 complete"
});
// Start a task that waits 1 second and returns a string
let task2 = threadpool.start_task(|| {
thread::sleep(time::Duration::from_millis(1000));
"Task 2 complete"
});
// Start a task that waits 2.5 seconds and returns a string
let task3 = threadpool.start_task(|| {
thread::sleep(time::Duration::from_millis(2500));
"Task 3 complete"
});
// Limitation: this doesn't support two threads waiting for the same task to complete, since the .join()
// method destroys the ThreadpoolTask when called on any thread
// So I grab it in a brand new task whose only purpose is to wait for task3 and start new tasks
// I'm sure this is possible, I just don't know how to do it yet.
// I'd love to have it work like a Promise where if you .join() after it's already done,
// it just gives you the value right away
let tasks4_and_5_task = {
let threadpool = threadpool.clone();
threadpool.clone().start_task(move || {
// Grab the result of Task 3 onto this thread
// Make a reference counted-pointer to said result so multiple threads can use it
let task3result = sync::Arc::new(task3.join().unwrap());
// Start a task that just uses task3result
let task4 = {
let task3result = task3result.clone();
threadpool.start_task(move || task3result.to_string() + " and task 4 complete")
};
// Start a task that waits for 3 seconds and then uses task3result
let task5 = {
let task3result = task3result.clone();
threadpool.start_task(move || {
thread::sleep(time::Duration::from_millis(1000));
task3result.to_string() + " and task 5 complete"
})
};
// Return a tuple of task4 and task5
(task4, task5)
})
};
// Print a string in the main thread
println!("Sync task complete");
println!("{}", task1.join().unwrap()); // Print Task 1, waits for 2 seconds
println!("{}", task2.join().unwrap()); // Print Task 2, happens immediately because it was already done when Task 1 completed
let (task4, task5) = tasks4_and_5_task.join().unwrap(); // grab those tasks from earlier
println!("{}", task4.join().unwrap()); // Print task 4, happens a half second later because Task 3 took that much more time than task 2
println!("{}", task5.join().unwrap()); // Print task 5, happens a second later because task 5 waits for a second after task 3
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment