Skip to content

Instantly share code, notes, and snippets.

@crakjie
Last active September 1, 2023 16:34
Show Gist options
  • Save crakjie/2d462eaf9e185f1e168c1192e171d886 to your computer and use it in GitHub Desktop.
Save crakjie/2d462eaf9e185f1e168c1192e171d886 to your computer and use it in GitHub Desktop.
arrow rust
[package]
name = "ckfunc"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
arrow = { version = "45.0.0", features = ["ipc_compression"] }
intmap = "2.0.0"
itertools = "0.11.0"
[profile.dev]
opt-level=3
debug=2
use std::collections::HashMap;
use std::io::{StdinLock, StdoutLock};
use std::io;
use std::io::prelude::*;
use arrow::error::ArrowError;
use arrow::ipc::{self};
use arrow::record_batch::RecordBatch;
use ipc::reader::StreamReader;
use ipc::writer::*;
use arrow::array::*;
use arrow::datatypes::*;
use alloc::sync::Arc;
use itertools::izip;
extern crate alloc;
fn main() {
read_arrow_init();
}
fn read_arrow_init() {
let stdin = io::stdin();
let stdin =&mut stdin.lock();
let out = std::io::stdout();
let stdout = &mut out.lock();
//this should be able to process multiple input pipe thoghether
while stdin.fill_buf().unwrap().len() > 0 {
//file.write_all(format!("buffer contains {}\n",stdin.fill_buf().unwrap().len()).as_bytes()).unwrap();
match read_arrow( stdin, stdout) {
Err(_e) => {
break;
},
Ok(_) => (),
}
let _ = stdout.flush();
}
//file.write_all(format!("end read arrow {}\n",stdin.fill_buf().unwrap().len()).as_bytes()).unwrap();
}
fn read_arrow(inbuffer : &mut StdinLock<'static>, out : &mut StdoutLock<'static>) -> Result<(), ArrowError> {
//stdin is already buffered
let mut reader = StreamReader::try_new_unbuffered( inbuffer,None).unwrap();
let out_schema: Schema = Schema::new(vec![
reader.schema().field(3).clone().with_name("result")
]);
let mut writer = StreamWriter::try_new(
out ,
&out_schema
)?;
while let Some(Ok(mut batch)) = reader.next() {
writer.write(&to_adjusta_hash( &mut batch))?
}
//writer.finish() seems to not works with clickhouse execution_pool, maybe because it use MetadataVersion::V3 or lower
Ok(())
}
fn to_adjusta_hash(
batch : &mut RecordBatch
) -> RecordBatch {
let from_label_it = &mut batch.column_by_name("fromLabel").unwrap().as_any().downcast_ref::<ListArray>().unwrap().iter();
let from_value_it = &mut batch.column_by_name("fromValue").unwrap().as_any().downcast_ref::<ListArray>().unwrap().iter();
let to_label_it = &mut batch.column_by_name("toLabel").unwrap().as_any().downcast_ref::<ListArray>().unwrap().iter();
let to_value_col = batch.column_by_name("toValue").unwrap().as_any().downcast_ref::<ListArray>().unwrap();
let to_value_col_it = &mut to_value_col.iter();
// let values_builder = Float64Builder::with_capacity(to_value_col.len()*250);
// let mut builder = ListBuilder::new(values_builder);
for (from_label_array, from_value_array, to_label_array,to_value_array ) in izip!(from_label_it, from_value_it, to_label_it, to_value_col_it) {
let from_label_array_col = from_label_array.unwrap();
let from_value_array_col = from_value_array.unwrap();
let to_label_array_col = to_label_array.unwrap();
let to_value_array_col = to_value_array.unwrap();
let truc = &mut to_value_array_col.as_primitive::<Float64Type>();
let from_label = from_label_array_col.as_any().downcast_ref::<UInt64Array>().unwrap();
let from_value = from_value_array_col.as_any().downcast_ref::<Float64Array>().unwrap().values();
let to_label = to_label_array_col.as_any().downcast_ref::<UInt64Array>().unwrap();
let to_value = truc;
let to_value_values = to_value.values();
let index: HashMap<u64, usize> = from_label.iter().enumerate().map(|x: (usize, Option<u64>)|return (x.1.to_owned().unwrap(), x.0)).collect();
// for (i, _) in to_label.iter().enumerate() {
// match index.get(&to_label.value(i)){
// Some(search) => {
// builder.values().append_value(from_value[search.to_owned().to_owned()].to_owned());
// }
// _ => {
// builder.values().append_value(to_value[i].to_owned());
// }
// };
// }
let mut builder = to_value.into_builder().unwrap();
builder
.values_slice_mut()
.iter_mut().enumerate()
.for_each(|(i, v)| {
match index.get(&to_label.value(i)){
Some(search) => {
*v = from_value[search.to_owned().to_owned()].to_owned();
}
_ => {
*v = to_value_values[i].to_owned(); //should be useless
}
};
});
// builder.append(true)
}
let out_schema: Schema = Schema::new(vec![
batch.schema().field(3).clone().with_name("result")
]);
let out_batch: RecordBatch = RecordBatch::try_new(Arc::new(out_schema), vec!(Arc::new(to_value_col))).unwrap();
out_batch
}
fn noop(batch : RecordBatch
) -> RecordBatch {
// batch.schema().field_with_name("fromValue").unwrap().with_name("result");
batch.project(&[1]).unwrap()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment