Skip to content

Instantly share code, notes, and snippets.

@jsgf
Created January 20, 2017 00:03
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jsgf/acb074cfbd7aa19a9927ed7038874177 to your computer and use it in GitHub Desktop.
Save jsgf/acb074cfbd7aa19a9927ed7038874177 to your computer and use it in GitHub Desktop.
use futures::{Async, AsyncSink, Future, Poll};
use futures::stream::{Fuse, Stream};
use futures::sink::Sink;
/// Fork a Stream into two
///
/// Returns a Future for a process that consumes items from a Stream and
/// forwards them to two sinks depending on a predicate. If the predicate
/// returns false, send the value to out1, otherwise out2.
pub fn streamfork<In, Out1, Out2, F, E>(inp: In, out1: Out1, out2: Out2, pred: F)
-> Forker<In, Out1, Out2, F>
where In: Stream,
Out1: Sink,
Out2: Sink,
F: FnMut(&In::Item) -> Result<bool, E> {
Forker {
inp: Some(inp.fuse()),
out1: Out::new(out1),
out2: Out::new(out2),
pred: pred,
}
}
pub struct Forker<In, Out1, Out2, F>
where In: Stream,
Out1: Sink,
Out2: Sink
{
inp: Option<Fuse<In>>,
out1: Out<Out1>,
out2: Out<Out2>,
pred: F,
}
struct Out<O> where O: Sink {
out: Option<O>,
buf: Option<O::SinkItem>,
}
impl<S: Sink> Out<S> {
fn new(s: S) -> Self {
Out { out: Some(s), buf: None }
}
fn out_mut(&mut self) -> &mut S {
self.out.as_mut().take().expect("Out after completion")
}
fn take_result(&mut self) -> S {
self.out.take().expect("Out missing")
}
fn try_start_send(&mut self, item: S::SinkItem) -> Poll<(), S::SinkError> {
debug_assert!(self.buf.is_none());
if let AsyncSink::NotReady(item) = try!(self.out_mut().start_send(item)) {
self.buf = Some(item);
return Ok(Async::NotReady)
}
Ok(Async::Ready(()))
}
fn push(&mut self) -> Option<Poll<(), S::SinkError>> {
if let Some(item) = self.buf.take() {
Some(self.try_start_send(item))
} else {
None
}
}
fn poll_complete(&mut self) -> Poll<(), S::SinkError> {
self.out_mut().poll_complete()
}
}
impl<In, Out1, Out2, F> Forker<In, Out1, Out2, F>
where In: Stream,
Out1: Sink,
Out2: Sink
{
fn inp_mut(&mut self) -> &mut Fuse<In> {
self.inp.as_mut().take().expect("Input after completion")
}
fn take_result(&mut self) -> (In, Out1, Out2) {
let inp = self.inp.take().expect("Input missing in result");
let out1 = self.out1.take_result();
let out2 = self.out2.take_result();
(inp.into_inner(), out1, out2)
}
}
#[derive(Debug)]
pub enum ForkError<Einp, Epred, Eout1, Eout2> {
Input(Einp),
Predicate(Epred),
Out1(Eout1),
Out2(Eout2),
}
impl<Einp, Epred, Eout1, Eout2> ForkError<Einp, Epred, Eout1, Eout2> {
fn input(err: Einp) -> Self { ForkError::Input(err) }
fn pred(err: Epred) -> Self { ForkError::Predicate(err) }
fn out1(err: Eout1) -> Self { ForkError::Out1(err) }
fn out2(err: Eout2) -> Self { ForkError::Out2(err) }
}
impl<In, Out1, Out2, F, Epred> Future for Forker<In, Out1, Out2, F>
where In: Stream,
Out1: Sink<SinkItem=In::Item>,
Out2: Sink<SinkItem=In::Item>,
F: FnMut(&In::Item) -> Result<bool, Epred>
{
type Item = (In, Out1, Out2);
type Error = ForkError<In::Error, Epred, Out1::SinkError, Out2::SinkError>;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
match self.out1.push() {
Some(Err(e)) => return Err(ForkError::out1(e)),
Some(Ok(Async::NotReady)) => return Ok(Async::NotReady),
_ => (),
}
match self.out2.push() {
Some(Err(e)) => return Err(ForkError::out2(e)),
Some(Ok(Async::NotReady)) => return Ok(Async::NotReady),
_ => (),
}
loop {
match try!(self.inp_mut().poll().map_err(ForkError::input)) {
Async::Ready(Some(item)) => {
match (self.pred)(&item) {
Ok(false) => try_ready!(self.out1.try_start_send(item).map_err(ForkError::out1)),
Ok(true) => try_ready!(self.out2.try_start_send(item).map_err(ForkError::out2)),
Err(e) => return Err(ForkError::pred(e)),
}
}
Async::Ready(None) => {
try_ready!(self.out1.poll_complete().map_err(ForkError::out1));
try_ready!(self.out2.poll_complete().map_err(ForkError::out2));
return Ok(Async::Ready(self.take_result()));
}
Async::NotReady => {
try_ready!(self.out1.poll_complete().map_err(ForkError::out1));
try_ready!(self.out2.poll_complete().map_err(ForkError::out2));
return Ok(Async::NotReady);
}
}
}
}
}
#[cfg(test)]
mod test {
use super::*;
use futures::Future;
use futures::stream::iter;
#[test]
fn simple() {
let even = Vec::new();
let odd = Vec::new();
let nums = iter((0i32..10).into_iter().map(Ok::<_,()>));
let (_, even, odd) = streamfork(nums, even, odd, |n| Ok::<_,()>(*n % 2 == 1)).wait().unwrap();
println!("even={:?}", even);
println!("odd={:?}", odd);
assert_eq!(even, vec![0,2,4,6,8]);
assert_eq!(odd, vec![1,3,5,7,9]);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment