Skip to content

Instantly share code, notes, and snippets.

@jeromer jeromer/E0525.rs
Created Mar 8, 2019

Embed
What would you like to do?
extern crate hyper;
extern crate tokio;
extern crate tokio_signal;
use futures::Stream;
use hyper::rt::Future;
use hyper::service::service_fn_ok;
use hyper::{Body, Request, Response, Server};
use futures::sink::Sink;
use futures::sync::{mpsc, oneshot};
use futures::{future, stream};
use std::time::Duration;
use tokio::timer::Interval;
fn main() {
let mut runtime = tokio::runtime::Runtime::new().unwrap();
let (tx1, rx1) = oneshot::channel::<()>();
let (tx_queue, rx_queue) = mpsc::channel(10);
// ----
runtime.spawn(init_queue(rx_queue));
// ----
let http_server = Server::bind(&([127, 0, 0, 1], 3000).into()).serve(|| {
service_fn_ok(move |_: Request<Body>| {
tx_queue.send(111);
Response::new(Body::from("Hello World!"))
})
});
let graceful = http_server
.with_graceful_shutdown(rx1)
.map_err(|err| eprintln!("server error: {}", err))
.and_then(|_| {
dbg!("stopped");
// TODO: stop order queue listener
Ok(())
});
dbg!("HTTP server listening ...");
runtime.spawn(graceful);
// ----
dbg!("Setting up signal handler");
let signal_handler = tokio_signal::ctrl_c()
.flatten_stream()
.take(1)
.for_each(|_| {
dbg!("received SIGINT");
Ok(())
});
runtime
.block_on(signal_handler)
.and_then(|_| {
dbg!("asking http server to stop");
tx1.send(()).unwrap();
Ok(())
})
.unwrap();
dbg!("exited");
}
pub fn init_queue(rx: mpsc::Receiver<usize>) -> impl Future<Item = (), Error = ()> {
#[derive(Eq, PartialEq)]
enum Item {
Value(usize),
Tick,
Done,
}
let tick_dur = Duration::from_secs(30);
let interval = Interval::new_interval(tick_dur)
.map(|_| Item::Tick)
.map_err(|_| ());
let items = rx
.map(Item::Value)
.chain(stream::once(Ok(Item::Done)))
// Merge in the stream of intervals
.select(interval)
// Terminate the stream once `Done` is received. This is necessary
// because `Interval` is an infinite stream and `select` will keep
// selecting on it.
.take_while(|item| future::ok(*item != Item::Done));
// With the stream of `Item` values, start our logic.
//
// Using `fold` allows the state to be maintained across iterations.
// In this case, the state is the number of read bytes between tick.
items
.fold(0, |num, item| {
match item {
// Sum the number of bytes with the state.
Item::Value(v) => future::ok(num + v),
Item::Tick => {
println!("process queue here !");
// Reset the byte counter
future::ok(0)
}
_ => unreachable!(),
}
})
.map(|_| ())
}
@jeromer

This comment has been minimized.

Copy link
Owner Author

commented Mar 8, 2019

➜  sandbox git:(master) ✗ cargo run
   Compiling sandbox v0.1.0 (/home/jerome/work/rustexperimentations/sandbox)
error[E0525]: expected a closure that implements the `Fn` trait, but this closure only implements `FnOnce`
  --> src/main.rs:29:74
   |
29 |     let http_server = Server::bind(&([127, 0, 0, 1], 3000).into()).serve(|| {
   |                                                                    ----- ^^ this closure implements `FnOnce`, not `Fn`
   |                                                                    |
   |                                                                    the requirement to implement `Fn` derives from here
30 |         service_fn_ok(move |_: Request<Body>| {
   |                       ----------------------- closure is `FnOnce` because it moves the variable `tx_queue` out of its environment
@jeromer

This comment has been minimized.

Copy link
Owner Author

commented Mar 8, 2019

[package]
name = "sandbox"
version = "0.1.0"
edition = "2018"

[dependencies]
futures = "0.1.25"
hyper = "0.12.24"
tokio-signal = "0.2.7"
tokio = "0.1.16"

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.