Skip to content

Instantly share code, notes, and snippets.

@iambriccardo
Created April 15, 2022 09:07
Show Gist options
  • Save iambriccardo/bcfab067a3cc26c6018b799c7c7214f3 to your computer and use it in GitHub Desktop.
Save iambriccardo/bcfab067a3cc26c6018b799c7c7214f3 to your computer and use it in GitHub Desktop.
Rust Thread Pool
extern crate core;
use std::borrow::Borrow;
use std::cell::RefCell;
use std::collections::HashMap;
use std::fmt::format;
use std::io;
use std::os::macos::raw::{stat, time_t};
use std::rc::Rc;
use std::sync::{Arc, Mutex};
use std::sync::mpsc;
use std::sync::mpsc::{Receiver, Sender};
use std::thread;
use std::thread::{JoinHandle, sleep};
use std::time::Duration;
use rand::Rng;
use crate::ThreadState::{IDLE, WORKING};
enum ThreadState {
IDLE,
WORKING,
}
struct PoolState {
thread_states: Mutex<Vec<ThreadState>>,
}
struct Pool<T> {
txs: Vec<Sender<T>>,
threads: Vec<JoinHandle<()>>,
state: Arc<PoolState>
}
type ThreadClosure = dyn Fn() + Send + 'static;
impl Pool<Box<ThreadClosure>>
{
fn new(size: usize) -> Self {
let mut txs = vec![];
let mut threads = vec![];
let mut thread_states = vec![];
for _ in 0..size {
thread_states.push(ThreadState::IDLE);
}
let state = Arc::new(PoolState {
thread_states: Mutex::from(thread_states),
});
for id in 0..size {
let (tx, rx) = mpsc::channel();
txs.push(tx);
let state = Arc::clone(&state);
threads.push(thread::spawn(move || {
Pool::worker_thread(state, id, rx);
}));
}
Pool {
txs,
threads,
state
}
}
fn execute(&self, block: Box<ThreadClosure>) {
let mut thread_id;
{
let thread_states = self.state.thread_states.lock().unwrap();
thread_id = thread_states.iter()
.position(|thread_state| match thread_state {
IDLE => true,
WORKING => false
})
.or_else(|| Option::Some(0))
.unwrap();
}
println!("Submitting task to thread {}...", thread_id);
self.txs[thread_id].send(block);
// We sleep to give the opportunity the thread to acquire the lock of the thread state. If we don't
// wait, we will end up having the lock acquired by all the calls to "execute".
sleep(Duration::new(0, 100))
}
fn worker_thread(state: Arc<PoolState>, id: usize, rx: Receiver<Box<ThreadClosure>>) {
loop {
for closure in rx.iter() {
{
let mut state = state.thread_states.lock().unwrap();
(*state)[id] = ThreadState::WORKING;
println!("Thread {} working...", id);
}
closure();
{
let mut state = state.thread_states.lock().unwrap();
(*state)[id] = ThreadState::IDLE;
println!("Thread {} idle...", id);
}
}
}
}
}
fn main() {
let pool = Pool::new(5);
pool.execute(Box::new(move || {
println!("Performing heavy work");
}));
loop {}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment