-
-
Save pythoneer/cac57079d36902782cdcc3f1c051038d to your computer and use it in GitHub Desktop.
meiliSearch Postgres
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use pgx::*; | |
use serde_json::Value; | |
use std::collections::HashMap; | |
use reqwest::header::{HeaderMap, USER_AGENT, HeaderValue, CONTENT_TYPE}; | |
pg_module_magic!(); | |
// extension_sql!(r#" | |
// | |
// create table public.movies ( | |
// movie_id BIGSERIAL primary key, | |
// movie_name text not null, | |
// short_summary text, | |
// long_description text | |
// ); | |
// | |
// insert into public.movies(movie_name, short_summary, long_description) values | |
// ('Harry Potter', 'magic and stuff', 'There is a lot of magic stuff happening in this movie'), | |
// ('Terminator', 'action stuff', 'There is a cyborg sent back to the past to safe cyborgkind from mankind'), | |
// ('Forest Gump', 'Drama about a guy', 'A Drama about a guy that can do everything that he wants'); | |
// | |
// "#); | |
// #[pg_extern] | |
// fn spi_test() { | |
// let query = &format!(r#" | |
// SELECT column_name | |
// FROM information_schema.columns | |
// WHERE table_name = '{}' AND table_schema = '{}' | |
// LIMIT 1 | |
// "#, "movies", "public"); | |
// info!("query: {}", query); | |
// let column_name: Option<String> = Spi::get_one(query);//.expect("could not find the name of the first column"); | |
// | |
// info!("column_name: {:?}", column_name); | |
// } | |
/// ```sql | |
/// CREATE OR REPLACE FUNCTION meili.update_trigger() RETURNS trigger LANGUAGE c AS 'MODULE_PATHNAME', 'update_trigger_wrapper'; | |
/// ``` | |
#[pg_extern] | |
fn update_trigger(fcinfo: pg_sys::FunctionCallInfo) -> pg_sys::Datum { | |
info!("update trigger invoked"); | |
// we can only be called as a trigger | |
if !called_as_trigger(fcinfo) { | |
panic!("not called by trigger manager"); | |
} | |
let trigdata: PgBox<pg_sys::TriggerData> = PgBox::from_pg( | |
unsafe { fcinfo.as_ref() }.expect("fcinfo is NULL").context as *mut pg_sys::TriggerData, | |
); | |
// info!("trigdata: {:#?}", trigdata); | |
// and for this example, we're only going to operate as an ON after INSERT FOR EACH ROW trigger | |
if trigger_fired_after(trigdata.tg_event) | |
&& trigger_fired_by_insert(trigdata.tg_event) | |
&& trigger_fired_for_row(trigdata.tg_event) | |
{ | |
let tupdesc = PgTupleDesc::from_pg_copy(unsafe { trigdata.tg_relation.as_ref() }.unwrap().rd_att); | |
let tuple = PgBox::from_pg(trigdata.tg_trigtuple); | |
let relation = unsafe {PgRelation::from_pg(trigdata.tg_relation)}; | |
let table_name = relation.name(); | |
let namespace = relation.namespace(); | |
let base_url = "http://localhost:7700"; //FIXME(dustin): get the real data | |
// info!("data: {}: {}", table_name, namespace); | |
// FIXME(dustin): assumes primary keys are in the first column of a table, needs accurate way to | |
// query for the actual primary key. It also assumes that that table has at least one column anyway | |
let id = heap_getattr::<i64>(&tuple, 1, &tupdesc).expect("table has not a single column"); | |
let query = &format!(r#" | |
SELECT column_name::Text | |
FROM information_schema.columns | |
WHERE table_name = '{}' AND table_schema = '{}' | |
LIMIT 1 | |
"#, table_name, namespace); | |
info!("query: {}", query); | |
let id_name: String = Spi::get_one(query).expect("could not find the name of the first column"); | |
// let id_name = "movie_id".to_owned(); //FIXME(dustin): get the real data | |
//try to get the json representation of the inserted row | |
let query = &format!(r#" | |
SELECT row_to_json(t) | |
FROM {}.{} as t | |
WHERE t.{} = {} | |
"#, namespace, table_name, id_name, id); | |
// info!("query: {}", query); | |
let json_str: String = Spi::get_one(query).expect("json transform of row failed"); | |
// info!("json: {}", json_str); | |
let client = reqwest::blocking::Client::new(); | |
let mut headers = HeaderMap::new(); | |
headers.insert(USER_AGENT, HeaderValue::from_static("meili_extension")); | |
headers.insert(CONTENT_TYPE, HeaderValue::from_static("application/json")); | |
let resp = client.post(&format!("{}/indexes/{}/documents", base_url, table_name)) | |
.headers(headers) | |
.body(format!("[{}]",json_str)) | |
.send() | |
.expect("error while sending request") | |
.text() | |
.expect("error while parsing body"); | |
info!("resp: {}", resp); | |
// return the inserting tuple, unchanged | |
trigdata.tg_trigtuple as pg_sys::Datum | |
} else { | |
panic!("not fired in the ON after INSERT context"); | |
} | |
} | |
#[pg_extern] | |
fn query(base_url: String, table_name: String, query: String) -> impl Iterator<Item=Option<i64>> { | |
let mut map = HashMap::new(); | |
map.insert("q", query); | |
let client = reqwest::blocking::Client::new(); | |
let resp = client.post(&format!("{}/indexes/{}/search", base_url, table_name)) | |
.json(&map) | |
.send() | |
.expect("error while sending request") | |
.text() | |
.expect("error while parsing body"); | |
// info!("resp: {}", resp); | |
let hits: Value = serde_json::from_str(&resp).expect("response could not be parsed"); | |
let values: &Vec<Value> = hits | |
.get("hits") | |
.expect("no hits entry found") | |
.as_array() | |
.expect("hits is not an array"); | |
values | |
.iter() | |
.map(|value| { | |
Some(value | |
.get("movie_id") | |
.expect("movie has no id") | |
.as_i64() | |
.expect("movie_id is no i64")) | |
}) | |
.collect::<Vec<Option<i64>>>().into_iter() | |
} | |
#[cfg(any(test, feature = "pg_test"))] | |
mod tests { | |
use pgx::*; | |
#[pg_test] | |
fn test_hello_meili_extension() { | |
assert_eq!("Hello, meili_extension", crate::hello_meili_extension()); | |
} | |
} | |
#[cfg(test)] | |
pub mod pg_test { | |
pub fn setup(_options: Vec<&str>) { | |
// perform one-off initialization when the pg_test framework starts | |
} | |
pub fn postgresql_conf_options() -> Vec<&'static str> { | |
// return any postgresql.conf settings that are required for your tests | |
vec![] | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment