Skip to content

Instantly share code, notes, and snippets.

@Phaqui
Created September 26, 2023 07:15
Show Gist options
  • Save Phaqui/2e85b74a3320d78ec878bf78b200c1e1 to your computer and use it in GitHub Desktop.
Save Phaqui/2e85b74a3320d78ec878bf78b200c1e1 to your computer and use it in GitHub Desktop.
Axum pipeline server
use std::io::{Read, Write};
use tokio;
use cmd_lib::run_fun;
use tower::ServiceBuilder;
use tower_http::cors::CorsLayer;
use flate2::bufread::GzDecoder;
use docx_rs::read_docx;
use base64::{Engine as _, engine::general_purpose};
use serde::Deserialize;
use newline_converter::dos2unix;
use tempfile;
use http::StatusCode;
use axum::{
routing::{get, post},
Router,
extract::{Json, DefaultBodyLimit},
response::{IntoResponse, Html},
};
const UE: StatusCode = StatusCode::UNPROCESSABLE_ENTITY;
const OK: StatusCode = StatusCode::OK;
fn read_docx_text(data: Vec<u8>) -> Option<String> {
let Ok(docx) = read_docx(&data) else { return None; };
let text = docx
.document
.children
.iter()
.filter_map(|doc_child| {
if let docx_rs::DocumentChild::Paragraph(paragraph) = doc_child {
Some(paragraph)
} else {
None
}
})
.map(|paragraph| paragraph.raw_text())
.collect::<Vec<_>>()
.join("\n");
let text = dos2unix(&text).to_string();
Some(text)
}
fn gunzip(data: Vec<u8>) -> Option<Vec<u8>> {
let mut gz = GzDecoder::new(&data[..]);
let mut buf = Vec::new();
match gz.read_to_end(&mut buf) {
Ok(_) => {},
Err(_) => return None,
}
Some(buf)
}
fn run_pipeline(input: String, lang: String) -> String {
// Server paths:
let tokdisamb = "/opt/smi/nob/bin/tokeniser-disamb-gt-desc.pmhfst";
let disambcg = "/opt/smi/nob/bin/disambiguator.cg3";
let dict = format!("/opt/smi/nob/bin/nob{}-all.fst", lang);
// My machine paths:
//let tokdisamb = "/usr/share/giella/nob/tokeniser-disamb-gt-desc.pmhfst";
//let disambcg = "/usr/share/giella/nob/disambiguator.bin";
//let dict = format!("/home/anders/giellalt/lang-nob/nob{}-all.fst", lang);
let Ok(temp_file) = tempfile::NamedTempFile::new() else {
return "Error: cannot create temporary file".into();
};
let path = temp_file.path();
let Ok(_) = temp_file.as_file().write_all(input.as_bytes()) else {
return "Error: could not write to temporary file".into();
};
let cut_delim = "-d\"";
match run_fun!(
cat $path |
hfst-tokenise -cg $tokdisamb |
vislcg3 -g $disambcg |
grep -v "^[:\"]" |
cut $cut_delim -f2 |
uniq |
sort |
uniq -c |
sort -nr |
cut -c9- |
grep -v "[0-9A-ZÆØÅ]" |
grep "[a-zæøå]" |
lookup $dict |
grep "?" |
cut -f1
) {
Ok(s) => s,
Err(e) => {
let msg = format!("{}", e);
eprintln!("Error running pipeline, msg:\n{}", msg);
msg
},
}
}
async fn pipeline(input: String, lang: String) -> String {
tokio::task::spawn_blocking(move || {
run_pipeline(input, lang)
}).await.unwrap_or("failed".to_string())
}
#[derive(Deserialize)]
struct InputBody {
typ: String,
lang: String,
data: String,
}
fn valid_lang(lang: &str) -> bool {
lang == "sma" || lang == "sme" || lang == "fin" || lang == "fkv"
}
async fn upload_endpoint(Json(body): Json<InputBody>) -> impl IntoResponse {
let lang = match body.lang {
lang if valid_lang(&lang) => lang,
_ => return (UE, "lang must be fin, fkv, sme or sma".into()),
};
let typ = match body.typ {
typ if typ == "text" || typ == "docx" => typ.to_owned(),
_ => return (UE, "typ must be text or docx".into()),
};
let Ok(data) = general_purpose::STANDARD.decode(body.data) else {
return (UE, "could not base64 decode data".into());
};
let text = if typ == "text" {
let Some(data) = gunzip(data) else {
return (UE, "failed to gunzip data".into());
};
let Ok(text) = String::from_utf8(data) else {
return (UE, "text not valid utf-8".into());
};
text
} else {
let Some(text) = read_docx_text(data) else {
return (UE, "could not read docx file".into());
};
text
};
(OK, pipeline(text, lang).await)
}
async fn index() -> Html<&'static str> {
Html(include_str!("index.html"))
}
#[tokio::main]
async fn main() {
let app = Router::new()
.route("/", get(index))
.route("/upload", post(upload_endpoint))
.layer(
ServiceBuilder::new()
.layer(CorsLayer::very_permissive())
)
// We'll just let the revproxy decide and reject request
// bodies that are too big
.layer(DefaultBodyLimit::disable());
eprintln!("starting webpipeline");
axum::Server::bind(&"0.0.0.0:3000".parse().unwrap())
.serve(app.into_make_service())
.await
.unwrap();
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment