Created
December 31, 2019 12:06
-
-
Save gibizer/0d9e5b6120826e06778ac3edf6af7a89 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
//! # A generic threaded pipeline | |
use std::fmt::Display; | |
use std::sync::mpsc; | |
use std::thread; | |
use std::time::Duration; | |
use log::{error, info}; | |
use log_panics; | |
use rand::prelude::Rng; | |
macro_rules! random_sleep { | |
() => { | |
thread::sleep(Duration::from_millis(rand::thread_rng().gen_range(10, 100))); | |
}; | |
($min:expr, $max:expr) => { | |
thread::sleep(Duration::from_millis( | |
rand::thread_rng().gen_range($min, $max), | |
)); | |
}; | |
} | |
struct Agent<T> { | |
name: String, | |
rx: mpsc::Receiver<T>, | |
tx: mpsc::Sender<T>, | |
// NOTE: Box needed as the pure closure's size is not known at compile time. | |
// Box makes it a smart func pointer on the heap. | |
// The dyn is needed as closures always have a unique type so two closures even with | |
// same signatures are different by type. So dyn makes Fn a Trait object e.g. an object | |
// that implements the Fn trait. | |
// NOTE: Send trait is required as we need to pass the Agent to a thread | |
func: Box<dyn Fn(T) -> T + Send>, | |
} | |
impl<T: Display> Agent<T> { | |
fn new( | |
name: String, | |
rx: mpsc::Receiver<T>, | |
tx: mpsc::Sender<T>, | |
func: Box<dyn Fn(T) -> T + Send>, | |
) -> Agent<T> { | |
Agent { name, rx, tx, func } | |
} | |
fn run(&self) { | |
while let Ok(msg) = self.rx.recv() { | |
info!("{}: processing {}", self.name, msg); | |
let msg = (self.func)(msg); | |
random_sleep!(); | |
info!("{}: processed {}", self.name, msg); | |
match self.tx.send(msg) { | |
Err(x) => { | |
error!("{}: sending failed: {}", self.name, x); | |
break; | |
} | |
Ok(_) => continue, | |
} | |
} | |
error!("{}: rx channel closed, exiting", self.name); | |
} | |
} | |
/// A threaded pipeline that can execute arbitrary chains of transformation on | |
/// many independent input | |
pub struct Pipeline<T> { | |
tx: mpsc::Sender<T>, | |
rx: mpsc::Receiver<T>, | |
threads: Vec<thread::JoinHandle<()>>, | |
} | |
// NOTE: Display Trait bound is needed for logging only | |
// NOTE: 'static lifetime of T type is needed as it is passed to threads | |
// that might outlive the Pipeline type | |
impl<T: Send + Display + 'static> Pipeline<T> { | |
// NOTE: When a Pipeline instance is dropped the tx to the first agent | |
// held by the pipeline is dropped. This will cause that the first Agent | |
// will receive and Err and therefore the Agent::run() will return making | |
// the first thread, the run() was spawned in, to exit. So the first | |
// thread will be dropped. As that thread owns the first Agent | |
// instance that Agent will be dropped too. When the first Agent drops | |
// its tx it cause the second Agent to go through the above cleanup | |
// process. Eventually all the threads and Agents are dropped. | |
// This cleanup process safe but _async_ as the threads is not joined during | |
// drop of the pipe. If you need a sync cleanup the shutdown() call can be | |
// used as that will join the threads before return | |
/// Create a new pipeline with independent threads runinng each passed | |
/// transformation function for each input sent into the pipeline. The | |
/// transformations are executed in the same order as they appear in | |
/// `funcs`. | |
pub fn new(funcs: Vec<Box<dyn Fn(T) -> T + Send>>) -> Pipeline<T> { | |
let mut threads = Vec::new(); | |
// NOTE: we build up the pipeline from its end: | |
// last -> | |
// prev -> last -> | |
// prev2 -> prev -> last -> | |
let (mut next_tx, last_rx) = mpsc::channel::<T>(); | |
for (i, func) in funcs.into_iter().enumerate().rev() { | |
let res = Pipeline::<T>::link_agent(format!("agent{}", i), next_tx, func); | |
next_tx = res.0; | |
threads.push(res.1); | |
} | |
Pipeline::<T> { | |
tx: next_tx, | |
rx: last_rx, | |
threads, | |
} | |
} | |
/// Link the agent to the previous agent in the chain | |
fn link_agent( | |
name: String, | |
tx: mpsc::Sender<T>, | |
func: Box<dyn Fn(T) -> T + Send>, | |
) -> (mpsc::Sender<T>, thread::JoinHandle<()>) { | |
let (next_tx, next_rx) = mpsc::channel::<T>(); | |
let agent = Agent::new(name, next_rx, tx, func); | |
(next_tx, thread::spawn(move || agent.run())) | |
} | |
/// Send in a item to be processed by the pipeline | |
pub fn send(&self, msg: T) { | |
self.tx.send(msg).unwrap(); | |
} | |
/// Receive an processed item from the pipeline. This call blocks if the | |
/// pipeline is empty. | |
pub fn recv(&self) -> T { | |
self.rx.recv().unwrap() | |
} | |
/// Deconstructs the pipeline in a syncronous way by closing channels and then | |
/// joining threads | |
pub fn shutdown(self) { | |
drop(self.rx); | |
drop(self.tx); | |
for thread in self.threads { | |
thread.join().unwrap(); | |
} | |
} | |
} | |
fn main() { | |
log_panics::init(); | |
env_logger::init(); | |
let mut funcs: Vec<Box<dyn Fn(String) -> String + Send>> = Vec::new(); | |
funcs.push(Box::new(|s: String| s.trim().to_owned())); | |
funcs.push(Box::new(|s: String| s.to_uppercase())); | |
let pipe = Pipeline::new(funcs); | |
let mut msg = " hello ".to_string(); | |
info!("main: sending in: {}", msg); | |
pipe.send(msg); | |
msg = " world! ".to_string(); | |
info!("main: sending in: {}", msg); | |
pipe.send(msg); | |
info!("main: receiving: {}", pipe.recv()); | |
info!("main: receiving: {}", pipe.recv()); | |
pipe.shutdown(); | |
} | |
#[cfg(test)] | |
mod tests_pipeline { | |
use super::*; | |
use std::fmt::{Formatter, Result}; | |
use std::sync::{Arc, Mutex}; | |
#[test] | |
fn test_empty_pipeline_does_nothing() { | |
let pipeline = Pipeline::new(Vec::new()); | |
let input = "hello"; | |
pipeline.send(input); | |
let result = pipeline.recv(); | |
assert_eq!(input, result); | |
} | |
#[test] | |
fn test_multiple_transforms_multiple_input() { | |
// NOTE: explicity type is needed as type inferrence work based on the first | |
// item pushed to the vector and that is a bit more specific that what we | |
// really need here due to closure typing | |
let mut transforms: Vec<Box<dyn Fn(u32) -> u32 + Send>> = Vec::new(); | |
transforms.push(Box::new(|x| x * x)); | |
transforms.push(Box::new(|x| x + 1)); | |
// This pipeline calculates f(x) -> x^2 + 1 | |
let pipeline = Pipeline::new(transforms); | |
pipeline.send(13); | |
let result = pipeline.recv(); | |
assert_eq!(13 * 13 + 1, result); | |
pipeline.send(0); | |
let result = pipeline.recv(); | |
assert_eq!(0 * 0 + 1, result); | |
} | |
struct TestItemWithDrop { | |
id: u32, | |
// this will be a shared state between the struct instances and | |
// the test case. Arc needed for sharing between threads Mutex | |
// needed to be able to safely mutate while shared | |
droplog: Arc<Mutex<Vec<u32>>>, | |
} | |
impl Drop for TestItemWithDrop { | |
fn drop(&mut self) { | |
self.droplog.lock().unwrap().push(self.id); | |
info!("{} being dropped", self); | |
} | |
} | |
impl Display for TestItemWithDrop { | |
fn fmt(&self, f: &mut Formatter) -> Result { | |
write!(f, "TestItemWithDrop({})", self.id) | |
} | |
} | |
/// We want to test that the pipeline is cleaned up properly. One way to do this | |
/// is to send in an item to the pipe that has a drop which changes a mutable state | |
/// when dropped during the test. Then let the item sit in the pipe when the pipe is | |
/// dropped. Then the test can check the mutable state to see if the item is dropped | |
/// properly. An alternative would be to send in an Arc but keep a weak ref on it. | |
/// Then assert that the strong_count of the weakref is 0 after the pipe is dropped. | |
/// Unfortunately the Weak::strong_count is still unstable API :/ | |
#[test] | |
fn test_threads_and_channels_behind_pipe_is_cleaned_when_dropped() { | |
let mut funcs: Vec<Box<dyn Fn(TestItemWithDrop) -> TestItemWithDrop + Send>> = Vec::new(); | |
funcs.push(Box::new(|s: TestItemWithDrop| s)); | |
let pipe = Pipeline::new(funcs); | |
let droplog = Arc::new(Mutex::new(Vec::new())); | |
// send in two items but only receive the first so the second remains in the pipe | |
pipe.send(TestItemWithDrop { | |
id: 1, | |
droplog: droplog.clone(), | |
}); | |
pipe.send(TestItemWithDrop { | |
id: 2, | |
droplog: droplog.clone(), | |
}); | |
info!("main received: {}", pipe.recv()); | |
info!("main dopping pipe"); | |
drop(pipe); | |
// this sleep is needed as the thread cleanup is async. See Pipeline::shutdown() | |
// for sync cleanup | |
random_sleep!(1000, 1001); | |
assert_eq!(2, droplog.lock().unwrap().len()); | |
} | |
#[test] | |
fn test_threads_and_channels_behind_pipe_is_cleaned_when_shutdown() { | |
let mut funcs: Vec<Box<dyn Fn(TestItemWithDrop) -> TestItemWithDrop + Send>> = Vec::new(); | |
funcs.push(Box::new(|s: TestItemWithDrop| s)); | |
let pipe = Pipeline::new(funcs); | |
let droplog = Arc::new(Mutex::new(Vec::new())); | |
// send in two items but only receive the first so the second remains in the pipe | |
pipe.send(TestItemWithDrop { | |
id: 1, | |
droplog: droplog.clone(), | |
}); | |
pipe.send(TestItemWithDrop { | |
id: 2, | |
droplog: droplog.clone(), | |
}); | |
info!("main received: {}", pipe.recv()); | |
info!("main shutting down pipe"); | |
pipe.shutdown(); | |
assert_eq!(2, droplog.lock().unwrap().len()); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment