Skip to content

Instantly share code, notes, and snippets.

@seanmonstar
Forked from GuillaumeGomez/server
Last active February 22, 2017 17:58
Show Gist options
  • Save seanmonstar/08b4192ff69120d8042914e66d0c0f9f to your computer and use it in GitHub Desktop.
Save seanmonstar/08b4192ff69120d8042914e66d0c0f9f to your computer and use it in GitHub Desktop.
#![deny(missing_docs)]
#![deny(warnings)]
//! A simple HTTP server based on a REST API.
//!
//! You can perform the following actions:
//!
//! * `GET: /[key]`: returns the value corresponding to [key].
//! * `POST: /[key]`: create a new entry or update it with the given value passed in the body.
//! * `DELETE: /[key]`: remove the corresponding [key] from the server.
extern crate crc;
extern crate futures;
extern crate hyper;
#[cfg(target_os = "linux")]
extern crate libc;
extern crate net2;
extern crate num_cpus;
extern crate reader_writer;
extern crate tokio_core;
use crc::{crc32, Hasher32};
use futures::{task, Async, Future, Poll, Stream};
use futures::future::BoxFuture;
use futures::sync::mpsc;
use hyper::{Delete, Get, Put, StatusCode};
use hyper::header::ContentLength;
use hyper::server::{Http, Service, Request, Response};
use tokio_core::reactor::Core;
use tokio_core::net::{TcpListener, TcpStream};
use std::env;
use std::thread;
use std::path::Path;
use std::net::SocketAddr;
use std::io::{Read, Write};
use std::fs::File;
use std::os::unix::io::{IntoRawFd, FromRawFd};
fn splitter(s: &str) -> Vec<&str> {
s.split('/').filter(|x| !x.is_empty()).collect()
}
#[derive(Clone)]
struct AlphaBravo {
rw: reader_writer::ReaderWriter,
}
impl AlphaBravo {
pub fn new<P: AsRef<Path>>(path: &P) -> AlphaBravo {
AlphaBravo {
rw: reader_writer::ReaderWriter::new(path).expect("ReaderWriter::new failed"),
}
}
}
impl Service for AlphaBravo {
type Request = Request;
type Response = Response;
type Error = hyper::Error;
type Future = BoxFuture<Self::Response, Self::Error>;
fn call(&self, req: Request) -> Self::Future {
let path = req.path().to_owned();
futures::future::ok(match *req.method() {
Get => {
let values = splitter(&path);
if values.len() < 2 || !self.rw.exists(&values[values.len() - 2]) {
Response::new().with_status(StatusCode::NotFound)
} else if let Some(mut file) = self.rw.get_file(&values[values.len() - 2], false) {
let mut out = Vec::new();
if file.read_to_end(&mut out).is_ok() {
Response::new().with_body(out)
} else {
Response::new().with_status(StatusCode::InternalServerError)
}
} else {
Response::new().with_status(StatusCode::NotFound)
}
}
Put => {
let values = splitter(&path);
if values.len() < 2 {
Response::new().with_status(StatusCode::NoContent)
} else if let Some(file) = self.rw.get_file(&"null"/*&values[values.len() - 2]*/, true) {
match req.headers().get::<ContentLength>() {
Some(&ContentLength(len)) => {
if len < 1 {
Response::new().with_status(StatusCode::NotModified)
} else {
let digest = crc32::Digest::new(crc32::IEEE)
return req.body().fold((file, digent), move |(mut file, mut digest), chunk| {
digest.write(&*chunk);
if file.write(&*chunk).is_ok() {
Ok((file, digest))
} else {
file.into_raw_fd();
Err(hyper::Error::Status)
}
}).then(move |r| {
match r {
Ok((_file, digest) => {
digest.sum32();
/*if file.sync_data().is_ok() {
*/Ok(Response::new().with_status(StatusCode::Ok))
/*} else {
Ok(Response::new().with_status(StatusCode::InternalServerError))
}*/
}
Err(_) => Ok(Response::new().with_status(
StatusCode::InsufficientStorage)),
}
}).boxed();
}
}
None => Response::new().with_status(StatusCode::NotModified),
}
} else {
Response::new().with_status(StatusCode::InternalServerError)
}
}
Delete => {
let values = splitter(&path);
if values.len() >= 2 && self.rw.exists(&values[values.len() - 2]) {
match self.rw.remove(&values[values.len() - 2]) {
Ok(_) => Response::new().with_status(StatusCode::Ok),
Err(_) => Response::new().with_status(StatusCode::InternalServerError),
}
} else {
Response::new().with_status(StatusCode::NotFound)
}
}
_ => {
Response::new().with_status(StatusCode::NotFound)
}
}).boxed()
}
}
type ThreadData = mpsc::UnboundedSender<(TcpStream, SocketAddr)>;
fn make_reader_threads<P: AsRef<Path> + Send + Sync>(path: P,
total: usize) -> Vec<ThreadData> {
let mut thread_data = Vec::new();
for _ in 0..total {
let (tx, rx) = mpsc::unbounded();
thread_data.push(tx);
let rpath = path.as_ref();
let ppath = rpath.to_path_buf();
thread::spawn(move || {
let mut core = Core::new().unwrap();
let handle = core.handle();
let mut protocol = Http::new();
protocol.keep_alive(false);
core.run(rx.for_each(move |(socket, addr)| {
protocol.bind_connection(&handle, socket, addr, AlphaBravo::new(&ppath));
}).unwrap();
});
}
thread_data
}
fn serve<P: AsRef<Path> + Send + Sync>(addr: &SocketAddr, path: P, total: usize) {
let thread_data = make_reader_threads(path, total);
let mut core = Core::new().unwrap();
let handle = core.handle();
let listener = net2::TcpBuilder::new_v4().unwrap()
.bind(addr).unwrap()
.listen(1000).unwrap();
let listener = TcpListener::from_listener(listener, addr, &handle).unwrap();
let mut counter = 0;
core.run(listener.incoming().for_each(move |(socket, addr)| {
thread_data[counter].send((socket, addr)).unwrap();
counter += 1;
if counter >= total {
counter = 0;
}
Ok(())
})).unwrap();
}
fn start_server<P: AsRef<Path> + Send + Sync>(nb_instances: usize, addr: &str, path: P) {
println!("===> Starting server on {} threads", nb_instances);
let addr = addr.parse().unwrap();
println!("=> Server listening on \"{}\"", addr);
println!("=> Saving files in \"{}\"", path.as_ref().display());
serve(&addr, path, nb_instances);
}
fn main() {
let num_cpus = num_cpus::get();
let num_cpus = if num_cpus > 4 { num_cpus - (num_cpus / 10) * 2 } else { num_cpus };
if let Some(path) = env::args().nth(1) {
start_server(num_cpus, "127.0.0.1:8080", path);
} else {
println!("No path received as argument so files will be stored in \"/tmp/data-rs/\".");
println!("If you want to specify a path, just start like this: \"./http-server [PATH]\"");
println!("");
start_server(num_cpus, "127.0.0.1:8080", "/tmp/data-rs/");
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment