Skip to content

Instantly share code, notes, and snippets.

@mooreniemi
Created May 5, 2024 20:29
Show Gist options
  • Save mooreniemi/a7e652cb1aefc945d1d6ff4fec66ff77 to your computer and use it in GitHub Desktop.
Save mooreniemi/a7e652cb1aefc945d1d6ff4fec66ff77 to your computer and use it in GitHub Desktop.
use arrow::array::ArrayRef;
use arrow::record_batch::RecordBatch;
use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
use parquet::arrow::ArrowWriter;
use std::env;
use std::fs::File;
use std::path::Path;
use std::time::Instant;
fn main() -> Result<(), Box<dyn std::error::Error>> {
// get command line args
let args: Vec<String> = env::args().collect();
// what the mock data script generates
let default_input_path = "/tmp/shards/shard_num=0/data.parquet".to_string();
let default_output_path = "/tmp/basic_process_data.parquet".to_string();
let input_path = args.get(1).unwrap_or(&default_input_path);
let output_path = args.get(2).unwrap_or(&default_output_path);
let batch_size: usize = args.get(3).unwrap_or(&"1024".to_string()).parse()?;
let file = File::open(Path::new(input_path))?;
let builder = ParquetRecordBatchReaderBuilder::try_new(file)?;
let output_schema = builder.schema().clone();
let mut arrow_reader = builder.with_batch_size(batch_size).build()?;
let new_file = File::create(Path::new(output_path))?;
let props = parquet::file::properties::WriterProperties::builder().build();
let mut writer = ArrowWriter::try_new(new_file, output_schema.clone(), Some(props))?;
let start_time = Instant::now();
let mut total_batches = 0;
while let Some(maybe_batch) = arrow_reader.next() {
let record_batch = maybe_batch?;
// placeholder for processing columns
let mut new_columns: Vec<ArrayRef> = vec![];
for i in 0..record_batch.num_columns() {
let column = record_batch.column(i).clone();
// add any processing here
new_columns.push(column);
}
let new_batch = RecordBatch::try_new(output_schema.clone(), new_columns)?;
writer.write(&new_batch)?;
total_batches += 1;
}
writer.close()?;
let duration = start_time.elapsed();
println!(
"Processed {} batches in {:?}s",
total_batches,
duration.as_secs()
);
Ok(())
}
use arrow::datatypes::Schema;
use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
use parquet::arrow::ArrowWriter;
use std::collections::HashMap;
use std::env;
use std::fs::File;
use std::path::Path;
use std::time::Instant;
fn main() -> Result<(), Box<dyn std::error::Error>> {
let args: Vec<String> = env::args().collect();
let default_input_path = "/tmp/shards/shard_num=0/data.parquet".to_string();
let default_output_path = "/tmp/process_range_data.parquet".to_string();
let input_path = args.get(1).unwrap_or(&default_input_path);
let output_path = args.get(2).unwrap_or(&default_output_path);
let batch_size: usize = args.get(3).unwrap_or(&"1024".to_string()).parse()?;
let start_row: usize = args.get(4).unwrap_or(&"0".to_string()).parse()?;
let end_row: usize = args.get(5).unwrap_or(&"3000".to_string()).parse()?;
let file = File::open(Path::new(input_path))?;
let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
let schema = builder.schema().clone();
println!("Read arrow schema is: {}", schema);
let mut arrow_reader = builder.with_batch_size(batch_size).build().unwrap();
let new_file = File::create(Path::new(output_path))?;
let props = parquet::file::properties::WriterProperties::builder().build();
// FIXME: when using slice below it didn't retain this metadata, and I don't really care to keep it right now
let schema_with_no_metadata = Schema::new_with_metadata(
schema.clone().fields().to_vec(),
HashMap::new(), // wipe pandas metadata
);
let mut writer = ArrowWriter::try_new(new_file, schema_with_no_metadata.into(), Some(props))?;
//println!("Writer Schema: {:?}", &schema);
let mut current_row_index = 0;
let start_time = Instant::now();
let mut total_batches = 0;
while let Some(Ok(record_batch)) = arrow_reader.next() {
let num_rows_in_batch = record_batch.num_rows();
let batch_end_row = current_row_index + num_rows_in_batch;
// check if the batch falls within the start and end rows
if batch_end_row > start_row && current_row_index < end_row {
// the subset of the batch to keep
let start_in_batch = if current_row_index < start_row {
start_row - current_row_index
} else {
0
};
let end_in_batch = if batch_end_row > end_row {
end_row - current_row_index
} else {
num_rows_in_batch
};
if start_in_batch < end_in_batch {
// slice the batch to only include the relevant rows
let sliced_batch =
record_batch.slice(start_in_batch, end_in_batch - start_in_batch);
//println!("RecordBatch Schema: {:?}", sliced_batch.schema());
writer.write(&sliced_batch)?;
}
}
if batch_end_row >= end_row {
break;
}
current_row_index += num_rows_in_batch;
total_batches += 1;
}
writer.close()?;
let duration = start_time.elapsed();
println!(
"Processed {} batches in {:?}s, from rows {} to {}",
total_batches,
duration.as_secs(),
start_row,
end_row - 1
);
Ok(())
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment