|
use datafusion::datasource::MemTable; |
|
use datafusion::prelude::*; |
|
use datafusion::logical_plan::JoinType; |
|
use std::sync::Arc; |
|
use std::time::Instant; |
|
use datafusion::arrow::record_batch::RecordBatch; |
|
|
|
const DATASET_DIR: &str = "datasets/100M-dataset/"; |
|
|
|
const BATCH_SIZE: usize = usize::pow(2, 19); |
|
// const BATCH_SIZE: usize = 101000000; |
|
// const BATCH_SIZE: usize = 0; |
|
|
|
async fn tests() -> datafusion::error::Result<()> { |
|
let mut cve_parquet = DATASET_DIR.to_owned(); |
|
let mut computer_names_parquet = DATASET_DIR.to_owned(); |
|
let mut compliance_findings_parquet = DATASET_DIR.to_owned(); |
|
|
|
cve_parquet.push_str("/cvss.parquet"); |
|
computer_names_parquet.push_str("/compnames.parquet"); |
|
|
|
|
|
|
|
let cve_findings_memtable = Arc::new(read_parquet_to_memtable(&cve_parquet, BATCH_SIZE).await); |
|
let computer_names_memtable = Arc::new(read_parquet_to_memtable(&computer_names_parquet, BATCH_SIZE).await); |
|
|
|
let new_ctx = || { |
|
if BATCH_SIZE > 0 { |
|
ExecutionContext::with_config(ExecutionConfig::new().with_batch_size(BATCH_SIZE)) |
|
} else { |
|
ExecutionContext::with_config(ExecutionConfig::new()) |
|
} |
|
}; |
|
|
|
//===================Left Join Benchmark================= |
|
|
|
let mut ctx = new_ctx(); |
|
|
|
ctx.register_table("computer_names", computer_names_memtable.clone())?; |
|
ctx.register_table("cve_findings", cve_findings_memtable.clone())?; |
|
|
|
let computer_names_df = ctx.table("computer_names")?.clone(); |
|
let cve_findings_df = ctx.table("cve_findings")?.clone(); |
|
|
|
let now = Instant::now(); |
|
let cves_and_computer_names = computer_names_df.join( |
|
cve_findings_df, |
|
JoinType::Left, |
|
&["eid"], &["eid"])? |
|
.select(vec![col("eid"), col("compname"), col("cvss")])?; |
|
let _result = cves_and_computer_names.collect().await?; |
|
println!("Execute Time for Left Join: {:.2?}", now.elapsed()); |
|
|
|
|
|
//===================Aggregate Benchmark================= |
|
let mut ctx = new_ctx(); |
|
|
|
ctx.register_table("cve_findings", cve_findings_memtable.clone())?; |
|
let cve_findings_df = ctx.table("cve_findings")?; |
|
|
|
let now = Instant::now(); |
|
let cvss_summed_per_eid_df = cve_findings_df.aggregate(vec![col("eid")], |
|
vec![sum(col("cvss").alias("Sum of CVSS scores"))])?; |
|
let _result = cvss_summed_per_eid_df.collect().await?; |
|
println!("Execute Time for Aggregate: {:.2?}", now.elapsed()); |
|
|
|
|
|
//===================Transform Benchmark=================== |
|
let mut ctx = new_ctx(); |
|
ctx.register_table("cve_findings", cve_findings_memtable.clone())?; |
|
let cve_findings_df = ctx.table("cve_findings")?; |
|
|
|
let now = Instant::now(); |
|
let cvss_times_ten_df = cve_findings_df.select(vec![(col("cvss") * lit(10)).alias("cvss times ten")])?; |
|
let _result = cvss_times_ten_df.collect().await?; |
|
println!("Execute Time for Transform: {:.2?}", now.elapsed()); |
|
|
|
//==================Filtration Benchmark==================== |
|
let mut ctx = new_ctx(); |
|
ctx.register_table("cve_findings", cve_findings_memtable.clone())?; |
|
let cve_findings_df = ctx.table("cve_findings")?; |
|
|
|
let now = Instant::now(); |
|
let cvss_lt_p5 = cve_findings_df.filter(col("cvss").lt(lit(0.5)))?; |
|
let _result = cvss_lt_p5.collect().await?; |
|
println!("Execute Time for Filtration: {:.2?}", now.elapsed()); |
|
Ok(()) |
|
} |
|
|
|
#[tokio::main] |
|
async fn main() -> datafusion::error::Result<()> { |
|
let _ = main2().await?; |
|
Ok(()) |
|
} |
|
|
|
async fn read_parquet_to_memtable(path: &str, batch_size: usize) -> MemTable |
|
{ |
|
// let datafusion do the heavy lifting of grabbing all the individual parquet files |
|
let mut ctx = if batch_size > 0 { |
|
ExecutionContext::with_config(ExecutionConfig::new().with_batch_size(batch_size)) |
|
} else { |
|
ExecutionContext::with_config(ExecutionConfig::new()) |
|
}; |
|
|
|
let records = ctx.read_parquet(path).unwrap().collect().await.unwrap(); |
|
let schema = records[0].schema(); |
|
return MemTable::try_new(schema, vec![records]).expect("try_new memtable"); |
|
} |