Skip to content

Instantly share code, notes, and snippets.

@kardeiz
Created August 4, 2017 19:14
Show Gist options
  • Star 3 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save kardeiz/1d0b50dacbeaefe9c37ecf3315134461 to your computer and use it in GitHub Desktop.
Save kardeiz/1d0b50dacbeaefe9c37ecf3315134461 to your computer and use it in GitHub Desktop.
hyper async + mysql async
extern crate hyper;
extern crate mysql_async as my;
extern crate net2;
extern crate tokio_core;
extern crate futures;
use std::net::SocketAddr;
use hyper::server::Service;
use hyper::server::Http;
use hyper::server::{Request, Response};
use hyper::header::{ContentType, ContentLength};
use hyper::StatusCode;
mod cored {
use std::cell::RefCell;
use tokio_core::reactor::{Core, Handle};
struct Container {
core: RefCell<Option<Core>>,
handle: RefCell<Handle>
}
thread_local!(static CONTAINER: Container = {
let core = Core::new().unwrap();
let handle = core.handle();
Container {
core: RefCell::new(Some(core)),
handle: RefCell::new(handle)
}
});
pub fn take_local_core() -> Option<Core> {
CONTAINER.with(|o| o.core.borrow_mut().take() )
}
pub fn with_local_handle<T, F: FnOnce(&Handle) -> T>(cls: F) -> T {
CONTAINER.with(|o| cls(&*o.handle.borrow()))
}
}
struct App;
impl App {
pub fn quick_serve(self, num_threads: usize, addr: SocketAddr) {
use std::sync::Arc;
use net2::unix::UnixTcpBuilderExt;
use futures::Stream;
fn inner(addr: &SocketAddr, protocol: Arc<Http>, router: Arc<App>) {
let mut core = cored::take_local_core().unwrap();
let hdl = core.handle();
let listener = net2::TcpBuilder::new_v4()
.unwrap()
.reuse_port(true)
.unwrap()
.bind(addr)
.unwrap()
.listen(128)
.unwrap();
let listener =
tokio_core::net::TcpListener::from_listener(listener, addr, &hdl).unwrap();
core.run(listener.incoming().for_each(|(socket, addr)| {
protocol.bind_connection(&hdl, socket, addr, router.clone());
Ok(())
})).unwrap();
}
let protocol = Arc::new(Http::new());
let router = Arc::new(self);
for _ in 0..(num_threads - 1) {
let protocol_c = protocol.clone();
let router_c = router.clone();
std::thread::spawn(move || inner(&addr, protocol_c, router_c));
}
inner(&addr, protocol, router);
}
}
impl Service for App {
type Request = Request;
type Response = Response;
type Error = hyper::Error;
type Future = Box<futures::future::Future<Item=Response, Error=hyper::Error>>;
fn call(&self, req: Request) -> Self::Future {
use futures::Future;
use my::prelude::Queryable;
let db = ::std::env::var("DATABASE_URL").unwrap();
let pool = cored::with_local_handle(|hdl| my::Pool::new(&db, hdl));
let body = pool.get_conn()
.and_then(|conn| conn.prep_exec("SELECT id FROM clips", ()))
.and_then(|result| {
result.map_and_drop(|row| {
let (id,): (i32,) = my::from_row(row);
id.to_string()
})
})
.and_then(|(_, ids)| pool.disconnect().map(|_| ids))
.map(|ids| {
let body = ids.join(", ");
Response::new()
.with_header(ContentLength(body.len() as u64))
.with_body(body)
})
.map_err(|_| unimplemented!() );
Box::new(body)
}
}
fn main() {
App.quick_serve(2, "127.0.0.1:3000".parse().unwrap())
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment