Skip to content

Instantly share code, notes, and snippets.

@lolgesten
Last active May 28, 2018 09:38
Show Gist options
  • Save lolgesten/dcdbdc9eb1eb38d6f99ff989479bb82a to your computer and use it in GitHub Desktop.
Save lolgesten/dcdbdc9eb1eb38d6f99ff989479bb82a to your computer and use it in GitHub Desktop.
extern crate futures;
extern crate tokio;
use futures::*;
use std::sync::Arc;
use std::sync::Mutex;
use std::thread;
use std::time::Duration;
// Test stream struct.
struct TestStream {
// the actual state is protected by a mutex since we are going to
// manipulate in a separate thread to the one driving the stream.
state: Arc<Mutex<State>>,
}
// the inner protected state
struct State {
// last value we emitted
last: u64,
// the next value to emit, or None if it is emitted.
next: Option<u64>,
}
impl TestStream {
// schedule a thread producing the next value.
// the task handle is the key to waking up the
// stream polling to get the next value.
fn schedule_next(&self, task: task::Task) {
// local ref to not need "self" inside the thread.
let state = Arc::clone(&self.state);
thread::spawn(move || {
// sleep for a second
thread::sleep(Duration::from_millis(1000));
// obtain the lock
let mut lock = state.lock().unwrap();
// manipulate the state
lock.last += 1;
lock.next = Some(lock.last);
// and ***this is the key***, notify we got a new value to poll()
task.notify();
});
}
}
impl Stream for TestStream {
type Item = u64;
type Error = ();
fn poll(&mut self) -> Result<Async<Option<Self::Item>>, Self::Error> {
// obtain lock
let mut lock = self.state.lock().unwrap();
// take the optional value, leaving none in its place (if there is one)
match lock.next.take() {
Some(v) => {
if v > 44 {
// end here. Sending "None" indicates end of stream.
Ok(Async::Ready(None))
} else {
// emit the value
Ok(Async::Ready(Some(v)))
}
}
// no value? schedule another value.
None => {
// this is the handle that will be used to "tell" the stream polling
// there's another value to emit.
let task = task::current();
// schedule a value
self.schedule_next(task);
// tell poller we don't have a value yet.
Ok(Async::NotReady)
}
}
}
}
fn main() {
// this is a simple runtime that drives futures on a single thread.
// the alternative is to use tokio::run(), which creates a thread
// pool to drive any futures spawned onto it.
use tokio::runtime::current_thread::Runtime;
let mut rt = Runtime::new().expect("Failed to create tokio runtime");
rt.spawn(future::ok(()).map(|_| {
// the stream
let test = TestStream {
state: Arc::new(Mutex::new(State {
last: 41,
next: None,
})),
};
// drive the stream as a future, "consuming" each value as they drop in.
// the .for_each produces a future that is complete once the stream ends.
tokio::spawn(test.for_each(|v| {
println!("{}", v);
Ok(())
}));
}));
// Wait until the runtime becomes idle and shut it down.
rt.run().unwrap();
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment