Last active
June 10, 2019 11:42
-
-
Save red75prime/7b2f5cb45fae81a44ac9451ae9dece00 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// [dependencies] | |
// serde = "1.0" | |
// serde_derive = "1.0" | |
// serde_json = { version = "1.0", features = ["raw_value"] } | |
// fnv = "1.0.5" | |
// memmap = "0.7" | |
// smallvec = { version = "0.6", features = ["serde"] } | |
// jemallocator = "0.3" | |
use fnv::{FnvHashSet as HashSet, FnvHashMap as HashMap}; | |
use memmap::Mmap; | |
use serde_derive::{Deserialize}; | |
use serde_json::{value::RawValue}; | |
use smallvec::{SmallVec, smallvec}; | |
use std::borrow::Cow; | |
use std::sync::atomic::{AtomicUsize, Ordering}; | |
use jemallocator::Jemalloc; | |
#[global_allocator] | |
static ALLOC: Jemalloc = Jemalloc; | |
#[derive(Debug, Deserialize)] | |
struct Rec<'a> { | |
#[serde(borrow)] | |
company: &'a RawValue, | |
#[serde(borrow)] | |
debt: &'a RawValue, | |
#[serde(borrow)] | |
phones: &'a RawValue, | |
#[serde(borrow)] | |
phone: Option<&'a RawValue>, | |
} | |
//source data | |
struct DebtRec<'a> { | |
pub company: Cow<'a, str>, | |
pub phones: SmallVec<[Cow<'a, str>; 4]>, | |
pub debt: f64 | |
} | |
//result data | |
#[derive(Debug)] | |
struct Debtor { | |
companies: HashSet<String>, | |
phones: HashSet<String>, | |
debt: f64 | |
} | |
struct Debtors { | |
all: Vec<Debtor>, | |
index_by_phone: HashMap<String, usize> | |
} | |
impl Debtor { | |
fn new() -> Debtor { | |
Debtor { | |
companies: HashSet::default(), | |
phones: HashSet::default(), | |
debt: 0.0 | |
} | |
} | |
} | |
impl Debtors { | |
fn new() -> Debtors { | |
Debtors { | |
all: Vec::new(), | |
index_by_phone: HashMap::default() | |
} | |
} | |
} | |
fn main() { | |
let mut res = Debtors::new(); | |
let mut fflag = 0; | |
for arg in std::env::args() { | |
if arg == "-f" { | |
fflag = 1; | |
} | |
else if fflag == 1 { | |
fflag = 2; | |
println!("{}:", &arg); | |
let start = std::time::Instant::now(); | |
let (count, err_count) = process_file(&arg, &mut res); | |
println!("PROCESSED: {} objects in {:?}, {} errors found", count, start.elapsed(), err_count); | |
} | |
} | |
for (di, d) in res.all.iter().enumerate() { | |
println!("-------------------------------"); | |
println!("#{}: debt: {}", di, &d.debt); | |
println!("companies: {:?}\nphones: {:?}", &d.companies, &d.phones); | |
} | |
if fflag < 2 { | |
println!("USAGE: fastpivot -f \"file 1\" -f \"file 2\" ..."); | |
} | |
} | |
fn process_file(path: &str, res: &mut Debtors) -> (i32, usize) { | |
let mut count = 0; | |
let err_count = AtomicUsize::new(0); | |
match std::fs::File::open(path) { | |
Ok(file) => { | |
let mmap = match unsafe { Mmap::map(&file) } { | |
Ok(mmap) => mmap, | |
Err(e) => { | |
println!("Cannot open '{}': {:?}", path, e); | |
return (0, 0); | |
} | |
}; | |
let mut braces = 0; | |
let mut quote = false; | |
let mut backslash = false; | |
let mut start_idx = 0; | |
for (idx, &b) in mmap.iter().enumerate() { | |
match b { | |
b'{' if !quote => { | |
if braces == 0 { | |
start_idx = idx; | |
} | |
braces += 1; | |
} | |
b'}' if !quote => { | |
braces -= 1; | |
if braces == 0 { //object formed ! | |
let obj = &mmap[start_idx ..= idx]; | |
process_object(obj, res, &err_count); | |
count += 1; | |
} | |
} | |
b'\\' if quote => { | |
backslash = true; | |
continue; | |
} | |
b'"' if !backslash => { | |
quote = !quote; | |
} | |
_ => {} | |
}; | |
backslash = false; | |
} | |
} | |
Err(e) => { | |
println!("ERROR: {}", e); | |
} | |
} | |
(count, err_count.load(Ordering::SeqCst)) | |
} | |
fn set_insert_cow<'a, R, T>(set: &mut HashSet<T>, r: Cow<'a, R>) | |
where | |
R: ?Sized, | |
R: std::hash::Hash + Eq + ToOwned<Owned = T>, | |
T: Clone + std::hash::Hash + Eq + std::borrow::Borrow<R>, | |
{ | |
if !set.contains(r.as_ref()) { | |
set.insert(r.into_owned()); | |
} | |
} | |
fn map_insert_cow<'a, R, K, V>(map: &mut HashMap<K, V>, k: Cow<'a, R>, v: V) | |
where | |
R: ?Sized, | |
R: std::hash::Hash + Eq + ToOwned<Owned = K>, | |
K: Clone + std::hash::Hash + Eq + std::borrow::Borrow<R>, | |
{ | |
if !map.contains_key(k.as_ref()) { | |
map.insert(k.into_owned(), v); | |
} | |
} | |
fn process_object(obj: &[u8], res: &mut Debtors, err_count: &AtomicUsize) { | |
let obj = unsafe { std::str::from_utf8_unchecked(obj) }; | |
let o = | |
match serde_json::from_str(obj) { | |
Ok(o) => { o } | |
Err(e) => { | |
println!("JSON ERROR: {}:\n{}", e, obj); | |
err_count.fetch_add(1, Ordering::Relaxed); | |
return; | |
} | |
}; | |
let dr = extract_data(o); | |
//println!("{} - {:?} - {}", &dr.company, &dr.phones, &dr.debt,); | |
let mut di = None; //debtor index search result | |
for p in &dr.phones { | |
if let Some(&i) = res.index_by_phone.get(p.as_ref()) { | |
di = Some(i); | |
break; | |
} | |
} | |
let d; | |
let idx; | |
match di { | |
Some(i) => { //existing debtor | |
idx = i; | |
d = &mut res.all[i]; | |
} | |
None => { //new debtor | |
idx = res.all.len(); | |
res.all.push(Debtor::new()); | |
d = res.all.last_mut().unwrap(); | |
d.debt = 0.; | |
} | |
}; | |
set_insert_cow(&mut d.companies, dr.company); | |
for p in dr.phones { | |
set_insert_cow(&mut d.phones, p.clone()); | |
map_insert_cow(&mut res.index_by_phone, p, idx); | |
} | |
d.debt += dr.debt; | |
} | |
#[derive(Debug, Deserialize)] | |
struct BorrowedStr<'a>( | |
#[serde(borrow)] | |
Cow<'a, str> | |
); | |
fn raw2str(raw: &RawValue) -> Cow<str> { | |
let payload = raw.get(); | |
if payload.starts_with('"') { | |
let b_str = serde_json::from_str(payload).unwrap_or_else(|_| BorrowedStr(Cow::default()) ); | |
b_str.0 | |
} else if payload.starts_with(|ch: char| ch.is_ascii_digit() ) { | |
payload.into() | |
} else { | |
"".into() | |
} | |
} | |
#[derive(Debug, Deserialize)] | |
struct CompanyObj<'a> { | |
#[serde(borrow)] | |
name: Cow<'a, str>, | |
} | |
fn extract_data(o: Rec) -> DebtRec { | |
use std::str::FromStr; | |
let company = { | |
let payload = o.company.get(); | |
if payload.starts_with('{') { | |
let company = serde_json::from_str(payload).unwrap_or_else(|_| CompanyObj{ name: "".into() }); | |
company.name | |
} else { | |
let company = serde_json::from_str(payload).unwrap_or_else(|_| BorrowedStr("".into())); | |
company.0 | |
} | |
}; | |
if let Cow::Owned(_) = &company { | |
println!("Warning: owned '{:?}'", company); | |
}; | |
let mut phones = smallvec![]; | |
let payload = o.phones.get(); | |
if payload.starts_with('[') { | |
let ps: SmallVec<[&RawValue; 4]> = serde_json::from_str(payload).unwrap_or_else(|_| smallvec![]); | |
phones.extend(ps.into_iter().map(raw2str)); | |
} else { | |
phones.push(raw2str(o.phones)); | |
}; | |
if let Some(p) = o.phone { | |
phones.push(raw2str(p)); | |
}; | |
let debt = f64::from_str(raw2str(o.debt).as_ref()).unwrap_or(0.0); | |
DebtRec { | |
company, phones, debt, | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment