Skip to content

Instantly share code, notes, and snippets.

@JosiahParry
Created April 5, 2024 15:03
Show Gist options
  • Save JosiahParry/6df3560b1f93043b041354ec6038d1e0 to your computer and use it in GitHub Desktop.
Save JosiahParry/6df3560b1f93043b041354ec6038d1e0 to your computer and use it in GitHub Desktop.
1brc using DataFusion and extendr for R API
[package]
name = 'tst2'
publish = false
version = '0.1.0'
edition = '2021'
[lib]
crate-type = [ 'staticlib' ]
name = 'tst2'
[dependencies]
arrow_extendr = "50.0.0"
datafusion = "36.0.0"
extendr-api = "*"
tokio = { version = "1.36.0", features = ["rt", "rt-multi-thread"] }
use extendr_api::prelude::*;
use arrow_extendr::to::IntoArrowRobj;
use datafusion::arrow::datatypes::{DataType, Field, Schema};
use datafusion::prelude::*;
use std::result::Result;
use tokio::runtime::Runtime;
#[extendr]
pub fn onebrc(path: &str) -> Result<Robj, Error> {
// create schema for the one billion row challenge
let schema = Schema::new(vec![
Field::new(String::from("station"), DataType::Utf8, false),
// the maximum temp value is quite small we can use a small number for it
Field::new(String::from("temperature"), DataType::Float32, false),
]);
// create a tokio runtime
let rt = Runtime::new().unwrap();
// create a datafusion context
let ctx = SessionContext::new();
let opts = CsvReadOptions::new()
.delimiter(b';')
.has_header(false)
.schema(&schema)
.file_extension("txt");
let df = rt.block_on(ctx.read_csv(path, opts)).unwrap();
let results_fut = df
.aggregate(
vec![col("station")],
vec![
min(col("temperature")),
avg(col("temperature")),
max(col("temperature")),
],
)
.unwrap()
.collect();
let results = rt.block_on(results_fut);
results.unwrap().into_arrow_robj()
}
extendr_module! {
mod tst2;
fn onebrc;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment