Skip to content

Instantly share code, notes, and snippets.

@boxdot
Last active October 9, 2018 09:20
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save boxdot/1114dd64bde86c9b3afe231b045f50e5 to your computer and use it in GitHub Desktop.
Save boxdot/1114dd64bde86c9b3afe231b045f50e5 to your computer and use it in GitHub Desktop.
Example of a hyper service/server with rate limiting of requests
[package]
name = "ratelimiting-server"
version = "0.1.0"
[dependencies]
futures = "0.1"
tokio = "0.1"
hyper = "0.12"
// Example: hyper service with rate limiting
extern crate futures;
extern crate hyper;
extern crate tokio;
use futures::sync::{mpsc, oneshot};
use futures::*;
use hyper::service::service_fn;
use hyper::{Body, Request, Response, Server};
use std::time::{Duration, Instant};
use tokio::prelude::FutureExt;
use tokio::timer::{timeout, Delay};
type ReqWithRespChannel = (Request<Body>, oneshot::Sender<Response<Body>>);
fn main() {
let (tx_requests, rx_requests) =
mpsc::channel::<(Request<Body>, oneshot::Sender<Response<Body>>)>(1000);
let handle =
move |req: Request<Body>| -> Box<Future<Item = Response<Body>, Error = hyper::Error> + Send> {
println!("Incoming: {:?}", req);
let (p, c) = oneshot::channel::<Response<Body>>();
let fut_resp = tx_requests.clone()
.send((req, p))
.map_err(MyError::from)
.and_then(move |_| {
c.and_then(|resp| {
println!("got response {:?}", resp);
Ok(resp)
}).timeout(Duration::from_millis(1500)).map_err(MyError::from)
}).then(|resp: Result<Response<Body>, MyError>| {
let resp = match resp {
Ok(resp) => {
println!("successful response");
resp
}
Err(e) => {
println!("got error: {:?}", e);
Response::new(Body::from(format!("{:?}", e)))
}
};
Ok(resp)
});
Box::new(fut_resp)
};
// Note: Without buffer_unordered below for_each works as if we would have buffer_unordered(1).
let processing = rx_requests
.map(|(req, resp_channel)| {
println!("Request arrive in processing: {:?}", req);
let resp = Response::new(Body::from("pong"));
Delay::new(Instant::now() + Duration::from_millis(1000))
.and_then(move |_| {
println!("Sending response to oneshot");
let _ = resp_channel.send(resp);
Ok(())
}).map_err(|e| println!("Delay error: {:?}", e))
}).buffer_unordered(2)
.for_each(|_| Ok(()));
let addr = ([127, 0, 0, 1], 3000).into();
let server = Server::bind(&addr)
.serve(move || service_fn(handle.clone()))
.map_err(|e| eprintln!("server error: {}", e));
let mut rt = tokio::runtime::Runtime::new().unwrap();
rt.spawn(processing);
println!("Listening on http://{}", addr);
rt.block_on(server).unwrap();
}
#[derive(Debug)]
enum MyError {
SendError(Request<Body>),
Canceled,
}
impl From<mpsc::SendError<ReqWithRespChannel>> for MyError {
fn from(e: mpsc::SendError<ReqWithRespChannel>) -> Self {
MyError::SendError(e.into_inner().0)
}
}
impl From<timeout::Error<futures::Canceled>> for MyError {
fn from(_e: timeout::Error<futures::Canceled>) -> Self {
MyError::Canceled
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment