Skip to content

Instantly share code, notes, and snippets.

@red75prime red75prime/main.rs
Last active Jun 10, 2019

Embed
What would you like to do?
// [dependencies]
// serde = "1.0"
// serde_derive = "1.0"
// serde_json = { version = "1.0", features = ["raw_value"] }
// fnv = "1.0.5"
// memmap = "0.7"
// smallvec = { version = "0.6", features = ["serde"] }
// jemallocator = "0.3"
use fnv::{FnvHashSet as HashSet, FnvHashMap as HashMap};
use memmap::Mmap;
use serde_derive::{Deserialize};
use serde_json::{value::RawValue};
use smallvec::{SmallVec, smallvec};
use std::borrow::Cow;
use std::sync::atomic::{AtomicUsize, Ordering};
use jemallocator::Jemalloc;
#[global_allocator]
static ALLOC: Jemalloc = Jemalloc;
#[derive(Debug, Deserialize)]
struct Rec<'a> {
#[serde(borrow)]
company: &'a RawValue,
#[serde(borrow)]
debt: &'a RawValue,
#[serde(borrow)]
phones: &'a RawValue,
#[serde(borrow)]
phone: Option<&'a RawValue>,
}
//source data
struct DebtRec<'a> {
pub company: Cow<'a, str>,
pub phones: SmallVec<[Cow<'a, str>; 4]>,
pub debt: f64
}
//result data
#[derive(Debug)]
struct Debtor {
companies: HashSet<String>,
phones: HashSet<String>,
debt: f64
}
struct Debtors {
all: Vec<Debtor>,
index_by_phone: HashMap<String, usize>
}
impl Debtor {
fn new() -> Debtor {
Debtor {
companies: HashSet::default(),
phones: HashSet::default(),
debt: 0.0
}
}
}
impl Debtors {
fn new() -> Debtors {
Debtors {
all: Vec::new(),
index_by_phone: HashMap::default()
}
}
}
fn main() {
let mut res = Debtors::new();
let mut fflag = 0;
for arg in std::env::args() {
if arg == "-f" {
fflag = 1;
}
else if fflag == 1 {
fflag = 2;
println!("{}:", &arg);
let start = std::time::Instant::now();
let (count, err_count) = process_file(&arg, &mut res);
println!("PROCESSED: {} objects in {:?}, {} errors found", count, start.elapsed(), err_count);
}
}
for (di, d) in res.all.iter().enumerate() {
println!("-------------------------------");
println!("#{}: debt: {}", di, &d.debt);
println!("companies: {:?}\nphones: {:?}", &d.companies, &d.phones);
}
if fflag < 2 {
println!("USAGE: fastpivot -f \"file 1\" -f \"file 2\" ...");
}
}
fn process_file(path: &str, res: &mut Debtors) -> (i32, usize) {
let mut count = 0;
let err_count = AtomicUsize::new(0);
match std::fs::File::open(path) {
Ok(file) => {
let mmap = match unsafe { Mmap::map(&file) } {
Ok(mmap) => mmap,
Err(e) => {
println!("Cannot open '{}': {:?}", path, e);
return (0, 0);
}
};
let mut braces = 0;
let mut quote = false;
let mut backslash = false;
let mut start_idx = 0;
for (idx, &b) in mmap.iter().enumerate() {
match b {
b'{' if !quote => {
if braces == 0 {
start_idx = idx;
}
braces += 1;
}
b'}' if !quote => {
braces -= 1;
if braces == 0 { //object formed !
let obj = &mmap[start_idx ..= idx];
process_object(obj, res, &err_count);
count += 1;
}
}
b'\\' if quote => {
backslash = true;
continue;
}
b'"' if !backslash => {
quote = !quote;
}
_ => {}
};
backslash = false;
}
}
Err(e) => {
println!("ERROR: {}", e);
}
}
(count, err_count.load(Ordering::SeqCst))
}
fn set_insert_cow<'a, R, T>(set: &mut HashSet<T>, r: Cow<'a, R>)
where
R: ?Sized,
R: std::hash::Hash + Eq + ToOwned<Owned = T>,
T: Clone + std::hash::Hash + Eq + std::borrow::Borrow<R>,
{
if !set.contains(r.as_ref()) {
set.insert(r.into_owned());
}
}
fn map_insert_cow<'a, R, K, V>(map: &mut HashMap<K, V>, k: Cow<'a, R>, v: V)
where
R: ?Sized,
R: std::hash::Hash + Eq + ToOwned<Owned = K>,
K: Clone + std::hash::Hash + Eq + std::borrow::Borrow<R>,
{
if !map.contains_key(k.as_ref()) {
map.insert(k.into_owned(), v);
}
}
fn process_object(obj: &[u8], res: &mut Debtors, err_count: &AtomicUsize) {
let obj = unsafe { std::str::from_utf8_unchecked(obj) };
let o =
match serde_json::from_str(obj) {
Ok(o) => { o }
Err(e) => {
println!("JSON ERROR: {}:\n{}", e, obj);
err_count.fetch_add(1, Ordering::Relaxed);
return;
}
};
let dr = extract_data(o);
//println!("{} - {:?} - {}", &dr.company, &dr.phones, &dr.debt,);
let mut di = None; //debtor index search result
for p in &dr.phones {
if let Some(&i) = res.index_by_phone.get(p.as_ref()) {
di = Some(i);
break;
}
}
let d;
let idx;
match di {
Some(i) => { //existing debtor
idx = i;
d = &mut res.all[i];
}
None => { //new debtor
idx = res.all.len();
res.all.push(Debtor::new());
d = res.all.last_mut().unwrap();
d.debt = 0.;
}
};
set_insert_cow(&mut d.companies, dr.company);
for p in dr.phones {
set_insert_cow(&mut d.phones, p.clone());
map_insert_cow(&mut res.index_by_phone, p, idx);
}
d.debt += dr.debt;
}
#[derive(Debug, Deserialize)]
struct BorrowedStr<'a>(
#[serde(borrow)]
Cow<'a, str>
);
fn raw2str(raw: &RawValue) -> Cow<str> {
let payload = raw.get();
if payload.starts_with('"') {
let b_str = serde_json::from_str(payload).unwrap_or_else(|_| BorrowedStr(Cow::default()) );
b_str.0
} else if payload.starts_with(|ch: char| ch.is_ascii_digit() ) {
payload.into()
} else {
"".into()
}
}
#[derive(Debug, Deserialize)]
struct CompanyObj<'a> {
#[serde(borrow)]
name: Cow<'a, str>,
}
fn extract_data(o: Rec) -> DebtRec {
use std::str::FromStr;
let company = {
let payload = o.company.get();
if payload.starts_with('{') {
let company = serde_json::from_str(payload).unwrap_or_else(|_| CompanyObj{ name: "".into() });
company.name
} else {
let company = serde_json::from_str(payload).unwrap_or_else(|_| BorrowedStr("".into()));
company.0
}
};
if let Cow::Owned(_) = &company {
println!("Warning: owned '{:?}'", company);
};
let mut phones = smallvec![];
let payload = o.phones.get();
if payload.starts_with('[') {
let ps: SmallVec<[&RawValue; 4]> = serde_json::from_str(payload).unwrap_or_else(|_| smallvec![]);
phones.extend(ps.into_iter().map(raw2str));
} else {
phones.push(raw2str(o.phones));
};
if let Some(p) = o.phone {
phones.push(raw2str(p));
};
let debt = f64::from_str(raw2str(o.debt).as_ref()).unwrap_or(0.0);
DebtRec {
company, phones, debt,
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.