Skip to content

Instantly share code, notes, and snippets.

@pythoneer

pythoneer/lib.rs Secret

Created January 7, 2022 07:58
Show Gist options
  • Save pythoneer/cac57079d36902782cdcc3f1c051038d to your computer and use it in GitHub Desktop.
Save pythoneer/cac57079d36902782cdcc3f1c051038d to your computer and use it in GitHub Desktop.
meiliSearch Postgres
use pgx::*;
use serde_json::Value;
use std::collections::HashMap;
use reqwest::header::{HeaderMap, USER_AGENT, HeaderValue, CONTENT_TYPE};
pg_module_magic!();
// extension_sql!(r#"
//
// create table public.movies (
// movie_id BIGSERIAL primary key,
// movie_name text not null,
// short_summary text,
// long_description text
// );
//
// insert into public.movies(movie_name, short_summary, long_description) values
// ('Harry Potter', 'magic and stuff', 'There is a lot of magic stuff happening in this movie'),
// ('Terminator', 'action stuff', 'There is a cyborg sent back to the past to safe cyborgkind from mankind'),
// ('Forest Gump', 'Drama about a guy', 'A Drama about a guy that can do everything that he wants');
//
// "#);
// #[pg_extern]
// fn spi_test() {
// let query = &format!(r#"
// SELECT column_name
// FROM information_schema.columns
// WHERE table_name = '{}' AND table_schema = '{}'
// LIMIT 1
// "#, "movies", "public");
// info!("query: {}", query);
// let column_name: Option<String> = Spi::get_one(query);//.expect("could not find the name of the first column");
//
// info!("column_name: {:?}", column_name);
// }
/// ```sql
/// CREATE OR REPLACE FUNCTION meili.update_trigger() RETURNS trigger LANGUAGE c AS 'MODULE_PATHNAME', 'update_trigger_wrapper';
/// ```
#[pg_extern]
fn update_trigger(fcinfo: pg_sys::FunctionCallInfo) -> pg_sys::Datum {
info!("update trigger invoked");
// we can only be called as a trigger
if !called_as_trigger(fcinfo) {
panic!("not called by trigger manager");
}
let trigdata: PgBox<pg_sys::TriggerData> = PgBox::from_pg(
unsafe { fcinfo.as_ref() }.expect("fcinfo is NULL").context as *mut pg_sys::TriggerData,
);
// info!("trigdata: {:#?}", trigdata);
// and for this example, we're only going to operate as an ON after INSERT FOR EACH ROW trigger
if trigger_fired_after(trigdata.tg_event)
&& trigger_fired_by_insert(trigdata.tg_event)
&& trigger_fired_for_row(trigdata.tg_event)
{
let tupdesc = PgTupleDesc::from_pg_copy(unsafe { trigdata.tg_relation.as_ref() }.unwrap().rd_att);
let tuple = PgBox::from_pg(trigdata.tg_trigtuple);
let relation = unsafe {PgRelation::from_pg(trigdata.tg_relation)};
let table_name = relation.name();
let namespace = relation.namespace();
let base_url = "http://localhost:7700"; //FIXME(dustin): get the real data
// info!("data: {}: {}", table_name, namespace);
// FIXME(dustin): assumes primary keys are in the first column of a table, needs accurate way to
// query for the actual primary key. It also assumes that that table has at least one column anyway
let id = heap_getattr::<i64>(&tuple, 1, &tupdesc).expect("table has not a single column");
let query = &format!(r#"
SELECT column_name::Text
FROM information_schema.columns
WHERE table_name = '{}' AND table_schema = '{}'
LIMIT 1
"#, table_name, namespace);
info!("query: {}", query);
let id_name: String = Spi::get_one(query).expect("could not find the name of the first column");
// let id_name = "movie_id".to_owned(); //FIXME(dustin): get the real data
//try to get the json representation of the inserted row
let query = &format!(r#"
SELECT row_to_json(t)
FROM {}.{} as t
WHERE t.{} = {}
"#, namespace, table_name, id_name, id);
// info!("query: {}", query);
let json_str: String = Spi::get_one(query).expect("json transform of row failed");
// info!("json: {}", json_str);
let client = reqwest::blocking::Client::new();
let mut headers = HeaderMap::new();
headers.insert(USER_AGENT, HeaderValue::from_static("meili_extension"));
headers.insert(CONTENT_TYPE, HeaderValue::from_static("application/json"));
let resp = client.post(&format!("{}/indexes/{}/documents", base_url, table_name))
.headers(headers)
.body(format!("[{}]",json_str))
.send()
.expect("error while sending request")
.text()
.expect("error while parsing body");
info!("resp: {}", resp);
// return the inserting tuple, unchanged
trigdata.tg_trigtuple as pg_sys::Datum
} else {
panic!("not fired in the ON after INSERT context");
}
}
#[pg_extern]
fn query(base_url: String, table_name: String, query: String) -> impl Iterator<Item=Option<i64>> {
let mut map = HashMap::new();
map.insert("q", query);
let client = reqwest::blocking::Client::new();
let resp = client.post(&format!("{}/indexes/{}/search", base_url, table_name))
.json(&map)
.send()
.expect("error while sending request")
.text()
.expect("error while parsing body");
// info!("resp: {}", resp);
let hits: Value = serde_json::from_str(&resp).expect("response could not be parsed");
let values: &Vec<Value> = hits
.get("hits")
.expect("no hits entry found")
.as_array()
.expect("hits is not an array");
values
.iter()
.map(|value| {
Some(value
.get("movie_id")
.expect("movie has no id")
.as_i64()
.expect("movie_id is no i64"))
})
.collect::<Vec<Option<i64>>>().into_iter()
}
#[cfg(any(test, feature = "pg_test"))]
mod tests {
use pgx::*;
#[pg_test]
fn test_hello_meili_extension() {
assert_eq!("Hello, meili_extension", crate::hello_meili_extension());
}
}
#[cfg(test)]
pub mod pg_test {
pub fn setup(_options: Vec<&str>) {
// perform one-off initialization when the pg_test framework starts
}
pub fn postgresql_conf_options() -> Vec<&'static str> {
// return any postgresql.conf settings that are required for your tests
vec![]
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment