Created
May 13, 2019 12:45
-
-
Save red75prime/b19c5cb369ee12a1ccfe8b6649b06647 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// [dependencies] | |
// serde = "1.0" | |
// serde_derive = "1.0" | |
// serde_json = { version = "1.0", features = ["raw_value"] } | |
// fnv = "1.0.5" | |
// memmap = "0.7" | |
// smallvec = { version = "0.6", features = ["serde"] } | |
// crossbeam = "0.7" | |
use crossbeam::{self, channel}; | |
use fnv::{FnvHashSet as HashSet, FnvHashMap as HashMap}; | |
use memmap::Mmap; | |
use serde_derive::{Deserialize}; | |
use serde_json::{value::RawValue}; | |
use smallvec::{SmallVec, smallvec}; | |
use std::borrow::Cow; | |
use std::sync::atomic::{AtomicUsize, Ordering}; | |
// Cow is required to deserialize | |
// strings like "OOO \"Night and brambles\"" | |
#[derive(Debug, Deserialize)] | |
#[serde(untagged)] | |
enum Company<'a> { | |
Name(Cow<'a, str>), | |
NameRec{ name: Cow<'a, str>}, | |
} | |
#[derive(Debug, Deserialize)] | |
struct Rec<'a> { | |
#[serde(borrow)] | |
company: Company<'a>, | |
debt: &'a RawValue, | |
phones: &'a RawValue, | |
phone: Option<&'a RawValue>, | |
} | |
//source data | |
struct DebtRec<'a> { | |
pub company: Cow<'a, str>, | |
pub phones: SmallVec<[&'a str; 4]>, | |
pub debt: f64 | |
} | |
//result data | |
#[derive(Debug)] | |
struct Debtor { | |
companies: HashSet<String>, | |
phones: HashSet<String>, | |
debt: f64 | |
} | |
struct Debtors { | |
all: Vec<Debtor>, | |
index_by_phone: HashMap<String, usize> | |
} | |
impl Debtor { | |
fn new() -> Debtor { | |
Debtor { | |
companies: HashSet::default(), | |
phones: HashSet::default(), | |
debt: 0.0 | |
} | |
} | |
} | |
impl Debtors { | |
fn new() -> Debtors { | |
Debtors { | |
all: Vec::new(), | |
index_by_phone: HashMap::default() | |
} | |
} | |
} | |
fn main() { | |
let mut res = Debtors::new(); | |
let mut fflag = 0; | |
for arg in std::env::args() { | |
if arg == "-f" { | |
fflag = 1; | |
} | |
else if fflag == 1 { | |
fflag = 2; | |
println!("{}:", &arg); | |
let start = std::time::Instant::now(); | |
let (count, err_count) = process_file(&arg, &mut res); | |
println!("PROCESSED: {} objects in {:?}, {} errors found", count, start.elapsed(), err_count); | |
} | |
} | |
for (di, d) in res.all.iter().enumerate() { | |
println!("-------------------------------"); | |
println!("#{}: debt: {}", di, &d.debt); | |
println!("companies: {:?}\nphones: {:?}", &d.companies, &d.phones); | |
} | |
if fflag < 2 { | |
println!("USAGE: fastpivot -f \"file 1\" -f \"file 2\" ..."); | |
} | |
} | |
fn process_file(path: &str, res: &mut Debtors) -> (i32, usize) { | |
let mut count = 0; | |
let err_count = AtomicUsize::new(0); | |
match std::fs::File::open(path) { | |
Ok(file) => { | |
let mmap = match unsafe { Mmap::map(&file) } { | |
Ok(mmap) => mmap, | |
Err(e) => { | |
println!("Cannot open '{}': {:?}", path, e); | |
return (0, 0); | |
} | |
}; | |
let mut results = vec![Debtors::new(), Debtors::new(), Debtors::new(), Debtors::new()]; | |
crossbeam::scope(|scope| { | |
let (s, r) = channel::bounded(20); | |
for debtor in &mut results { | |
let r = r.clone(); | |
let err_count = &err_count; | |
scope.spawn(move |_| { | |
while let Ok(obj) = r.recv() { | |
match serde_json::from_slice(obj) { | |
Ok(o) => { | |
process_object(o, debtor); | |
} | |
Err(e) => { | |
println!("JSON ERROR: {}:\n{}", e, String::from_utf8_lossy(obj)); | |
err_count.fetch_add(1, Ordering::Relaxed); | |
} | |
} | |
} | |
}); | |
} | |
let mut braces = 0; | |
let mut quote = false; | |
let mut backslash = false; | |
let mut start_idx = 0; | |
for (idx, &b) in mmap.iter().enumerate() { | |
match b { | |
b'{' if !quote => { | |
if braces == 0 { | |
start_idx = idx; | |
} | |
braces += 1; | |
} | |
b'}' if !quote => { | |
braces -= 1; | |
if braces == 0 { //object formed ! | |
let obj = &mmap[start_idx ..= idx]; | |
s.send(obj).unwrap(); | |
count += 1; | |
} | |
} | |
b'\\' if quote => { | |
backslash = true; | |
continue; | |
} | |
b'"' if !backslash => { | |
quote = !quote; | |
} | |
_ => {} | |
}; | |
backslash = false; | |
} | |
}).unwrap(); | |
for debtor in results { | |
merge_result(debtor, res); | |
}; | |
} | |
Err(e) => { | |
println!("ERROR: {}", e); | |
} | |
} | |
(count, err_count.load(Ordering::SeqCst)) | |
} | |
fn merge_result(part: Debtors, result: &mut Debtors) { | |
for dr in part.all { | |
let di = match dr.phones.iter().filter_map(|p| result.index_by_phone.get(p)).next() { | |
Some(i) => *i, | |
None => { | |
result.all.push(Debtor::new()); | |
result.all.len()-1 | |
} | |
}; | |
let d = &mut result.all[di]; | |
d.companies.extend(dr.companies.into_iter()); | |
for p in &dr.phones { | |
d.phones.insert(p.to_owned()); | |
result.index_by_phone.insert(p.to_owned(), di); | |
} | |
d.debt += dr.debt; | |
} | |
} | |
fn set_insert_ref<'a, R, T>(set: &mut HashSet<T>, r: &'a R) | |
where | |
R: ?Sized, | |
R: std::hash::Hash + Eq + ToOwned<Owned = T>, | |
T: std::hash::Hash + Eq + std::borrow::Borrow<R>, | |
{ | |
if !set.contains(r) { | |
set.insert(r.to_owned()); | |
} | |
} | |
fn map_insert_ref<'a, R, K, V>(map: &mut HashMap<K, V>, k: &'a R, v: V) | |
where | |
R: ?Sized, | |
R: std::hash::Hash + Eq + ToOwned<Owned = K>, | |
K: std::hash::Hash + Eq + std::borrow::Borrow<R>, | |
{ | |
if !map.contains_key(k) { | |
map.insert(k.to_owned(), v); | |
} | |
} | |
fn process_object(o: Rec, res: &mut Debtors) { | |
let dr = extract_data(o); | |
//println!("{} - {:?} - {}", &dr.company, &dr.phones, &dr.debt,); | |
let mut di = None; //debtor index search result | |
for &p in &dr.phones { | |
if let Some(&i) = res.index_by_phone.get(p) { | |
di = Some(i); | |
break; | |
} | |
} | |
let d; | |
let idx; | |
match di { | |
Some(i) => { //existing debtor | |
idx = i; | |
d = &mut res.all[i]; | |
} | |
None => { //new debtor | |
idx = res.all.len(); | |
res.all.push(Debtor::new()); | |
d = res.all.last_mut().unwrap(); | |
d.debt = 0.; | |
} | |
}; | |
set_insert_ref(&mut d.companies, dr.company.as_ref()); | |
for &p in &dr.phones { | |
set_insert_ref(&mut d.phones, p); | |
map_insert_ref(&mut res.index_by_phone, p, idx); | |
} | |
d.debt += dr.debt; | |
} | |
fn raw2str(raw: &RawValue) -> &str { | |
let payload = raw.get(); | |
if payload.starts_with('"') { | |
&payload[1 .. payload.len() -1] | |
} else if payload.starts_with(|ch| char::is_digit(ch, 10) ) { | |
payload | |
} else { | |
"" | |
} | |
} | |
fn extract_data(o: Rec) -> DebtRec { | |
use std::str::FromStr; | |
let company = match o.company { | |
Company::Name(c) | Company::NameRec{ name: c} => c, | |
}; | |
let mut phones = smallvec![]; | |
let payload = o.phones.get(); | |
if payload.starts_with('[') { | |
let ps: SmallVec<[&RawValue; 4]> = serde_json::from_str(payload).unwrap_or_else(|_| smallvec![]); | |
phones.extend(ps.into_iter().map(raw2str)); | |
} else { | |
phones.push(raw2str(o.phones)); | |
}; | |
if let Some(p) = o.phone { | |
phones.push(raw2str(p)); | |
}; | |
let debt = f64::from_str(raw2str(o.debt)).unwrap_or(0.0); | |
DebtRec { | |
company, phones, debt, | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment