Skip to content

Instantly share code, notes, and snippets.

@JakkuSakura
Created February 4, 2024 03:55
Show Gist options
  • Save JakkuSakura/4bb9678501dbabf56c1b6d95269740aa to your computer and use it in GitHub Desktop.
Save JakkuSakura/4bb9678501dbabf56c1b6d95269740aa to your computer and use it in GitHub Desktop.
Price table with GlueSQL benchmark
// gluesql-derive = "0.1.7"
use crate::db::gluesql::utils::decimal_to_ast_expr;
use eyre::bail;
use gluesql::core::ast::Statement;
use gluesql::core::ast_builder::{expr, num, table, text, Build, ExprNode};
use gluesql::core::executor::Payload;
use gluesql::core::store::{GStore, GStoreMut};
use gluesql::prelude::Glue;
use gluesql_derive::FromGlueSqlRow;
pub type TimeStampMs = i64;
use rust_decimal::Decimal;
use std::sync::atomic::AtomicU64;
static INDEX: AtomicU64 = AtomicU64::new(0);
#[must_use]
fn counter() -> u64 {
let counter = INDEX.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
counter
}
/// turn Option<Decimal> into either ast_builder::expr or ast_builder::none
pub fn decimal_to_ast_expr(d: Decimal) -> ExprNode<'static> {
expr(d.to_string())
}
/// create price table
pub async fn create_price_table<T: GStore + GStoreMut>(
glue: &mut Glue<T>,
table_name: &str,
with_index: bool,
) -> eyre::Result<()> {
// hyper, binance and diff are nullable, so we don't store things when it is not available
// always select before insert, and when data are missing, either:
// - whole row is not present (no binance/hyper price)
// - one row is missing (data missing on either side)
// only when we have difference data, we are certain (select difference data)
// for the moment, use string for index for simplicity
let mut sql = format!(
" CREATE TABLE IF NOT EXISTS {table_name} (
id UINT64 NOT NULL,
symbol TEXT NOT NULL,
datetime INT NOT NULL,
hyper_bid DECIMAL,
hyper_oracle DECIMAL,
hyper_mark DECIMAL,
binance DECIMAL,
difference DECIMAL,
bp DECIMAL,
);
"
);
if with_index {
sql.push_str(
format!(
" CREATE INDEX IF NOT EXISTS index_{table_name}_datetime ON {table_name} (datetime);
",
)
.as_str(),
);
}
let res = glue.execute(sql.as_str()).await;
match res {
Err(e) => Err(e.into()),
_ => Ok(()),
}
}
#[derive(Debug, Clone, FromGlueSqlRow, Default)]
/// row representation of the difference market table
pub struct PriceRow {
pub id: u64,
pub symbol: String,
pub binance: Decimal,
pub hyper_bid: Decimal,
pub hyper_oracle: Decimal,
pub hyper_mark: Decimal,
pub difference: Decimal,
pub bp: Decimal,
pub datetime: TimeStampMs,
}
impl PriceRow {
pub fn columns() -> Vec<&'static str> {
vec![
"id",
"symbol",
"datetime",
"hyper_bid",
"hyper_oracle",
"hyper_mark",
"binance",
"difference",
"bp",
]
}
pub fn to_expr(self, index: u64) -> Vec<ExprNode<'static>> {
vec![
num(index),
text(self.symbol),
num(self.datetime),
decimal_to_ast_expr(self.hyper_bid),
decimal_to_ast_expr(self.hyper_oracle),
decimal_to_ast_expr(self.hyper_mark),
decimal_to_ast_expr(self.binance),
decimal_to_ast_expr(self.difference),
decimal_to_ast_expr(self.bp),
]
}
}
/// insert price into price table
pub async fn insert_price_table<T: GStore + GStoreMut>(
glue: &mut Glue<T>,
table_name: &str,
price_row: PriceRow,
) -> eyre::Result<u64> {
let index = counter();
let sql = table(table_name)
.insert()
.columns(PriceRow::columns())
.values(vec![price_row.to_expr(index)])
.build()?;
let res = glue.execute_stmt(&sql).await;
match res {
Ok(Payload::Insert(_)) => Ok(index),
Err(e) => bail!("unexpected result {e}"),
_ => unreachable!(),
}
}
/// insert price into price table
pub async fn insert_iter_price_table<T: GStore + GStoreMut>(
glue: &mut Glue<T>,
table_name: &str,
price_rows: impl IntoIterator<Item = PriceRow>,
) -> eyre::Result<u64> {
let mut batch = vec![];
for price_row in price_rows {
let index = counter();
let row = price_row.to_expr(index);
batch.push(row);
}
let sql = table(table_name)
.insert()
.columns(PriceRow::columns())
.values(batch)
.build()?;
let res = glue.execute_stmt(&sql).await;
match res {
Ok(Payload::Insert(size)) => Ok(size as u64),
Err(e) => bail!("unexpected result {e}"),
_ => unreachable!(),
}
}
/// set up range filter
fn filter_range_expr(from_ms: Option<i64>, until_ms: Option<i64>) -> ExprNode<'static> {
match (from_ms, until_ms) {
(Some(from), Some(until)) => expr("datetime").gte(num(from)).and(expr("datetime").lte(num(until))),
(Some(from), None) => expr("datetime").gte(num(from)),
(None, Some(until)) => expr("datetime").lte(num(until)),
(None, None) => expr("true"),
}
}
fn select_price_all_stmt(
table_name: &str,
datetime_from_ms: Option<i64>,
datetime_until_ms: Option<i64>,
) -> gluesql::prelude::Result<Statement> {
let filter = filter_range_expr(datetime_from_ms, datetime_until_ms);
table(table_name)
.select()
.filter(filter)
.project("id, symbol, binance, hyper_bid, hyper_oracle, hyper_mark, difference, bp, datetime")
.order_by("symbol DESC, datetime DESC")
.build()
}
/// select all symbol prices
pub async fn select_price_all<T: GStore + GStoreMut>(
glue: &mut Glue<T>,
table_name: &str,
datetime_from_ms: Option<i64>,
datetime_until_ms: Option<i64>,
) -> eyre::Result<Vec<PriceRow>> {
let sql = select_price_all_stmt(table_name, datetime_from_ms, datetime_until_ms)?;
// line below takes 1350ms
let res = glue.execute_stmt(&sql).await;
match res {
Ok(Payload::Select { labels, rows }) => {
// line below takes 1350ms
let results = PriceRow::from_gluesql_rows(&labels, rows)?;
Ok(results)
}
_ => bail!("wrong format, check selection AST [{res:?}]"),
}
}
fn select_price_by_symbol_stmt(
table_name: &str,
symbol: &str,
datetime_from_ms: Option<i64>,
datetime_until_ms: Option<i64>,
) -> gluesql::prelude::Result<Statement> {
let filter =
filter_range_expr(datetime_from_ms, datetime_until_ms).and(expr("symbol").eq(text(symbol.to_string())));
table(table_name)
.select()
.filter(filter)
.project("id, symbol, binance, hyper_bid, hyper_oracle, hyper_mark, difference, bp, datetime")
.order_by("datetime DESC")
.build()
}
/// select single symbol prices
pub async fn select_price_by_symbol<T: GStore + GStoreMut>(
glue: &mut Glue<T>,
table_name: &str,
symbol: &str,
datetime_from_ms: Option<i64>,
datetime_until_ms: Option<i64>,
) -> eyre::Result<Vec<PriceRow>> {
let sql = select_price_by_symbol_stmt(table_name, symbol, datetime_from_ms, datetime_until_ms)?;
let res = glue.execute_stmt(&sql).await;
match res {
Ok(Payload::Select { labels, rows }) => {
let results = PriceRow::from_gluesql_rows(&labels, rows)?;
Ok(results)
}
_ => bail!("wrong format, check selection AST [{res:?}]"),
}
}
/// delete price by time range
pub async fn delete_price_by_time<T: GStore + GStoreMut>(
glue: &mut Glue<T>,
table_name: &str,
datetime_from_ms: Option<i64>,
datetime_until_ms: Option<i64>,
) -> eyre::Result<usize> {
let sql = table(table_name)
.delete()
.filter(filter_range_expr(datetime_from_ms, datetime_until_ms))
.build()?;
let res = glue.execute_stmt(&sql).await;
match res {
Ok(Payload::Delete(count)) => Ok(count),
_ => bail!("wrong format, check selection AST [{res:?}]"),
}
}
#[cfg(test)]
mod tests {
use super::*;
use gluesql::core::parse_sql::parse;
use gluesql::core::translate::translate;
use gluesql::prelude::{SharedMemoryStorage, SledStorage};
use gluesql::sled_storage::sled::{Config as SledConfig, Mode};
use tempfile::TempDir;
#[tokio::test]
async fn test_ast_select_by_symbol() {
let actual = select_price_by_symbol_stmt("test_ast_select_by_symbol", "BTC", Some(0), Some(10)).unwrap();
let expected = "SELECT id, symbol, binance, hyper_bid, hyper_oracle, hyper_mark, difference, bp, datetime FROM test_ast_select_by_symbol WHERE datetime >= 0 AND datetime <= 10 AND symbol = 'BTC' ORDER BY datetime DESC";
let expected = translate(&parse(expected).unwrap()[0]).unwrap();
assert_eq!(actual, expected);
}
#[tokio::test]
async fn test_ast_select_all() {
let actual = select_price_all_stmt("test_ast_select_all", Some(0), Some(10)).unwrap();
let expected = "SELECT id, symbol, binance, hyper_bid, hyper_oracle, hyper_mark, difference, bp, datetime FROM test_ast_select_all WHERE datetime >= 0 AND datetime <= 10 ORDER BY symbol DESC, datetime DESC";
let expected = translate(&parse(expected).unwrap()[0]).unwrap();
assert_eq!(actual, expected);
let actual = select_price_all_stmt("test_ast_select_all", None, None);
let expected = "SELECT id, symbol, binance, hyper_bid, hyper_oracle, hyper_mark, difference, bp, datetime FROM test_ast_select_all WHERE true ORDER BY symbol DESC, datetime DESC";
let expected = translate(&parse(expected).unwrap()[0]);
assert_eq!(actual, expected);
}
#[tokio::test]
async fn test_ast_multi_filter() {
let actual = table("test_ast_multi_filter")
.select()
.filter("id IS NULL")
.filter("id > 10")
.filter("id < 20")
.build();
let expected = "SELECT * FROM test_ast_multi_filter WHERE id IS NULL AND id > 10 AND id < 20";
let expected = translate(&parse(expected).expect(expected)[0]);
assert_eq!(actual, expected);
}
#[tokio::test]
async fn test_ast_multi_order() {
// we cannot add multiple order_by() here
let actual = table("test_ast_multi_order")
.select()
.order_by("age ASC, name DESC")
.build();
let expected = "SELECT * FROM test_ast_multi_order ORDER BY age ASC, name DESC";
let expected = translate(&parse(expected).expect(expected)[0]);
assert_eq!(actual, expected);
}
#[tokio::test]
async fn test_ast_multi_filter_order() {
let actual = table("test_ast_filter_order")
.select()
.filter("id IS NULL")
.filter("age > 20")
.order_by("name DESC, age ASC")
.build();
let expected = "SELECT * FROM test_ast_filter_order WHERE id IS NULL AND age > 20 ORDER BY name DESC, age ASC";
let expected = translate(&parse(expected).expect(expected)[0]);
assert_eq!(actual, expected);
}
#[tokio::test]
async fn test_price_table_create() {
let table_name = "test_price_table_create";
let shared = SharedMemoryStorage::new();
let mut glue = Glue::new(shared);
let res = create_price_table(&mut glue, table_name, false).await;
assert!(res.is_ok());
}
#[tokio::test]
async fn test_price_table_insert_select() {
let table_name = "test_price_table_insert_select";
let datetime = 123;
let symbol = "BTC";
let price_binance = <Decimal as std::str::FromStr>::from_str("99.5").unwrap();
let price_hyper = <Decimal as std::str::FromStr>::from_str("100").unwrap();
let shared = SharedMemoryStorage::new();
let mut glue = Glue::new(shared);
let res = create_price_table(&mut glue, table_name, false).await;
assert!(res.is_ok());
// insertion
let mut row = PriceRow::default();
row.binance = price_binance;
row.hyper_bid = price_hyper;
row.hyper_bid = price_hyper;
row.datetime = datetime;
row.symbol = symbol.to_string();
insert_price_table(&mut glue, table_name, row).await.unwrap();
// selection
let res = select_price_by_symbol(&mut glue, table_name, symbol, Some(datetime), None)
.await
.unwrap();
assert!(res.first().is_some(), "no valid data");
let res = res.first().unwrap();
assert_eq!(res.binance, price_binance);
assert_eq!(res.hyper_bid, price_hyper);
assert_eq!(res.difference, Decimal::default());
assert_eq!(res.bp, Decimal::default());
}
#[tokio::test]
async fn test_price_table_insert_select_all() {
let table_name = "test_price_table_insert_select_all";
let datetime = 123;
let symbol = "BTC";
let price_binance = <Decimal as std::str::FromStr>::from_str("99.5").unwrap();
let price_hyper = <Decimal as std::str::FromStr>::from_str("100").unwrap();
let shared = SharedMemoryStorage::new();
let mut glue = Glue::new(shared);
let res = create_price_table(&mut glue, table_name, false).await;
assert!(res.is_ok());
// insertion
let mut row = PriceRow::default();
row.binance = price_binance;
row.hyper_bid = price_hyper;
row.hyper_bid = price_hyper;
row.datetime = datetime;
row.symbol = symbol.to_string();
insert_price_table(&mut glue, table_name, row).await.unwrap();
// selection
let res = select_price_all(&mut glue, table_name, Some(datetime), None)
.await
.unwrap();
assert!(res.first().is_some(), "no valid data");
let res = res.first().unwrap();
assert_eq!(res.binance, price_binance);
assert_eq!(res.hyper_bid, price_hyper);
assert_eq!(res.difference, Decimal::default());
assert_eq!(res.bp, Decimal::default());
}
#[tokio::test]
async fn test_price_table_insert_delete() {
let table_name = "test_price_table_insert_delete";
let shared = SharedMemoryStorage::new();
let mut glue = Glue::new(shared);
let res = create_price_table(&mut glue, table_name, false).await;
assert!(res.is_ok());
let total_duration = 100;
let offset = 30;
let symbol = "coin";
for d in 0..total_duration {
let mut row = PriceRow::default();
row.symbol = symbol.to_string();
row.datetime = d + 1;
row.binance = d.into();
row.hyper_bid = d.into();
insert_price_table(&mut glue, table_name, row).await.unwrap();
}
let res = delete_price_by_time(&mut glue, table_name, None, Some(offset)).await;
assert!(res.is_ok());
assert_eq!(res.unwrap(), offset as usize);
let res = select_price_by_symbol(&mut glue, table_name, symbol, None, None)
.await
.unwrap();
assert_eq!(res.len(), (total_duration - offset) as usize);
}
#[tokio::test]
// result is about 1ms
async fn test_insert_one_sec_volatile() {
let table_name = "test_insert_one_sec_volatile";
let shared = SharedMemoryStorage::new();
let mut glue = Glue::new(shared.clone());
assert!(create_price_table(&mut glue, table_name, false).await.is_ok());
let message_rate = 100;
let selection_limit = 3000;
let mut max_taken = 0;
for m in 0..message_rate {
let time_start_ms = lib::utils::get_time_milliseconds();
let mut row = PriceRow::default();
row.symbol = format!("coin_{m}");
row.datetime = 0;
row.binance = Decimal::from(m + 1);
row.hyper_bid = Decimal::from(m + 1);
row.difference = Decimal::from(m + 1);
row.bp = Decimal::from(m + 1);
insert_price_table(&mut glue, table_name, row).await.unwrap();
let taken = lib::utils::get_time_milliseconds() - time_start_ms;
max_taken = max_taken.max(taken);
}
assert!(max_taken <= selection_limit, "selection took {max_taken}ms");
println!("volatile database insertion for 1s data: {max_taken}ms");
}
#[tokio::test]
// result is about 1ms
async fn test_insert_one_sec_persistent() {
let table_name = "test_insert_one_sec_persistent";
let tmp_dir = tempfile::tempdir().unwrap();
let tmp_dir = tmp_dir.path().to_str().unwrap();
let db_filename = format!("{tmp_dir}/db_{table_name}");
let storage_persistent = SledStorage::new(&db_filename).unwrap();
let mut glue = Glue::new(storage_persistent);
assert!(create_price_table(&mut glue, table_name, true).await.is_ok());
let message_rate = 100;
let selection_limit = 3000;
let mut max_taken = 0;
for m in 0..message_rate {
let time_start_ms = lib::utils::get_time_milliseconds();
let mut row = PriceRow::default();
row.symbol = format!("coin_{m}");
row.datetime = 0;
row.binance = Decimal::from(m + 1);
row.hyper_bid = Decimal::from(m + 1);
row.difference = Decimal::from(m + 1);
row.bp = Decimal::from(m + 1);
insert_price_table(&mut glue, table_name, row).await.unwrap();
let taken = lib::utils::get_time_milliseconds() - time_start_ms;
max_taken = max_taken.max(taken);
}
assert!(max_taken <= selection_limit, "selection took {max_taken}ms");
println!("persistent database insertion for 1s data: {max_taken}ms");
}
/// NOTE below are for the purpose of benchmark.
/// Since they take a long time, just disable them for normal unit tests
async fn test_price_table_insert_and_select(
store: impl GStore + GStoreMut,
name: &str,
hrs: i32,
with_index: bool,
only_one_hour: bool,
) {
let hr_in_sec = 3600;
let count_symbol = 1;
let table_name = "test_select_one_hour_volatile_shared";
let mut glue = Glue::new(store);
create_price_table(&mut glue, table_name, with_index)
.await
.expect("Failed to create table");
for hr in 0..hrs {
let mut rows = vec![];
for i in 0..hr_in_sec {
// check if insertion is less than 10ms
for m in 0..count_symbol {
let mut row = PriceRow::default();
row.symbol = format!("coin_{m}");
row.datetime = (hr + i) as TimeStampMs;
row.binance = Decimal::from(m + 1);
row.hyper_bid = Decimal::from(m + 1);
row.difference = Decimal::from(m + 1);
row.bp = Decimal::from(m + 1);
rows.push(row);
}
}
// println!("v: {hr}");
insert_iter_price_table(&mut glue, table_name, rows).await.unwrap();
}
// check if selection time is less than 3s
let time_start_select_ms = lib::utils::get_time_milliseconds();
let time_begin;
let time_end;
if only_one_hour {
time_begin = Some(0);
time_end = Some(hr_in_sec as TimeStampMs);
} else {
time_begin = Some(0);
time_end = Some((hrs * hr_in_sec) as TimeStampMs);
}
select_price_by_symbol(&mut glue, table_name, "coin_0", time_begin, time_end)
.await
.unwrap();
let taken = lib::utils::get_time_milliseconds() - time_start_select_ms;
println!("{name} with {count_symbol} symbols {hrs}hr data: {taken}ms, 1hr={only_one_hour}");
}
#[allow(dead_code)]
#[tokio::test]
// test runs about 10 sec for 1hr data
// 1hr data took 700ms, 24hr data took 23000ms
async fn test_select_one_hour_volatile_shared() {
let memory = SharedMemoryStorage::new();
test_price_table_insert_and_select(memory, "volatile shared database selection", 1, false, false).await;
}
#[allow(dead_code)]
#[tokio::test]
// test runs about 10 sec for 1hr data
// 1hr data took 700ms, 24hr data took 23000ms
async fn test_select_one_day_volatile_shared() {
let memory = SharedMemoryStorage::new();
test_price_table_insert_and_select(memory, "volatile shared database selection", 24, false, false).await;
}
#[allow(dead_code)]
#[tokio::test]
// test runs about 10 sec for 1hr data
// 1hr data took 700ms, 24hr data took 23000ms
async fn test_select_one_hour_from_one_day_volatile_shared() {
let memory = SharedMemoryStorage::new();
test_price_table_insert_and_select(memory, "volatile shared database selection", 24, false, true).await;
}
fn create_temp_db(table_name: &str) -> (TempDir, SledStorage) {
let tmp_dir = tempfile::tempdir().unwrap();
let db_filename = tmp_dir.path().join(format!("db_{table_name}"));
let sled_config = SledConfig::default()
.path(db_filename)
.mode(Mode::HighThroughput)
.use_compression(false)
.flush_every_ms(None)
.cache_capacity(1024 * 1024 * 1024 * 10 as u64);
let storage = SledStorage::try_from(sled_config).unwrap();
(tmp_dir, storage)
}
#[allow(dead_code)]
#[tokio::test]
async fn test_select_one_hour_persistent() {
let (_dir, db) = create_temp_db("persistent_database_selection_1");
test_price_table_insert_and_select(db, "persistent database selection", 1, true, false).await;
}
#[allow(dead_code)]
#[tokio::test]
async fn test_select_one_day_persistent() {
let (_dir, db) = create_temp_db("persistent_database_selection_2");
test_price_table_insert_and_select(db, "persistent database selection", 24, true, false).await;
}
#[allow(dead_code)]
#[tokio::test]
async fn test_select_one_hour_from_one_day_persistent() {
let (_dir, db) = create_temp_db("persistent_database_selection_2");
test_price_table_insert_and_select(db, "persistent database selection", 24, true, true).await;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment