Skip to content

Instantly share code, notes, and snippets.

@red75prime red75prime/main.rs
Created May 13, 2019

Embed
What would you like to do?
// [dependencies]
// serde = "1.0"
// serde_derive = "1.0"
// serde_json = { version = "1.0", features = ["raw_value"] }
// fnv = "1.0.5"
// memmap = "0.7"
// smallvec = { version = "0.6", features = ["serde"] }
// crossbeam = "0.7"
use crossbeam::{self, channel};
use fnv::{FnvHashSet as HashSet, FnvHashMap as HashMap};
use memmap::Mmap;
use serde_derive::{Deserialize};
use serde_json::{value::RawValue};
use smallvec::{SmallVec, smallvec};
use std::borrow::Cow;
use std::sync::atomic::{AtomicUsize, Ordering};
// Cow is required to deserialize
// strings like "OOO \"Night and brambles\""
#[derive(Debug, Deserialize)]
#[serde(untagged)]
enum Company<'a> {
Name(Cow<'a, str>),
NameRec{ name: Cow<'a, str>},
}
#[derive(Debug, Deserialize)]
struct Rec<'a> {
#[serde(borrow)]
company: Company<'a>,
debt: &'a RawValue,
phones: &'a RawValue,
phone: Option<&'a RawValue>,
}
//source data
struct DebtRec<'a> {
pub company: Cow<'a, str>,
pub phones: SmallVec<[&'a str; 4]>,
pub debt: f64
}
//result data
#[derive(Debug)]
struct Debtor {
companies: HashSet<String>,
phones: HashSet<String>,
debt: f64
}
struct Debtors {
all: Vec<Debtor>,
index_by_phone: HashMap<String, usize>
}
impl Debtor {
fn new() -> Debtor {
Debtor {
companies: HashSet::default(),
phones: HashSet::default(),
debt: 0.0
}
}
}
impl Debtors {
fn new() -> Debtors {
Debtors {
all: Vec::new(),
index_by_phone: HashMap::default()
}
}
}
fn main() {
let mut res = Debtors::new();
let mut fflag = 0;
for arg in std::env::args() {
if arg == "-f" {
fflag = 1;
}
else if fflag == 1 {
fflag = 2;
println!("{}:", &arg);
let start = std::time::Instant::now();
let (count, err_count) = process_file(&arg, &mut res);
println!("PROCESSED: {} objects in {:?}, {} errors found", count, start.elapsed(), err_count);
}
}
for (di, d) in res.all.iter().enumerate() {
println!("-------------------------------");
println!("#{}: debt: {}", di, &d.debt);
println!("companies: {:?}\nphones: {:?}", &d.companies, &d.phones);
}
if fflag < 2 {
println!("USAGE: fastpivot -f \"file 1\" -f \"file 2\" ...");
}
}
fn process_file(path: &str, res: &mut Debtors) -> (i32, usize) {
let mut count = 0;
let err_count = AtomicUsize::new(0);
match std::fs::File::open(path) {
Ok(file) => {
let mmap = match unsafe { Mmap::map(&file) } {
Ok(mmap) => mmap,
Err(e) => {
println!("Cannot open '{}': {:?}", path, e);
return (0, 0);
}
};
let mut results = vec![Debtors::new(), Debtors::new(), Debtors::new(), Debtors::new()];
crossbeam::scope(|scope| {
let (s, r) = channel::bounded(20);
for debtor in &mut results {
let r = r.clone();
let err_count = &err_count;
scope.spawn(move |_| {
while let Ok(obj) = r.recv() {
match serde_json::from_slice(obj) {
Ok(o) => {
process_object(o, debtor);
}
Err(e) => {
println!("JSON ERROR: {}:\n{}", e, String::from_utf8_lossy(obj));
err_count.fetch_add(1, Ordering::Relaxed);
}
}
}
});
}
let mut braces = 0;
let mut quote = false;
let mut backslash = false;
let mut start_idx = 0;
for (idx, &b) in mmap.iter().enumerate() {
match b {
b'{' if !quote => {
if braces == 0 {
start_idx = idx;
}
braces += 1;
}
b'}' if !quote => {
braces -= 1;
if braces == 0 { //object formed !
let obj = &mmap[start_idx ..= idx];
s.send(obj).unwrap();
count += 1;
}
}
b'\\' if quote => {
backslash = true;
continue;
}
b'"' if !backslash => {
quote = !quote;
}
_ => {}
};
backslash = false;
}
}).unwrap();
for debtor in results {
merge_result(debtor, res);
};
}
Err(e) => {
println!("ERROR: {}", e);
}
}
(count, err_count.load(Ordering::SeqCst))
}
fn merge_result(part: Debtors, result: &mut Debtors) {
for dr in part.all {
let di = match dr.phones.iter().filter_map(|p| result.index_by_phone.get(p)).next() {
Some(i) => *i,
None => {
result.all.push(Debtor::new());
result.all.len()-1
}
};
let d = &mut result.all[di];
d.companies.extend(dr.companies.into_iter());
for p in &dr.phones {
d.phones.insert(p.to_owned());
result.index_by_phone.insert(p.to_owned(), di);
}
d.debt += dr.debt;
}
}
fn set_insert_ref<'a, R, T>(set: &mut HashSet<T>, r: &'a R)
where
R: ?Sized,
R: std::hash::Hash + Eq + ToOwned<Owned = T>,
T: std::hash::Hash + Eq + std::borrow::Borrow<R>,
{
if !set.contains(r) {
set.insert(r.to_owned());
}
}
fn map_insert_ref<'a, R, K, V>(map: &mut HashMap<K, V>, k: &'a R, v: V)
where
R: ?Sized,
R: std::hash::Hash + Eq + ToOwned<Owned = K>,
K: std::hash::Hash + Eq + std::borrow::Borrow<R>,
{
if !map.contains_key(k) {
map.insert(k.to_owned(), v);
}
}
fn process_object(o: Rec, res: &mut Debtors) {
let dr = extract_data(o);
//println!("{} - {:?} - {}", &dr.company, &dr.phones, &dr.debt,);
let mut di = None; //debtor index search result
for &p in &dr.phones {
if let Some(&i) = res.index_by_phone.get(p) {
di = Some(i);
break;
}
}
let d;
let idx;
match di {
Some(i) => { //existing debtor
idx = i;
d = &mut res.all[i];
}
None => { //new debtor
idx = res.all.len();
res.all.push(Debtor::new());
d = res.all.last_mut().unwrap();
d.debt = 0.;
}
};
set_insert_ref(&mut d.companies, dr.company.as_ref());
for &p in &dr.phones {
set_insert_ref(&mut d.phones, p);
map_insert_ref(&mut res.index_by_phone, p, idx);
}
d.debt += dr.debt;
}
fn raw2str(raw: &RawValue) -> &str {
let payload = raw.get();
if payload.starts_with('"') {
&payload[1 .. payload.len() -1]
} else if payload.starts_with(|ch| char::is_digit(ch, 10) ) {
payload
} else {
""
}
}
fn extract_data(o: Rec) -> DebtRec {
use std::str::FromStr;
let company = match o.company {
Company::Name(c) | Company::NameRec{ name: c} => c,
};
let mut phones = smallvec![];
let payload = o.phones.get();
if payload.starts_with('[') {
let ps: SmallVec<[&RawValue; 4]> = serde_json::from_str(payload).unwrap_or_else(|_| smallvec![]);
phones.extend(ps.into_iter().map(raw2str));
} else {
phones.push(raw2str(o.phones));
};
if let Some(p) = o.phone {
phones.push(raw2str(p));
};
let debt = f64::from_str(raw2str(o.debt)).unwrap_or(0.0);
DebtRec {
company, phones, debt,
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.