Skip to content

Instantly share code, notes, and snippets.

@Jubiko31
Created July 24, 2023 13:04
Show Gist options
  • Save Jubiko31/89e2188d4ed86a43e1089e3570a19db3 to your computer and use it in GitHub Desktop.
Save Jubiko31/89e2188d4ed86a43e1089e3570a19db3 to your computer and use it in GitHub Desktop.
πŸ¦€ Rust (Rocket) microservice to clean up postgres database within specified interval. Connection to Django app with RabbitMQ
use tokio_postgres::{NoTls, Error};
pub async fn pg_delete_old_records() -> Result<(), Error> {
let (client, connection) = tokio_postgres::connect("postgres://postgres:postgres@localhost:5432/test_db", NoTls).await?;
tokio::spawn(async move {
if let Err(e) = connection.await {
eprintln!("connection error: {}", e);
}
});
let _result = client
.query("DELETE FROM records WHERE created_at < now() - INTERVAL '90 DAY'", &[])
.await?;
Ok(())
}
#[macro_use]
extern crate rocket;
mod db;
use db::pg_delete_old_records;
use deadpool_lapin::{Manager, Pool, PoolError};
use futures::{join, StreamExt};
use lapin::{options::*, types::FieldTable, BasicProperties, ConnectionProperties};
use std::convert::Infallible;
use std::result::Result as StdResult;
use std::time::Duration;
use thiserror::Error as ThisError;
use tokio_amqp::*;
use warp::{Filter, Rejection, Reply};
use chrono::{Duration as ChDuration, Utc};
type WebResult<T> = StdResult<T, Rejection>;
type RMQResult<T> = StdResult<T, PoolError>;
type Result<T> = StdResult<T, Error>;
type Connection = deadpool::managed::Object<deadpool_lapin::Manager>;
#[derive(ThisError, Debug)]
enum Error {
#[error("rmq error: {0}")]
RMQError(#[from] lapin::Error),
#[error("rmq pool error: {0}")]
RMQPoolError(#[from] PoolError),
}
impl warp::reject::Reject for Error {}
#[tokio::main]
async fn main() -> Result<()> {
let addr =
std::env::var("AMQP_ADDR").unwrap_or_else(|_| "amqp://guest:guest@localhost:5672/%2f".into());
let manager = Manager::new(addr, ConnectionProperties::default().with_tokio());
let pool: Pool = deadpool::managed::Pool::builder(manager)
.max_size(10)
.build()
.expect("can create pool");
let health_route = warp::path!("health").and_then(health_handler);
let add_msg_route = warp::path!("msg")
.and(warp::post())
.and(with_rmq(pool.clone()))
.and_then(add_msg_handler);
let hello_route = warp::path::end().map(hello);
let routes = health_route.or(add_msg_route).or(hello_route);
println!("Started server at http://localhost:9000 πŸ¦€");
let _ = join!(
warp::serve(routes).run(([0, 0, 0, 0], 9000)),
rmq_listen(pool.clone())
);
Ok(())
}
fn with_rmq(pool: Pool) -> impl Filter<Extract = (Pool,), Error = Infallible> + Clone {
warp::any().map(move || pool.clone())
}
async fn add_msg_handler(pool: Pool) -> WebResult<impl Reply> {
let payload = b"Hello world!";
let rmq_con = get_rmq_con(pool).await.map_err(|e| {
eprintln!("can't connect to rmq, {}", e);
warp::reject::custom(Error::RMQPoolError(e))
})?;
let channel = rmq_con.create_channel().await.map_err(|e| {
eprintln!("can't create channel, {}", e);
warp::reject::custom(Error::RMQError(e))
})?;
channel
.basic_publish(
"",
"django_main_app_response_queue",
BasicPublishOptions::default(),
payload.to_vec(),
BasicProperties::default(),
)
.await
.map_err(|e| {
eprintln!("can't publish: {}", e);
warp::reject::custom(Error::RMQError(e))
})?
.await
.map_err(|e| {
eprintln!("can't publish: {}", e);
warp::reject::custom(Error::RMQError(e))
})?;
Ok("OK")
}
async fn health_handler() -> WebResult<impl Reply> {
Ok("OK")
}
async fn get_rmq_con(pool: Pool) -> RMQResult<Connection> {
let connection = pool.get().await?;
Ok(connection)
}
async fn rmq_listen(pool: Pool) -> Result<()> {
let mut retry_interval = tokio::time::interval(Duration::from_secs(5));
loop {
retry_interval.tick().await;
println!("connecting rmq consumer...");
match init_rmq_listen(pool.clone()).await {
Ok(_) => println!("rmq listen returned"),
Err(e) => eprintln!("rmq listen had an error: {}", e),
};
}
}
async fn init_rmq_listen(pool: Pool) -> Result<()> {
let rmq_con = get_rmq_con(pool).await.map_err(|e| {
eprintln!("could not get rmq connectoin: {}", e);
e
})?;
let channel = rmq_con.create_channel().await?;
let queue = channel
.queue_declare(
"rust_microservice_queue",
QueueDeclareOptions::default(),
FieldTable::default(),
)
.await?;
println!("Declared queue {:?}", queue);
let mut consumer = channel
.basic_consume(
"rust_microservice_queue",
"my_consumer",
BasicConsumeOptions::default(),
FieldTable::default(),
)
.await?;
println!("rmq consumer connected, waiting for messages");
while let Some(delivery) = consumer.next().await {
if let Ok((channel, delivery)) = delivery {
let trigger_message = String::from_utf8_lossy(&delivery.data).to_string();
println!("==========================================\nReceived message from Django app: {}", trigger_message);
match trigger_message.as_str() {
"delete_records" => {
let interval = Utc::now() - ChDuration::days(90);
let interval_str = interval.to_rfc3339();
let _ = pg_delete_old_records().await;
let message = format!("πŸ¦€[RUST_SERVICE] Deleted records older than: {}", interval_str);
channel
.basic_publish(
"",
"django_main_app_response_queue",
BasicPublishOptions::default(),
message.as_bytes().to_vec(),
BasicProperties::default(),
)
.await?;
},
_ => {
println!("[Ign] Unknown message received: {}.", trigger_message)
}
}
channel
.basic_ack(delivery.delivery_tag, BasicAckOptions::default())
.await?
}
}
Ok(())
}
#[get("/hello")]
fn hello() -> &'static str {
"Hello from Rocket! πŸš€"
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment