-
-
Save zen3ger/9cf3bfb47c410543012784f6ed056994 to your computer and use it in GitHub Desktop.
Trying to figure out how to fork/observe a stream
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use ::futures::{Async, Poll, Stream}; | |
use ::std::sync::mpsc::{channel, Receiver, Sender}; | |
pub struct Fork<T> { | |
value: Receiver<T>, | |
} | |
impl<T> Stream for Fork<T> { | |
type Item = T; | |
type Error = String; | |
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { | |
match self.value.try_recv() { | |
Err(_) => Ok(Async::NotReady), | |
Ok(val) => Ok(Async::Ready(Some(val))), | |
} | |
} | |
} | |
pub struct Forkable<S> | |
where S: Stream, | |
{ | |
fork: Option<Sender<S::Item>>, | |
stream: S, | |
} | |
impl<S> Forkable<S> | |
where S: Stream | |
{ | |
pub fn new(s: S) -> Forkable<S> { | |
Forkable { | |
fork: None, | |
stream: s, | |
} | |
} | |
pub fn fork(&mut self) -> Fork<S::Item> { | |
let (tx, rx) = channel(); | |
self.fork = Some(tx); | |
Fork { | |
value: rx, | |
} | |
} | |
} | |
impl<S> Stream for Forkable<S> | |
where S: Stream, | |
S::Item: Clone, | |
{ | |
type Item = S::Item; | |
type Error = S::Error; | |
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { | |
use ::futures::Async; | |
let poll = self.stream.poll(); | |
match poll { | |
Err(e) => Err(e), | |
Ok(Async::Ready(Some(val))) => { | |
if let Some(ref fork) = self.fork { | |
fork.send(val.clone()); | |
} | |
Ok(Async::Ready(Some(val))) | |
} | |
Ok(other) => Ok(other), | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use ::std::{self}; | |
use ::futures::{Stream, Poll}; | |
#[derive(Debug)] | |
pub struct Interval { | |
recv: std::sync::mpsc::Receiver<u64>, | |
} | |
impl Interval { | |
pub fn new(interval: u64) -> Interval { | |
let (tx, rx) = std::sync::mpsc::channel(); | |
std::thread::spawn(move || { | |
let mut tick = 0; | |
let duration = std::time::Duration::from_millis(interval); | |
loop { | |
tx.send(tick); | |
tick += 1; | |
std::thread::sleep(duration); | |
} | |
}); | |
Interval { | |
recv: rx, | |
} | |
} | |
} | |
impl Stream for Interval { | |
type Item = u64; | |
type Error = String; | |
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { | |
use futures::Async; | |
match self.recv.try_recv() { | |
Err(_) => Ok(Async::NotReady), | |
Ok(tick) => Ok(Async::Ready(Some(tick))), | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
extern crate futures; | |
mod interval; | |
mod fork; | |
use fork::{Forkable}; | |
use futures::{Async, Stream}; | |
use interval::{Interval}; | |
fn main() { | |
let mut interval = Forkable::new(Interval::new(100)); | |
let f1 = interval.fork() | |
.map(|tick| if tick % 2 == 0 { "even" } else { "odd" }); | |
let run = f1.zip(interval); | |
test_stream(run, |item| println!("{:?}", item)); | |
} | |
pub fn test_stream<S, F>(mut stream: S, func: F) | |
where S: Stream, | |
F: Fn(S::Item), | |
{ | |
'reactor: loop { | |
let val = stream.poll(); | |
match val { | |
Err(_) => { | |
println!("An error occurred!"); | |
break 'reactor; | |
}, | |
Ok(Async::NotReady) => {}, | |
Ok(Async::Ready(Some(t))) => func(t), | |
Ok(Async::Ready(None)) => { | |
println!("Stream finished!"); | |
break 'reactor; | |
} | |
} | |
std::thread::sleep(std::time::Duration::from_millis(100)); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment