Last active
November 30, 2022 15:30
-
-
Save sundy-li/4984ec7cfeade556d60306a3a218ec8a to your computer and use it in GitHub Desktop.
arrow2-read-write
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use std::fs; | |
use std::fs::File; | |
use std::sync::mpsc::channel; | |
use std::time::SystemTime; | |
use arrow2::error::Error; | |
use arrow2::io::parquet::read::{self, FileReader}; | |
use threadpool::ThreadPool; | |
// wget https://repo.databend.rs/alapha/input.parquet -O /tmp/input.parquet | |
// for i in `seq 1 128`;do ✔ │ sundy@arch | |
// cp -rf /tmp/input.parquet /tmp/input_$i.parquet | |
// done | |
// cargo run --example arrow2_read --release | |
// cost -> 836 ms | |
// compare with duckdb | |
// select max(url) from read_parquet('/tmp/*.parquet', binary_as_string=True); | |
// ┌────────────────────────┐ | |
// │ max(url) │ | |
// │ varchar │ | |
// ├────────────────────────┤ | |
// │ http://zarplatia-nogin │ | |
// └────────────────────────┘ | |
// Run Time (s): real 0.430 user 4.347991 sys 4.099678 | |
fn main() -> Result<(), Error> { | |
let files: Vec<String> = fs::read_dir("/tmp/") | |
.unwrap() | |
.map(|f| f.unwrap().path().display().to_string()) | |
.filter(|f| f.ends_with(".parquet")) | |
.collect(); | |
let start = SystemTime::now(); | |
// duckdb use | |
let n_workers = 16; | |
let n_jobs = files.len(); | |
let pool = ThreadPool::new(n_workers); | |
let (tx, rx) = channel(); | |
for file in files { | |
let tx = tx.clone(); | |
pool.execute(move || { | |
let chunk = builder_chunk(&file); | |
read_chunk(chunk); | |
tx.send(1).unwrap(); | |
}); | |
} | |
assert_eq!(rx.iter().take(n_jobs).fold(0, |a, b| a + b), n_jobs); | |
println!("cost -> {:?} ms", start.elapsed().unwrap().as_millis()); | |
Ok(()) | |
} | |
fn builder_chunk(f: &String) -> FileReader<File> { | |
let mut reader = File::open(&f).unwrap(); | |
let metadata = read::read_metadata(&mut reader).unwrap(); | |
let schema = read::infer_schema(&metadata).unwrap(); | |
let schema = schema.filter(|_index, _field| _field.name == "url"); | |
let row_groups = metadata | |
.row_groups | |
.into_iter() | |
.enumerate() | |
.map(|(_, row_group)| row_group) | |
.collect(); | |
// duckdb use 2048 batch size | |
let chunk_size = Some(2048); | |
// we can then read the row groups into chunks | |
let chunks = read::FileReader::new(reader, row_groups, schema, chunk_size, None, None); | |
chunks | |
} | |
fn read_chunk(chunk: FileReader<File>) { | |
let mut rows = 0; | |
for maybe_chunk in chunk { | |
let chunk = maybe_chunk.unwrap(); | |
rows += chunk.len(); | |
} | |
assert_eq!(rows, 106742); | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use std::fs::File; | |
use std::time::SystemTime; | |
use arrow2::error::Error; | |
use arrow2::io::parquet::read::{self, FileReader}; | |
fn main() -> Result<(), Error> { | |
let datas = vec![ | |
//原始文件 | |
("/tmp", "input.parquet"), | |
//copy into 到databend 生成的文件 | |
("/tmp", "output.parquet"), | |
]; | |
for (dir, f) in datas { | |
// 并行次数 | |
for parallel in &[1, 2, 4, 8, 16] { | |
let chunks = (0..*parallel).map(|_| builder_chunk(dir, f)).collect(); | |
let start = SystemTime::now(); | |
paralle_read_chunks(chunks); | |
println!( | |
"{dir}/{f}, parallel {parallel} read took: {} ms", | |
start.elapsed().unwrap().as_millis() | |
); | |
} | |
} | |
Ok(()) | |
} | |
fn builder_chunk(prefix: &str, file_path: &str) -> FileReader<File> { | |
let f = format!("{prefix}/{file_path}"); | |
let mut reader = File::open(&f).unwrap(); | |
// we can read its metadata: | |
let metadata = read::read_metadata(&mut reader).unwrap(); | |
// and infer a [`Schema`] from the `metadata`. | |
let schema = read::infer_schema(&metadata).unwrap(); | |
// we can filter the columns we need (here we select all) | |
let schema = schema.filter(|_index, _field| _field.name == "l_quantity"); | |
// we can read the statistics of all parquet's row groups (here for each field) | |
// for field in &schema.fields { | |
// let statistics = read::statistics::deserialize(field, &metadata.row_groups).unwrap(); | |
// println!("{:#?}", statistics); | |
// } | |
let row_groups = metadata | |
.row_groups | |
.into_iter() | |
.enumerate() | |
.map(|(_, row_group)| row_group) | |
.collect(); | |
// we can then read the row groups into chunks | |
let chunks = read::FileReader::new(reader, row_groups, schema, Some(65536), None, None); | |
chunks | |
} | |
fn paralle_read_chunks(chunks: Vec<FileReader<File>>) { | |
let mut vs = Vec::with_capacity(16); | |
for chunk in chunks { | |
let handler = std::thread::spawn(move || { | |
for maybe_chunk in chunk { | |
let chunk = maybe_chunk.unwrap(); | |
assert!(!chunk.is_empty()); | |
} | |
}); | |
vs.push(handler); | |
} | |
for v in vs { | |
v.join().unwrap(); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use std::fs::File; | |
use arrow2::{ | |
array::{Array, Int32Array}, | |
chunk::Chunk, | |
datatypes::{Field, Schema}, | |
error::Result, | |
io::parquet::{ | |
read, | |
write::{ | |
transverse, CompressionOptions, Encoding, FileWriter, RowGroupIterator, Version, | |
WriteOptions, | |
}, | |
}, | |
}; | |
fn write_chunk(path: &str, schema: Schema, chunk: Chunk<Box<dyn Array>>) -> Result<()> { | |
let options = WriteOptions { | |
write_statistics: false, | |
compression: CompressionOptions::Snappy, | |
version: Version::V2, | |
}; | |
let iter = vec![Ok(chunk)]; | |
let encodings = schema | |
.fields | |
.iter() | |
.map(|f| transverse(&f.data_type, |_| Encoding::Plain)) | |
.collect(); | |
let row_groups = RowGroupIterator::try_new(iter.into_iter(), &schema, options, encodings)?; | |
// Create a new empty file | |
let file = File::create(path)?; | |
let mut writer = FileWriter::try_new(file, schema, options)?; | |
for group in row_groups { | |
writer.write(group?)?; | |
} | |
let _size = writer.end(None)?; | |
Ok(()) | |
} | |
fn main() -> Result<()> { | |
let mut reader = File::open("/tmp/input.parquet").unwrap(); | |
// we can read its metadata: | |
let metadata = read::read_metadata(&mut reader).unwrap(); | |
let schema = read::infer_schema(&metadata).unwrap(); | |
let row_groups = metadata | |
.row_groups | |
.into_iter() | |
.enumerate() | |
.map(|(_, row_group)| row_group) | |
.collect(); | |
// we can then read the row groups into chunks | |
let mut chunks = read::FileReader::new(reader, row_groups, schema.clone(), None, None, None); | |
let chunk = chunks.next().unwrap().unwrap(); | |
println!("read row size -> {:?}", chunk.len()); | |
//assume it's only one rowgroup | |
assert!(chunks.next().is_none()); | |
write_chunk("/tmp/output.parquet", schema, chunk) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment