-
-
Save cchantep/80cd18c8c32977d15fba768097d66955 to your computer and use it in GitHub Desktop.
🦀 Some rustlings (see repository: https://github.com/cchantep/rust_sandbox/)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/// Future and Stream experimentations. | |
/// | |
/// **Usage:** | |
/// | |
/// Implements the following functions and make the provided tests :satisfied:. | |
/// | |
/// > These rustlings can be checkout locally from [GitHub](https://github.com/cchantep/rust_sandbox). | |
/// | |
/// *See [solutions](https://github.com/cchantep/rust_sandbox/blob/solution/rust/src/future_stream.rs)* | |
/// | |
/// **Requirements:** (`Cargo.toml`) | |
/// | |
/// ```toml | |
/// [dependencies] | |
/// futures = "0.3" | |
/// tokio = { version = "0.2", features = [ "time" ] } | |
/// ``` | |
extern crate futures; // 0.3.5 | |
use std::convert::AsRef; | |
use std::io::Error; | |
use std::result::Result; | |
use std::time::Duration; | |
use futures::future::{ready, Future}; | |
use futures::io::AsyncWrite; | |
use futures::sink::Sink; | |
use futures::stream::Stream; | |
/// Function `result2try_future` with at least 2 type parameters `A` and `B`; | |
/// Returns a `Future` containing the input `result` mapped using `f`. | |
/// | |
/// # Arguments | |
/// | |
/// * `result` - The input `Result<A, Error>` | |
/// * `f` - The function applied within `result` if and only if it's `Ok`: `fn(A) -> B` | |
/// | |
// TODO: result2try_future<A, B> | |
/// Function `result2try_stream` with at least 2 type parameters `A` and `B`; | |
/// Returns a `Future` containing the input `result` mapped using `f`. | |
/// | |
/// # Arguments | |
/// | |
/// * `result` - The input `Result<A, Error>` | |
/// * `f` - The function applied within `result` if and only if it's `Ok` | |
/// | |
// TODO: result2try_stream<A, B> | |
/// Function `result_map_async` with at least 2 type parameters `A` and `B`; | |
/// Returns a `Future` containing the input `result` mapped using `f`. | |
/// | |
/// # Arguments | |
/// | |
/// * `result` - The input `Result<A, Error>` | |
/// * `f` - The asynchronous function applied within `result` if and only if it's `Ok`: `fn(A) -> Future<Output = Result<B, Error>>` | |
/// | |
// TODO: result_map_async<A, B> | |
/// Function `result2stream` with at least 2 type parameters `A` and `B`; | |
/// Returns a `Stream` containing the input `result` mapped using `f`. | |
/// | |
/// !! The `Stream` should stops as soon as an `Err` | |
/// item is encountered in `result`. | |
/// | |
/// # Arguments | |
/// | |
/// * `result` - The input `Result<A, Error>` | |
/// * `f` - The function applied within `result` if and only if it's `Ok`: `fn(A) -> B` | |
/// | |
// TODO: result2stream<A, B> | |
/// Function `stream_map_async_with_filter` | |
/// with at least 2 type parameters `A` and `B`; | |
/// | |
/// Returns a `Stream` containing the input `stream` mapped | |
/// from `A` to `Result<B, Error>` using `f`. | |
/// | |
/// Each `B` value is also checked with `is_errored` function. | |
/// | |
/// * If `Some(error)` is returned, then the `B` value should be mapped to `Err(error)`; | |
/// * Otherwise `Ok(b_value)` is kept. | |
/// | |
/// # Arguments | |
/// | |
/// * `stream` - The input `Stream<Result<A, Error>>` | |
/// * `f` - The function applied to `stream` items if and only if it's `Ok`: `fn(A) -> B` | |
/// | |
// TODO: stream_map_async_with_filter<A, B> | |
/// Function `stream_map_until` with at least 2 type parameters `A` and `B`. | |
/// | |
/// Each `A` item is mapped as `B` using the `f` function. | |
/// If `Some` mapped value is returned, it's emitted as `Stream` item; | |
/// Other if `None` is returned, the `Stream` terminates. | |
/// | |
/// # Arguments | |
/// | |
/// * `stream` - The input `Stream<A>` | |
/// * `f` - The function applied to `stream` items: `fn(A) -> Option<B>` | |
/// | |
// TODO: stream_map_until<A, B> | |
/// Function `stream_map_async_until` | |
/// with at least 2 type parameters `A` and `B`. | |
/// | |
/// Returns a `Stream` containing the input `stream` mapped | |
/// from `A` to `Result<B, Error>` using `f`; | |
/// If `Err(someError)` is returned, then the stream terminates. | |
/// | |
/// Each `B` value is also checked with `is_errored` function. | |
/// | |
/// * If `Some(error)` is returned, then the stream terminates; | |
/// * Otherwise the `B` value that's `Ok` is pushed as `Stream` item. | |
/// | |
/// # Arguments | |
/// | |
/// * `stream` - The input `Stream<A>` | |
/// * `f` - The function applied to `stream` items if and only if it's `Ok`: `fn(A) -> B` | |
/// | |
// TODO: stream_map_async_until<A, B> | |
/// Function `await_or_timeout` with at least 1 type parameters `A`; | |
/// | |
/// Returns the awaited `Ok(a_value)`, or timeout `Err`. | |
/// | |
/// # Arguments | |
/// | |
/// * `future` - The asynchronous `A` value | |
/// * `duration` - The timeout duration | |
/// | |
// TODO: await_or_timeout<A> | |
/// Function `contramap_sink` with at least 2 type parameters `A` and `B`; | |
/// | |
/// Returns a `Sink` that consumes `A` items, then converts each item as `B` | |
/// before writing to the `output`. | |
/// | |
/// # Arguments | |
/// | |
/// * `f` - The function that converts a `A` value into a `B` one | |
/// * `output` - The sink output | |
/// | |
// TODO: contramap_sink<A, B> | |
// --- | |
#[cfg(test)] | |
mod tests { | |
use super::*; | |
use futures::future::pending; | |
use std::io::ErrorKind; | |
// Test runtime | |
use tokio::runtime::Runtime; | |
// Fixtures | |
fn ok_input() -> Result<String, Error> { | |
Ok(String::from("lorem")) | |
} | |
fn err_input() -> Result<String, Error> { | |
Err(Error::new(ErrorKind::Other, "cause")) | |
} | |
#[test] | |
fn test_result2try_future() { | |
let mut rt: Runtime = Runtime::new().unwrap(); | |
assert_eq!( | |
rt.block_on(result2try_future(ok_input(), |s| s.len())) | |
.unwrap(), | |
5 | |
); | |
assert_eq!( | |
rt.block_on(result2try_future(err_input(), |s| s.len())) | |
.is_err(), | |
true | |
); | |
} | |
#[test] | |
fn test_result2try_stream() { | |
let mut rt: Runtime = Runtime::new().unwrap(); | |
use futures::stream::StreamExt; // provide 'next' | |
let ok_head: Option<Result<usize, Error>> = rt | |
.block_on( | |
result2try_stream(ok_input(), |s| s.len()).collect::<Vec<Result<usize, Error>>>(), | |
) | |
.pop(); | |
assert_eq!(ok_head.unwrap().unwrap(), 5); | |
let err_head: Option<Result<usize, Error>> = rt | |
.block_on( | |
result2try_stream(err_input(), |s| s.len()).collect::<Vec<Result<usize, Error>>>(), | |
) | |
.pop(); | |
assert_eq!(err_head.unwrap().is_err(), true); | |
} | |
#[test] | |
fn test_result_map_async() { | |
let mut rt: Runtime = Runtime::new().unwrap(); | |
assert_eq!( | |
rt.block_on(result_map_async(ok_input(), |s| ready(Ok(s.len())))) | |
.unwrap(), | |
5 | |
); | |
assert_eq!( | |
rt.block_on(result_map_async(err_input(), |s| ready(Ok(s.len())))) | |
.is_err(), | |
true | |
); | |
} | |
#[test] | |
fn test_result2stream() { | |
let mut rt: Runtime = Runtime::new().unwrap(); | |
use futures::stream::StreamExt; | |
let ok_head: Option<usize> = rt | |
.block_on(result2stream(ok_input(), |s| s.len()).collect::<Vec<usize>>()) | |
.pop(); | |
assert_eq!(ok_head.unwrap(), 5); | |
let err_head: Option<usize> = rt | |
.block_on(result2stream(err_input(), |s| s.len()).collect::<Vec<usize>>()) | |
.pop(); | |
assert_eq!(err_head.is_none(), true); | |
} | |
#[test] | |
fn test_stream_map_until() { | |
let mut rt: Runtime = Runtime::new().unwrap(); | |
use futures::stream::{iter, StreamExt}; | |
let mut usize_stream = stream_map_until(iter(vec!["lala", "lorem", "rolo"]), |s| { | |
let l = s.len(); | |
if l % 2 == 0 { | |
Some(l) | |
} else { | |
None | |
} | |
}); | |
assert_eq!(rt.block_on(usize_stream.next()).unwrap(), 4); // "lala" | |
assert_eq!(rt.block_on(usize_stream.next()).is_none(), true); // "lorem".len() % 2 != 0 | |
assert_eq!(rt.block_on(usize_stream.next()).is_none(), true); // "rolo"; stream terminated | |
} | |
#[test] | |
fn test_stream_map_async_with_filter() { | |
let mut rt: Runtime = Runtime::new().unwrap(); | |
fn f(s: String) -> impl Future<Output = Result<usize, Error>> { | |
let l = s.len(); | |
ready(if l == 0 { | |
Err(Error::new(ErrorKind::Other, "Empty")) | |
} else { | |
Ok(l) | |
}) | |
} | |
fn is_errored(sz: usize) -> Option<Error> { | |
if sz % 2 == 0 { | |
None | |
} else { | |
Some(Error::new(ErrorKind::Other, "Odd")) | |
} | |
} | |
// --- | |
use futures::stream::{iter, once, StreamExt}; | |
let ok_head = rt | |
.block_on( | |
stream_map_async_with_filter(once(ready(ok_input())), f, is_errored) | |
.collect::<Vec<Result<usize, Error>>>(), | |
) | |
.pop() | |
.unwrap(); | |
assert_eq!(ok_head.is_err(), true); // "lorem".len() % 2 != 0 | |
let head2 = rt | |
.block_on( | |
stream_map_async_with_filter(once(ready(Ok(String::from("lala")))), f, is_errored) | |
.collect::<Vec<Result<usize, Error>>>(), | |
) | |
.pop() | |
.unwrap() | |
.unwrap(); | |
assert_eq!(head2, 4); | |
let mut vec_stream1 = stream_map_async_with_filter( | |
iter(vec![ | |
Ok(String::from("lala")), | |
Ok(String::from("lorem")), | |
Err(Error::new(ErrorKind::Other, "Foo")), | |
Ok(String::from("rololo")), | |
]), | |
f, | |
is_errored, | |
); | |
assert_eq!(rt.block_on(vec_stream1.next()).unwrap().unwrap(), 4); // "lala" | |
assert_eq!(rt.block_on(vec_stream1.next()).unwrap().is_err(), true); // "lorem" | |
assert_eq!(rt.block_on(vec_stream1.next()).unwrap().is_err(), true); // Err | |
assert_eq!(rt.block_on(vec_stream1.next()).unwrap().unwrap(), 6); // "rololo" | |
} | |
#[test] | |
fn test_stream_map_async_until() { | |
let mut rt: Runtime = Runtime::new().unwrap(); | |
fn f(s: String) -> impl Future<Output = Result<usize, Error>> { | |
let l = s.len(); | |
ready(if l == 0 { | |
Err(Error::new(ErrorKind::Other, "Empty")) | |
} else { | |
Ok(l) | |
}) | |
} | |
fn is_errored(sz: usize) -> Option<Error> { | |
if sz % 2 == 0 { | |
None | |
} else { | |
Some(Error::new(ErrorKind::Other, "Odd")) | |
} | |
} | |
// --- | |
use futures::stream::{iter, once, StreamExt}; | |
let ok_head = rt | |
.block_on( | |
stream_map_async_until(once(ready(String::from("lorem"))), f, is_errored) | |
.collect::<Vec<usize>>(), | |
) | |
.pop(); | |
assert_eq!(ok_head.is_none(), true); // "lorem".len() % 2 != 0 | |
let head2 = rt | |
.block_on( | |
stream_map_async_until(once(ready(String::from("lala"))), f, is_errored) | |
.collect::<Vec<usize>>(), | |
) | |
.pop() | |
.unwrap(); | |
assert_eq!(head2, 4); | |
let mut vec_stream1 = stream_map_async_until( | |
iter(vec![ | |
String::from("lala"), | |
String::from("lorem"), | |
String::from("rololo"), | |
]), | |
f, | |
is_errored, | |
); | |
assert_eq!(rt.block_on(vec_stream1.next()).unwrap(), 4); // "lala" | |
assert_eq!(rt.block_on(vec_stream1.next()).is_none(), true); // "lorem" | |
assert_eq!(rt.block_on(vec_stream1.next()).is_none(), true); // stream terminated | |
assert_eq!(rt.block_on(vec_stream1.next()).is_none(), true); // "rololo" - stream terminated | |
} | |
#[test] | |
fn test_await_or_timeout() { | |
let mut rt: Runtime = Runtime::new().unwrap(); | |
let tmout = Duration::new(5, 0); | |
assert_eq!(rt.block_on(await_or_timeout(ready(1), tmout)).unwrap(), 1); | |
assert_eq!( | |
rt.block_on(await_or_timeout(pending::<u8>(), tmout)) | |
.is_err(), | |
true | |
); | |
} | |
#[test] | |
fn test_contramap_sink() { | |
use futures::stream::{self, StreamExt}; // provide 'forward' | |
let mut rt: Runtime = Runtime::new().unwrap(); | |
let mut out = vec![]; | |
fn f<'a>(s: &'a str) -> &'a [u8] { | |
s.as_bytes() | |
} | |
let source = stream::iter(vec![Ok("foo"), Ok("bar"), Ok("lorem")]); | |
let conv_sink = contramap_sink(f, &mut out); | |
let mat_val = rt.block_on(source.forward(conv_sink)); | |
assert_eq!(mat_val.is_ok(), true); | |
assert_eq!( | |
out, | |
vec![102, 111, 111, 98, 97, 114, 108, 111, 114, 101, 109] | |
); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
mod future_stream; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment