Created
October 26, 2018 01:12
-
-
Save rust-play/e277f8dbbf65cd9bd393f6e3fe141c26 to your computer and use it in GitHub Desktop.
Code shared from the Rust Playground
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use std::cell::RefCell; | |
use std::rc::Rc; | |
trait Observer<T> { | |
fn on_next(&mut self, item: T); | |
} | |
impl<T, F> Observer<T> for F where F: FnMut(T) -> () { | |
fn on_next(&mut self, item: T) { | |
self(item) | |
} | |
} | |
trait Stream<'a>: Sized { | |
type Item: Clone + 'a; // TODO: ref instead of clone | |
fn subscribe<F>(self, f: F) where F: Observer<Self::Item> + 'a; | |
fn share(self) -> Multicast<'a, Self::Item> { | |
Multicast::new(self) | |
} | |
} | |
type ObserverBundle<'a, T> = Rc<RefCell<Vec<Box<Observer<T> + 'a>>>>; | |
struct Multicast<'a, T> { | |
observers: ObserverBundle<'a, T>, | |
} | |
impl<'a, T: Clone + 'a> Multicast<'a, T> { | |
fn new<S: Stream<'a, Item=T>>(stream: S) -> Self { | |
let observers: ObserverBundle<T> = Rc::new(RefCell::new(Vec::new())); | |
let dup = observers.clone(); | |
stream.subscribe(move |x: T| { | |
for observer in dup.borrow_mut().iter_mut() { | |
observer.on_next(x.clone()); // TODO: ref instead of clone | |
} | |
}); | |
Multicast { observers } | |
} | |
fn fork(&self) -> Subscription<'a, T> { | |
Subscription::new(self.observers.clone()) | |
} | |
} | |
struct Input<'a, T> { | |
observers: ObserverBundle<'a, T>, | |
} | |
impl<'a, T: Clone + 'a> Input<'a, T> { | |
fn new() -> Self { | |
Input { observers: Rc::new(RefCell::new(Vec::new())) } | |
} | |
fn feed(&self, value: T) { | |
for observer in self.observers.borrow_mut().iter_mut() { | |
observer.on_next(value.clone()); // TODO: ref instead of clone | |
} | |
} | |
fn fork(&self) -> Subscription<'a, T> { | |
Subscription::new(self.observers.clone()) | |
} | |
} | |
struct Subscription<'a, T> { | |
observers: ObserverBundle<'a, T>, | |
} | |
impl<'a, T> Subscription<'a, T> { | |
fn new(observers: ObserverBundle<'a, T>) -> Self { | |
Subscription { observers } | |
} | |
} | |
impl<'a, T: 'a + Clone> Stream<'a> for Subscription<'a, T> { | |
type Item = T; | |
fn subscribe<F>(self, f: F) where F: Observer<Self::Item> + 'a { | |
self.observers.borrow_mut().push(Box::new(f)); | |
} | |
} | |
struct Map<S, M> { | |
stream: S, | |
func: M, | |
} | |
impl<'a, S: Stream<'a>, M, T: 'a + Clone> Stream<'a> for Map<S, M> where M: 'a + FnMut(S::Item) -> T { | |
type Item = T; | |
fn subscribe<F>(self, mut f: F) where F: Observer<Self::Item> + 'a { | |
let mut func = self.func; | |
self.stream.subscribe(move |x| f.on_next(func(x))) | |
} | |
} | |
trait StreamExt<'a>: Stream<'a> { | |
fn map<M, T>(self, func: M) -> Map<Self, M> where M: FnMut(Self::Item) -> T + 'a{ | |
return Map { stream: self, func } | |
} | |
} | |
impl<'a, S: Stream<'a>> StreamExt<'a> for S where S: Stream<'a> {} | |
fn main() { | |
let input = Input::<i64>::new(); | |
let s1 = input.fork(); | |
s1.subscribe(|x| { | |
println!("s1: {}", x) | |
}); | |
let s2 = input.fork(); | |
s2.map(|x| x * 3).subscribe(|x| { | |
println!("s2: {}", x) | |
}); | |
let s3 = input.fork().map(|x| { | |
let s3 = (x as f64) + 100.5; | |
println!("s3: {}", s3); | |
s3 | |
}).share(); | |
s3.fork().map(|x| x + 1.).subscribe(|x| println!("s4: {}", x)); | |
s3.fork().map(|x| x + 2.).subscribe(|x| println!("s5: {}", x)); | |
input.feed(1); | |
println!("---"); | |
input.feed(2); | |
} | |
/* | |
s1: 1 | |
s2: 3 | |
s3: 101.5 | |
s4: 102.5 | |
s5: 103.5 | |
--- | |
s1: 2 | |
s2: 6 | |
s3: 102.5 | |
s4: 103.5 | |
s5: 104.5 | |
*/ |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment