-
-
Save JakkuSakura/4bb9678501dbabf56c1b6d95269740aa to your computer and use it in GitHub Desktop.
Price table with GlueSQL benchmark
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// gluesql-derive = "0.1.7" | |
use crate::db::gluesql::utils::decimal_to_ast_expr; | |
use eyre::bail; | |
use gluesql::core::ast::Statement; | |
use gluesql::core::ast_builder::{expr, num, table, text, Build, ExprNode}; | |
use gluesql::core::executor::Payload; | |
use gluesql::core::store::{GStore, GStoreMut}; | |
use gluesql::prelude::Glue; | |
use gluesql_derive::FromGlueSqlRow; | |
pub type TimeStampMs = i64; | |
use rust_decimal::Decimal; | |
use std::sync::atomic::AtomicU64; | |
static INDEX: AtomicU64 = AtomicU64::new(0); | |
#[must_use] | |
fn counter() -> u64 { | |
let counter = INDEX.fetch_add(1, std::sync::atomic::Ordering::Relaxed); | |
counter | |
} | |
/// turn Option<Decimal> into either ast_builder::expr or ast_builder::none | |
pub fn decimal_to_ast_expr(d: Decimal) -> ExprNode<'static> { | |
expr(d.to_string()) | |
} | |
/// create price table | |
pub async fn create_price_table<T: GStore + GStoreMut>( | |
glue: &mut Glue<T>, | |
table_name: &str, | |
with_index: bool, | |
) -> eyre::Result<()> { | |
// hyper, binance and diff are nullable, so we don't store things when it is not available | |
// always select before insert, and when data are missing, either: | |
// - whole row is not present (no binance/hyper price) | |
// - one row is missing (data missing on either side) | |
// only when we have difference data, we are certain (select difference data) | |
// for the moment, use string for index for simplicity | |
let mut sql = format!( | |
" CREATE TABLE IF NOT EXISTS {table_name} ( | |
id UINT64 NOT NULL, | |
symbol TEXT NOT NULL, | |
datetime INT NOT NULL, | |
hyper_bid DECIMAL, | |
hyper_oracle DECIMAL, | |
hyper_mark DECIMAL, | |
binance DECIMAL, | |
difference DECIMAL, | |
bp DECIMAL, | |
); | |
" | |
); | |
if with_index { | |
sql.push_str( | |
format!( | |
" CREATE INDEX IF NOT EXISTS index_{table_name}_datetime ON {table_name} (datetime); | |
", | |
) | |
.as_str(), | |
); | |
} | |
let res = glue.execute(sql.as_str()).await; | |
match res { | |
Err(e) => Err(e.into()), | |
_ => Ok(()), | |
} | |
} | |
#[derive(Debug, Clone, FromGlueSqlRow, Default)] | |
/// row representation of the difference market table | |
pub struct PriceRow { | |
pub id: u64, | |
pub symbol: String, | |
pub binance: Decimal, | |
pub hyper_bid: Decimal, | |
pub hyper_oracle: Decimal, | |
pub hyper_mark: Decimal, | |
pub difference: Decimal, | |
pub bp: Decimal, | |
pub datetime: TimeStampMs, | |
} | |
impl PriceRow { | |
pub fn columns() -> Vec<&'static str> { | |
vec![ | |
"id", | |
"symbol", | |
"datetime", | |
"hyper_bid", | |
"hyper_oracle", | |
"hyper_mark", | |
"binance", | |
"difference", | |
"bp", | |
] | |
} | |
pub fn to_expr(self, index: u64) -> Vec<ExprNode<'static>> { | |
vec![ | |
num(index), | |
text(self.symbol), | |
num(self.datetime), | |
decimal_to_ast_expr(self.hyper_bid), | |
decimal_to_ast_expr(self.hyper_oracle), | |
decimal_to_ast_expr(self.hyper_mark), | |
decimal_to_ast_expr(self.binance), | |
decimal_to_ast_expr(self.difference), | |
decimal_to_ast_expr(self.bp), | |
] | |
} | |
} | |
/// insert price into price table | |
pub async fn insert_price_table<T: GStore + GStoreMut>( | |
glue: &mut Glue<T>, | |
table_name: &str, | |
price_row: PriceRow, | |
) -> eyre::Result<u64> { | |
let index = counter(); | |
let sql = table(table_name) | |
.insert() | |
.columns(PriceRow::columns()) | |
.values(vec![price_row.to_expr(index)]) | |
.build()?; | |
let res = glue.execute_stmt(&sql).await; | |
match res { | |
Ok(Payload::Insert(_)) => Ok(index), | |
Err(e) => bail!("unexpected result {e}"), | |
_ => unreachable!(), | |
} | |
} | |
/// insert price into price table | |
pub async fn insert_iter_price_table<T: GStore + GStoreMut>( | |
glue: &mut Glue<T>, | |
table_name: &str, | |
price_rows: impl IntoIterator<Item = PriceRow>, | |
) -> eyre::Result<u64> { | |
let mut batch = vec![]; | |
for price_row in price_rows { | |
let index = counter(); | |
let row = price_row.to_expr(index); | |
batch.push(row); | |
} | |
let sql = table(table_name) | |
.insert() | |
.columns(PriceRow::columns()) | |
.values(batch) | |
.build()?; | |
let res = glue.execute_stmt(&sql).await; | |
match res { | |
Ok(Payload::Insert(size)) => Ok(size as u64), | |
Err(e) => bail!("unexpected result {e}"), | |
_ => unreachable!(), | |
} | |
} | |
/// set up range filter | |
fn filter_range_expr(from_ms: Option<i64>, until_ms: Option<i64>) -> ExprNode<'static> { | |
match (from_ms, until_ms) { | |
(Some(from), Some(until)) => expr("datetime").gte(num(from)).and(expr("datetime").lte(num(until))), | |
(Some(from), None) => expr("datetime").gte(num(from)), | |
(None, Some(until)) => expr("datetime").lte(num(until)), | |
(None, None) => expr("true"), | |
} | |
} | |
fn select_price_all_stmt( | |
table_name: &str, | |
datetime_from_ms: Option<i64>, | |
datetime_until_ms: Option<i64>, | |
) -> gluesql::prelude::Result<Statement> { | |
let filter = filter_range_expr(datetime_from_ms, datetime_until_ms); | |
table(table_name) | |
.select() | |
.filter(filter) | |
.project("id, symbol, binance, hyper_bid, hyper_oracle, hyper_mark, difference, bp, datetime") | |
.order_by("symbol DESC, datetime DESC") | |
.build() | |
} | |
/// select all symbol prices | |
pub async fn select_price_all<T: GStore + GStoreMut>( | |
glue: &mut Glue<T>, | |
table_name: &str, | |
datetime_from_ms: Option<i64>, | |
datetime_until_ms: Option<i64>, | |
) -> eyre::Result<Vec<PriceRow>> { | |
let sql = select_price_all_stmt(table_name, datetime_from_ms, datetime_until_ms)?; | |
// line below takes 1350ms | |
let res = glue.execute_stmt(&sql).await; | |
match res { | |
Ok(Payload::Select { labels, rows }) => { | |
// line below takes 1350ms | |
let results = PriceRow::from_gluesql_rows(&labels, rows)?; | |
Ok(results) | |
} | |
_ => bail!("wrong format, check selection AST [{res:?}]"), | |
} | |
} | |
fn select_price_by_symbol_stmt( | |
table_name: &str, | |
symbol: &str, | |
datetime_from_ms: Option<i64>, | |
datetime_until_ms: Option<i64>, | |
) -> gluesql::prelude::Result<Statement> { | |
let filter = | |
filter_range_expr(datetime_from_ms, datetime_until_ms).and(expr("symbol").eq(text(symbol.to_string()))); | |
table(table_name) | |
.select() | |
.filter(filter) | |
.project("id, symbol, binance, hyper_bid, hyper_oracle, hyper_mark, difference, bp, datetime") | |
.order_by("datetime DESC") | |
.build() | |
} | |
/// select single symbol prices | |
pub async fn select_price_by_symbol<T: GStore + GStoreMut>( | |
glue: &mut Glue<T>, | |
table_name: &str, | |
symbol: &str, | |
datetime_from_ms: Option<i64>, | |
datetime_until_ms: Option<i64>, | |
) -> eyre::Result<Vec<PriceRow>> { | |
let sql = select_price_by_symbol_stmt(table_name, symbol, datetime_from_ms, datetime_until_ms)?; | |
let res = glue.execute_stmt(&sql).await; | |
match res { | |
Ok(Payload::Select { labels, rows }) => { | |
let results = PriceRow::from_gluesql_rows(&labels, rows)?; | |
Ok(results) | |
} | |
_ => bail!("wrong format, check selection AST [{res:?}]"), | |
} | |
} | |
/// delete price by time range | |
pub async fn delete_price_by_time<T: GStore + GStoreMut>( | |
glue: &mut Glue<T>, | |
table_name: &str, | |
datetime_from_ms: Option<i64>, | |
datetime_until_ms: Option<i64>, | |
) -> eyre::Result<usize> { | |
let sql = table(table_name) | |
.delete() | |
.filter(filter_range_expr(datetime_from_ms, datetime_until_ms)) | |
.build()?; | |
let res = glue.execute_stmt(&sql).await; | |
match res { | |
Ok(Payload::Delete(count)) => Ok(count), | |
_ => bail!("wrong format, check selection AST [{res:?}]"), | |
} | |
} | |
#[cfg(test)] | |
mod tests { | |
use super::*; | |
use gluesql::core::parse_sql::parse; | |
use gluesql::core::translate::translate; | |
use gluesql::prelude::{SharedMemoryStorage, SledStorage}; | |
use gluesql::sled_storage::sled::{Config as SledConfig, Mode}; | |
use tempfile::TempDir; | |
#[tokio::test] | |
async fn test_ast_select_by_symbol() { | |
let actual = select_price_by_symbol_stmt("test_ast_select_by_symbol", "BTC", Some(0), Some(10)).unwrap(); | |
let expected = "SELECT id, symbol, binance, hyper_bid, hyper_oracle, hyper_mark, difference, bp, datetime FROM test_ast_select_by_symbol WHERE datetime >= 0 AND datetime <= 10 AND symbol = 'BTC' ORDER BY datetime DESC"; | |
let expected = translate(&parse(expected).unwrap()[0]).unwrap(); | |
assert_eq!(actual, expected); | |
} | |
#[tokio::test] | |
async fn test_ast_select_all() { | |
let actual = select_price_all_stmt("test_ast_select_all", Some(0), Some(10)).unwrap(); | |
let expected = "SELECT id, symbol, binance, hyper_bid, hyper_oracle, hyper_mark, difference, bp, datetime FROM test_ast_select_all WHERE datetime >= 0 AND datetime <= 10 ORDER BY symbol DESC, datetime DESC"; | |
let expected = translate(&parse(expected).unwrap()[0]).unwrap(); | |
assert_eq!(actual, expected); | |
let actual = select_price_all_stmt("test_ast_select_all", None, None); | |
let expected = "SELECT id, symbol, binance, hyper_bid, hyper_oracle, hyper_mark, difference, bp, datetime FROM test_ast_select_all WHERE true ORDER BY symbol DESC, datetime DESC"; | |
let expected = translate(&parse(expected).unwrap()[0]); | |
assert_eq!(actual, expected); | |
} | |
#[tokio::test] | |
async fn test_ast_multi_filter() { | |
let actual = table("test_ast_multi_filter") | |
.select() | |
.filter("id IS NULL") | |
.filter("id > 10") | |
.filter("id < 20") | |
.build(); | |
let expected = "SELECT * FROM test_ast_multi_filter WHERE id IS NULL AND id > 10 AND id < 20"; | |
let expected = translate(&parse(expected).expect(expected)[0]); | |
assert_eq!(actual, expected); | |
} | |
#[tokio::test] | |
async fn test_ast_multi_order() { | |
// we cannot add multiple order_by() here | |
let actual = table("test_ast_multi_order") | |
.select() | |
.order_by("age ASC, name DESC") | |
.build(); | |
let expected = "SELECT * FROM test_ast_multi_order ORDER BY age ASC, name DESC"; | |
let expected = translate(&parse(expected).expect(expected)[0]); | |
assert_eq!(actual, expected); | |
} | |
#[tokio::test] | |
async fn test_ast_multi_filter_order() { | |
let actual = table("test_ast_filter_order") | |
.select() | |
.filter("id IS NULL") | |
.filter("age > 20") | |
.order_by("name DESC, age ASC") | |
.build(); | |
let expected = "SELECT * FROM test_ast_filter_order WHERE id IS NULL AND age > 20 ORDER BY name DESC, age ASC"; | |
let expected = translate(&parse(expected).expect(expected)[0]); | |
assert_eq!(actual, expected); | |
} | |
#[tokio::test] | |
async fn test_price_table_create() { | |
let table_name = "test_price_table_create"; | |
let shared = SharedMemoryStorage::new(); | |
let mut glue = Glue::new(shared); | |
let res = create_price_table(&mut glue, table_name, false).await; | |
assert!(res.is_ok()); | |
} | |
#[tokio::test] | |
async fn test_price_table_insert_select() { | |
let table_name = "test_price_table_insert_select"; | |
let datetime = 123; | |
let symbol = "BTC"; | |
let price_binance = <Decimal as std::str::FromStr>::from_str("99.5").unwrap(); | |
let price_hyper = <Decimal as std::str::FromStr>::from_str("100").unwrap(); | |
let shared = SharedMemoryStorage::new(); | |
let mut glue = Glue::new(shared); | |
let res = create_price_table(&mut glue, table_name, false).await; | |
assert!(res.is_ok()); | |
// insertion | |
let mut row = PriceRow::default(); | |
row.binance = price_binance; | |
row.hyper_bid = price_hyper; | |
row.hyper_bid = price_hyper; | |
row.datetime = datetime; | |
row.symbol = symbol.to_string(); | |
insert_price_table(&mut glue, table_name, row).await.unwrap(); | |
// selection | |
let res = select_price_by_symbol(&mut glue, table_name, symbol, Some(datetime), None) | |
.await | |
.unwrap(); | |
assert!(res.first().is_some(), "no valid data"); | |
let res = res.first().unwrap(); | |
assert_eq!(res.binance, price_binance); | |
assert_eq!(res.hyper_bid, price_hyper); | |
assert_eq!(res.difference, Decimal::default()); | |
assert_eq!(res.bp, Decimal::default()); | |
} | |
#[tokio::test] | |
async fn test_price_table_insert_select_all() { | |
let table_name = "test_price_table_insert_select_all"; | |
let datetime = 123; | |
let symbol = "BTC"; | |
let price_binance = <Decimal as std::str::FromStr>::from_str("99.5").unwrap(); | |
let price_hyper = <Decimal as std::str::FromStr>::from_str("100").unwrap(); | |
let shared = SharedMemoryStorage::new(); | |
let mut glue = Glue::new(shared); | |
let res = create_price_table(&mut glue, table_name, false).await; | |
assert!(res.is_ok()); | |
// insertion | |
let mut row = PriceRow::default(); | |
row.binance = price_binance; | |
row.hyper_bid = price_hyper; | |
row.hyper_bid = price_hyper; | |
row.datetime = datetime; | |
row.symbol = symbol.to_string(); | |
insert_price_table(&mut glue, table_name, row).await.unwrap(); | |
// selection | |
let res = select_price_all(&mut glue, table_name, Some(datetime), None) | |
.await | |
.unwrap(); | |
assert!(res.first().is_some(), "no valid data"); | |
let res = res.first().unwrap(); | |
assert_eq!(res.binance, price_binance); | |
assert_eq!(res.hyper_bid, price_hyper); | |
assert_eq!(res.difference, Decimal::default()); | |
assert_eq!(res.bp, Decimal::default()); | |
} | |
#[tokio::test] | |
async fn test_price_table_insert_delete() { | |
let table_name = "test_price_table_insert_delete"; | |
let shared = SharedMemoryStorage::new(); | |
let mut glue = Glue::new(shared); | |
let res = create_price_table(&mut glue, table_name, false).await; | |
assert!(res.is_ok()); | |
let total_duration = 100; | |
let offset = 30; | |
let symbol = "coin"; | |
for d in 0..total_duration { | |
let mut row = PriceRow::default(); | |
row.symbol = symbol.to_string(); | |
row.datetime = d + 1; | |
row.binance = d.into(); | |
row.hyper_bid = d.into(); | |
insert_price_table(&mut glue, table_name, row).await.unwrap(); | |
} | |
let res = delete_price_by_time(&mut glue, table_name, None, Some(offset)).await; | |
assert!(res.is_ok()); | |
assert_eq!(res.unwrap(), offset as usize); | |
let res = select_price_by_symbol(&mut glue, table_name, symbol, None, None) | |
.await | |
.unwrap(); | |
assert_eq!(res.len(), (total_duration - offset) as usize); | |
} | |
#[tokio::test] | |
// result is about 1ms | |
async fn test_insert_one_sec_volatile() { | |
let table_name = "test_insert_one_sec_volatile"; | |
let shared = SharedMemoryStorage::new(); | |
let mut glue = Glue::new(shared.clone()); | |
assert!(create_price_table(&mut glue, table_name, false).await.is_ok()); | |
let message_rate = 100; | |
let selection_limit = 3000; | |
let mut max_taken = 0; | |
for m in 0..message_rate { | |
let time_start_ms = lib::utils::get_time_milliseconds(); | |
let mut row = PriceRow::default(); | |
row.symbol = format!("coin_{m}"); | |
row.datetime = 0; | |
row.binance = Decimal::from(m + 1); | |
row.hyper_bid = Decimal::from(m + 1); | |
row.difference = Decimal::from(m + 1); | |
row.bp = Decimal::from(m + 1); | |
insert_price_table(&mut glue, table_name, row).await.unwrap(); | |
let taken = lib::utils::get_time_milliseconds() - time_start_ms; | |
max_taken = max_taken.max(taken); | |
} | |
assert!(max_taken <= selection_limit, "selection took {max_taken}ms"); | |
println!("volatile database insertion for 1s data: {max_taken}ms"); | |
} | |
#[tokio::test] | |
// result is about 1ms | |
async fn test_insert_one_sec_persistent() { | |
let table_name = "test_insert_one_sec_persistent"; | |
let tmp_dir = tempfile::tempdir().unwrap(); | |
let tmp_dir = tmp_dir.path().to_str().unwrap(); | |
let db_filename = format!("{tmp_dir}/db_{table_name}"); | |
let storage_persistent = SledStorage::new(&db_filename).unwrap(); | |
let mut glue = Glue::new(storage_persistent); | |
assert!(create_price_table(&mut glue, table_name, true).await.is_ok()); | |
let message_rate = 100; | |
let selection_limit = 3000; | |
let mut max_taken = 0; | |
for m in 0..message_rate { | |
let time_start_ms = lib::utils::get_time_milliseconds(); | |
let mut row = PriceRow::default(); | |
row.symbol = format!("coin_{m}"); | |
row.datetime = 0; | |
row.binance = Decimal::from(m + 1); | |
row.hyper_bid = Decimal::from(m + 1); | |
row.difference = Decimal::from(m + 1); | |
row.bp = Decimal::from(m + 1); | |
insert_price_table(&mut glue, table_name, row).await.unwrap(); | |
let taken = lib::utils::get_time_milliseconds() - time_start_ms; | |
max_taken = max_taken.max(taken); | |
} | |
assert!(max_taken <= selection_limit, "selection took {max_taken}ms"); | |
println!("persistent database insertion for 1s data: {max_taken}ms"); | |
} | |
/// NOTE below are for the purpose of benchmark. | |
/// Since they take a long time, just disable them for normal unit tests | |
async fn test_price_table_insert_and_select( | |
store: impl GStore + GStoreMut, | |
name: &str, | |
hrs: i32, | |
with_index: bool, | |
only_one_hour: bool, | |
) { | |
let hr_in_sec = 3600; | |
let count_symbol = 1; | |
let table_name = "test_select_one_hour_volatile_shared"; | |
let mut glue = Glue::new(store); | |
create_price_table(&mut glue, table_name, with_index) | |
.await | |
.expect("Failed to create table"); | |
for hr in 0..hrs { | |
let mut rows = vec![]; | |
for i in 0..hr_in_sec { | |
// check if insertion is less than 10ms | |
for m in 0..count_symbol { | |
let mut row = PriceRow::default(); | |
row.symbol = format!("coin_{m}"); | |
row.datetime = (hr + i) as TimeStampMs; | |
row.binance = Decimal::from(m + 1); | |
row.hyper_bid = Decimal::from(m + 1); | |
row.difference = Decimal::from(m + 1); | |
row.bp = Decimal::from(m + 1); | |
rows.push(row); | |
} | |
} | |
// println!("v: {hr}"); | |
insert_iter_price_table(&mut glue, table_name, rows).await.unwrap(); | |
} | |
// check if selection time is less than 3s | |
let time_start_select_ms = lib::utils::get_time_milliseconds(); | |
let time_begin; | |
let time_end; | |
if only_one_hour { | |
time_begin = Some(0); | |
time_end = Some(hr_in_sec as TimeStampMs); | |
} else { | |
time_begin = Some(0); | |
time_end = Some((hrs * hr_in_sec) as TimeStampMs); | |
} | |
select_price_by_symbol(&mut glue, table_name, "coin_0", time_begin, time_end) | |
.await | |
.unwrap(); | |
let taken = lib::utils::get_time_milliseconds() - time_start_select_ms; | |
println!("{name} with {count_symbol} symbols {hrs}hr data: {taken}ms, 1hr={only_one_hour}"); | |
} | |
#[allow(dead_code)] | |
#[tokio::test] | |
// test runs about 10 sec for 1hr data | |
// 1hr data took 700ms, 24hr data took 23000ms | |
async fn test_select_one_hour_volatile_shared() { | |
let memory = SharedMemoryStorage::new(); | |
test_price_table_insert_and_select(memory, "volatile shared database selection", 1, false, false).await; | |
} | |
#[allow(dead_code)] | |
#[tokio::test] | |
// test runs about 10 sec for 1hr data | |
// 1hr data took 700ms, 24hr data took 23000ms | |
async fn test_select_one_day_volatile_shared() { | |
let memory = SharedMemoryStorage::new(); | |
test_price_table_insert_and_select(memory, "volatile shared database selection", 24, false, false).await; | |
} | |
#[allow(dead_code)] | |
#[tokio::test] | |
// test runs about 10 sec for 1hr data | |
// 1hr data took 700ms, 24hr data took 23000ms | |
async fn test_select_one_hour_from_one_day_volatile_shared() { | |
let memory = SharedMemoryStorage::new(); | |
test_price_table_insert_and_select(memory, "volatile shared database selection", 24, false, true).await; | |
} | |
fn create_temp_db(table_name: &str) -> (TempDir, SledStorage) { | |
let tmp_dir = tempfile::tempdir().unwrap(); | |
let db_filename = tmp_dir.path().join(format!("db_{table_name}")); | |
let sled_config = SledConfig::default() | |
.path(db_filename) | |
.mode(Mode::HighThroughput) | |
.use_compression(false) | |
.flush_every_ms(None) | |
.cache_capacity(1024 * 1024 * 1024 * 10 as u64); | |
let storage = SledStorage::try_from(sled_config).unwrap(); | |
(tmp_dir, storage) | |
} | |
#[allow(dead_code)] | |
#[tokio::test] | |
async fn test_select_one_hour_persistent() { | |
let (_dir, db) = create_temp_db("persistent_database_selection_1"); | |
test_price_table_insert_and_select(db, "persistent database selection", 1, true, false).await; | |
} | |
#[allow(dead_code)] | |
#[tokio::test] | |
async fn test_select_one_day_persistent() { | |
let (_dir, db) = create_temp_db("persistent_database_selection_2"); | |
test_price_table_insert_and_select(db, "persistent database selection", 24, true, false).await; | |
} | |
#[allow(dead_code)] | |
#[tokio::test] | |
async fn test_select_one_hour_from_one_day_persistent() { | |
let (_dir, db) = create_temp_db("persistent_database_selection_2"); | |
test_price_table_insert_and_select(db, "persistent database selection", 24, true, true).await; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment