Skip to content

Instantly share code, notes, and snippets.

@AaronM04
Created February 25, 2019 03:46
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save AaronM04/d2bfb0d618732c2881fdfeace15c7646 to your computer and use it in GitHub Desktop.
Save AaronM04/d2bfb0d618732c2881fdfeace15c7646 to your computer and use it in GitHub Desktop.
[package]
name = "filefuture"
version = "0.1.0"
authors = ["you"]
edition = "2018"
[dependencies]
futures = "*"
/*
I created this just to learn about how Futures work.
Running:
create a new bin project with cargo.
Add this to [dependencies] section of Cargo.toml:
futures = "*"
*/
extern crate futures;
use futures::future::Future;
use futures::{executor, task, Async, Poll};
use std::env;
use std::fs::File;
use std::io::Read;
use std::sync::mpsc::{channel, Receiver, TryRecvError};
use std::thread;
use std::time::Instant;
/// A future to read the contents of a file
enum ReadFileContents {
NotStarted(String),
// created on first call to poll()
Waiting {
ch_contents: Receiver<Result<String, String>>,
},
}
impl Future for ReadFileContents {
type Item = String;
type Error = String;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
println!("{:?} DEBUG: called poll()!", Instant::now());
match self {
ReadFileContents::NotStarted(ref path) => {
println!(
"{:?} DEBUG: first poll, so storing current task",
Instant::now()
);
let task = task::current();
let (tx, rx) = channel();
let path = path.clone();
*self = ReadFileContents::Waiting { ch_contents: rx };
thread::spawn(move || {
println!("{:?} DEBUG: worker: about to File::open...", Instant::now());
let mut f = match File::open(path) {
Ok(_f) => _f,
Err(err) => {
tx.send(Err(err.to_string())).unwrap();
return;
}
};
let mut contents = String::new();
println!(
"{:?} DEBUG: worker: about to read_to_string...",
Instant::now()
);
if let Err(err) = f.read_to_string(&mut contents) {
println!("{:?} DEBUG: worker: read_to_string failed! About to send err on channel...", Instant::now());
tx.send(Err(err.to_string())).unwrap();
task.notify();
return;
};
if let Err(ch_err) = tx.send(Ok(contents)) {
println!("{:?} ERROR: worker: read_file_contents: failed to send Ok(contents) to ch: {}", Instant::now(), ch_err.to_string());
}
task.notify();
});
Ok(Async::NotReady)
}
ReadFileContents::Waiting { ref ch_contents } => match ch_contents.try_recv() {
Ok(channel_item) => {
let result = match channel_item {
Ok(contents) => Ok(Async::Ready(contents)),
Err(msg) => Err(msg),
};
result
}
Err(TryRecvError::Empty) => Ok(Async::NotReady),
Err(TryRecvError::Disconnected) => panic!("already polled"),
},
}
}
}
fn read_file_contents(path: &str) -> ReadFileContents {
ReadFileContents::NotStarted(path.to_owned())
}
fn main() {
let path = env::args().nth(1).expect("Expected file path argument.");
let fut = read_file_contents(&path);
let fut2 = fut.and_then(|contents| {
println!("the contents are {} bytes long", contents.len());
Ok(())
});
executor::spawn(fut2).wait_future().unwrap();
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment