Created
April 15, 2022 09:07
-
-
Save iambriccardo/bcfab067a3cc26c6018b799c7c7214f3 to your computer and use it in GitHub Desktop.
Rust Thread Pool
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
extern crate core; | |
use std::borrow::Borrow; | |
use std::cell::RefCell; | |
use std::collections::HashMap; | |
use std::fmt::format; | |
use std::io; | |
use std::os::macos::raw::{stat, time_t}; | |
use std::rc::Rc; | |
use std::sync::{Arc, Mutex}; | |
use std::sync::mpsc; | |
use std::sync::mpsc::{Receiver, Sender}; | |
use std::thread; | |
use std::thread::{JoinHandle, sleep}; | |
use std::time::Duration; | |
use rand::Rng; | |
use crate::ThreadState::{IDLE, WORKING}; | |
enum ThreadState { | |
IDLE, | |
WORKING, | |
} | |
struct PoolState { | |
thread_states: Mutex<Vec<ThreadState>>, | |
} | |
struct Pool<T> { | |
txs: Vec<Sender<T>>, | |
threads: Vec<JoinHandle<()>>, | |
state: Arc<PoolState> | |
} | |
type ThreadClosure = dyn Fn() + Send + 'static; | |
impl Pool<Box<ThreadClosure>> | |
{ | |
fn new(size: usize) -> Self { | |
let mut txs = vec![]; | |
let mut threads = vec![]; | |
let mut thread_states = vec![]; | |
for _ in 0..size { | |
thread_states.push(ThreadState::IDLE); | |
} | |
let state = Arc::new(PoolState { | |
thread_states: Mutex::from(thread_states), | |
}); | |
for id in 0..size { | |
let (tx, rx) = mpsc::channel(); | |
txs.push(tx); | |
let state = Arc::clone(&state); | |
threads.push(thread::spawn(move || { | |
Pool::worker_thread(state, id, rx); | |
})); | |
} | |
Pool { | |
txs, | |
threads, | |
state | |
} | |
} | |
fn execute(&self, block: Box<ThreadClosure>) { | |
let mut thread_id; | |
{ | |
let thread_states = self.state.thread_states.lock().unwrap(); | |
thread_id = thread_states.iter() | |
.position(|thread_state| match thread_state { | |
IDLE => true, | |
WORKING => false | |
}) | |
.or_else(|| Option::Some(0)) | |
.unwrap(); | |
} | |
println!("Submitting task to thread {}...", thread_id); | |
self.txs[thread_id].send(block); | |
// We sleep to give the opportunity the thread to acquire the lock of the thread state. If we don't | |
// wait, we will end up having the lock acquired by all the calls to "execute". | |
sleep(Duration::new(0, 100)) | |
} | |
fn worker_thread(state: Arc<PoolState>, id: usize, rx: Receiver<Box<ThreadClosure>>) { | |
loop { | |
for closure in rx.iter() { | |
{ | |
let mut state = state.thread_states.lock().unwrap(); | |
(*state)[id] = ThreadState::WORKING; | |
println!("Thread {} working...", id); | |
} | |
closure(); | |
{ | |
let mut state = state.thread_states.lock().unwrap(); | |
(*state)[id] = ThreadState::IDLE; | |
println!("Thread {} idle...", id); | |
} | |
} | |
} | |
} | |
} | |
fn main() { | |
let pool = Pool::new(5); | |
pool.execute(Box::new(move || { | |
println!("Performing heavy work"); | |
})); | |
loop {} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment