-
-
Save chisade/d40a723d6f544b3d3a714befde935c5e to your computer and use it in GitHub Desktop.
Rust database pool using (hyper,) tokio and postgres
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
[package] | |
name = "test" | |
version = "0.1.0" | |
authors = ["someone"] | |
edition = "2018" | |
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html | |
[dependencies] | |
futures = "0.3.1" | |
tokio = { version = "0.2", features = ["full"] } | |
hyper = "0.13.1" | |
pretty_env_logger = "0.3.1" | |
log = "^0.4" | |
postgres = "0.17.0" | |
tokio-postgres = "0.5.1" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
extern crate pretty_env_logger; | |
extern crate tokio_postgres; | |
use tokio_postgres::tls::NoTls; | |
use tokio_postgres::row::Row; | |
use postgres::types::ToSql; | |
use std::sync::{RwLock, Mutex}; | |
use std::collections::LinkedList; | |
struct Connection { | |
client : tokio_postgres::Client, | |
} | |
impl Connection { | |
fn new(client :tokio_postgres::Client) -> Connection { | |
Connection { | |
client, | |
} | |
} | |
} | |
#[derive(Default)] | |
pub struct Database { | |
pool : Mutex<RwLock<LinkedList<Connection>>>, | |
} | |
impl Database { | |
pub fn new() -> Database { | |
Database { | |
pool : Mutex::new(RwLock::new(LinkedList::new())), | |
} | |
} | |
async fn open_connection(&self) -> Result<Connection, Box<dyn std::error::Error>> { | |
let (client, connection) = tokio_postgres::connect("host=localhost user=rust password=some_password", NoTls).await?; | |
tokio::spawn(async move { | |
if let Err(e) = connection.await { | |
error!("The connection was not set up properly: {}", e); | |
} | |
}); | |
Ok(Connection::new(client)) | |
} | |
async fn get_connection(&self) -> Option<Connection> { | |
let connection : Option<Connection>; | |
{ | |
let pool = self.pool.lock().unwrap(); | |
let mut pool = pool.write().unwrap(); | |
debug!("Database: pool[{}]", pool.len()); | |
connection = pool.pop_front(); | |
} | |
match connection { | |
Some(connection) => Some(connection), | |
None => { | |
match self.open_connection().await { | |
Ok(connection) => Some(connection), | |
Err(_) => None, | |
} | |
}, | |
} | |
} | |
fn return_connection(&self, connection :Connection) { | |
let pool = self.pool.lock().unwrap(); | |
let mut pool = pool.write().unwrap(); | |
pool.push_front(connection); | |
} | |
pub async fn query<'a>(&self, query: &'a str, params: &'a[&'a(dyn ToSql + Sync)]) -> Option<Vec<Row>> { | |
debug!("in"); | |
match self.get_connection().await { | |
Some(connection) => { | |
match connection.client.query(query, params).await { | |
Ok(rows) => { | |
self.return_connection(connection); | |
Some(rows) | |
}, | |
Err(e) => { | |
self.return_connection(connection); | |
error!("{}", e); | |
None | |
}, | |
} | |
}, | |
None => { | |
error!("Failed to execute query {}", query); | |
None | |
} | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
mod database; | |
extern crate pretty_env_logger; | |
#[macro_use] extern crate log; | |
extern crate tokio_postgres; | |
use tokio_postgres::{Error}; | |
use std::sync::{Mutex, Arc}; | |
use hyper::service::{make_service_fn, service_fn}; | |
use hyper::{Body, Response, Server}; | |
use database::Database; | |
#[tokio::main] | |
pub async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> { | |
pretty_env_logger::init(); | |
let test : Arc<Mutex<Vec<String>>> = Arc::new(Mutex::new(Vec::new())); | |
let make_svc = make_service_fn(move |_conn| { | |
let test = test.clone(); | |
async move { | |
Ok::<_, hyper::Error>(service_fn(move |_req| { | |
let mut c = test.lock().unwrap(); | |
c.push(String::from("test")); | |
let part = format!("Hello World: {:?}", c); | |
async move { | |
Ok::<_, Error>(Response::new(Body::from(part))) | |
} | |
})) | |
} | |
}); | |
let addr = ([127, 0, 0, 1], 8080).into(); | |
let database = Arc::new(Database::new()); | |
let sql = "select 1"; | |
let params = &[]; | |
let database = database.clone(); | |
tokio::spawn(async move { | |
database.query(sql, params) | |
}); | |
match Server::try_bind(&addr) { | |
Ok(builder) => { | |
let server = builder.serve(make_svc); | |
info!("Listening on http://{}", addr); | |
if let Err(e) = server.await { | |
error!("Server error encountered: {}", e); | |
} | |
}, | |
Err(_) => error!("Failed to bind to: {}", addr), | |
}; | |
Ok(()) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment