Created
June 14, 2022 12:39
-
-
Save matsuu/4173646ad88afd2161e3b98d3b393f28 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
--- src/main.rs.orig 2022-06-14 21:26:10.771842719 +0900 | |
+++ src/main.rs 2022-06-14 21:28:46.817699137 +0900 | |
@@ -5,7 +5,12 @@ | |
use chrono::{DateTime, NaiveDateTime}; | |
use futures::StreamExt as _; | |
use futures::TryStreamExt as _; | |
-use std::collections::{HashMap, HashSet}; | |
+use std::collections::HashMap; | |
+ | |
+use std::fs::File; | |
+use std::io::prelude::*; | |
+ | |
+use tokio::sync::mpsc; | |
const SESSION_NAME: &str = "isucondition_rust"; | |
const CONDITION_LIMIT: usize = 20; | |
@@ -95,6 +100,7 @@ | |
timestamp: DateTime<chrono::FixedOffset>, | |
is_sitting: bool, | |
condition: String, | |
+ condition_level: String, | |
message: String, | |
created_at: DateTime<chrono::FixedOffset>, | |
} | |
@@ -113,6 +119,7 @@ | |
timestamp, | |
is_sitting: row.try_get("is_sitting")?, | |
condition: row.try_get("condition")?, | |
+ condition_level: row.try_get("condition_level")?, | |
message: row.try_get("message")?, | |
created_at, | |
}) | |
@@ -196,7 +203,7 @@ | |
timestamp: i64, | |
is_sitting: bool, | |
condition: String, | |
- condition_level: &'static str, | |
+ condition_level: String, | |
message: String, | |
} | |
@@ -223,6 +230,15 @@ | |
timestamp: i64, | |
} | |
+#[derive(Debug, serde::Deserialize)] | |
+struct PostIsuCondition { | |
+ jia_isu_uuid: String, | |
+ is_sitting: bool, | |
+ condition: String, | |
+ message: String, | |
+ timestamp: i64, | |
+} | |
+ | |
#[derive(Debug, serde::Serialize)] | |
struct JIAServiceRequest<'a> { | |
target_base_url: &'a str, | |
@@ -236,7 +252,7 @@ | |
let mysql_connection_env = MySQLConnectionEnv::default(); | |
let pool = sqlx::mysql::MySqlPoolOptions::new() | |
- .max_connections(10) | |
+ .max_connections(40) | |
.after_connect(|conn| { | |
Box::pin(async move { | |
use sqlx::Executor as _; | |
@@ -247,6 +263,8 @@ | |
}) | |
.connect_with( | |
sqlx::mysql::MySqlConnectOptions::new() | |
+ .ssl_mode(sqlx::mysql::MySqlSslMode::Disabled) | |
+ .statement_cache_capacity(100) | |
.host(&mysql_connection_env.host) | |
.port(mysql_connection_env.port) | |
.database(&mysql_connection_env.db_name) | |
@@ -262,6 +280,57 @@ | |
if session_key.len() < 32 { | |
session_key.resize(32, 0); | |
} | |
+ let (mptx, mut mprx): ( | |
+ mpsc::Sender<PostIsuCondition>, | |
+ mpsc::Receiver<PostIsuCondition>, | |
+ ) = mpsc::channel(100); | |
+ | |
+ let mut thread_pool = pool.acquire().await.unwrap().leak(); | |
+ tokio::spawn(async move { | |
+ let mut interval = tokio::time::interval(tokio::time::Duration::from_millis(500)); | |
+ let mut conds = Vec::with_capacity(30000); | |
+ loop { | |
+ tokio::select! { | |
+ _ = interval.tick() => { | |
+ if conds.len() == 0 { | |
+ continue; | |
+ } | |
+ }, | |
+ Some(cond) = mprx.recv() => { | |
+ conds.push(cond); | |
+ if conds.len() < 10000 { | |
+ continue; | |
+ } | |
+ }, | |
+ } | |
+ log::info!("conds.len() is {}", conds.len()); | |
+ | |
+ let mut query_builder: sqlx::QueryBuilder<sqlx::MySql> = sqlx::QueryBuilder::new( | |
+ "INSERT INTO `isu_condition` (`jia_isu_uuid`, `timestamp`, `is_sitting`, `condition`, `condition_level`, `message`) " | |
+ ); | |
+ query_builder.push_values(conds.iter(), |mut b, cond| { | |
+ let timestamp: DateTime<chrono::FixedOffset> = DateTime::from_utc( | |
+ NaiveDateTime::from_timestamp(cond.timestamp, 0), | |
+ JST_OFFSET.fix(), | |
+ ); | |
+ | |
+ if !is_valid_condition_format(&cond.condition) { | |
+ return; | |
+ } | |
+ let condition_level = calculate_condition_level(&cond.condition); | |
+ b.push_bind(&cond.jia_isu_uuid) | |
+ .push_bind(timestamp.naive_local()) | |
+ .push_bind(cond.is_sitting) | |
+ .push_bind(&cond.condition) | |
+ .push_bind(condition_level) | |
+ .push_bind(&cond.message); | |
+ }); | |
+ | |
+ let _ = query_builder.build().execute(&mut thread_pool).await; | |
+ // query_builder.reset(); | |
+ conds.clear(); | |
+ } | |
+ }); | |
let server = actix_web::HttpServer::new(move || { | |
actix_web::App::new() | |
@@ -273,7 +342,8 @@ | |
} | |
})) | |
.app_data(web::Data::new(pool.clone())) | |
- .wrap(actix_web::middleware::Logger::default()) | |
+ .app_data(web::Data::new(mptx.clone())) | |
+ // .wrap(actix_web::middleware::Logger::default()) | |
.wrap( | |
actix_session::CookieSession::signed(&session_key) | |
.secure(false) | |
@@ -490,6 +560,38 @@ | |
return Err(actix_web::error::ErrorInternalServerError("")); | |
} | |
+ let status = tokio::process::Command::new("rm") | |
+ .args(&["-rf", "/home/isucon/local/image"]) | |
+ .status() | |
+ .await | |
+ .map_err(|e| { | |
+ log::error!("exec rm error: {}", e); | |
+ e | |
+ })?; | |
+ if !status.success() { | |
+ log::error!("exec rm failed with exit code {:?}", status.code()); | |
+ return Err(actix_web::error::ErrorInternalServerError("")); | |
+ } | |
+ | |
+ let status = tokio::process::Command::new("mkdir") | |
+ .args(&["/home/isucon/local/image"]) | |
+ .status() | |
+ .await | |
+ .map_err(|e| { | |
+ log::error!("exec mkdir error: {}", e); | |
+ e | |
+ })?; | |
+ if !status.success() { | |
+ log::error!("exec mkdir failed with exit code {:?}", status.code()); | |
+ return Err(actix_web::error::ErrorInternalServerError("")); | |
+ } | |
+ | |
+ sqlx::query( | |
+ "ALTER TABLE isu_condition ADD COLUMN condition_level VARCHAR(8) NOT NULL DEFAULT '', ADD INDEX(jia_isu_uuid, timestamp)", | |
+ ).execute(pool.as_ref()) | |
+ .await | |
+ .map_err(SqlxError)?; | |
+ | |
sqlx::query( | |
"INSERT INTO `isu_association_config` (`name`, `url`) VALUES (?, ?) ON DUPLICATE KEY UPDATE `url` = VALUES(`url`)", | |
) | |
@@ -498,6 +600,38 @@ | |
.execute(pool.as_ref()) | |
.await | |
.map_err(SqlxError)?; | |
+ | |
+ let isu_list: Vec<Isu> = sqlx::query_as("SELECT * FROM isu") | |
+ .fetch_all(pool.as_ref()) | |
+ .await | |
+ .map_err(SqlxError)?; | |
+ | |
+ for isu in isu_list { | |
+ let imgdir = "/home/isucon/local/image/".to_string(); | |
+ let imgpath = imgdir + &isu.jia_isu_uuid + ".png"; | |
+ let mut img = File::create(imgpath).unwrap(); | |
+ img.write_all(&isu.image).unwrap(); | |
+ img.flush().unwrap(); | |
+ } | |
+ | |
+ sqlx::query("UPDATE isu SET image = ''") | |
+ .execute(pool.as_ref()) | |
+ .await | |
+ .map_err(SqlxError)?; | |
+ | |
+ let mut rows = sqlx::query_as("SELECT * FROM isu_condition").fetch(pool.as_ref()); | |
+ | |
+ while let Some(row) = rows.next().await { | |
+ let condition: IsuCondition = row.map_err(SqlxError)?; | |
+ let condition_level = calculate_condition_level(&condition.condition); | |
+ sqlx::query("UPDATE isu_condition SET condition_level = ? WHERE id = ?") | |
+ .bind(condition_level) | |
+ .bind(condition.id) | |
+ .execute(pool.as_ref()) | |
+ .await | |
+ .map_err(SqlxError)?; | |
+ } | |
+ | |
Ok(HttpResponse::Ok().json(InitializeResponse { | |
language: "rust".to_owned(), | |
})) | |
@@ -612,7 +746,7 @@ | |
timestamp: last_condition.timestamp.timestamp(), | |
is_sitting: last_condition.is_sitting, | |
condition: last_condition.condition, | |
- condition_level, | |
+ condition_level: condition_level.to_string(), | |
message: last_condition.message, | |
}) | |
} else { | |
@@ -682,14 +816,20 @@ | |
let mut tx = pool.begin().await.map_err(SqlxError)?; | |
let result = sqlx::query( | |
- "INSERT INTO `isu` (`jia_isu_uuid`, `name`, `image`, `jia_user_id`) VALUES (?, ?, ?, ?)", | |
+ "INSERT INTO `isu` (`jia_isu_uuid`, `name`, `image`, `jia_user_id`) VALUES (?, ?, '', ?)", | |
) | |
.bind(&jia_isu_uuid) | |
.bind(&isu_name) | |
- .bind(image.as_ref()) | |
.bind(&jia_user_id) | |
.execute(&mut tx) | |
.await; | |
+ | |
+ let imgdir = "/home/isucon/local/image/".to_string(); | |
+ let imgpath = imgdir + &jia_isu_uuid + ".png"; | |
+ let mut img = File::create(imgpath).unwrap(); | |
+ img.write_all(&image).unwrap(); | |
+ img.flush().unwrap(); | |
+ | |
if let Err(sqlx::Error::Database(ref db_error)) = result { | |
if let Some(mysql_error) = db_error.try_downcast_ref::<sqlx::mysql::MySqlDatabaseError>() { | |
if mysql_error.number() == MYSQL_ERR_NUM_DUPLICATE_ENTRY { | |
@@ -793,17 +933,20 @@ | |
) -> actix_web::Result<HttpResponse> { | |
let jia_user_id = require_signed_in(pool.as_ref(), session).await?; | |
- let image: Option<Vec<u8>> = sqlx::query_scalar( | |
- "SELECT `image` FROM `isu` WHERE `jia_user_id` = ? AND `jia_isu_uuid` = ?", | |
+ let tmp: Option<String> = sqlx::query_scalar( | |
+ "SELECT `jia_isu_uuid` FROM `isu` WHERE `jia_user_id` = ? AND `jia_isu_uuid` = ?", | |
) | |
.bind(&jia_user_id) | |
.bind(jia_isu_uuid.as_ref()) | |
.fetch_optional(pool.as_ref()) | |
.await | |
.map_err(SqlxError)?; | |
+ let redirect_path = "/image/".to_string() + &jia_isu_uuid + ".png"; | |
- if let Some(image) = image { | |
- Ok(HttpResponse::Ok().body(image)) | |
+ if let Some(_) = tmp { | |
+ Ok(HttpResponse::Ok() | |
+ .insert_header(("X-Accel-Redirect", redirect_path)) | |
+ .finish()) | |
} else { | |
Err(actix_web::error::ErrorNotFound("not found: isu")) | |
} | |
@@ -874,11 +1017,14 @@ | |
let mut timestamps_in_this_hour = Vec::new(); | |
let mut start_time_in_this_hour = | |
DateTime::from_utc(NaiveDateTime::from_timestamp(0, 0), JST_OFFSET.fix()); | |
+ let end_time = graph_date + chrono::Duration::hours(24); | |
let mut rows = sqlx::query_as( | |
- "SELECT * FROM `isu_condition` WHERE `jia_isu_uuid` = ? ORDER BY `timestamp` ASC", | |
+ "SELECT * FROM `isu_condition` WHERE `jia_isu_uuid` = ? AND `timestamp` BETWEEN ? AND ? ORDER BY `timestamp` ASC", | |
) | |
.bind(jia_isu_uuid) | |
+ .bind(start_time_in_this_hour.naive_local()) | |
+ .bind(end_time.naive_local()) | |
.fetch(tx); | |
while let Some(row) = rows.next().await { | |
@@ -917,7 +1063,6 @@ | |
}); | |
} | |
- let end_time = graph_date + chrono::Duration::hours(24); | |
let mut filtered_data_points = data_points | |
.into_iter() | |
.skip_while(|graph| graph.start_at < graph_date) | |
@@ -1057,9 +1202,9 @@ | |
"missing: condition_level", | |
)); | |
} | |
- let mut condition_level = HashSet::new(); | |
+ let mut condition_level: Vec<&str> = vec![]; | |
for level in query.condition_level.as_ref().unwrap().split(',') { | |
- condition_level.insert(level); | |
+ condition_level.push(level); | |
} | |
let start_time = match &query.start_time { | |
@@ -1083,7 +1228,7 @@ | |
.await | |
.map_err(SqlxError)?; | |
if isu_name.is_none() { | |
- log::error!("isu not found"); | |
+ // log::error!("isu not found"); | |
return Err(actix_web::error::ErrorNotFound("not found: isu")); | |
} | |
let isu_name = isu_name.unwrap(); | |
@@ -1107,43 +1252,85 @@ | |
pool: &sqlx::MySqlPool, | |
jia_isu_uuid: &str, | |
end_time: DateTime<chrono::FixedOffset>, | |
- condition_level: &HashSet<&str>, | |
+ condition_level: &Vec<&str>, | |
start_time: &Option<DateTime<chrono::FixedOffset>>, | |
limit: usize, | |
isu_name: &str, | |
) -> sqlx::Result<Vec<GetIsuConditionResponse>> { | |
let conditions: Vec<IsuCondition> = if let Some(ref start_time) = start_time { | |
- sqlx::query_as( | |
- "SELECT * FROM `isu_condition` WHERE `jia_isu_uuid` = ? AND `timestamp` < ? AND ? <= `timestamp` ORDER BY `timestamp` DESC", | |
- ) | |
+ if condition_level.len() == 3 { | |
+ sqlx::query_as( | |
+ "SELECT * FROM `isu_condition` WHERE `jia_isu_uuid` = ? AND `timestamp` < ? AND ? <= `timestamp` ORDER BY `timestamp` DESC LIMIT ?", | |
+ ) | |
+ .bind(jia_isu_uuid) | |
+ .bind(end_time.naive_local()) | |
+ .bind(start_time.naive_local()) | |
+ .bind(limit as u32) | |
+ .fetch_all(pool) | |
+ } else if condition_level.len() == 2 { | |
+ sqlx::query_as( | |
+ "SELECT * FROM `isu_condition` WHERE `jia_isu_uuid` = ? AND `timestamp` < ? AND ? <= `timestamp` AND condition_level IN (?,?) ORDER BY `timestamp` DESC LIMIT ?", | |
+ ) | |
+ .bind(jia_isu_uuid) | |
+ .bind(end_time.naive_local()) | |
+ .bind(start_time.naive_local()) | |
+ .bind(condition_level[0]) | |
+ .bind(condition_level[1]) | |
+ .bind(limit as u32) | |
+ .fetch_all(pool) | |
+ } else { | |
+ sqlx::query_as( | |
+ "SELECT * FROM `isu_condition` WHERE `jia_isu_uuid` = ? AND `timestamp` < ? AND ? <= `timestamp` AND condition_level = ? ORDER BY `timestamp` DESC LIMIT ?", | |
+ ) | |
+ .bind(jia_isu_uuid) | |
+ .bind(end_time.naive_local()) | |
+ .bind(start_time.naive_local()) | |
+ .bind(condition_level[0]) | |
+ .bind(limit as u32) | |
+ .fetch_all(pool) | |
+ } | |
+ } else { | |
+ if condition_level.len() == 3 { | |
+ sqlx::query_as( | |
+ "SELECT * FROM `isu_condition` WHERE `jia_isu_uuid` = ? AND `timestamp` < ? ORDER BY `timestamp` DESC LIMIT ?", | |
+ ) | |
.bind(jia_isu_uuid) | |
.bind(end_time.naive_local()) | |
- .bind(start_time.naive_local()) | |
+ .bind(limit as u32) | |
.fetch_all(pool) | |
- } else { | |
- sqlx::query_as( | |
- "SELECT * FROM `isu_condition` WHERE `jia_isu_uuid` = ? AND `timestamp` < ? ORDER BY `timestamp` DESC", | |
- ) | |
- .bind(jia_isu_uuid) | |
- .bind(end_time.naive_local()) | |
- .fetch_all(pool) | |
+ } else if condition_level.len() == 2 { | |
+ sqlx::query_as( | |
+ "SELECT * FROM `isu_condition` WHERE `jia_isu_uuid` = ? AND `timestamp` < ? AND condition_level IN (?, ?) ORDER BY `timestamp` DESC LIMIT ?", | |
+ ) | |
+ .bind(jia_isu_uuid) | |
+ .bind(end_time.naive_local()) | |
+ .bind(condition_level[0]) | |
+ .bind(condition_level[1]) | |
+ .bind(limit as u32) | |
+ .fetch_all(pool) | |
+ } else { | |
+ sqlx::query_as( | |
+ "SELECT * FROM `isu_condition` WHERE `jia_isu_uuid` = ? AND `timestamp` < ? AND condition_level = ? ORDER BY `timestamp` DESC LIMIT ?", | |
+ ) | |
+ .bind(jia_isu_uuid) | |
+ .bind(end_time.naive_local()) | |
+ .bind(condition_level[0]) | |
+ .bind(limit as u32) | |
+ .fetch_all(pool) | |
+ } | |
}.await?; | |
let mut conditions_response = Vec::new(); | |
for c in conditions { | |
- if let Some(c_level) = calculate_condition_level(&c.condition) { | |
- if condition_level.contains(c_level) { | |
- conditions_response.push(GetIsuConditionResponse { | |
- jia_isu_uuid: c.jia_isu_uuid, | |
- isu_name: isu_name.to_owned(), | |
- timestamp: c.timestamp.timestamp(), | |
- is_sitting: c.is_sitting, | |
- condition: c.condition, | |
- condition_level: c_level, | |
- message: c.message, | |
- }); | |
- } | |
- } | |
+ conditions_response.push(GetIsuConditionResponse { | |
+ jia_isu_uuid: c.jia_isu_uuid, | |
+ isu_name: isu_name.to_owned(), | |
+ timestamp: c.timestamp.timestamp(), | |
+ is_sitting: c.is_sitting, | |
+ condition: c.condition, | |
+ condition_level: c.condition_level.to_owned(), | |
+ message: c.message, | |
+ }); | |
} | |
if conditions_response.len() > limit { | |
@@ -1187,7 +1374,7 @@ | |
let mut character_critical_isu_conditions = Vec::new(); | |
for isu in isu_list { | |
let conditions: Vec<IsuCondition> = sqlx::query_as( | |
- "SELECT * FROM `isu_condition` WHERE `jia_isu_uuid` = ? ORDER BY timestamp DESC", | |
+ "SELECT * FROM `isu_condition` WHERE `jia_isu_uuid` = ? ORDER BY timestamp DESC LIMIT 1", | |
) | |
.bind(&isu.jia_isu_uuid) | |
.fetch_all(pool.as_ref()) | |
@@ -1236,57 +1423,35 @@ | |
#[actix_web::post("/api/condition/{jia_isu_uuid}")] | |
async fn post_isu_condition( | |
pool: web::Data<sqlx::MySqlPool>, | |
+ mptx: web::Data<mpsc::Sender<PostIsuCondition>>, | |
jia_isu_uuid: web::Path<String>, | |
req: web::Json<Vec<PostIsuConditionRequest>>, | |
) -> actix_web::Result<HttpResponse> { | |
- // TODO: 一定割合リクエストを落としてしのぐようにしたが、本来は全量さばけるようにすべき | |
- const DROP_PROBABILITY: f64 = 0.9; | |
- if rand::random::<f64>() <= DROP_PROBABILITY { | |
- log::warn!("drop post isu condition request"); | |
- return Ok(HttpResponse::Accepted().finish()); | |
- } | |
- | |
if req.is_empty() { | |
return Err(actix_web::error::ErrorBadRequest("bad request body")); | |
} | |
- let mut tx = pool.begin().await.map_err(SqlxError)?; | |
- | |
- let count: i64 = fetch_one_scalar( | |
- sqlx::query_scalar("SELECT COUNT(*) FROM `isu` WHERE `jia_isu_uuid` = ?") | |
- .bind(jia_isu_uuid.as_ref()), | |
- &mut tx, | |
- ) | |
- .await | |
- .map_err(SqlxError)?; | |
- if count == 0 { | |
+ let tmp: Option<String> = | |
+ sqlx::query_scalar("SELECT jia_isu_uuid FROM `isu` WHERE `jia_isu_uuid` = ?") | |
+ .bind(jia_isu_uuid.as_ref()) | |
+ .fetch_optional(pool.as_ref()) | |
+ .await | |
+ .map_err(SqlxError)?; | |
+ if tmp == None { | |
return Err(actix_web::error::ErrorNotFound("not found: isu")); | |
} | |
- for cond in req.iter() { | |
- let timestamp: DateTime<chrono::FixedOffset> = DateTime::from_utc( | |
- NaiveDateTime::from_timestamp(cond.timestamp, 0), | |
- JST_OFFSET.fix(), | |
- ); | |
- | |
- if !is_valid_condition_format(&cond.condition) { | |
- return Err(actix_web::error::ErrorBadRequest("bad request body")); | |
- } | |
- | |
- sqlx::query( | |
- "INSERT INTO `isu_condition` (`jia_isu_uuid`, `timestamp`, `is_sitting`, `condition`, `message`) VALUES (?, ?, ?, ?, ?)", | |
- ) | |
- .bind(jia_isu_uuid.as_ref()) | |
- .bind(×tamp.naive_local()) | |
- .bind(&cond.is_sitting) | |
- .bind(&cond.condition) | |
- .bind(&cond.message) | |
- .execute(&mut tx) | |
- .await.map_err(SqlxError)?; | |
+ for c in req.iter() { | |
+ let cond = PostIsuCondition { | |
+ jia_isu_uuid: jia_isu_uuid.to_string(), | |
+ is_sitting: c.is_sitting.clone(), | |
+ condition: c.condition.clone(), | |
+ message: c.message.clone(), | |
+ timestamp: c.timestamp, | |
+ }; | |
+ let _ = mptx.send(cond).await; | |
} | |
- tx.commit().await.map_err(SqlxError)?; | |
- | |
Ok(HttpResponse::Accepted().finish()) | |
} | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment