Skip to content

Instantly share code, notes, and snippets.

@max-itzpapalotl
Last active February 25, 2024 14:46
Show Gist options
  • Save max-itzpapalotl/7dcfbd37490f83e2e4ee591ad706c548 to your computer and use it in GitHub Desktop.
Save max-itzpapalotl/7dcfbd37490f83e2e4ee591ad706c548 to your computer and use it in GitHub Desktop.
26. An HTTP server using warp II

26. An HTTP server using warp II

Preparations

We need to add the following dependencies:

cargo add tokio --features full
cargo add warp
cargo add serde_json
cargo add serde --features derive
cargo add bytes

Using full as feature in tokio is a bit lazy, but we do it here for the sake of simplicity. Furthermore, we do not bother with TLS for now. We will, however, use asynchronous handler functions instead of synchronous closures, since this is more realistic for larger projects.

Multiple routes

use bytes::Bytes;
use serde::{Serialize, Deserialize};
use std::net::IpAddr;
use warp::{Filter, Rejection};
use warp::http::StatusCode;

#[derive(Debug, Serialize, Deserialize)]
struct Person {
    name: String,
    age: i32,
}

#[derive(Debug, Serialize, Deserialize)]
struct ErrorBody {
    error_code: i32,
    error_message: String,
}

async fn get_handler(name: String) -> Result<warp::reply::WithStatus<warp::reply::Json>, Rejection> {
    println!("Got GET request for name {}", name);
    let p = Person{name, age: 54};
    Ok(warp::reply::with_status(warp::reply::json(&p), StatusCode::OK))
}

async fn post_handler(body: Bytes) -> Result<warp::reply::WithStatus<warp::reply::Json>, Rejection> {
    let p : Person = serde_json::from_slice(&body[..]).unwrap();
    println!("Got POST for Person: {:?}", p);
    Ok(warp::reply::with_status(warp::reply::json(&p), StatusCode::OK))
}

#[tokio::main]
async fn main() {
    let api_get = 
        warp::path!("api" / "person" / String)
        .and(warp::get())
        .and_then(get_handler);
    let api_post =
        warp::path!("api" / "person")
        .and(warp::post())
        .and(warp::body::bytes())
        .and_then(post_handler);
    let api = api_get.or(api_post);

    let ip_addr: IpAddr = "0.0.0.0".parse().unwrap();
    warp::serve(api).run((ip_addr, 8000)).await;
}

Error handling

use bytes::Bytes;
use serde::{Serialize, Deserialize};
use std::net::IpAddr;
use warp::{Filter, Rejection};
use warp::http::StatusCode;

#[derive(Debug, Serialize, Deserialize)]
struct Person {
    name: String,
    age: i32,
}

#[derive(Debug, Serialize, Deserialize)]
struct ErrorBody {
    error_code: i32,
    error_message: String,
}

async fn get_handler(name: String) -> Result<warp::reply::WithStatus<warp::reply::Json>, Rejection> {
    println!("Got GET request for name {}", name);
    let p = Person{name, age: 54};
    Ok(warp::reply::with_status(warp::reply::json(&p), StatusCode::OK))
}

async fn post_handler(body: Bytes) -> Result<warp::reply::WithStatus<warp::reply::Json>, Rejection> {
    let r : serde_json::Result<Person> = serde_json::from_slice(&body[..]);
    match r {
        Ok(p) => {
            println!("Got POST for Person: {:?}", p);
            Ok(warp::reply::with_status(warp::reply::json(&p),
                                        StatusCode::OK))
        },
        Err(e) => {
            println!("Got bad POST for Person: {:?}", e);
            let eb = ErrorBody{
                error_code: 1,
                error_message: format!("{:?}", e),
            };
            Ok(warp::reply::with_status(warp::reply::json(&eb),
                                        StatusCode::BAD_REQUEST))
        }
    }
}

#[tokio::main]
async fn main() {
    let api_get = 
        warp::path!("api" / "person" / String)
        .and(warp::get())
        .and_then(get_handler);
    let api_post =
        warp::path!("api" / "person")
        .and(warp::post())
        .and(warp::body::bytes())
        .and_then(post_handler);
    let api = api_get.or(api_post);

    let ip_addr: IpAddr = "0.0.0.0".parse().unwrap();
    warp::serve(api).run((ip_addr, 8000)).await;
}

## Rejections and centralized error handling

```rust
use bytes::Bytes;
use serde::{Deserialize, Serialize};
use std::convert::Infallible;
use std::net::IpAddr;
use warp::http::StatusCode;
use warp::{Filter, Rejection};

#[derive(Debug, Serialize, Deserialize)]
struct Person {
    name: String,
    age: i32,
}

#[derive(Debug, Serialize, Deserialize, Clone)]
struct ErrorBody {
    error_code: u16,
    error_message: String,
}

impl warp::reject::Reject for ErrorBody {}

async fn get_handler(
    name: String,
) -> Result<warp::reply::WithStatus<warp::reply::Json>, Rejection> {
    println!("Got GET request for name {}", name);
    let p = Person { name, age: 54 };
    Ok(warp::reply::with_status(
        warp::reply::json(&p),
        StatusCode::OK,
    ))
}

async fn post_handler(
    body: Bytes,
) -> Result<warp::reply::WithStatus<warp::reply::Json>, Rejection> {
    let r: serde_json::Result<Person> = serde_json::from_slice(&body[..]);
    match r {
        Ok(p) => {
            println!("Got POST for Person: {:?}", p);
            Ok(warp::reply::with_status(
                warp::reply::json(&p),
                StatusCode::OK,
            ))
        }
        Err(e) => {
            println!("Got bad POST for Person: {:?}", e);
            Err(warp::reject::custom(ErrorBody {
                error_code: 1,
                error_message: format!("{:?}", e),
            }))
        }
    }
}

pub async fn handle_errors(err: Rejection) -> Result<impl warp::Reply, Infallible> {
    let code: StatusCode;
    let eb: ErrorBody;

    if err.is_not_found() {
        code = StatusCode::NOT_FOUND;
        eb = ErrorBody {
            error_code: 404,
            error_message: "NOT_FOUND".to_string(),
        };
    } else if let Some(_) = err.find::<warp::reject::MethodNotAllowed>() {
        code = StatusCode::METHOD_NOT_ALLOWED;
        eb = ErrorBody {
            error_code: code.as_u16(),
            error_message: "METHOD_NOT_ALLOWED".to_string(),
        };
    } else if let Some(e) = err.find::<ErrorBody>() {
        code = StatusCode::UNAUTHORIZED;
        eb = e.clone();
    } else {
        code = StatusCode::INTERNAL_SERVER_ERROR;
        eb = ErrorBody {
            error_code: code.as_u16(),
            error_message: "INTERNAL_SERVER_ERROR".to_string(),
        };
    }
    Ok(warp::reply::with_status(warp::reply::json(&eb), code))
}

#[tokio::main]
async fn main() {
    let api_get = warp::path!("api" / "person" / String)
        .and(warp::get())
        .and_then(get_handler);
    let api_post = warp::path!("api" / "person")
        .and(warp::post())
        .and(warp::body::bytes())
        .and_then(post_handler);
    let api = api_get.or(api_post).recover(handle_errors);

    let ip_addr: IpAddr = "0.0.0.0".parse().unwrap();
    warp::serve(api).run((ip_addr, 8000)).await;
}

Data sharing between handlers

use bytes::Bytes;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::convert::Infallible;
use std::net::IpAddr;
use std::sync::{Arc, RwLock};
use warp::http::StatusCode;
use warp::{Filter, Rejection};

#[derive(Debug, Serialize, Deserialize, Clone)]
struct Person {
    name: String,
    age: i32,
}

#[derive(Debug, Serialize, Deserialize, Clone)]
struct ErrorBody {
    error_code: u16,
    error_message: String,
}

impl warp::reject::Reject for ErrorBody {}

struct PersonBase {
    map: HashMap<String, Person>,
}

fn with_person_base(
    pb: Arc<RwLock<PersonBase>>,
) -> impl Filter<Extract = (Arc<RwLock<PersonBase>>,), Error = Infallible> + Clone {
    warp::any().map(move || pb.clone())
}

async fn get_handler(
    name: String,
    pb: Arc<RwLock<PersonBase>>,
) -> Result<warp::reply::WithStatus<warp::reply::Json>, Rejection> {
    println!("Got GET request for name {}", name);
    let guard = pb.read().unwrap();
    let p = guard.map.get(&name);
    match p {
        Some(pp) => Ok(warp::reply::with_status(
            warp::reply::json(&pp),
            StatusCode::OK,
        )),
        None => {
            let err = ErrorBody {
                error_code: 2,
                error_message: format!("No person with name {name} in database!"),
            };
            Ok(warp::reply::with_status(
                warp::reply::json(&err),
                StatusCode::NOT_FOUND,
            ))
        }
    }
}

async fn post_handler(
    body: Bytes,
    pb: Arc<RwLock<PersonBase>>,
) -> Result<warp::reply::WithStatus<warp::reply::Json>, Rejection> {
    let r: serde_json::Result<Person> = serde_json::from_slice(&body[..]);
    match r {
        Ok(p) => {
            println!("Got POST for Person: {:?}", p);
            let mut guard = pb.write().unwrap();
            let p_db = guard.map.get(&p.name);
            match p_db {
                None => {
                    guard.map.insert(p.name.clone(), p.clone());
                    Ok(warp::reply::with_status(
                        warp::reply::json(&p),
                        StatusCode::OK,
                    ))
                }
                Some(_) => {
                    let err = ErrorBody {
                        error_code: 3,
                        error_message: format!("Person with name {} already in database!", p.name),
                    };
                    Ok(warp::reply::with_status(
                        warp::reply::json(&err),
                        StatusCode::CONFLICT,
                    ))
                }
            }
        }
        Err(e) => {
            println!("Got bad POST for Person: {:?}", e);
            Err(warp::reject::custom(ErrorBody {
                error_code: 1,
                error_message: format!("{:?}", e),
            }))
        }
    }
}

pub async fn handle_errors(err: Rejection) -> Result<impl warp::Reply, Infallible> {
    let code: StatusCode;
    let eb: ErrorBody;

    if err.is_not_found() {
        code = StatusCode::NOT_FOUND;
        eb = ErrorBody {
            error_code: 404,
            error_message: "NOT_FOUND".to_string(),
        };
    } else if let Some(_) = err.find::<warp::reject::MethodNotAllowed>() {
        code = StatusCode::METHOD_NOT_ALLOWED;
        eb = ErrorBody {
            error_code: code.as_u16(),
            error_message: "METHOD_NOT_ALLOWED".to_string(),
        };
    } else if let Some(e) = err.find::<ErrorBody>() {
        code = StatusCode::BAD_REQUEST;
        eb = e.clone();
    } else {
        code = StatusCode::INTERNAL_SERVER_ERROR;
        eb = ErrorBody {
            error_code: code.as_u16(),
            error_message: "INTERNAL_SERVER_ERROR".to_string(),
        };
    }
    Ok(warp::reply::with_status(warp::reply::json(&eb), code))
}

#[tokio::main]
async fn main() {
    let person_base = Arc::new(RwLock::new(PersonBase {
        map: HashMap::new(),
    }));

    let api_get = warp::path!("api" / "person" / String)
        .and(warp::get())
        .and(with_person_base(person_base.clone()))
        .and_then(get_handler);
    let api_post = warp::path!("api" / "person")
        .and(warp::post())
        .and(warp::body::bytes())
        .and(with_person_base(person_base.clone()))
        .and_then(post_handler);
    let api = api_get.or(api_post).recover(handle_errors);

    let ip_addr: IpAddr = "0.0.0.0".parse().unwrap();
    warp::serve(api).run((ip_addr, 8000)).await;
}

Authentication

use bytes::Bytes;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::convert::Infallible;
use std::net::IpAddr;
use std::sync::{Arc, RwLock};
use warp::http::StatusCode;
use warp::{Filter, Rejection};

#[derive(Debug, Serialize, Deserialize, Clone)]
struct Person {
    name: String,
    age: i32,
}

#[derive(Debug, Serialize, Deserialize, Clone)]
struct ErrorBody {
    error_code: u16,
    error_message: String,
}

impl warp::reject::Reject for ErrorBody {}

#[derive(Debug)]
struct Unauthorized {
    error: String,
}

impl warp::reject::Reject for Unauthorized {}

struct PersonBase {
    map: HashMap<String, Person>,
}

fn with_person_base(
    pb: Arc<RwLock<PersonBase>>,
) -> impl Filter<Extract = (Arc<RwLock<PersonBase>>,), Error = Infallible> + Clone {
    warp::any().map(move || pb.clone())
}

fn authorize(auth: &Option<String>) -> Result<(), Rejection> {
    match auth {
        Some(auth) => {
            println!("Auth header:{}<<<", auth);
            // Put your authentication here...
            if auth == "xxx" {
                return Err(warp::reject::custom(Unauthorized {
                    error: "Bad header!".to_string(),
                }));
            }
        }
        None => {
            return Err(warp::reject::custom(Unauthorized {
                error: "Missing auth header!".to_string(),
            }));
        }
    }
    Ok(())
}

async fn get_handler(
    name: String,
    auth: Option<String>,
    pb: Arc<RwLock<PersonBase>>,
) -> Result<warp::reply::WithStatus<warp::reply::Json>, Rejection> {
    authorize(&auth)?;
    println!("Got GET request for name {}", name);
    let guard = pb.read().unwrap();
    let p = guard.map.get(&name);
    match p {
        Some(pp) => Ok(warp::reply::with_status(
            warp::reply::json(&pp),
            StatusCode::OK,
        )),
        None => {
            let err = ErrorBody {
                error_code: 2,
                error_message: format!("No person with name {name} in database!"),
            };
            Ok(warp::reply::with_status(
                warp::reply::json(&err),
                StatusCode::NOT_FOUND,
            ))
        }
    }
}

async fn post_handler(
    auth: Option<String>,
    body: Bytes,
    pb: Arc<RwLock<PersonBase>>,
) -> Result<warp::reply::WithStatus<warp::reply::Json>, Rejection> {
    authorize(&auth)?;
    let r: serde_json::Result<Person> = serde_json::from_slice(&body[..]);
    match r {
        Ok(p) => {
            println!("Got POST for Person: {:?}", p);
            let mut guard = pb.write().unwrap();
            let p_db = guard.map.get(&p.name);
            match p_db {
                None => {
                    guard.map.insert(p.name.clone(), p.clone());
                    Ok(warp::reply::with_status(
                        warp::reply::json(&p),
                        StatusCode::OK,
                    ))
                }
                Some(_) => {
                    let err = ErrorBody {
                        error_code: 3,
                        error_message: format!("Person with name {} already in database!", p.name),
                    };
                    Ok(warp::reply::with_status(
                        warp::reply::json(&err),
                        StatusCode::CONFLICT,
                    ))
                }
            }
        }
        Err(e) => {
            println!("Got bad POST for Person: {:?}", e);
            Err(warp::reject::custom(ErrorBody {
                error_code: 1,
                error_message: format!("{:?}", e),
            }))
        }
    }
}

pub async fn handle_errors(err: Rejection) -> Result<impl warp::Reply, Infallible> {
    let code: StatusCode;
    let eb: ErrorBody;

    if err.is_not_found() {
        code = StatusCode::NOT_FOUND;
        eb = ErrorBody {
            error_code: 404,
            error_message: "NOT_FOUND".to_string(),
        };
    } else if let Some(_) = err.find::<warp::reject::MethodNotAllowed>() {
        code = StatusCode::METHOD_NOT_ALLOWED;
        eb = ErrorBody {
            error_code: code.as_u16(),
            error_message: "METHOD_NOT_ALLOWED".to_string(),
        };
    } else if let Some(e) = err.find::<Unauthorized>() {
        code = StatusCode::UNAUTHORIZED;
        eb = ErrorBody {
            error_code: code.as_u16(),
            error_message: e.error.clone(),
        };
    } else if let Some(e) = err.find::<ErrorBody>() {
        code = StatusCode::BAD_REQUEST;
        eb = e.clone();
    } else {
        code = StatusCode::INTERNAL_SERVER_ERROR;
        eb = ErrorBody {
            error_code: code.as_u16(),
            error_message: "INTERNAL_SERVER_ERROR".to_string(),
        };
    }
    Ok(warp::reply::with_status(warp::reply::json(&eb), code))
}

#[tokio::main]
async fn main() {
    let person_base = Arc::new(RwLock::new(PersonBase {
        map: HashMap::new(),
    }));

    let api_get = warp::path!("api" / "person" / String)
        .and(warp::get())
        .and(warp::filters::header::optional::<String>("authorization"))
        .and(with_person_base(person_base.clone()))
        .and_then(get_handler);
    let api_post = warp::path!("api" / "person")
        .and(warp::post())
        .and(warp::filters::header::optional::<String>("authorization"))
        .and(warp::body::bytes())
        .and(with_person_base(person_base.clone()))
        .and_then(post_handler);
    let api = api_get.or(api_post).recover(handle_errors);

    let ip_addr: IpAddr = "0.0.0.0".parse().unwrap();
    warp::serve(api).run((ip_addr, 8000)).await;
}

Clean shutdown

use bytes::Bytes;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::convert::Infallible;
use std::net::IpAddr;
use std::sync::{Arc, Mutex, RwLock};
use tokio::sync::oneshot;
use warp::http::StatusCode;
use warp::{Filter, Rejection};

#[derive(Debug, Serialize, Deserialize, Clone)]
struct Person {
    name: String,
    age: i32,
}

#[derive(Debug, Serialize, Deserialize, Clone)]
struct ErrorBody {
    error_code: u16,
    error_message: String,
}

impl warp::reject::Reject for ErrorBody {}

#[derive(Debug)]
struct Unauthorized {
    error: String,
}

impl warp::reject::Reject for Unauthorized {}

struct PersonBase {
    map: HashMap<String, Person>,
}

fn with_person_base(
    pb: Arc<RwLock<PersonBase>>,
) -> impl Filter<Extract = (Arc<RwLock<PersonBase>>,), Error = Infallible> + Clone {
    warp::any().map(move || pb.clone())
}

fn authorize(auth: &Option<String>) -> Result<(), Rejection> {
    match auth {
        Some(auth) => {
            println!("Auth header:{}<<<", auth);
            // Put your authentication here...
            if auth == "xxx" {
                return Err(warp::reject::custom(Unauthorized {
                    error: "Bad header!".to_string(),
                }));
            }
        }
        None => {
            return Err(warp::reject::custom(Unauthorized {
                error: "Missing auth header!".to_string(),
            }));
        }
    }
    Ok(())
}

async fn shutdown_handler(
    auth: Option<String>,
    tx_arc: Arc<Mutex<Option<tokio::sync::oneshot::Sender<()>>>>,
) -> Result<warp::reply::WithStatus<warp::reply::Json>, Rejection> {
    authorize(&auth)?;
    let mut tx = tx_arc.lock().unwrap();
    let tx_out = tx.take();
    if let Some(tx) = tx_out {
        tx.send(()).unwrap();
    }
    Ok(warp::reply::with_status(
        warp::reply::json(&ErrorBody {
            error_code: 0,
            error_message: "".to_string(),
        }),
        StatusCode::OK,
    ))
}

async fn get_handler(
    name: String,
    auth: Option<String>,
    pb: Arc<RwLock<PersonBase>>,
) -> Result<warp::reply::WithStatus<warp::reply::Json>, Rejection> {
    authorize(&auth)?;
    println!("Got GET request for name {}", name);
    let guard = pb.read().unwrap();
    let p = guard.map.get(&name);
    match p {
        Some(pp) => Ok(warp::reply::with_status(
            warp::reply::json(&pp),
            StatusCode::OK,
        )),
        None => {
            let err = ErrorBody {
                error_code: 2,
                error_message: format!("No person with name {name} in database!"),
            };
            Ok(warp::reply::with_status(
                warp::reply::json(&err),
                StatusCode::NOT_FOUND,
            ))
        }
    }
}

async fn post_handler(
    auth: Option<String>,
    body: Bytes,
    pb: Arc<RwLock<PersonBase>>,
) -> Result<warp::reply::WithStatus<warp::reply::Json>, Rejection> {
    authorize(&auth)?;
    let r: serde_json::Result<Person> = serde_json::from_slice(&body[..]);
    match r {
        Ok(p) => {
            println!("Got POST for Person: {:?}", p);
            let mut guard = pb.write().unwrap();
            let p_db = guard.map.get(&p.name);
            match p_db {
                None => {
                    guard.map.insert(p.name.clone(), p.clone());
                    Ok(warp::reply::with_status(
                        warp::reply::json(&p),
                        StatusCode::OK,
                    ))
                }
                Some(_) => {
                    let err = ErrorBody {
                        error_code: 3,
                        error_message: format!("Person with name {} already in database!", p.name),
                    };
                    Ok(warp::reply::with_status(
                        warp::reply::json(&err),
                        StatusCode::CONFLICT,
                    ))
                }
            }
        }
        Err(e) => {
            println!("Got bad POST for Person: {:?}", e);
            Err(warp::reject::custom(ErrorBody {
                error_code: 1,
                error_message: format!("{:?}", e),
            }))
        }
    }
}

pub async fn handle_errors(err: Rejection) -> Result<impl warp::Reply, Infallible> {
    let code: StatusCode;
    let eb: ErrorBody;

    if err.is_not_found() {
        code = StatusCode::NOT_FOUND;
        eb = ErrorBody {
            error_code: 404,
            error_message: "NOT_FOUND".to_string(),
        };
    } else if let Some(_) = err.find::<warp::reject::MethodNotAllowed>() {
        code = StatusCode::METHOD_NOT_ALLOWED;
        eb = ErrorBody {
            error_code: code.as_u16(),
            error_message: "METHOD_NOT_ALLOWED".to_string(),
        };
    } else if let Some(e) = err.find::<Unauthorized>() {
        code = StatusCode::UNAUTHORIZED;
        eb = ErrorBody {
            error_code: code.as_u16(),
            error_message: e.error.clone(),
        };
    } else if let Some(e) = err.find::<ErrorBody>() {
        code = StatusCode::BAD_REQUEST;
        eb = e.clone();
    } else {
        code = StatusCode::INTERNAL_SERVER_ERROR;
        eb = ErrorBody {
            error_code: code.as_u16(),
            error_message: "INTERNAL_SERVER_ERROR".to_string(),
        };
    }
    Ok(warp::reply::with_status(warp::reply::json(&eb), code))
}

#[tokio::main]
async fn main() {
    let person_base = Arc::new(RwLock::new(PersonBase {
        map: HashMap::new(),
    }));

    let (tx, rx) = oneshot::channel::<()>();
    let tx_arc = Arc::new(Mutex::new(Some(tx)));

    let api_shutdown = warp::path!("api" / "shutdown")
        .and(warp::delete())
        .and(warp::filters::header::optional::<String>("authorization"))
        .and(warp::any().map(move || tx_arc.clone()))
        .and_then(shutdown_handler);
    let api_get = warp::path!("api" / "person" / String)
        .and(warp::get())
        .and(warp::filters::header::optional::<String>("authorization"))
        .and(with_person_base(person_base.clone()))
        .and_then(get_handler);
    let api_post = warp::path!("api" / "person")
        .and(warp::post())
        .and(warp::filters::header::optional::<String>("authorization"))
        .and(warp::body::bytes())
        .and(with_person_base(person_base.clone()))
        .and_then(post_handler);
    let api = api_shutdown.or(api_get).or(api_post).recover(handle_errors);

    let ip_addr: IpAddr = "0.0.0.0".parse().unwrap();
    let (_addr, server) = warp::serve(api).bind_with_graceful_shutdown((ip_addr, 8000), async {
        rx.await.ok();
    });
    let j = tokio::task::spawn(server);
    j.await.unwrap();
}

References

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment