Created
April 5, 2024 15:03
-
-
Save JosiahParry/6df3560b1f93043b041354ec6038d1e0 to your computer and use it in GitHub Desktop.
1brc using DataFusion and extendr for R API
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
[package] | |
name = 'tst2' | |
publish = false | |
version = '0.1.0' | |
edition = '2021' | |
[lib] | |
crate-type = [ 'staticlib' ] | |
name = 'tst2' | |
[dependencies] | |
arrow_extendr = "50.0.0" | |
datafusion = "36.0.0" | |
extendr-api = "*" | |
tokio = { version = "1.36.0", features = ["rt", "rt-multi-thread"] } |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use extendr_api::prelude::*; | |
use arrow_extendr::to::IntoArrowRobj; | |
use datafusion::arrow::datatypes::{DataType, Field, Schema}; | |
use datafusion::prelude::*; | |
use std::result::Result; | |
use tokio::runtime::Runtime; | |
#[extendr] | |
pub fn onebrc(path: &str) -> Result<Robj, Error> { | |
// create schema for the one billion row challenge | |
let schema = Schema::new(vec![ | |
Field::new(String::from("station"), DataType::Utf8, false), | |
// the maximum temp value is quite small we can use a small number for it | |
Field::new(String::from("temperature"), DataType::Float32, false), | |
]); | |
// create a tokio runtime | |
let rt = Runtime::new().unwrap(); | |
// create a datafusion context | |
let ctx = SessionContext::new(); | |
let opts = CsvReadOptions::new() | |
.delimiter(b';') | |
.has_header(false) | |
.schema(&schema) | |
.file_extension("txt"); | |
let df = rt.block_on(ctx.read_csv(path, opts)).unwrap(); | |
let results_fut = df | |
.aggregate( | |
vec![col("station")], | |
vec![ | |
min(col("temperature")), | |
avg(col("temperature")), | |
max(col("temperature")), | |
], | |
) | |
.unwrap() | |
.collect(); | |
let results = rt.block_on(results_fut); | |
results.unwrap().into_arrow_robj() | |
} | |
extendr_module! { | |
mod tst2; | |
fn onebrc; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment