Skip to content

Instantly share code, notes, and snippets.

@yoshuawuyts
Last active February 16, 2020 21:32
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save yoshuawuyts/182721650d1858ea25f7590ac0384d8d to your computer and use it in GitHub Desktop.
Save yoshuawuyts/182721650d1858ea25f7590ac0384d8d to your computer and use it in GitHub Desktop.
use async_std::prelude::*;
use async_std::io;
use async_std::task;
use async_std::stream;
use async_std::sync;
// map (square), then sum
//
// ```
// let res = stream::repeat(12_u64)
// .take(10)
// .par()
// .map(|num| num * num)
// .map(|num| num / num)
// .sum()
// .await;
//
// dbg!(res);
// ```
fn main() -> io::Result<()> {
task::block_on(async {
let mut seq = stream::repeat(12_u64).take(10);
let (sender, receiver) = sync::channel(1);
task::spawn(async move {
while let Some(num) = seq.next().await {
let sender = sender.clone();
task::spawn(async move {
let mut s = stream::once(num)
.map(|num| num * num)
.map(|num| num / num);
// Send it back to the collector.
sender.send(s.next().await).await;
});
}
});
let output = receiver.sum().await;
dbg!(output);
Ok(())
})
}
@yoshuawuyts
Copy link
Author

yoshuawuyts commented Dec 23, 2019

What's missing here in terms of functionality is a way to cancel all tasks when the streams are dropped.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment