Skip to content

Instantly share code, notes, and snippets.

@robinst
Created October 12, 2016 00:25
Show Gist options
  • Star 6 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save robinst/faff19b13915373c1ac7ed2a2946639d to your computer and use it in GitHub Desktop.
Save robinst/faff19b13915373c1ac7ed2a2946639d to your computer and use it in GitHub Desktop.
Rust thread pool with worker reuse for tasks that have an expensive initialization
use std::sync::mpsc;
use std::sync::mpsc::{Receiver, Sender};
use std::sync::{Arc, Mutex};
use std::thread;
fn main() {
let pool = Arc::new(Pool::new());
let mut handles = Vec::new();
for i in 0..3 {
// In an Iron server, the pool would be passed to the Handler.
// We simulate that here by spawning our own threads.
let pool = pool.clone();
let handle = thread::spawn(move || {
println!("{}", pool.calculate(i));
});
handles.push(handle);
}
// In a server process, it wouldn't be necessary to join the handles.
for handle in handles {
handle.join().expect("Joining thread failed");
}
}
struct Pool {
// MsQueue from the crossbeam crate could also be used instead to avoid the locking
workers: Mutex<Vec<Worker>>,
}
impl Pool {
fn new() -> Pool {
Pool { workers: Mutex::new(Vec::new()) }
}
fn calculate(&self, input: u32) -> u32 {
let worker = self.get_worker();
worker.in_tx.send(input).expect("Sending input failed");
let result = worker.out_rx.recv().expect("Unable to get output from worker");
self.reuse_worker(worker);
result
}
fn get_worker(&self) -> Worker {
let mut workers = self.workers.lock().expect("Unable to lock workers for pop");
if let Some(worker) = workers.pop() {
println!("Reusing worker");
worker
} else {
// Note that this is currently unbounded, so we'll just keep on creating workers if
// needed. We could also put a bound on how many workers we're going to create and
// block until a worker is available.
Self::create_worker()
}
}
fn create_worker() -> Worker {
println!("Creating new worker");
let (in_tx, in_rx) = mpsc::channel();
let (out_tx, out_rx) = mpsc::channel();
let worker = Worker {
in_tx: in_tx,
out_rx: out_rx,
};
thread::spawn(move || {
// TODO: Do expensive init once here
println!("Initialized worker");
loop {
if let Ok(input) = in_rx.recv() {
// TODO: Do the real calculation here
let result = input * 10;
out_tx.send(result).expect("Sending output failed");
} else {
// The sending half of the channel has been closed, terminate worker
break;
}
}
});
worker
}
fn reuse_worker(&self, worker: Worker) {
let mut workers = self.workers.lock().expect("Unable to lock workers for push");
workers.push(worker);
}
}
struct Worker {
in_tx: Sender<u32>,
out_rx: Receiver<u32>,
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment