Skip to content

Instantly share code, notes, and snippets.

@rust-play
Created October 26, 2018 01:12
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save rust-play/e277f8dbbf65cd9bd393f6e3fe141c26 to your computer and use it in GitHub Desktop.
Save rust-play/e277f8dbbf65cd9bd393f6e3fe141c26 to your computer and use it in GitHub Desktop.
Code shared from the Rust Playground
use std::cell::RefCell;
use std::rc::Rc;
trait Observer<T> {
fn on_next(&mut self, item: T);
}
impl<T, F> Observer<T> for F where F: FnMut(T) -> () {
fn on_next(&mut self, item: T) {
self(item)
}
}
trait Stream<'a>: Sized {
type Item: Clone + 'a; // TODO: ref instead of clone
fn subscribe<F>(self, f: F) where F: Observer<Self::Item> + 'a;
fn share(self) -> Multicast<'a, Self::Item> {
Multicast::new(self)
}
}
type ObserverBundle<'a, T> = Rc<RefCell<Vec<Box<Observer<T> + 'a>>>>;
struct Multicast<'a, T> {
observers: ObserverBundle<'a, T>,
}
impl<'a, T: Clone + 'a> Multicast<'a, T> {
fn new<S: Stream<'a, Item=T>>(stream: S) -> Self {
let observers: ObserverBundle<T> = Rc::new(RefCell::new(Vec::new()));
let dup = observers.clone();
stream.subscribe(move |x: T| {
for observer in dup.borrow_mut().iter_mut() {
observer.on_next(x.clone()); // TODO: ref instead of clone
}
});
Multicast { observers }
}
fn fork(&self) -> Subscription<'a, T> {
Subscription::new(self.observers.clone())
}
}
struct Input<'a, T> {
observers: ObserverBundle<'a, T>,
}
impl<'a, T: Clone + 'a> Input<'a, T> {
fn new() -> Self {
Input { observers: Rc::new(RefCell::new(Vec::new())) }
}
fn feed(&self, value: T) {
for observer in self.observers.borrow_mut().iter_mut() {
observer.on_next(value.clone()); // TODO: ref instead of clone
}
}
fn fork(&self) -> Subscription<'a, T> {
Subscription::new(self.observers.clone())
}
}
struct Subscription<'a, T> {
observers: ObserverBundle<'a, T>,
}
impl<'a, T> Subscription<'a, T> {
fn new(observers: ObserverBundle<'a, T>) -> Self {
Subscription { observers }
}
}
impl<'a, T: 'a + Clone> Stream<'a> for Subscription<'a, T> {
type Item = T;
fn subscribe<F>(self, f: F) where F: Observer<Self::Item> + 'a {
self.observers.borrow_mut().push(Box::new(f));
}
}
struct Map<S, M> {
stream: S,
func: M,
}
impl<'a, S: Stream<'a>, M, T: 'a + Clone> Stream<'a> for Map<S, M> where M: 'a + FnMut(S::Item) -> T {
type Item = T;
fn subscribe<F>(self, mut f: F) where F: Observer<Self::Item> + 'a {
let mut func = self.func;
self.stream.subscribe(move |x| f.on_next(func(x)))
}
}
trait StreamExt<'a>: Stream<'a> {
fn map<M, T>(self, func: M) -> Map<Self, M> where M: FnMut(Self::Item) -> T + 'a{
return Map { stream: self, func }
}
}
impl<'a, S: Stream<'a>> StreamExt<'a> for S where S: Stream<'a> {}
fn main() {
let input = Input::<i64>::new();
let s1 = input.fork();
s1.subscribe(|x| {
println!("s1: {}", x)
});
let s2 = input.fork();
s2.map(|x| x * 3).subscribe(|x| {
println!("s2: {}", x)
});
let s3 = input.fork().map(|x| {
let s3 = (x as f64) + 100.5;
println!("s3: {}", s3);
s3
}).share();
s3.fork().map(|x| x + 1.).subscribe(|x| println!("s4: {}", x));
s3.fork().map(|x| x + 2.).subscribe(|x| println!("s5: {}", x));
input.feed(1);
println!("---");
input.feed(2);
}
/*
s1: 1
s2: 3
s3: 101.5
s4: 102.5
s5: 103.5
---
s1: 2
s2: 6
s3: 102.5
s4: 103.5
s5: 104.5
*/
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment