Skip to content

Instantly share code, notes, and snippets.

@amustafa
Last active May 19, 2021 18:26
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save amustafa/519bfb0f254ea4f9250c235ffb65b09b to your computer and use it in GitHub Desktop.
Save amustafa/519bfb0f254ea4f9250c235ffb65b09b to your computer and use it in GitHub Desktop.
Datafusion Performance Example
[package]
name = "datafusion_playground"
version = "0.1.0"
authors = [""]
edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
arrow = { git = "https://github.com/apache/arrow-rs", branch = "master" }
tokio = "^1.5.0"
rand = "^0.8.3"
csv = "^1.1.6"
glob = "^0.3.0"
# datafusion has unreleased bug fixes on joins that are necessary. Needs custom version of arrow to function
datafusion = { git = "https://github.com/apache/arrow-datafusion", branch = "master" }

100 Million Rows

Query Engine Dataset Join Transform Filter Aggregate/Groupby Max Mem Rough CPU cpu usage
DataFusion 100M 1 <1 <1 32 8.8GB 1100%
DataFusion (batch = 4194304) 100M 2.6 0.27 1.5 10.5 10.6 GB 700% 36.41 real 255.69 user 35.41 sys
Julia/DataFrames (mean[stddev]) 100M 0.77 0.57 0.49 6.10 9.2GB ~100%

1 Billion Rows

Query Engine Dataset Join Transform Filter Aggregate/Groupby Max Mem Rough CPU cpu usage
DataFUsion (batch = 4194304) 1B 87 29 26 266 25.8 GB 1100% 783.84 real 5439.11 user 998.59 sys
Julia/DataFrames (mean[stddev]) 1B 16 22 15 298 25.4GB ~100%
use datafusion::datasource::MemTable;
use datafusion::prelude::*;
use datafusion::logical_plan::JoinType;
use std::sync::Arc;
use std::time::Instant;
use datafusion::arrow::record_batch::RecordBatch;
const DATASET_DIR: &str = "datasets/100M-dataset/";
const BATCH_SIZE: usize = usize::pow(2, 19);
// const BATCH_SIZE: usize = 101000000;
// const BATCH_SIZE: usize = 0;
async fn tests() -> datafusion::error::Result<()> {
let mut cve_parquet = DATASET_DIR.to_owned();
let mut computer_names_parquet = DATASET_DIR.to_owned();
let mut compliance_findings_parquet = DATASET_DIR.to_owned();
cve_parquet.push_str("/cvss.parquet");
computer_names_parquet.push_str("/compnames.parquet");
let cve_findings_memtable = Arc::new(read_parquet_to_memtable(&cve_parquet, BATCH_SIZE).await);
let computer_names_memtable = Arc::new(read_parquet_to_memtable(&computer_names_parquet, BATCH_SIZE).await);
let new_ctx = || {
if BATCH_SIZE > 0 {
ExecutionContext::with_config(ExecutionConfig::new().with_batch_size(BATCH_SIZE))
} else {
ExecutionContext::with_config(ExecutionConfig::new())
}
};
//===================Left Join Benchmark=================
let mut ctx = new_ctx();
ctx.register_table("computer_names", computer_names_memtable.clone())?;
ctx.register_table("cve_findings", cve_findings_memtable.clone())?;
let computer_names_df = ctx.table("computer_names")?.clone();
let cve_findings_df = ctx.table("cve_findings")?.clone();
let now = Instant::now();
let cves_and_computer_names = computer_names_df.join(
cve_findings_df,
JoinType::Left,
&["eid"], &["eid"])?
.select(vec![col("eid"), col("compname"), col("cvss")])?;
let _result = cves_and_computer_names.collect().await?;
println!("Execute Time for Left Join: {:.2?}", now.elapsed());
//===================Aggregate Benchmark=================
let mut ctx = new_ctx();
ctx.register_table("cve_findings", cve_findings_memtable.clone())?;
let cve_findings_df = ctx.table("cve_findings")?;
let now = Instant::now();
let cvss_summed_per_eid_df = cve_findings_df.aggregate(vec![col("eid")],
vec![sum(col("cvss").alias("Sum of CVSS scores"))])?;
let _result = cvss_summed_per_eid_df.collect().await?;
println!("Execute Time for Aggregate: {:.2?}", now.elapsed());
//===================Transform Benchmark===================
let mut ctx = new_ctx();
ctx.register_table("cve_findings", cve_findings_memtable.clone())?;
let cve_findings_df = ctx.table("cve_findings")?;
let now = Instant::now();
let cvss_times_ten_df = cve_findings_df.select(vec![(col("cvss") * lit(10)).alias("cvss times ten")])?;
let _result = cvss_times_ten_df.collect().await?;
println!("Execute Time for Transform: {:.2?}", now.elapsed());
//==================Filtration Benchmark====================
let mut ctx = new_ctx();
ctx.register_table("cve_findings", cve_findings_memtable.clone())?;
let cve_findings_df = ctx.table("cve_findings")?;
let now = Instant::now();
let cvss_lt_p5 = cve_findings_df.filter(col("cvss").lt(lit(0.5)))?;
let _result = cvss_lt_p5.collect().await?;
println!("Execute Time for Filtration: {:.2?}", now.elapsed());
Ok(())
}
#[tokio::main]
async fn main() -> datafusion::error::Result<()> {
let _ = main2().await?;
Ok(())
}
async fn read_parquet_to_memtable(path: &str, batch_size: usize) -> MemTable
{
// let datafusion do the heavy lifting of grabbing all the individual parquet files
let mut ctx = if batch_size > 0 {
ExecutionContext::with_config(ExecutionConfig::new().with_batch_size(batch_size))
} else {
ExecutionContext::with_config(ExecutionConfig::new())
};
let records = ctx.read_parquet(path).unwrap().collect().await.unwrap();
let schema = records[0].schema();
return MemTable::try_new(schema, vec![records]).expect("try_new memtable");
}
println("Start me up!")
using DataFrames
using CSV
using Profile
wd = pwd()
println(wd)
inputPath = "datasets/100m-dataset/csvs"
nameFile = joinpath(inputPath, "compnames.csv");
cvssFile = joinpath(inputPath, "cvss.csv");
correctnessCSVOut = joinpath(inputPath, "correctness-output.csv");
# Everything below here is a bit messy / was adjusted for running benchmarks ... at the expense of
# readability.
# Run it `numIters` times (+1 to get JAOT compilation; ignore first run).
numIters = 5
function main(mode)
println("running in ", mode, " mode")
if "leftjoin" == mode
println("Loading data ...")
(findingsData, nameData) = (DataFrame(CSV.File(inputFile)), DataFrame(CSV.File(nameFile)))
println("Done.")
println("Benchmarking ...")
for _ = 1:numIters + 1
@time leftjoin(findingsData, nameData, on = :eid)
GC.gc(true)
end
println("Done.")
# Free the RAMs!
findingsData = nothing
nameData = nothing
elseif "transform" == mode
println("Loading data ...")
cvssData = DataFrame(CSV.File(cvssFile))
println("Done.")
println("Benchmarking ...")
for _ = 1:numIters + 1
@time DataFrame(
eid = cvssData.eid,
cvss = cvssData.cvss * 10
)
GC.gc(true)
end
println("Done.")
# Free the RAMs!
cvssData = nothing
elseif "transform-in-place" == mode
println("Loading data ...")
cvssData = DataFrame(CSV.File(cvssFile))
println("Done.")
println("Benchmarking ...")
for _ = 1:numIters + 1
f = copy(cvssData)
@time f.cvss *= 10
f = nothing
GC.gc(true)
end
println("Done.")
# Free the RAMs!
cvssData = nothing
elseif "filter" == mode
println("Loading data ...")
cvssData = DataFrame(CSV.File(cvssFile))
println("Done.")
println("Benchmarking ...")
for _ = 1:numIters + 1
@time filter(:cvss => x -> x > 0.5, cvssData)
GC.gc(true)
end
println("Done.")
# Free the RAMs!
cvssData = nothing
elseif "filter2" == mode
println("Loading data ...")
println("Done.")
println("Benchmarking ...")
for _ = 1:numIters + 1
cvssData = DataFrame(CSV.File(cvssFile))
@time filter(:cvss => x -> x > 0.5, cvssData)
cvssData = nothing
GC.gc(true)
end
println("Done.")
# Free the RAMs!
cvssData = nothing
elseif "group-aggregate" == mode
println("Loading data ...")
cvssData = DataFrame(CSV.File(cvssFile))
println("Done.")
println("Benchmarking ...")
for _ = 1:numIters + 1
@time combine(
groupby(cvssData, :eid),
:cvss => (d -> cvssWeight * sum(d)) => :scoresSum
)
GC.gc(true)
end
println("Done.")
# Free the RAMs!
cvssData = nothing
end
GC.gc(true)
end
# Select which benchmark(s) to run here!
main("leftjoin")
main("transform-in-place")
main("filter")
main("filter2")
main("group-aggregate")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment