Created
July 24, 2023 13:04
-
-
Save Jubiko31/89e2188d4ed86a43e1089e3570a19db3 to your computer and use it in GitHub Desktop.
π¦ Rust (Rocket) microservice to clean up postgres database within specified interval. Connection to Django app with RabbitMQ
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use tokio_postgres::{NoTls, Error}; | |
pub async fn pg_delete_old_records() -> Result<(), Error> { | |
let (client, connection) = tokio_postgres::connect("postgres://postgres:postgres@localhost:5432/test_db", NoTls).await?; | |
tokio::spawn(async move { | |
if let Err(e) = connection.await { | |
eprintln!("connection error: {}", e); | |
} | |
}); | |
let _result = client | |
.query("DELETE FROM records WHERE created_at < now() - INTERVAL '90 DAY'", &[]) | |
.await?; | |
Ok(()) | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#[macro_use] | |
extern crate rocket; | |
mod db; | |
use db::pg_delete_old_records; | |
use deadpool_lapin::{Manager, Pool, PoolError}; | |
use futures::{join, StreamExt}; | |
use lapin::{options::*, types::FieldTable, BasicProperties, ConnectionProperties}; | |
use std::convert::Infallible; | |
use std::result::Result as StdResult; | |
use std::time::Duration; | |
use thiserror::Error as ThisError; | |
use tokio_amqp::*; | |
use warp::{Filter, Rejection, Reply}; | |
use chrono::{Duration as ChDuration, Utc}; | |
type WebResult<T> = StdResult<T, Rejection>; | |
type RMQResult<T> = StdResult<T, PoolError>; | |
type Result<T> = StdResult<T, Error>; | |
type Connection = deadpool::managed::Object<deadpool_lapin::Manager>; | |
#[derive(ThisError, Debug)] | |
enum Error { | |
#[error("rmq error: {0}")] | |
RMQError(#[from] lapin::Error), | |
#[error("rmq pool error: {0}")] | |
RMQPoolError(#[from] PoolError), | |
} | |
impl warp::reject::Reject for Error {} | |
#[tokio::main] | |
async fn main() -> Result<()> { | |
let addr = | |
std::env::var("AMQP_ADDR").unwrap_or_else(|_| "amqp://guest:guest@localhost:5672/%2f".into()); | |
let manager = Manager::new(addr, ConnectionProperties::default().with_tokio()); | |
let pool: Pool = deadpool::managed::Pool::builder(manager) | |
.max_size(10) | |
.build() | |
.expect("can create pool"); | |
let health_route = warp::path!("health").and_then(health_handler); | |
let add_msg_route = warp::path!("msg") | |
.and(warp::post()) | |
.and(with_rmq(pool.clone())) | |
.and_then(add_msg_handler); | |
let hello_route = warp::path::end().map(hello); | |
let routes = health_route.or(add_msg_route).or(hello_route); | |
println!("Started server at http://localhost:9000 π¦"); | |
let _ = join!( | |
warp::serve(routes).run(([0, 0, 0, 0], 9000)), | |
rmq_listen(pool.clone()) | |
); | |
Ok(()) | |
} | |
fn with_rmq(pool: Pool) -> impl Filter<Extract = (Pool,), Error = Infallible> + Clone { | |
warp::any().map(move || pool.clone()) | |
} | |
async fn add_msg_handler(pool: Pool) -> WebResult<impl Reply> { | |
let payload = b"Hello world!"; | |
let rmq_con = get_rmq_con(pool).await.map_err(|e| { | |
eprintln!("can't connect to rmq, {}", e); | |
warp::reject::custom(Error::RMQPoolError(e)) | |
})?; | |
let channel = rmq_con.create_channel().await.map_err(|e| { | |
eprintln!("can't create channel, {}", e); | |
warp::reject::custom(Error::RMQError(e)) | |
})?; | |
channel | |
.basic_publish( | |
"", | |
"django_main_app_response_queue", | |
BasicPublishOptions::default(), | |
payload.to_vec(), | |
BasicProperties::default(), | |
) | |
.await | |
.map_err(|e| { | |
eprintln!("can't publish: {}", e); | |
warp::reject::custom(Error::RMQError(e)) | |
})? | |
.await | |
.map_err(|e| { | |
eprintln!("can't publish: {}", e); | |
warp::reject::custom(Error::RMQError(e)) | |
})?; | |
Ok("OK") | |
} | |
async fn health_handler() -> WebResult<impl Reply> { | |
Ok("OK") | |
} | |
async fn get_rmq_con(pool: Pool) -> RMQResult<Connection> { | |
let connection = pool.get().await?; | |
Ok(connection) | |
} | |
async fn rmq_listen(pool: Pool) -> Result<()> { | |
let mut retry_interval = tokio::time::interval(Duration::from_secs(5)); | |
loop { | |
retry_interval.tick().await; | |
println!("connecting rmq consumer..."); | |
match init_rmq_listen(pool.clone()).await { | |
Ok(_) => println!("rmq listen returned"), | |
Err(e) => eprintln!("rmq listen had an error: {}", e), | |
}; | |
} | |
} | |
async fn init_rmq_listen(pool: Pool) -> Result<()> { | |
let rmq_con = get_rmq_con(pool).await.map_err(|e| { | |
eprintln!("could not get rmq connectoin: {}", e); | |
e | |
})?; | |
let channel = rmq_con.create_channel().await?; | |
let queue = channel | |
.queue_declare( | |
"rust_microservice_queue", | |
QueueDeclareOptions::default(), | |
FieldTable::default(), | |
) | |
.await?; | |
println!("Declared queue {:?}", queue); | |
let mut consumer = channel | |
.basic_consume( | |
"rust_microservice_queue", | |
"my_consumer", | |
BasicConsumeOptions::default(), | |
FieldTable::default(), | |
) | |
.await?; | |
println!("rmq consumer connected, waiting for messages"); | |
while let Some(delivery) = consumer.next().await { | |
if let Ok((channel, delivery)) = delivery { | |
let trigger_message = String::from_utf8_lossy(&delivery.data).to_string(); | |
println!("==========================================\nReceived message from Django app: {}", trigger_message); | |
match trigger_message.as_str() { | |
"delete_records" => { | |
let interval = Utc::now() - ChDuration::days(90); | |
let interval_str = interval.to_rfc3339(); | |
let _ = pg_delete_old_records().await; | |
let message = format!("π¦[RUST_SERVICE] Deleted records older than: {}", interval_str); | |
channel | |
.basic_publish( | |
"", | |
"django_main_app_response_queue", | |
BasicPublishOptions::default(), | |
message.as_bytes().to_vec(), | |
BasicProperties::default(), | |
) | |
.await?; | |
}, | |
_ => { | |
println!("[Ign] Unknown message received: {}.", trigger_message) | |
} | |
} | |
channel | |
.basic_ack(delivery.delivery_tag, BasicAckOptions::default()) | |
.await? | |
} | |
} | |
Ok(()) | |
} | |
#[get("/hello")] | |
fn hello() -> &'static str { | |
"Hello from Rocket! π" | |
} | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment