Skip to content

Instantly share code, notes, and snippets.

@GuillaumeGomez
Last active February 21, 2017 21:12
Show Gist options
  • Save GuillaumeGomez/779704041faca2fa5036cf36b73ff1e0 to your computer and use it in GitHub Desktop.
Save GuillaumeGomez/779704041faca2fa5036cf36b73ff1e0 to your computer and use it in GitHub Desktop.
#![deny(missing_docs)]
#![deny(warnings)]
//! A simple HTTP server based on a REST API.
//!
//! You can perform the following actions:
//!
//! * `GET: /[key]`: returns the value corresponding to [key].
//! * `POST: /[key]`: create a new entry or update it with the given value passed in the body.
//! * `DELETE: /[key]`: remove the corresponding [key] from the server.
extern crate crc;
extern crate futures;
extern crate hyper;
#[cfg(target_os = "linux")]
extern crate libc;
extern crate net2;
extern crate num_cpus;
extern crate reader_writer;
extern crate tokio_core;
use crc::{crc32, Hasher32};
use futures::{task, Async, Future, Poll, Stream};
use futures::future::BoxFuture;
use hyper::{Delete, Get, Put, StatusCode};
use hyper::header::ContentLength;
use hyper::server::{Http, Service, Request, Response};
use tokio_core::reactor::Core;
use tokio_core::net::{TcpListener, TcpStream};
use std::env;
use std::thread;
use std::path::Path;
use std::net::SocketAddr;
use std::sync::{Arc, Mutex};
use std::io::{Read, Write};
use std::fs::File;
use std::os::unix::io::{IntoRawFd, FromRawFd};
fn splitter(s: &str) -> Vec<&str> {
s.split('/').filter(|x| !x.is_empty()).collect()
}
#[derive(Clone)]
struct AlphaBravo {
rw: reader_writer::ReaderWriter,
}
impl AlphaBravo {
pub fn new<P: AsRef<Path>>(path: &P) -> AlphaBravo {
AlphaBravo {
rw: reader_writer::ReaderWriter::new(path).expect("ReaderWriter::new failed"),
}
}
}
impl Service for AlphaBravo {
type Request = Request;
type Response = Response;
type Error = hyper::Error;
type Future = BoxFuture<Self::Response, Self::Error>;
fn call(&self, req: Request) -> Self::Future {
let path = req.path().to_owned();
futures::future::ok(match *req.method() {
Get => {
let values = splitter(&path);
if values.len() < 2 || !self.rw.exists(&values[values.len() - 2]) {
Response::new().with_status(StatusCode::NotFound)
} else if let Some(mut file) = self.rw.get_file(&values[values.len() - 2], false) {
let mut out = Vec::new();
if file.read_to_end(&mut out).is_ok() {
Response::new().with_body(out)
} else {
Response::new().with_status(StatusCode::InternalServerError)
}
} else {
Response::new().with_status(StatusCode::NotFound)
}
}
Put => {
let values = splitter(&path);
if values.len() < 2 {
Response::new().with_status(StatusCode::NoContent)
} else if let Some(file) = self.rw.get_file(&"null"/*&values[values.len() - 2]*/, true) {
match req.headers().get::<ContentLength>() {
Some(&ContentLength(len)) => {
if len < 1 {
Response::new().with_status(StatusCode::NotModified)
} else {
let raw = file.into_raw_fd();
let digest = Arc::new(Mutex::new(crc32::Digest::new(crc32::IEEE)));
let digest_clone = digest.clone();
return req.body().for_each(move |chunk| {
digest_clone.lock().unwrap().write(&*chunk);
let mut file = unsafe { File::from_raw_fd(raw) };
if file.write(&*chunk).is_ok() {
file.into_raw_fd();
Ok(())
} else {
file.into_raw_fd();
Err(hyper::Error::Status)
}
}).then(move |r| {
let _file = unsafe { File::from_raw_fd(raw) };
let _digest = digest.lock().unwrap().sum32();
match r {
Ok(_) => {
/*if file.sync_data().is_ok() {
*/Ok(Response::new().with_status(StatusCode::Ok))
/*} else {
Ok(Response::new().with_status(StatusCode::InternalServerError))
}*/
}
Err(_) => Ok(Response::new().with_status(
StatusCode::InsufficientStorage)),
}
}).boxed();
}
}
None => Response::new().with_status(StatusCode::NotModified),
}
} else {
Response::new().with_status(StatusCode::InternalServerError)
}
}
Delete => {
let values = splitter(&path);
if values.len() >= 2 && self.rw.exists(&values[values.len() - 2]) {
match self.rw.remove(&values[values.len() - 2]) {
Ok(_) => Response::new().with_status(StatusCode::Ok),
Err(_) => Response::new().with_status(StatusCode::InternalServerError),
}
} else {
Response::new().with_status(StatusCode::NotFound)
}
}
_ => {
Response::new().with_status(StatusCode::NotFound)
}
}).boxed()
}
}
struct ThreadData {
entries: Vec<(TcpStream, SocketAddr)>,
task: Option<task::Task>,
}
impl ThreadData {
pub fn new() -> Arc<Mutex<ThreadData>> {
Arc::new(Mutex::new(ThreadData {
entries: Vec::new(),
task: None,
}))
}
}
struct Foo<F: Fn()> {
c: F,
}
impl<F: Fn()> Future for Foo<F> {
type Item = ();
type Error = ();
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
(self.c)();
Ok(Async::NotReady)
}
}
fn make_reader_threads<P: AsRef<Path> + Send + Sync>(path: P,
total: usize) -> Vec<Arc<Mutex<ThreadData>>> {
let mut thread_data = Vec::new();
for _ in 0..total {
let data = ThreadData::new();
thread_data.push(data.clone());
let rpath = path.as_ref();
let ppath = rpath.to_path_buf();
thread::spawn(move || {
let mut core = Core::new().unwrap();
let handle = core.handle();
let protocol = Http::new();
core.run(Foo { c: || {
let client = {
let mut data = data.lock().unwrap();
data.task = Some(task::park());
if data.entries.is_empty() {
return
}
data.entries.remove(0)
};
protocol.bind_connection(&handle, client.0, client.1, AlphaBravo::new(&ppath));
}}).unwrap();
});
}
thread_data
}
fn serve<P: AsRef<Path> + Send + Sync>(addr: &SocketAddr, path: P, total: usize) {
let thread_data = make_reader_threads(path, total);
let mut core = Core::new().unwrap();
let handle = core.handle();
let listener = net2::TcpBuilder::new_v4().unwrap()
.bind(addr).unwrap()
.listen(1000).unwrap();
let listener = TcpListener::from_listener(listener, addr, &handle).unwrap();
let mut counter = 0;
core.run(listener.incoming().for_each(move |(socket, addr)| {
let ref entry = thread_data[counter];
/*{
entry.entries.lock().unwrap().push((socket, addr));
entry.blocker.notify_one();
}*/
{
let mut entry = entry.lock().unwrap();
entry.entries.push((socket, addr));
if let Some(ref task) = entry.task {
task.unpark();
}
}
counter += 1;
if counter >= total {
counter = 0;
}
Ok(())
})).unwrap();
}
fn start_server<P: AsRef<Path> + Send + Sync>(nb_instances: usize, addr: &str, path: P) {
println!("===> Starting server on {} threads", nb_instances);
let addr = addr.parse().unwrap();
println!("=> Server listening on \"{}\"", addr);
println!("=> Saving files in \"{}\"", path.as_ref().display());
serve(&addr, path, nb_instances);
}
fn main() {
let num_cpus = num_cpus::get();
let num_cpus = if num_cpus > 4 { num_cpus - (num_cpus / 10) * 2 } else { num_cpus };
if let Some(path) = env::args().nth(1) {
start_server(num_cpus, "127.0.0.1:8080", path);
} else {
println!("No path received as argument so files will be stored in \"/tmp/data-rs/\".");
println!("If you want to specify a path, just start like this: \"./http-server [PATH]\"");
println!("");
start_server(num_cpus, "127.0.0.1:8080", "/tmp/data-rs/");
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment