Created
May 5, 2024 20:29
-
-
Save mooreniemi/a7e652cb1aefc945d1d6ff4fec66ff77 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use arrow::array::ArrayRef; | |
use arrow::record_batch::RecordBatch; | |
use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder; | |
use parquet::arrow::ArrowWriter; | |
use std::env; | |
use std::fs::File; | |
use std::path::Path; | |
use std::time::Instant; | |
fn main() -> Result<(), Box<dyn std::error::Error>> { | |
// get command line args | |
let args: Vec<String> = env::args().collect(); | |
// what the mock data script generates | |
let default_input_path = "/tmp/shards/shard_num=0/data.parquet".to_string(); | |
let default_output_path = "/tmp/basic_process_data.parquet".to_string(); | |
let input_path = args.get(1).unwrap_or(&default_input_path); | |
let output_path = args.get(2).unwrap_or(&default_output_path); | |
let batch_size: usize = args.get(3).unwrap_or(&"1024".to_string()).parse()?; | |
let file = File::open(Path::new(input_path))?; | |
let builder = ParquetRecordBatchReaderBuilder::try_new(file)?; | |
let output_schema = builder.schema().clone(); | |
let mut arrow_reader = builder.with_batch_size(batch_size).build()?; | |
let new_file = File::create(Path::new(output_path))?; | |
let props = parquet::file::properties::WriterProperties::builder().build(); | |
let mut writer = ArrowWriter::try_new(new_file, output_schema.clone(), Some(props))?; | |
let start_time = Instant::now(); | |
let mut total_batches = 0; | |
while let Some(maybe_batch) = arrow_reader.next() { | |
let record_batch = maybe_batch?; | |
// placeholder for processing columns | |
let mut new_columns: Vec<ArrayRef> = vec![]; | |
for i in 0..record_batch.num_columns() { | |
let column = record_batch.column(i).clone(); | |
// add any processing here | |
new_columns.push(column); | |
} | |
let new_batch = RecordBatch::try_new(output_schema.clone(), new_columns)?; | |
writer.write(&new_batch)?; | |
total_batches += 1; | |
} | |
writer.close()?; | |
let duration = start_time.elapsed(); | |
println!( | |
"Processed {} batches in {:?}s", | |
total_batches, | |
duration.as_secs() | |
); | |
Ok(()) | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use arrow::datatypes::Schema; | |
use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder; | |
use parquet::arrow::ArrowWriter; | |
use std::collections::HashMap; | |
use std::env; | |
use std::fs::File; | |
use std::path::Path; | |
use std::time::Instant; | |
fn main() -> Result<(), Box<dyn std::error::Error>> { | |
let args: Vec<String> = env::args().collect(); | |
let default_input_path = "/tmp/shards/shard_num=0/data.parquet".to_string(); | |
let default_output_path = "/tmp/process_range_data.parquet".to_string(); | |
let input_path = args.get(1).unwrap_or(&default_input_path); | |
let output_path = args.get(2).unwrap_or(&default_output_path); | |
let batch_size: usize = args.get(3).unwrap_or(&"1024".to_string()).parse()?; | |
let start_row: usize = args.get(4).unwrap_or(&"0".to_string()).parse()?; | |
let end_row: usize = args.get(5).unwrap_or(&"3000".to_string()).parse()?; | |
let file = File::open(Path::new(input_path))?; | |
let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap(); | |
let schema = builder.schema().clone(); | |
println!("Read arrow schema is: {}", schema); | |
let mut arrow_reader = builder.with_batch_size(batch_size).build().unwrap(); | |
let new_file = File::create(Path::new(output_path))?; | |
let props = parquet::file::properties::WriterProperties::builder().build(); | |
// FIXME: when using slice below it didn't retain this metadata, and I don't really care to keep it right now | |
let schema_with_no_metadata = Schema::new_with_metadata( | |
schema.clone().fields().to_vec(), | |
HashMap::new(), // wipe pandas metadata | |
); | |
let mut writer = ArrowWriter::try_new(new_file, schema_with_no_metadata.into(), Some(props))?; | |
//println!("Writer Schema: {:?}", &schema); | |
let mut current_row_index = 0; | |
let start_time = Instant::now(); | |
let mut total_batches = 0; | |
while let Some(Ok(record_batch)) = arrow_reader.next() { | |
let num_rows_in_batch = record_batch.num_rows(); | |
let batch_end_row = current_row_index + num_rows_in_batch; | |
// check if the batch falls within the start and end rows | |
if batch_end_row > start_row && current_row_index < end_row { | |
// the subset of the batch to keep | |
let start_in_batch = if current_row_index < start_row { | |
start_row - current_row_index | |
} else { | |
0 | |
}; | |
let end_in_batch = if batch_end_row > end_row { | |
end_row - current_row_index | |
} else { | |
num_rows_in_batch | |
}; | |
if start_in_batch < end_in_batch { | |
// slice the batch to only include the relevant rows | |
let sliced_batch = | |
record_batch.slice(start_in_batch, end_in_batch - start_in_batch); | |
//println!("RecordBatch Schema: {:?}", sliced_batch.schema()); | |
writer.write(&sliced_batch)?; | |
} | |
} | |
if batch_end_row >= end_row { | |
break; | |
} | |
current_row_index += num_rows_in_batch; | |
total_batches += 1; | |
} | |
writer.close()?; | |
let duration = start_time.elapsed(); | |
println!( | |
"Processed {} batches in {:?}s, from rows {} to {}", | |
total_batches, | |
duration.as_secs(), | |
start_row, | |
end_row - 1 | |
); | |
Ok(()) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment