Skip to content

Instantly share code, notes, and snippets.

@zen3ger

zen3ger/fork.rs Secret

Last active July 24, 2017 00:16
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save zen3ger/9cf3bfb47c410543012784f6ed056994 to your computer and use it in GitHub Desktop.
Save zen3ger/9cf3bfb47c410543012784f6ed056994 to your computer and use it in GitHub Desktop.
Trying to figure out how to fork/observe a stream
use ::futures::{Async, Poll, Stream};
use ::std::sync::mpsc::{channel, Receiver, Sender};
pub struct Fork<T> {
value: Receiver<T>,
}
impl<T> Stream for Fork<T> {
type Item = T;
type Error = String;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
match self.value.try_recv() {
Err(_) => Ok(Async::NotReady),
Ok(val) => Ok(Async::Ready(Some(val))),
}
}
}
pub struct Forkable<S>
where S: Stream,
{
fork: Option<Sender<S::Item>>,
stream: S,
}
impl<S> Forkable<S>
where S: Stream
{
pub fn new(s: S) -> Forkable<S> {
Forkable {
fork: None,
stream: s,
}
}
pub fn fork(&mut self) -> Fork<S::Item> {
let (tx, rx) = channel();
self.fork = Some(tx);
Fork {
value: rx,
}
}
}
impl<S> Stream for Forkable<S>
where S: Stream,
S::Item: Clone,
{
type Item = S::Item;
type Error = S::Error;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
use ::futures::Async;
let poll = self.stream.poll();
match poll {
Err(e) => Err(e),
Ok(Async::Ready(Some(val))) => {
if let Some(ref fork) = self.fork {
fork.send(val.clone());
}
Ok(Async::Ready(Some(val)))
}
Ok(other) => Ok(other),
}
}
}
use ::std::{self};
use ::futures::{Stream, Poll};
#[derive(Debug)]
pub struct Interval {
recv: std::sync::mpsc::Receiver<u64>,
}
impl Interval {
pub fn new(interval: u64) -> Interval {
let (tx, rx) = std::sync::mpsc::channel();
std::thread::spawn(move || {
let mut tick = 0;
let duration = std::time::Duration::from_millis(interval);
loop {
tx.send(tick);
tick += 1;
std::thread::sleep(duration);
}
});
Interval {
recv: rx,
}
}
}
impl Stream for Interval {
type Item = u64;
type Error = String;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
use futures::Async;
match self.recv.try_recv() {
Err(_) => Ok(Async::NotReady),
Ok(tick) => Ok(Async::Ready(Some(tick))),
}
}
}
extern crate futures;
mod interval;
mod fork;
use fork::{Forkable};
use futures::{Async, Stream};
use interval::{Interval};
fn main() {
let mut interval = Forkable::new(Interval::new(100));
let f1 = interval.fork()
.map(|tick| if tick % 2 == 0 { "even" } else { "odd" });
let run = f1.zip(interval);
test_stream(run, |item| println!("{:?}", item));
}
pub fn test_stream<S, F>(mut stream: S, func: F)
where S: Stream,
F: Fn(S::Item),
{
'reactor: loop {
let val = stream.poll();
match val {
Err(_) => {
println!("An error occurred!");
break 'reactor;
},
Ok(Async::NotReady) => {},
Ok(Async::Ready(Some(t))) => func(t),
Ok(Async::Ready(None)) => {
println!("Stream finished!");
break 'reactor;
}
}
std::thread::sleep(std::time::Duration::from_millis(100));
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment