Skip to content

Instantly share code, notes, and snippets.

@gibizer
Created December 31, 2019 12:06
Show Gist options
  • Save gibizer/0d9e5b6120826e06778ac3edf6af7a89 to your computer and use it in GitHub Desktop.
Save gibizer/0d9e5b6120826e06778ac3edf6af7a89 to your computer and use it in GitHub Desktop.
//! # A generic threaded pipeline
use std::fmt::Display;
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
use log::{error, info};
use log_panics;
use rand::prelude::Rng;
macro_rules! random_sleep {
() => {
thread::sleep(Duration::from_millis(rand::thread_rng().gen_range(10, 100)));
};
($min:expr, $max:expr) => {
thread::sleep(Duration::from_millis(
rand::thread_rng().gen_range($min, $max),
));
};
}
struct Agent<T> {
name: String,
rx: mpsc::Receiver<T>,
tx: mpsc::Sender<T>,
// NOTE: Box needed as the pure closure's size is not known at compile time.
// Box makes it a smart func pointer on the heap.
// The dyn is needed as closures always have a unique type so two closures even with
// same signatures are different by type. So dyn makes Fn a Trait object e.g. an object
// that implements the Fn trait.
// NOTE: Send trait is required as we need to pass the Agent to a thread
func: Box<dyn Fn(T) -> T + Send>,
}
impl<T: Display> Agent<T> {
fn new(
name: String,
rx: mpsc::Receiver<T>,
tx: mpsc::Sender<T>,
func: Box<dyn Fn(T) -> T + Send>,
) -> Agent<T> {
Agent { name, rx, tx, func }
}
fn run(&self) {
while let Ok(msg) = self.rx.recv() {
info!("{}: processing {}", self.name, msg);
let msg = (self.func)(msg);
random_sleep!();
info!("{}: processed {}", self.name, msg);
match self.tx.send(msg) {
Err(x) => {
error!("{}: sending failed: {}", self.name, x);
break;
}
Ok(_) => continue,
}
}
error!("{}: rx channel closed, exiting", self.name);
}
}
/// A threaded pipeline that can execute arbitrary chains of transformation on
/// many independent input
pub struct Pipeline<T> {
tx: mpsc::Sender<T>,
rx: mpsc::Receiver<T>,
threads: Vec<thread::JoinHandle<()>>,
}
// NOTE: Display Trait bound is needed for logging only
// NOTE: 'static lifetime of T type is needed as it is passed to threads
// that might outlive the Pipeline type
impl<T: Send + Display + 'static> Pipeline<T> {
// NOTE: When a Pipeline instance is dropped the tx to the first agent
// held by the pipeline is dropped. This will cause that the first Agent
// will receive and Err and therefore the Agent::run() will return making
// the first thread, the run() was spawned in, to exit. So the first
// thread will be dropped. As that thread owns the first Agent
// instance that Agent will be dropped too. When the first Agent drops
// its tx it cause the second Agent to go through the above cleanup
// process. Eventually all the threads and Agents are dropped.
// This cleanup process safe but _async_ as the threads is not joined during
// drop of the pipe. If you need a sync cleanup the shutdown() call can be
// used as that will join the threads before return
/// Create a new pipeline with independent threads runinng each passed
/// transformation function for each input sent into the pipeline. The
/// transformations are executed in the same order as they appear in
/// `funcs`.
pub fn new(funcs: Vec<Box<dyn Fn(T) -> T + Send>>) -> Pipeline<T> {
let mut threads = Vec::new();
// NOTE: we build up the pipeline from its end:
// last ->
// prev -> last ->
// prev2 -> prev -> last ->
let (mut next_tx, last_rx) = mpsc::channel::<T>();
for (i, func) in funcs.into_iter().enumerate().rev() {
let res = Pipeline::<T>::link_agent(format!("agent{}", i), next_tx, func);
next_tx = res.0;
threads.push(res.1);
}
Pipeline::<T> {
tx: next_tx,
rx: last_rx,
threads,
}
}
/// Link the agent to the previous agent in the chain
fn link_agent(
name: String,
tx: mpsc::Sender<T>,
func: Box<dyn Fn(T) -> T + Send>,
) -> (mpsc::Sender<T>, thread::JoinHandle<()>) {
let (next_tx, next_rx) = mpsc::channel::<T>();
let agent = Agent::new(name, next_rx, tx, func);
(next_tx, thread::spawn(move || agent.run()))
}
/// Send in a item to be processed by the pipeline
pub fn send(&self, msg: T) {
self.tx.send(msg).unwrap();
}
/// Receive an processed item from the pipeline. This call blocks if the
/// pipeline is empty.
pub fn recv(&self) -> T {
self.rx.recv().unwrap()
}
/// Deconstructs the pipeline in a syncronous way by closing channels and then
/// joining threads
pub fn shutdown(self) {
drop(self.rx);
drop(self.tx);
for thread in self.threads {
thread.join().unwrap();
}
}
}
fn main() {
log_panics::init();
env_logger::init();
let mut funcs: Vec<Box<dyn Fn(String) -> String + Send>> = Vec::new();
funcs.push(Box::new(|s: String| s.trim().to_owned()));
funcs.push(Box::new(|s: String| s.to_uppercase()));
let pipe = Pipeline::new(funcs);
let mut msg = " hello ".to_string();
info!("main: sending in: {}", msg);
pipe.send(msg);
msg = " world! ".to_string();
info!("main: sending in: {}", msg);
pipe.send(msg);
info!("main: receiving: {}", pipe.recv());
info!("main: receiving: {}", pipe.recv());
pipe.shutdown();
}
#[cfg(test)]
mod tests_pipeline {
use super::*;
use std::fmt::{Formatter, Result};
use std::sync::{Arc, Mutex};
#[test]
fn test_empty_pipeline_does_nothing() {
let pipeline = Pipeline::new(Vec::new());
let input = "hello";
pipeline.send(input);
let result = pipeline.recv();
assert_eq!(input, result);
}
#[test]
fn test_multiple_transforms_multiple_input() {
// NOTE: explicity type is needed as type inferrence work based on the first
// item pushed to the vector and that is a bit more specific that what we
// really need here due to closure typing
let mut transforms: Vec<Box<dyn Fn(u32) -> u32 + Send>> = Vec::new();
transforms.push(Box::new(|x| x * x));
transforms.push(Box::new(|x| x + 1));
// This pipeline calculates f(x) -> x^2 + 1
let pipeline = Pipeline::new(transforms);
pipeline.send(13);
let result = pipeline.recv();
assert_eq!(13 * 13 + 1, result);
pipeline.send(0);
let result = pipeline.recv();
assert_eq!(0 * 0 + 1, result);
}
struct TestItemWithDrop {
id: u32,
// this will be a shared state between the struct instances and
// the test case. Arc needed for sharing between threads Mutex
// needed to be able to safely mutate while shared
droplog: Arc<Mutex<Vec<u32>>>,
}
impl Drop for TestItemWithDrop {
fn drop(&mut self) {
self.droplog.lock().unwrap().push(self.id);
info!("{} being dropped", self);
}
}
impl Display for TestItemWithDrop {
fn fmt(&self, f: &mut Formatter) -> Result {
write!(f, "TestItemWithDrop({})", self.id)
}
}
/// We want to test that the pipeline is cleaned up properly. One way to do this
/// is to send in an item to the pipe that has a drop which changes a mutable state
/// when dropped during the test. Then let the item sit in the pipe when the pipe is
/// dropped. Then the test can check the mutable state to see if the item is dropped
/// properly. An alternative would be to send in an Arc but keep a weak ref on it.
/// Then assert that the strong_count of the weakref is 0 after the pipe is dropped.
/// Unfortunately the Weak::strong_count is still unstable API :/
#[test]
fn test_threads_and_channels_behind_pipe_is_cleaned_when_dropped() {
let mut funcs: Vec<Box<dyn Fn(TestItemWithDrop) -> TestItemWithDrop + Send>> = Vec::new();
funcs.push(Box::new(|s: TestItemWithDrop| s));
let pipe = Pipeline::new(funcs);
let droplog = Arc::new(Mutex::new(Vec::new()));
// send in two items but only receive the first so the second remains in the pipe
pipe.send(TestItemWithDrop {
id: 1,
droplog: droplog.clone(),
});
pipe.send(TestItemWithDrop {
id: 2,
droplog: droplog.clone(),
});
info!("main received: {}", pipe.recv());
info!("main dopping pipe");
drop(pipe);
// this sleep is needed as the thread cleanup is async. See Pipeline::shutdown()
// for sync cleanup
random_sleep!(1000, 1001);
assert_eq!(2, droplog.lock().unwrap().len());
}
#[test]
fn test_threads_and_channels_behind_pipe_is_cleaned_when_shutdown() {
let mut funcs: Vec<Box<dyn Fn(TestItemWithDrop) -> TestItemWithDrop + Send>> = Vec::new();
funcs.push(Box::new(|s: TestItemWithDrop| s));
let pipe = Pipeline::new(funcs);
let droplog = Arc::new(Mutex::new(Vec::new()));
// send in two items but only receive the first so the second remains in the pipe
pipe.send(TestItemWithDrop {
id: 1,
droplog: droplog.clone(),
});
pipe.send(TestItemWithDrop {
id: 2,
droplog: droplog.clone(),
});
info!("main received: {}", pipe.recv());
info!("main shutting down pipe");
pipe.shutdown();
assert_eq!(2, droplog.lock().unwrap().len());
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment