Skip to content

Instantly share code, notes, and snippets.

@itarato
Created March 29, 2024 21:11
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save itarato/9628c84f064d4c0b3a102ca204dada4c to your computer and use it in GitHub Desktop.
Save itarato/9628c84f064d4c0b3a102ca204dada4c to your computer and use it in GitHub Desktop.
1 Billion Record Challenge
/*
* The task is to write a ~Java~ Rust program which reads the file, calculates the
* min, mean, and max temperature value per weather station, and emits the results
* on stdout like this (i.e. sorted alphabetically by station name, and the result
* values per station in the format <min>/<mean>/<max>, rounded to one fractional digit):
*
* {
* Abha=-23.0/18.0/59.2,
* Abidjan=-16.2/26.0/67.3,
* Abéché=-10.0/29.4/69.0,
* Accra=-10.1/26.4/66.4,
* Addis Ababa=-23.7/16.0/67.0,
* Adelaide=-27.8/17.3/58.5, ...
* }
*/
extern crate num_cpus;
use std::{
collections::BTreeMap,
fs::File,
io::{self, BufRead},
path::Path,
};
struct CityData {
count: usize,
min: f32,
max: f32,
total: f32,
}
impl Default for CityData {
fn default() -> Self {
CityData {
count: 0,
min: f32::INFINITY,
max: f32::NEG_INFINITY,
total: 0.0,
}
}
}
impl CityData {
fn mean(&self) -> f32 {
self.total / self.count as f32
}
fn record_temp(&mut self, temp: f32) {
self.count += 1;
self.total += temp;
if temp < self.min {
self.min = temp;
}
if temp > self.max {
self.max = temp;
}
}
fn merge(&mut self, other: &CityData) {
self.count += other.count;
self.total += other.total;
if other.min < self.min {
self.min = other.min;
}
if other.max > self.max {
self.max = other.max;
}
}
fn dump(&self, name: &String) {
println!("\t{}={}/{}/{},", name, self.min, self.mean(), self.max);
}
}
fn str_to_f32(s: &str) -> f32 {
let mut neg = false;
let mut raw = 0u32;
let mut digits = 1.0f32;
let mut digits_on = false;
for c in s.bytes() {
if digits_on {
digits *= 10.0;
}
if c == b'-' {
neg = true;
} else if c == b'.' {
digits_on = true;
} else {
raw = raw * 10 + (c - b'0') as u32
}
}
if neg {
-(raw as f32) / digits
} else {
raw as f32 / digits
}
}
fn main() {
let args = std::env::args().collect::<Vec<_>>();
if args.len() != 3 {
panic!("Bad call. Usage: <executable> <DATA_FILE> <LINE-HINT>");
}
let filename = &args[1];
let hint = usize::from_str_radix(&args[2], 10).unwrap();
let cores = num_cpus::get();
let batch_size = ((hint / cores) + 1) * 2;
let lines = read_lines(filename).unwrap();
let mut city_counter: BTreeMap<String, CityData> = BTreeMap::new();
let mut threads = vec![];
let mut line_iterator = lines.flatten();
let mut work_done = false;
loop {
let mut batch = Vec::with_capacity(batch_size);
for _ in 0..batch_size {
if let Some(line) = line_iterator.next() {
batch.push(line);
} else {
work_done = true;
break;
}
}
let thread_handle = std::thread::spawn(move || {
let mut city_counter: BTreeMap<String, CityData> = BTreeMap::new();
for line in batch {
let comma_pos = line.find(',').unwrap();
let city = line[0..comma_pos].to_owned();
city_counter
.entry(city)
.or_default()
.record_temp(str_to_f32(&line[comma_pos + 1..]));
}
city_counter
});
threads.push(thread_handle);
if work_done {
break;
}
}
for thread in threads {
let batch_result = thread.join().unwrap();
for (k, v) in batch_result {
city_counter.entry(k).or_default().merge(&v);
}
}
println!("{{");
for (k, v) in city_counter {
v.dump(&k);
}
println!("}}");
}
fn read_lines<P>(filename: P) -> io::Result<io::Lines<io::BufReader<File>>>
where
P: AsRef<Path>,
{
let file = File::open(filename)?;
Ok(io::BufReader::new(file).lines())
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment