Skip to content

Instantly share code, notes, and snippets.

@sundy-li
Last active November 30, 2022 15:30
Show Gist options
  • Save sundy-li/4984ec7cfeade556d60306a3a218ec8a to your computer and use it in GitHub Desktop.
Save sundy-li/4984ec7cfeade556d60306a3a218ec8a to your computer and use it in GitHub Desktop.
arrow2-read-write
use std::fs;
use std::fs::File;
use std::sync::mpsc::channel;
use std::time::SystemTime;
use arrow2::error::Error;
use arrow2::io::parquet::read::{self, FileReader};
use threadpool::ThreadPool;
// wget https://repo.databend.rs/alapha/input.parquet -O /tmp/input.parquet
// for i in `seq 1 128`;do ✔ │ sundy@arch
// cp -rf /tmp/input.parquet /tmp/input_$i.parquet
// done
// cargo run --example arrow2_read --release
// cost -> 836 ms
// compare with duckdb
// select max(url) from read_parquet('/tmp/*.parquet', binary_as_string=True);
// ┌────────────────────────┐
// │ max(url) │
// │ varchar │
// ├────────────────────────┤
// │ http://zarplatia-nogin │
// └────────────────────────┘
// Run Time (s): real 0.430 user 4.347991 sys 4.099678
fn main() -> Result<(), Error> {
let files: Vec<String> = fs::read_dir("/tmp/")
.unwrap()
.map(|f| f.unwrap().path().display().to_string())
.filter(|f| f.ends_with(".parquet"))
.collect();
let start = SystemTime::now();
// duckdb use
let n_workers = 16;
let n_jobs = files.len();
let pool = ThreadPool::new(n_workers);
let (tx, rx) = channel();
for file in files {
let tx = tx.clone();
pool.execute(move || {
let chunk = builder_chunk(&file);
read_chunk(chunk);
tx.send(1).unwrap();
});
}
assert_eq!(rx.iter().take(n_jobs).fold(0, |a, b| a + b), n_jobs);
println!("cost -> {:?} ms", start.elapsed().unwrap().as_millis());
Ok(())
}
fn builder_chunk(f: &String) -> FileReader<File> {
let mut reader = File::open(&f).unwrap();
let metadata = read::read_metadata(&mut reader).unwrap();
let schema = read::infer_schema(&metadata).unwrap();
let schema = schema.filter(|_index, _field| _field.name == "url");
let row_groups = metadata
.row_groups
.into_iter()
.enumerate()
.map(|(_, row_group)| row_group)
.collect();
// duckdb use 2048 batch size
let chunk_size = Some(2048);
// we can then read the row groups into chunks
let chunks = read::FileReader::new(reader, row_groups, schema, chunk_size, None, None);
chunks
}
fn read_chunk(chunk: FileReader<File>) {
let mut rows = 0;
for maybe_chunk in chunk {
let chunk = maybe_chunk.unwrap();
rows += chunk.len();
}
assert_eq!(rows, 106742);
}
use std::fs::File;
use std::time::SystemTime;
use arrow2::error::Error;
use arrow2::io::parquet::read::{self, FileReader};
fn main() -> Result<(), Error> {
let datas = vec![
//原始文件
("/tmp", "input.parquet"),
//copy into 到databend 生成的文件
("/tmp", "output.parquet"),
];
for (dir, f) in datas {
// 并行次数
for parallel in &[1, 2, 4, 8, 16] {
let chunks = (0..*parallel).map(|_| builder_chunk(dir, f)).collect();
let start = SystemTime::now();
paralle_read_chunks(chunks);
println!(
"{dir}/{f}, parallel {parallel} read took: {} ms",
start.elapsed().unwrap().as_millis()
);
}
}
Ok(())
}
fn builder_chunk(prefix: &str, file_path: &str) -> FileReader<File> {
let f = format!("{prefix}/{file_path}");
let mut reader = File::open(&f).unwrap();
// we can read its metadata:
let metadata = read::read_metadata(&mut reader).unwrap();
// and infer a [`Schema`] from the `metadata`.
let schema = read::infer_schema(&metadata).unwrap();
// we can filter the columns we need (here we select all)
let schema = schema.filter(|_index, _field| _field.name == "l_quantity");
// we can read the statistics of all parquet's row groups (here for each field)
// for field in &schema.fields {
// let statistics = read::statistics::deserialize(field, &metadata.row_groups).unwrap();
// println!("{:#?}", statistics);
// }
let row_groups = metadata
.row_groups
.into_iter()
.enumerate()
.map(|(_, row_group)| row_group)
.collect();
// we can then read the row groups into chunks
let chunks = read::FileReader::new(reader, row_groups, schema, Some(65536), None, None);
chunks
}
fn paralle_read_chunks(chunks: Vec<FileReader<File>>) {
let mut vs = Vec::with_capacity(16);
for chunk in chunks {
let handler = std::thread::spawn(move || {
for maybe_chunk in chunk {
let chunk = maybe_chunk.unwrap();
assert!(!chunk.is_empty());
}
});
vs.push(handler);
}
for v in vs {
v.join().unwrap();
}
}
use std::fs::File;
use arrow2::{
array::{Array, Int32Array},
chunk::Chunk,
datatypes::{Field, Schema},
error::Result,
io::parquet::{
read,
write::{
transverse, CompressionOptions, Encoding, FileWriter, RowGroupIterator, Version,
WriteOptions,
},
},
};
fn write_chunk(path: &str, schema: Schema, chunk: Chunk<Box<dyn Array>>) -> Result<()> {
let options = WriteOptions {
write_statistics: false,
compression: CompressionOptions::Snappy,
version: Version::V2,
};
let iter = vec![Ok(chunk)];
let encodings = schema
.fields
.iter()
.map(|f| transverse(&f.data_type, |_| Encoding::Plain))
.collect();
let row_groups = RowGroupIterator::try_new(iter.into_iter(), &schema, options, encodings)?;
// Create a new empty file
let file = File::create(path)?;
let mut writer = FileWriter::try_new(file, schema, options)?;
for group in row_groups {
writer.write(group?)?;
}
let _size = writer.end(None)?;
Ok(())
}
fn main() -> Result<()> {
let mut reader = File::open("/tmp/input.parquet").unwrap();
// we can read its metadata:
let metadata = read::read_metadata(&mut reader).unwrap();
let schema = read::infer_schema(&metadata).unwrap();
let row_groups = metadata
.row_groups
.into_iter()
.enumerate()
.map(|(_, row_group)| row_group)
.collect();
// we can then read the row groups into chunks
let mut chunks = read::FileReader::new(reader, row_groups, schema.clone(), None, None, None);
let chunk = chunks.next().unwrap().unwrap();
println!("read row size -> {:?}", chunk.len());
//assume it's only one rowgroup
assert!(chunks.next().is_none());
write_chunk("/tmp/output.parquet", schema, chunk)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment