Skip to content

Instantly share code, notes, and snippets.

@lolgesten
Last active May 29, 2018 07:48
Show Gist options
  • Save lolgesten/fdeff42b00e3b1e23991c01db779c010 to your computer and use it in GitHub Desktop.
Save lolgesten/fdeff42b00e3b1e23991c01db779c010 to your computer and use it in GitHub Desktop.
Tokio streams (and futures) example without I/O
extern crate futures;
extern crate tokio;
// instead of tokio::prelude, which also re-exports streams and futures,
// we use the futures crate directly to get access to futures::sync::mspc
use futures::*;
use std::thread;
use std::time::Duration;
fn main() {
//
// tokio::run starts a standard reactor core. all further future/stream
// execution must be done inside this.
//
tokio::run(future::ok(()).map(|_| {
//
// multi producer, single consumer from the futures package bounded queue of 2.
//
let (sender, receiver) = sync::mpsc::channel::<u32>(2);
// the sender side goes into a thread and emits 14, 15, 16.
thread::spawn(move || {
//
// imagine we get a callback from c in some separate thread with some data
// to send into the stream.
// 1. send_all consumes the sender, hence .clone()
// 2. stream::once(Ok(v)) to send a single value.
// (this specific example could have been done better with stream::iter_ok())
//
sender.clone().send_all(stream::once(Ok(14))).wait().ok();
thread::sleep(Duration::from_millis(300));
sender.clone().send_all(stream::once(Ok(15))).wait().ok();
thread::sleep(Duration::from_millis(300));
sender.clone().send_all(stream::once(Ok(16))).wait().ok();
});
// the receiver side is a stream. we use tokio::spawn() to execute a future.
// and stream.collect() makes a future of a stream.
tokio::spawn(
// receiver stream
receiver
// here we would do other things with the data from the other thread.
.map(|x| println!("{}", x))
// turn the stream into a future that waits for the end of the stream.
.fold((), |_, _| Ok(()))
);
}));
}
@lolgesten
Copy link
Author

This is an example of how to use tokio without any network of file i/o.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment