Skip to content

Instantly share code, notes, and snippets.

@matsuu
Created June 14, 2022 12:39
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save matsuu/4173646ad88afd2161e3b98d3b393f28 to your computer and use it in GitHub Desktop.
Save matsuu/4173646ad88afd2161e3b98d3b393f28 to your computer and use it in GitHub Desktop.
--- src/main.rs.orig 2022-06-14 21:26:10.771842719 +0900
+++ src/main.rs 2022-06-14 21:28:46.817699137 +0900
@@ -5,7 +5,12 @@
use chrono::{DateTime, NaiveDateTime};
use futures::StreamExt as _;
use futures::TryStreamExt as _;
-use std::collections::{HashMap, HashSet};
+use std::collections::HashMap;
+
+use std::fs::File;
+use std::io::prelude::*;
+
+use tokio::sync::mpsc;
const SESSION_NAME: &str = "isucondition_rust";
const CONDITION_LIMIT: usize = 20;
@@ -95,6 +100,7 @@
timestamp: DateTime<chrono::FixedOffset>,
is_sitting: bool,
condition: String,
+ condition_level: String,
message: String,
created_at: DateTime<chrono::FixedOffset>,
}
@@ -113,6 +119,7 @@
timestamp,
is_sitting: row.try_get("is_sitting")?,
condition: row.try_get("condition")?,
+ condition_level: row.try_get("condition_level")?,
message: row.try_get("message")?,
created_at,
})
@@ -196,7 +203,7 @@
timestamp: i64,
is_sitting: bool,
condition: String,
- condition_level: &'static str,
+ condition_level: String,
message: String,
}
@@ -223,6 +230,15 @@
timestamp: i64,
}
+#[derive(Debug, serde::Deserialize)]
+struct PostIsuCondition {
+ jia_isu_uuid: String,
+ is_sitting: bool,
+ condition: String,
+ message: String,
+ timestamp: i64,
+}
+
#[derive(Debug, serde::Serialize)]
struct JIAServiceRequest<'a> {
target_base_url: &'a str,
@@ -236,7 +252,7 @@
let mysql_connection_env = MySQLConnectionEnv::default();
let pool = sqlx::mysql::MySqlPoolOptions::new()
- .max_connections(10)
+ .max_connections(40)
.after_connect(|conn| {
Box::pin(async move {
use sqlx::Executor as _;
@@ -247,6 +263,8 @@
})
.connect_with(
sqlx::mysql::MySqlConnectOptions::new()
+ .ssl_mode(sqlx::mysql::MySqlSslMode::Disabled)
+ .statement_cache_capacity(100)
.host(&mysql_connection_env.host)
.port(mysql_connection_env.port)
.database(&mysql_connection_env.db_name)
@@ -262,6 +280,57 @@
if session_key.len() < 32 {
session_key.resize(32, 0);
}
+ let (mptx, mut mprx): (
+ mpsc::Sender<PostIsuCondition>,
+ mpsc::Receiver<PostIsuCondition>,
+ ) = mpsc::channel(100);
+
+ let mut thread_pool = pool.acquire().await.unwrap().leak();
+ tokio::spawn(async move {
+ let mut interval = tokio::time::interval(tokio::time::Duration::from_millis(500));
+ let mut conds = Vec::with_capacity(30000);
+ loop {
+ tokio::select! {
+ _ = interval.tick() => {
+ if conds.len() == 0 {
+ continue;
+ }
+ },
+ Some(cond) = mprx.recv() => {
+ conds.push(cond);
+ if conds.len() < 10000 {
+ continue;
+ }
+ },
+ }
+ log::info!("conds.len() is {}", conds.len());
+
+ let mut query_builder: sqlx::QueryBuilder<sqlx::MySql> = sqlx::QueryBuilder::new(
+ "INSERT INTO `isu_condition` (`jia_isu_uuid`, `timestamp`, `is_sitting`, `condition`, `condition_level`, `message`) "
+ );
+ query_builder.push_values(conds.iter(), |mut b, cond| {
+ let timestamp: DateTime<chrono::FixedOffset> = DateTime::from_utc(
+ NaiveDateTime::from_timestamp(cond.timestamp, 0),
+ JST_OFFSET.fix(),
+ );
+
+ if !is_valid_condition_format(&cond.condition) {
+ return;
+ }
+ let condition_level = calculate_condition_level(&cond.condition);
+ b.push_bind(&cond.jia_isu_uuid)
+ .push_bind(timestamp.naive_local())
+ .push_bind(cond.is_sitting)
+ .push_bind(&cond.condition)
+ .push_bind(condition_level)
+ .push_bind(&cond.message);
+ });
+
+ let _ = query_builder.build().execute(&mut thread_pool).await;
+ // query_builder.reset();
+ conds.clear();
+ }
+ });
let server = actix_web::HttpServer::new(move || {
actix_web::App::new()
@@ -273,7 +342,8 @@
}
}))
.app_data(web::Data::new(pool.clone()))
- .wrap(actix_web::middleware::Logger::default())
+ .app_data(web::Data::new(mptx.clone()))
+ // .wrap(actix_web::middleware::Logger::default())
.wrap(
actix_session::CookieSession::signed(&session_key)
.secure(false)
@@ -490,6 +560,38 @@
return Err(actix_web::error::ErrorInternalServerError(""));
}
+ let status = tokio::process::Command::new("rm")
+ .args(&["-rf", "/home/isucon/local/image"])
+ .status()
+ .await
+ .map_err(|e| {
+ log::error!("exec rm error: {}", e);
+ e
+ })?;
+ if !status.success() {
+ log::error!("exec rm failed with exit code {:?}", status.code());
+ return Err(actix_web::error::ErrorInternalServerError(""));
+ }
+
+ let status = tokio::process::Command::new("mkdir")
+ .args(&["/home/isucon/local/image"])
+ .status()
+ .await
+ .map_err(|e| {
+ log::error!("exec mkdir error: {}", e);
+ e
+ })?;
+ if !status.success() {
+ log::error!("exec mkdir failed with exit code {:?}", status.code());
+ return Err(actix_web::error::ErrorInternalServerError(""));
+ }
+
+ sqlx::query(
+ "ALTER TABLE isu_condition ADD COLUMN condition_level VARCHAR(8) NOT NULL DEFAULT '', ADD INDEX(jia_isu_uuid, timestamp)",
+ ).execute(pool.as_ref())
+ .await
+ .map_err(SqlxError)?;
+
sqlx::query(
"INSERT INTO `isu_association_config` (`name`, `url`) VALUES (?, ?) ON DUPLICATE KEY UPDATE `url` = VALUES(`url`)",
)
@@ -498,6 +600,38 @@
.execute(pool.as_ref())
.await
.map_err(SqlxError)?;
+
+ let isu_list: Vec<Isu> = sqlx::query_as("SELECT * FROM isu")
+ .fetch_all(pool.as_ref())
+ .await
+ .map_err(SqlxError)?;
+
+ for isu in isu_list {
+ let imgdir = "/home/isucon/local/image/".to_string();
+ let imgpath = imgdir + &isu.jia_isu_uuid + ".png";
+ let mut img = File::create(imgpath).unwrap();
+ img.write_all(&isu.image).unwrap();
+ img.flush().unwrap();
+ }
+
+ sqlx::query("UPDATE isu SET image = ''")
+ .execute(pool.as_ref())
+ .await
+ .map_err(SqlxError)?;
+
+ let mut rows = sqlx::query_as("SELECT * FROM isu_condition").fetch(pool.as_ref());
+
+ while let Some(row) = rows.next().await {
+ let condition: IsuCondition = row.map_err(SqlxError)?;
+ let condition_level = calculate_condition_level(&condition.condition);
+ sqlx::query("UPDATE isu_condition SET condition_level = ? WHERE id = ?")
+ .bind(condition_level)
+ .bind(condition.id)
+ .execute(pool.as_ref())
+ .await
+ .map_err(SqlxError)?;
+ }
+
Ok(HttpResponse::Ok().json(InitializeResponse {
language: "rust".to_owned(),
}))
@@ -612,7 +746,7 @@
timestamp: last_condition.timestamp.timestamp(),
is_sitting: last_condition.is_sitting,
condition: last_condition.condition,
- condition_level,
+ condition_level: condition_level.to_string(),
message: last_condition.message,
})
} else {
@@ -682,14 +816,20 @@
let mut tx = pool.begin().await.map_err(SqlxError)?;
let result = sqlx::query(
- "INSERT INTO `isu` (`jia_isu_uuid`, `name`, `image`, `jia_user_id`) VALUES (?, ?, ?, ?)",
+ "INSERT INTO `isu` (`jia_isu_uuid`, `name`, `image`, `jia_user_id`) VALUES (?, ?, '', ?)",
)
.bind(&jia_isu_uuid)
.bind(&isu_name)
- .bind(image.as_ref())
.bind(&jia_user_id)
.execute(&mut tx)
.await;
+
+ let imgdir = "/home/isucon/local/image/".to_string();
+ let imgpath = imgdir + &jia_isu_uuid + ".png";
+ let mut img = File::create(imgpath).unwrap();
+ img.write_all(&image).unwrap();
+ img.flush().unwrap();
+
if let Err(sqlx::Error::Database(ref db_error)) = result {
if let Some(mysql_error) = db_error.try_downcast_ref::<sqlx::mysql::MySqlDatabaseError>() {
if mysql_error.number() == MYSQL_ERR_NUM_DUPLICATE_ENTRY {
@@ -793,17 +933,20 @@
) -> actix_web::Result<HttpResponse> {
let jia_user_id = require_signed_in(pool.as_ref(), session).await?;
- let image: Option<Vec<u8>> = sqlx::query_scalar(
- "SELECT `image` FROM `isu` WHERE `jia_user_id` = ? AND `jia_isu_uuid` = ?",
+ let tmp: Option<String> = sqlx::query_scalar(
+ "SELECT `jia_isu_uuid` FROM `isu` WHERE `jia_user_id` = ? AND `jia_isu_uuid` = ?",
)
.bind(&jia_user_id)
.bind(jia_isu_uuid.as_ref())
.fetch_optional(pool.as_ref())
.await
.map_err(SqlxError)?;
+ let redirect_path = "/image/".to_string() + &jia_isu_uuid + ".png";
- if let Some(image) = image {
- Ok(HttpResponse::Ok().body(image))
+ if let Some(_) = tmp {
+ Ok(HttpResponse::Ok()
+ .insert_header(("X-Accel-Redirect", redirect_path))
+ .finish())
} else {
Err(actix_web::error::ErrorNotFound("not found: isu"))
}
@@ -874,11 +1017,14 @@
let mut timestamps_in_this_hour = Vec::new();
let mut start_time_in_this_hour =
DateTime::from_utc(NaiveDateTime::from_timestamp(0, 0), JST_OFFSET.fix());
+ let end_time = graph_date + chrono::Duration::hours(24);
let mut rows = sqlx::query_as(
- "SELECT * FROM `isu_condition` WHERE `jia_isu_uuid` = ? ORDER BY `timestamp` ASC",
+ "SELECT * FROM `isu_condition` WHERE `jia_isu_uuid` = ? AND `timestamp` BETWEEN ? AND ? ORDER BY `timestamp` ASC",
)
.bind(jia_isu_uuid)
+ .bind(start_time_in_this_hour.naive_local())
+ .bind(end_time.naive_local())
.fetch(tx);
while let Some(row) = rows.next().await {
@@ -917,7 +1063,6 @@
});
}
- let end_time = graph_date + chrono::Duration::hours(24);
let mut filtered_data_points = data_points
.into_iter()
.skip_while(|graph| graph.start_at < graph_date)
@@ -1057,9 +1202,9 @@
"missing: condition_level",
));
}
- let mut condition_level = HashSet::new();
+ let mut condition_level: Vec<&str> = vec![];
for level in query.condition_level.as_ref().unwrap().split(',') {
- condition_level.insert(level);
+ condition_level.push(level);
}
let start_time = match &query.start_time {
@@ -1083,7 +1228,7 @@
.await
.map_err(SqlxError)?;
if isu_name.is_none() {
- log::error!("isu not found");
+ // log::error!("isu not found");
return Err(actix_web::error::ErrorNotFound("not found: isu"));
}
let isu_name = isu_name.unwrap();
@@ -1107,43 +1252,85 @@
pool: &sqlx::MySqlPool,
jia_isu_uuid: &str,
end_time: DateTime<chrono::FixedOffset>,
- condition_level: &HashSet<&str>,
+ condition_level: &Vec<&str>,
start_time: &Option<DateTime<chrono::FixedOffset>>,
limit: usize,
isu_name: &str,
) -> sqlx::Result<Vec<GetIsuConditionResponse>> {
let conditions: Vec<IsuCondition> = if let Some(ref start_time) = start_time {
- sqlx::query_as(
- "SELECT * FROM `isu_condition` WHERE `jia_isu_uuid` = ? AND `timestamp` < ? AND ? <= `timestamp` ORDER BY `timestamp` DESC",
- )
+ if condition_level.len() == 3 {
+ sqlx::query_as(
+ "SELECT * FROM `isu_condition` WHERE `jia_isu_uuid` = ? AND `timestamp` < ? AND ? <= `timestamp` ORDER BY `timestamp` DESC LIMIT ?",
+ )
+ .bind(jia_isu_uuid)
+ .bind(end_time.naive_local())
+ .bind(start_time.naive_local())
+ .bind(limit as u32)
+ .fetch_all(pool)
+ } else if condition_level.len() == 2 {
+ sqlx::query_as(
+ "SELECT * FROM `isu_condition` WHERE `jia_isu_uuid` = ? AND `timestamp` < ? AND ? <= `timestamp` AND condition_level IN (?,?) ORDER BY `timestamp` DESC LIMIT ?",
+ )
+ .bind(jia_isu_uuid)
+ .bind(end_time.naive_local())
+ .bind(start_time.naive_local())
+ .bind(condition_level[0])
+ .bind(condition_level[1])
+ .bind(limit as u32)
+ .fetch_all(pool)
+ } else {
+ sqlx::query_as(
+ "SELECT * FROM `isu_condition` WHERE `jia_isu_uuid` = ? AND `timestamp` < ? AND ? <= `timestamp` AND condition_level = ? ORDER BY `timestamp` DESC LIMIT ?",
+ )
+ .bind(jia_isu_uuid)
+ .bind(end_time.naive_local())
+ .bind(start_time.naive_local())
+ .bind(condition_level[0])
+ .bind(limit as u32)
+ .fetch_all(pool)
+ }
+ } else {
+ if condition_level.len() == 3 {
+ sqlx::query_as(
+ "SELECT * FROM `isu_condition` WHERE `jia_isu_uuid` = ? AND `timestamp` < ? ORDER BY `timestamp` DESC LIMIT ?",
+ )
.bind(jia_isu_uuid)
.bind(end_time.naive_local())
- .bind(start_time.naive_local())
+ .bind(limit as u32)
.fetch_all(pool)
- } else {
- sqlx::query_as(
- "SELECT * FROM `isu_condition` WHERE `jia_isu_uuid` = ? AND `timestamp` < ? ORDER BY `timestamp` DESC",
- )
- .bind(jia_isu_uuid)
- .bind(end_time.naive_local())
- .fetch_all(pool)
+ } else if condition_level.len() == 2 {
+ sqlx::query_as(
+ "SELECT * FROM `isu_condition` WHERE `jia_isu_uuid` = ? AND `timestamp` < ? AND condition_level IN (?, ?) ORDER BY `timestamp` DESC LIMIT ?",
+ )
+ .bind(jia_isu_uuid)
+ .bind(end_time.naive_local())
+ .bind(condition_level[0])
+ .bind(condition_level[1])
+ .bind(limit as u32)
+ .fetch_all(pool)
+ } else {
+ sqlx::query_as(
+ "SELECT * FROM `isu_condition` WHERE `jia_isu_uuid` = ? AND `timestamp` < ? AND condition_level = ? ORDER BY `timestamp` DESC LIMIT ?",
+ )
+ .bind(jia_isu_uuid)
+ .bind(end_time.naive_local())
+ .bind(condition_level[0])
+ .bind(limit as u32)
+ .fetch_all(pool)
+ }
}.await?;
let mut conditions_response = Vec::new();
for c in conditions {
- if let Some(c_level) = calculate_condition_level(&c.condition) {
- if condition_level.contains(c_level) {
- conditions_response.push(GetIsuConditionResponse {
- jia_isu_uuid: c.jia_isu_uuid,
- isu_name: isu_name.to_owned(),
- timestamp: c.timestamp.timestamp(),
- is_sitting: c.is_sitting,
- condition: c.condition,
- condition_level: c_level,
- message: c.message,
- });
- }
- }
+ conditions_response.push(GetIsuConditionResponse {
+ jia_isu_uuid: c.jia_isu_uuid,
+ isu_name: isu_name.to_owned(),
+ timestamp: c.timestamp.timestamp(),
+ is_sitting: c.is_sitting,
+ condition: c.condition,
+ condition_level: c.condition_level.to_owned(),
+ message: c.message,
+ });
}
if conditions_response.len() > limit {
@@ -1187,7 +1374,7 @@
let mut character_critical_isu_conditions = Vec::new();
for isu in isu_list {
let conditions: Vec<IsuCondition> = sqlx::query_as(
- "SELECT * FROM `isu_condition` WHERE `jia_isu_uuid` = ? ORDER BY timestamp DESC",
+ "SELECT * FROM `isu_condition` WHERE `jia_isu_uuid` = ? ORDER BY timestamp DESC LIMIT 1",
)
.bind(&isu.jia_isu_uuid)
.fetch_all(pool.as_ref())
@@ -1236,57 +1423,35 @@
#[actix_web::post("/api/condition/{jia_isu_uuid}")]
async fn post_isu_condition(
pool: web::Data<sqlx::MySqlPool>,
+ mptx: web::Data<mpsc::Sender<PostIsuCondition>>,
jia_isu_uuid: web::Path<String>,
req: web::Json<Vec<PostIsuConditionRequest>>,
) -> actix_web::Result<HttpResponse> {
- // TODO: 一定割合リクエストを落としてしのぐようにしたが、本来は全量さばけるようにすべき
- const DROP_PROBABILITY: f64 = 0.9;
- if rand::random::<f64>() <= DROP_PROBABILITY {
- log::warn!("drop post isu condition request");
- return Ok(HttpResponse::Accepted().finish());
- }
-
if req.is_empty() {
return Err(actix_web::error::ErrorBadRequest("bad request body"));
}
- let mut tx = pool.begin().await.map_err(SqlxError)?;
-
- let count: i64 = fetch_one_scalar(
- sqlx::query_scalar("SELECT COUNT(*) FROM `isu` WHERE `jia_isu_uuid` = ?")
- .bind(jia_isu_uuid.as_ref()),
- &mut tx,
- )
- .await
- .map_err(SqlxError)?;
- if count == 0 {
+ let tmp: Option<String> =
+ sqlx::query_scalar("SELECT jia_isu_uuid FROM `isu` WHERE `jia_isu_uuid` = ?")
+ .bind(jia_isu_uuid.as_ref())
+ .fetch_optional(pool.as_ref())
+ .await
+ .map_err(SqlxError)?;
+ if tmp == None {
return Err(actix_web::error::ErrorNotFound("not found: isu"));
}
- for cond in req.iter() {
- let timestamp: DateTime<chrono::FixedOffset> = DateTime::from_utc(
- NaiveDateTime::from_timestamp(cond.timestamp, 0),
- JST_OFFSET.fix(),
- );
-
- if !is_valid_condition_format(&cond.condition) {
- return Err(actix_web::error::ErrorBadRequest("bad request body"));
- }
-
- sqlx::query(
- "INSERT INTO `isu_condition` (`jia_isu_uuid`, `timestamp`, `is_sitting`, `condition`, `message`) VALUES (?, ?, ?, ?, ?)",
- )
- .bind(jia_isu_uuid.as_ref())
- .bind(&timestamp.naive_local())
- .bind(&cond.is_sitting)
- .bind(&cond.condition)
- .bind(&cond.message)
- .execute(&mut tx)
- .await.map_err(SqlxError)?;
+ for c in req.iter() {
+ let cond = PostIsuCondition {
+ jia_isu_uuid: jia_isu_uuid.to_string(),
+ is_sitting: c.is_sitting.clone(),
+ condition: c.condition.clone(),
+ message: c.message.clone(),
+ timestamp: c.timestamp,
+ };
+ let _ = mptx.send(cond).await;
}
- tx.commit().await.map_err(SqlxError)?;
-
Ok(HttpResponse::Accepted().finish())
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment