Skip to content

Instantly share code, notes, and snippets.

@sma
Last active April 25, 2024 10:57
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save sma/c21d6706dcc2a361abd97f657c465b33 to your computer and use it in GitHub Desktop.
Save sma/c21d6706dcc2a361abd97f657c465b33 to your computer and use it in GitHub Desktop.
one billion (or at least a couple of millions) rows challenge
import 'dart:io';
import './data.dart';
void base() {
final sw = Stopwatch()..start();
final lines = File(path).readAsLinesSync();
print(sw.elapsed);
final measurements = <String, Measurement>{};
for (final line in lines) {
final i = line.indexOf(';');
final name = line.substring(0, i);
final value = double.parse(line.substring(i + 1));
measurements.putIfAbsent(name, () => Measurement()).add(value);
}
print(measurements.length);
print(sw.elapsed);
}
class Measurement {
double min = double.infinity;
double max = double.negativeInfinity;
double sum = 0;
int count = 0;
double get avg => sum / count;
void add(double value) {
if (min > value) min = value;
if (max < value) max = value;
sum += value;
count++;
}
}
void main() {
base();
}
import 'dart:io';
import './data.dart';
void bytes() {
final sw = Stopwatch()..start();
final bytes = File(path).readAsBytesSync();
print(sw.elapsed);
final measurements = <String, Measurement>{};
final length = bytes.length;
for (var i = 0; i < length;) {
final j = bytes.indexOf(59, i);
final name = String.fromCharCodes(bytes, i, j);
i = j + 1;
final negative = bytes[i] == 45;
if (negative) i++;
var r = 0, c = 0;
while ((c = bytes[i++]) != 10) {
if (c != 46) {
r = r * 10 + c - 48;
}
}
if (negative) r = -r;
measurements.putIfAbsent(name, Measurement.new).add(r / 10000);
}
print(measurements.length);
print(sw.elapsed);
}
class Measurement {
double min = double.infinity;
double max = double.negativeInfinity;
double sum = 0;
int count = 0;
double get avg => sum / count;
void add(double value) {
if (min > value) min = value;
if (max < value) max = value;
sum += value;
count++;
}
}
void main() {
bytes();
}
const path = 'data/100m';
url=https://raw.githubusercontent.com/gunnarmorling/1brc/main/data/weather_stations.csv
prepare:
curl -s ${url} | grep -v '^#' >data2/weather.csv
dart run bin/prepare.dart 1_000_000 >data2/1m
dart run bin/prepare.dart 10_000_000 >data2/10m
dart run bin/prepare.dart 100_000_000 >data2/100m
dart run bin/prepare.dart 1_000_000_000 >data2/1g
clean:
rm data/*
import 'dart:io';
import 'dart:isolate';
import './data.dart';
void multi() async {
final sw = Stopwatch()..start();
final bytes = File(path).readAsBytesSync();
print(sw.elapsed);
final n = 4; // Platform.numberOfProcessors;
final chunks = List.generate(n, (_) => Chunk());
for (var i = 1; i < n; i++) {
var j = (i * bytes.length) ~/ n;
while (bytes[j++] != 10) {}
chunks[i - 1].end = j;
chunks[i].start = j;
}
chunks[n - 1].end = bytes.length;
final measurementsFromChunks = await Future.wait(chunks.map((chunk) {
return Isolate.run(() {
final measurements = <String, Measurement>{};
for (var i = chunk.start; i < chunk.end;) {
final j = bytes.indexOf(59, i);
final name = String.fromCharCodes(bytes, i, j);
i = j + 1;
final n = bytes[i] == 45;
if (n) i++;
var r = 0, c = 0;
while ((c = bytes[i++]) != 10) {
if (c != 46) {
r = r * 10 + c - 48;
}
}
if (n) r = -r;
measurements.putIfAbsent(name, Measurement.new).add(r / 10000);
}
return measurements;
});
}));
print(sw.elapsed);
final measurements = <String, Measurement>{};
for (final measurementsFromChunk in measurementsFromChunks) {
for (final e in measurementsFromChunk.entries) {
final m = measurements[e.key];
if (m != null) {
m.min = m.min < e.value.min ? m.min : e.value.min;
m.max = m.max > e.value.max ? m.max : e.value.max;
m.sum += e.value.sum;
m.count += e.value.count;
} else {
measurements[e.key] = e.value;
}
}
}
print(measurements.length);
print(sw.elapsed);
}
class Chunk {
var start = 0;
var end = 0;
}
class Measurement {
double min = double.infinity;
double max = double.negativeInfinity;
double sum = 0;
int count = 0;
double get avg => sum / count;
void add(double value) {
if (min > value) min = value;
if (max < value) max = value;
sum += value;
count++;
}
}
void main() {
multi();
}
import 'dart:io';
import 'dart:math';
void main(List<String> args) {
final random = Random(13);
final count = int.parse(args.single.replaceAll('_', ''));
final lines = File('data/weather.csv').readAsLinesSync();
final names = lines.map((line) => line.split(';').first).toSet().toList()..shuffle(random);
final buffer = StringBuffer();
const length = 1000000;
for (var i = 0; i < count; i += length) {
for (var j = 0; j < min(count - i, length); j++) {
final value = random.nextDouble() * 200 - 100;
buffer
..write(names[random.nextInt(names.length)])
..write(';')
..write(value.toStringAsFixed(4))
..write('\n');
}
stdout.write(buffer);
buffer.clear();
}
}
import 'dart:io';
import 'dart:isolate';
import 'dart:typed_data';
import './data.dart';
Future<void> compute((Chunk chunk, SendPort p) data) async {
final (chunk, p) = data;
final buffer = BytesBuilder();
final stream = File(path).openRead(chunk.start, chunk.end);
await for (final block in stream) {
buffer.add(block);
}
final bytes = buffer.takeBytes().asUnmodifiableView();
final measurements = <String, Measurement>{};
for (var i = 0; i < bytes.length;) {
final j = bytes.indexOf(59, i);
final name = String.fromCharCodes(bytes, i, j);
i = j + 1;
final n = bytes[i] == 45;
if (n) i++;
var r = 0, c = 0;
while ((c = bytes[i++]) != 10) {
if (c != 46) {
r = r * 10 + c - 48;
}
}
if (n) r = -r;
measurements.putIfAbsent(name, Measurement.new).add(r / 10000);
}
Isolate.exit(p, measurements);
}
void multi() async {
final sw = Stopwatch()..start();
Uint8List? bytes = File(path).readAsBytesSync();
print(sw.elapsed);
final n = Platform.numberOfProcessors;
final chunks = List.generate(n, (_) => Chunk());
for (var i = 1; i < n; i++) {
var j = (i * bytes.length) ~/ n;
while (bytes[j++] != 10) {}
chunks[i - 1].end = j;
chunks[i].start = j;
}
chunks[n - 1].end = bytes.length;
bytes = null;
final measurementsFromChunks = await Future.wait(chunks.map((chunk) {
final p = ReceivePort();
Isolate.spawn(compute, (chunk, p.sendPort));
return p.first.then((value) => value as Map<String, Measurement>);
}));
print(sw.elapsed);
final measurements = <String, Measurement>{};
for (final measurementsFromChunk in measurementsFromChunks) {
for (final e in measurementsFromChunk.entries) {
final m = measurements[e.key];
if (m != null) {
m.min = m.min < e.value.min ? m.min : e.value.min;
m.max = m.max > e.value.max ? m.max : e.value.max;
m.sum += e.value.sum;
m.count += e.value.count;
} else {
measurements[e.key] = e.value;
}
}
}
print(measurements.length);
print(sw.elapsed);
}
class Chunk {
var start = 0;
var end = 0;
}
class Measurement {
double min = double.infinity;
double max = double.negativeInfinity;
double sum = 0;
int count = 0;
double get avg => sum / count;
void add(double value) {
if (min > value) min = value;
if (max < value) max = value;
sum += value;
count++;
}
}
void main() {
multi();
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment