Skip to content

Instantly share code, notes, and snippets.

@ggomagundan
Last active March 13, 2020 10:02
Show Gist options
  • Save ggomagundan/69d53635a5c6ed1cc108b6552c227181 to your computer and use it in GitHub Desktop.
Save ggomagundan/69d53635a5c6ed1cc108b6552c227181 to your computer and use it in GitHub Desktop.
Actix Web
[package]
name = "btoz_video"
version = "0.1.0"
authors = ["Kai <byeongsangp@tripbtoz.com>"]
edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
actix-web = "2.0"
actix-multipart = "0.2.0"
actix-rt = "1.0"
listenfd = "0.3"
dotenv = "0.14.1"
dotenv_codegen = "0.14.1"
futures = "0.3.1"
serde = { version = "1.0.104", features=["derive"] }
serde_json = "1.0"
serde-value = "0.6.0"
chrono = { version = "0.4", features = ["serde"] }
rusoto_core = "0.43.0-beta.1"
rusoto_s3 = "0.43.0-beta.1"
use crate::utils::uploads::save_file;
use crate::utils::uploads::Tmpfile;
use actix_multipart::Field;
use std::io::Write;
#[macro_use]
extern crate dotenv_codegen;
use actix_multipart::Multipart;
use actix_web::{web, App, Error, HttpResponse, HttpServer, Responder};
use futures::StreamExt;
use listenfd::ListenFd;
use std::collections::HashMap;
use serde::Deserialize;
mod routes;
mod utils;
use utils::uploads::UplodFile;
static PORT: &str = dotenv!("SERVER_PORT");
#[derive(Deserialize)]
struct VideoInput {
user_id: String,
sources: String,
}
async fn index(mut payload: Multipart) -> Result<HttpResponse, Error> {
let mut form_data: HashMap<String, String> = HashMap::new();
let mut tmp_file: Tmpfile = Tmpfile::new("", "");
while let Some(item) = payload.next().await {
let mut field: Field = item?;
let content_type = field.content_disposition().unwrap();
let name = content_type.get_name().unwrap();
if name == "file" {
match content_type.get_filename() {
Some(filename) => {
let timestamp = chrono::Utc::now().format("%Y%m%d_%s").to_string();
let saved_filename = format!("{}_{}", timestamp.clone(), filename);
let filepath = format!("./tmp/{}", saved_filename.clone());
// File::create is blocking operation, use threadpool
let mut f = web::block(|| std::fs::File::create(filepath))
.await
.unwrap();
// Field in turn is stream of *Bytes* objectv
while let Some(chunk) = field.next().await {
let data = chunk.unwrap();
// filesystem operations are blocking, we have to use threadpool
f = web::block(move || f.write_all(&data).map(|_| f))
.await
.unwrap();
}
let saved_filepath = format!("./tmp/{}", saved_filename.clone());
tmp_file = Tmpfile::new(&saved_filename, &saved_filepath);
}
None => {
println!("file none");
}
}
} else {
let data = field.next().await;
let wrapped_data = &data.unwrap().unwrap();
let parsed_data = std::str::from_utf8(&wrapped_data).unwrap();
form_data.insert(name.to_string(), format!("{}", parsed_data.clone()));
}
}
println!("data: {:?}", form_data);
println!("file Info: {:?}", tmp_file);
let s3_upload_key = format!("{}_{}/",form_data.get("user_id").unwrap(),chrono::Utc::now().format("%s").to_string() ).to_owned();
//create tmp file and upload s3 and remove tmp file
let upload_files: Vec<UplodFile> = save_file(tmp_file, s3_upload_key).await.unwrap();
println!("upload_files={:#?}", upload_files);
Ok(HttpResponse::Ok().into())
}
async fn index2() -> impl Responder {
HttpResponse::Ok().body("Hello world again!")
}
#[actix_rt::main]
async fn main() -> std::io::Result<()> {
println!("Server Running on {:} Port", PORT);
std::fs::create_dir_all("./tmp").unwrap();
let mut listenfd = ListenFd::from_env();
let mut server = HttpServer::new(|| {
App::new()
.route("/videos", web::post().to(routes::videos::index))
.route("/videos", web::get().to(routes::videos::index2))
});
server = if let Some(l) = listenfd.take_tcp_listener(0).unwrap() {
server.listen(l)?
} else {
server.bind(format!("127.0.0.1:{:}", PORT))?
};
server.run().await
// systemfd --no-pid -s http::8787 -- cargo watch -x run
}
pub mod videos;
use crate::utils::uploads::save_file;
use crate::utils::uploads::UplodFile;
use actix_multipart::Field;
use crate::utils::uploads::Tmpfile;
use futures::StreamExt;
use std::io::Write;
use actix_web::{web, Error, HttpResponse, Responder};
use actix_multipart::Multipart;
use std::collections::HashMap;
pub async fn index(mut payload: Multipart) -> Result<HttpResponse, Error> {
let mut form_data: HashMap<String, String> = HashMap::new();
let mut tmp_file: Tmpfile = Tmpfile::new("", "");
while let Some(item) = payload.next().await {
let mut field: Field = item?;
let content_type = field.content_disposition().unwrap();
let name = content_type.get_name().unwrap();
if name == "file" {
match content_type.get_filename() {
Some(filename) => {
let timestamp = chrono::Utc::now().format("%Y%m%d%s").to_string();
let saved_filename = format!("{}_{}", timestamp.clone(), filename);
let filepath = format!("./tmp/{}", saved_filename.clone());
// File::create is blocking operation, use threadpool
let mut f = web::block(|| std::fs::File::create(filepath))
.await
.unwrap();
// Field in turn is stream of *Bytes* objectv
while let Some(chunk) = field.next().await {
let data = chunk.unwrap();
// filesystem operations are blocking, we have to use threadpool
f = web::block(move || f.write_all(&data).map(|_| f))
.await
.unwrap();
}
let saved_filepath = format!("./tmp/{}", saved_filename.clone());
tmp_file = Tmpfile::new(&saved_filename, &saved_filepath);
}
None => {
println!("file none");
}
}
} else {
let data = field.next().await;
let wrapped_data = &data.unwrap().unwrap();
let parsed_data = std::str::from_utf8(&wrapped_data).unwrap();
form_data.insert(name.to_string(), format!("{}", parsed_data.clone()));
}
}
println!("data: {:?}", form_data);
println!("file Info: {:?}", tmp_file);
let s3_upload_key = format!("{}/{}/{}/",
chrono::Utc::now().format("%Y%m%d").to_string(),
form_data.get("user_id").unwrap(),
chrono::Utc::now().format("%s").to_string()).to_owned();
//create tmp file and upload s3 and remove tmp file
let upload_files: Vec<UplodFile> = save_file(tmp_file, s3_upload_key).await.unwrap();
println!("upload_files={:#?}", upload_files);
Ok(HttpResponse::Ok().json(upload_files))
}
pub async fn index2() -> impl Responder {
HttpResponse::Ok().body("Hello world again!")
}
pub mod s3;
pub mod uploads;
use rusoto_core::{Region};
use rusoto_s3::{DeleteObjectRequest, PutObjectRequest, S3, S3Client};
use std::io::Read;
pub struct Client {
region: Region,
s3: S3Client,
bucket_name: String,
}
impl Client {
// construct S3 testing client
pub fn new() -> Client {
let env_region = dotenv!("AWS_REGION").to_owned();
let region = match env_region.as_ref() {
"ap-northeast-2" => Region::ApNortheast2,
_ => Region::ApNortheast2
};
Client {
region: region.to_owned(),
s3: S3Client::new(region),
bucket_name: dotenv!("UPLOAD_BUCKET_NAME").to_owned(),
}
}
pub fn url(&self, key: &str) -> String {
format!(
"https://{}.s3.{}.amazonaws.com/{}",
dotenv!("UPLOAD_BUCKET_NAME").to_owned(),
dotenv!("AWS_REGION").to_owned(),
key
)
}
pub async fn put_object(&self, localfilepath: &str, key: &str) -> String {
let mut file = std::fs::File::open(localfilepath).unwrap();
let mut contents: Vec<u8> = Vec::new();
file.read_to_end(&mut contents);
let put_request = PutObjectRequest {
bucket: self.bucket_name.to_owned(),
key: key.to_owned(),
body: Some(contents.into()),
..Default::default()
};
let _res = self
.s3
.put_object(put_request)
.await
.expect("Failed to put test object");
self.url(key)
}
pub async fn delete_object(&self, key: String) {
let delete_object_req = DeleteObjectRequest {
bucket: self.bucket_name.to_owned(),
key: key.to_owned(),
..Default::default()
};
let _res = self
.s3
.delete_object(delete_object_req)
.await
.expect("Couldn't delete object");
}
}
use crate::utils::s3::Client;
use actix_web::{Error};
use serde::{Deserialize, Serialize};
use std::convert::From;
#[derive(Deserialize, Serialize, Debug, Clone)]
pub struct UplodFile {
pub filename: String,
pub key: String,
pub url: String,
}
impl From<Tmpfile> for UplodFile {
fn from(tmp_file: Tmpfile) -> Self {
UplodFile {
filename: tmp_file.name,
key: tmp_file.s3_key,
url: tmp_file.s3_url,
}
}
}
#[derive(Debug, Clone)]
pub struct Tmpfile {
pub name: String,
pub tmp_path: String,
pub s3_key: String,
pub s3_url: String,
}
impl Tmpfile {
pub fn new(filename: &str, filepath: &str) -> Tmpfile {
Tmpfile {
name: filename.to_string(),
tmp_path: filepath.to_string(),
s3_key: "".to_string(),
s3_url: "".to_string(),
}
}
async fn s3_upload_and_tmp_remove(&mut self, s3_upload_key: String) {
self.s3_upload(s3_upload_key).await;
self.tmp_remove();
}
async fn s3_upload(&mut self, s3_upload_key: String) {
let key = format!("{}{}", &s3_upload_key, &self.name);
self.s3_key = key.clone();
let url: String = Client::new().put_object(&self.tmp_path, &key.clone()).await;
self.s3_url = url;
}
fn tmp_remove(&self) {
std::fs::remove_file(&self.tmp_path).unwrap();
}
}
pub async fn save_file(
tmp_file: Tmpfile,
s3_upload_key: String,
) -> Result<Vec<UplodFile>, Error> {
let mut arr: Vec<UplodFile> = Vec::new();
let mut tmp_file: Tmpfile = tmp_file.clone();
tmp_file
.s3_upload_and_tmp_remove(s3_upload_key.clone())
.await;
arr.push(UplodFile::from(tmp_file));
Ok(arr)
}
pub async fn delete_object(mut list: Vec<String>) {
for key in list {
Client::new().delete_object(key).await;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment