// [dependencies] | |
// serde = "1.0" | |
// serde_derive = "1.0" | |
// serde_json = { version = "1.0", features = ["raw_value"] } | |
// fnv = "1.0.5" | |
// memmap = "0.7" | |
// smallvec = { version = "0.6", features = ["serde"] } | |
// jemallocator = "0.3" | |
use fnv::{FnvHashSet as HashSet, FnvHashMap as HashMap}; | |
use memmap::Mmap; | |
use serde_derive::{Deserialize}; | |
use serde_json::{value::RawValue}; | |
use smallvec::{SmallVec, smallvec}; | |
use std::borrow::Cow; | |
use std::sync::atomic::{AtomicUsize, Ordering}; | |
use jemallocator::Jemalloc; | |
#[global_allocator] | |
static ALLOC: Jemalloc = Jemalloc; | |
#[derive(Debug, Deserialize)] | |
struct Rec<'a> { | |
#[serde(borrow)] | |
company: &'a RawValue, | |
#[serde(borrow)] | |
debt: &'a RawValue, | |
#[serde(borrow)] | |
phones: &'a RawValue, | |
#[serde(borrow)] | |
phone: Option<&'a RawValue>, | |
} | |
//source data | |
struct DebtRec<'a> { | |
pub company: Cow<'a, str>, | |
pub phones: SmallVec<[Cow<'a, str>; 4]>, | |
pub debt: f64 | |
} | |
//result data | |
#[derive(Debug)] | |
struct Debtor { | |
companies: HashSet<String>, | |
phones: HashSet<String>, | |
debt: f64 | |
} | |
struct Debtors { | |
all: Vec<Debtor>, | |
index_by_phone: HashMap<String, usize> | |
} | |
impl Debtor { | |
fn new() -> Debtor { | |
Debtor { | |
companies: HashSet::default(), | |
phones: HashSet::default(), | |
debt: 0.0 | |
} | |
} | |
} | |
impl Debtors { | |
fn new() -> Debtors { | |
Debtors { | |
all: Vec::new(), | |
index_by_phone: HashMap::default() | |
} | |
} | |
} | |
fn main() { | |
let mut res = Debtors::new(); | |
let mut fflag = 0; | |
for arg in std::env::args() { | |
if arg == "-f" { | |
fflag = 1; | |
} | |
else if fflag == 1 { | |
fflag = 2; | |
println!("{}:", &arg); | |
let start = std::time::Instant::now(); | |
let (count, err_count) = process_file(&arg, &mut res); | |
println!("PROCESSED: {} objects in {:?}, {} errors found", count, start.elapsed(), err_count); | |
} | |
} | |
for (di, d) in res.all.iter().enumerate() { | |
println!("-------------------------------"); | |
println!("#{}: debt: {}", di, &d.debt); | |
println!("companies: {:?}\nphones: {:?}", &d.companies, &d.phones); | |
} | |
if fflag < 2 { | |
println!("USAGE: fastpivot -f \"file 1\" -f \"file 2\" ..."); | |
} | |
} | |
fn process_file(path: &str, res: &mut Debtors) -> (i32, usize) { | |
let mut count = 0; | |
let err_count = AtomicUsize::new(0); | |
match std::fs::File::open(path) { | |
Ok(file) => { | |
let mmap = match unsafe { Mmap::map(&file) } { | |
Ok(mmap) => mmap, | |
Err(e) => { | |
println!("Cannot open '{}': {:?}", path, e); | |
return (0, 0); | |
} | |
}; | |
let mut braces = 0; | |
let mut quote = false; | |
let mut backslash = false; | |
let mut start_idx = 0; | |
for (idx, &b) in mmap.iter().enumerate() { | |
match b { | |
b'{' if !quote => { | |
if braces == 0 { | |
start_idx = idx; | |
} | |
braces += 1; | |
} | |
b'}' if !quote => { | |
braces -= 1; | |
if braces == 0 { //object formed ! | |
let obj = &mmap[start_idx ..= idx]; | |
process_object(obj, res, &err_count); | |
count += 1; | |
} | |
} | |
b'\\' if quote => { | |
backslash = true; | |
continue; | |
} | |
b'"' if !backslash => { | |
quote = !quote; | |
} | |
_ => {} | |
}; | |
backslash = false; | |
} | |
} | |
Err(e) => { | |
println!("ERROR: {}", e); | |
} | |
} | |
(count, err_count.load(Ordering::SeqCst)) | |
} | |
fn set_insert_cow<'a, R, T>(set: &mut HashSet<T>, r: Cow<'a, R>) | |
where | |
R: ?Sized, | |
R: std::hash::Hash + Eq + ToOwned<Owned = T>, | |
T: Clone + std::hash::Hash + Eq + std::borrow::Borrow<R>, | |
{ | |
if !set.contains(r.as_ref()) { | |
set.insert(r.into_owned()); | |
} | |
} | |
fn map_insert_cow<'a, R, K, V>(map: &mut HashMap<K, V>, k: Cow<'a, R>, v: V) | |
where | |
R: ?Sized, | |
R: std::hash::Hash + Eq + ToOwned<Owned = K>, | |
K: Clone + std::hash::Hash + Eq + std::borrow::Borrow<R>, | |
{ | |
if !map.contains_key(k.as_ref()) { | |
map.insert(k.into_owned(), v); | |
} | |
} | |
fn process_object(obj: &[u8], res: &mut Debtors, err_count: &AtomicUsize) { | |
let obj = unsafe { std::str::from_utf8_unchecked(obj) }; | |
let o = | |
match serde_json::from_str(obj) { | |
Ok(o) => { o } | |
Err(e) => { | |
println!("JSON ERROR: {}:\n{}", e, obj); | |
err_count.fetch_add(1, Ordering::Relaxed); | |
return; | |
} | |
}; | |
let dr = extract_data(o); | |
//println!("{} - {:?} - {}", &dr.company, &dr.phones, &dr.debt,); | |
let mut di = None; //debtor index search result | |
for p in &dr.phones { | |
if let Some(&i) = res.index_by_phone.get(p.as_ref()) { | |
di = Some(i); | |
break; | |
} | |
} | |
let d; | |
let idx; | |
match di { | |
Some(i) => { //existing debtor | |
idx = i; | |
d = &mut res.all[i]; | |
} | |
None => { //new debtor | |
idx = res.all.len(); | |
res.all.push(Debtor::new()); | |
d = res.all.last_mut().unwrap(); | |
d.debt = 0.; | |
} | |
}; | |
set_insert_cow(&mut d.companies, dr.company); | |
for p in dr.phones { | |
set_insert_cow(&mut d.phones, p.clone()); | |
map_insert_cow(&mut res.index_by_phone, p, idx); | |
} | |
d.debt += dr.debt; | |
} | |
#[derive(Debug, Deserialize)] | |
struct BorrowedStr<'a>( | |
#[serde(borrow)] | |
Cow<'a, str> | |
); | |
fn raw2str(raw: &RawValue) -> Cow<str> { | |
let payload = raw.get(); | |
if payload.starts_with('"') { | |
let b_str = serde_json::from_str(payload).unwrap_or_else(|_| BorrowedStr(Cow::default()) ); | |
b_str.0 | |
} else if payload.starts_with(|ch: char| ch.is_ascii_digit() ) { | |
payload.into() | |
} else { | |
"".into() | |
} | |
} | |
#[derive(Debug, Deserialize)] | |
struct CompanyObj<'a> { | |
#[serde(borrow)] | |
name: Cow<'a, str>, | |
} | |
fn extract_data(o: Rec) -> DebtRec { | |
use std::str::FromStr; | |
let company = { | |
let payload = o.company.get(); | |
if payload.starts_with('{') { | |
let company = serde_json::from_str(payload).unwrap_or_else(|_| CompanyObj{ name: "".into() }); | |
company.name | |
} else { | |
let company = serde_json::from_str(payload).unwrap_or_else(|_| BorrowedStr("".into())); | |
company.0 | |
} | |
}; | |
if let Cow::Owned(_) = &company { | |
println!("Warning: owned '{:?}'", company); | |
}; | |
let mut phones = smallvec![]; | |
let payload = o.phones.get(); | |
if payload.starts_with('[') { | |
let ps: SmallVec<[&RawValue; 4]> = serde_json::from_str(payload).unwrap_or_else(|_| smallvec![]); | |
phones.extend(ps.into_iter().map(raw2str)); | |
} else { | |
phones.push(raw2str(o.phones)); | |
}; | |
if let Some(p) = o.phone { | |
phones.push(raw2str(p)); | |
}; | |
let debt = f64::from_str(raw2str(o.debt).as_ref()).unwrap_or(0.0); | |
DebtRec { | |
company, phones, debt, | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment