Skip to content

Instantly share code, notes, and snippets.

@rust-play
Created October 29, 2018 00:57
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save rust-play/53c6255e6c7b33969fd89ba792d1b877 to your computer and use it in GitHub Desktop.
Save rust-play/53c6255e6c7b33969fd89ba792d1b877 to your computer and use it in GitHub Desktop.
Code shared from the Rust Playground
#![allow(dead_code)]
use std::cell::RefCell;
use std::rc::Rc;
trait Stream<'a> {
type Item: 'a + ?Sized;
fn subscribe<F>(self, f: F) where F: FnMut(&Self::Item) + 'a;
fn share(self) -> Multicast<'a, Self::Item> where Self: Sized {
Multicast::new(self)
}
}
type ObserverBundle<'a, T> = Rc<RefCell<Vec<Box<FnMut(&T) + 'a>>>>;
struct Multicast<'a, T: ?Sized> {
observers: ObserverBundle<'a, T>,
}
impl<'a, T> Multicast<'a, T> where T: 'a + ?Sized {
fn new<S: Stream<'a, Item=T>>(stream: S) -> Self {
let observers: ObserverBundle<T> = Rc::new(RefCell::new(Vec::new()));
let dup = observers.clone();
stream.subscribe(move |x: &T| {
for observer in dup.borrow_mut().iter_mut() {
observer(x);
}
});
Multicast { observers }
}
fn fork(&self) -> Subscription<'a, T> {
Subscription::new(self.observers.clone())
}
}
struct Input<'a, T> {
observers: ObserverBundle<'a, T>,
}
impl<'a, T: Clone + 'a> Input<'a, T> {
fn new() -> Self {
Input { observers: Rc::new(RefCell::new(Vec::new())) }
}
fn feed(&self, value: T) {
for observer in self.observers.borrow_mut().iter_mut() {
observer(&value);
}
}
fn fork(&self) -> Subscription<'a, T> {
Subscription::new(self.observers.clone())
}
}
struct Subscription<'a, T: ?Sized> {
observers: ObserverBundle<'a, T>,
}
impl<'a, T: ?Sized> Subscription<'a, T> {
fn new(observers: ObserverBundle<'a, T>) -> Self {
Subscription { observers }
}
}
impl<'a, T> Stream<'a> for Subscription<'a, T> where T: 'a {
type Item = T;
fn subscribe<F>(self, f: F) where F: FnMut(&Self::Item) + 'a {
self.observers.borrow_mut().push(Box::new(f));
}
}
struct Map<S, M> {
stream: S,
func: M,
}
impl<'a, S, M, T> Stream<'a> for Map<S, M>
where
S: Stream<'a>,
T: 'a,
M: 'a + FnMut(&S::Item) -> T
{
type Item = T;
fn subscribe<F>(self, mut f: F) where F: FnMut(&Self::Item) + 'a {
let mut func = self.func;
self.stream.subscribe(move |x| f(&func(x)))
}
}
struct Collect<S, T: Sized> {
stream: S,
data: Rc<RefCell<Vec<T>>>,
}
impl<'a, S, T> Stream<'a> for Collect<S, T>
where S: Stream<'a, Item=T>, T: 'a + Clone + Sized
{
type Item = [T];
fn subscribe<F>(self, mut f: F) where F: FnMut(&Self::Item) + 'a {
let data = self.data.clone();
self.stream.subscribe(move |x| {
data.borrow_mut().push(x.clone());
f(&*data.borrow());
})
}
}
trait StreamExt<'a>: Stream<'a> {
fn map<M, T>(self, func: M) -> Map<Self, M>
where M: FnMut(&Self::Item) -> T + 'a, Self: Sized
{
Map { stream: self, func }
}
fn collect(self) -> Collect<Self, Self::Item>
where Self: Sized, Self::Item: Sized
{
Collect { stream: self, data: Rc::new(RefCell::new(Vec::new())) }
}
}
impl<'a, S: Stream<'a>> StreamExt<'a> for S where S: Stream<'a> {}
fn main() {
let input = Input::<i64>::new();
let s1 = input.fork();
s1.subscribe(|x| {
println!("s1: {}", x)
});
let s2 = input.fork();
s2.map(|x| x * 2).collect().subscribe(|x| {
println!("s2: {:?}", x);
});
input.feed(1);
input.feed(2);
input.feed(3)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment