Skip to content

Instantly share code, notes, and snippets.

@simolus3
Created January 10, 2024 16:56
Show Gist options
  • Save simolus3/0ae5a63d6bf499c53aeb7b75701d8f5e to your computer and use it in GitHub Desktop.
Save simolus3/0ae5a63d6bf499c53aeb7b75701d8f5e to your computer and use it in GitHub Desktop.
Adapted 1brc challenge using mmap from https://github.com/osaxma/1brc_dart
import 'dart:ffi';
import 'dart:io';
import 'dart:isolate';
import 'dart:math';
import 'dart:typed_data';
import 'package:ffi/ffi.dart';
@Native<Pointer<Void> Function(Pointer<Void>, Size, Int, Int, Int, Size)>()
external Pointer<Void> mmap(
Pointer<Void> addr, int length, int prot, int flags, int fd, int offset);
@Native<Int Function(Pointer<Utf8>, Int)>()
external int open(Pointer<Utf8> path, int mode);
const rows = 1 * 1000 * 1000 * 1000;
const maxBytesPerRow = 107; // see README
const semiColonCodeUnit = 59;
const newLineCodeUnit = 10;
class Data {
final String name;
double sum = 0;
double count = 0;
double minimum = double.infinity;
double maximum = double.negativeInfinity;
double get average => sum / count;
Data(this.name);
void merge(Data data) {
assert(data.name == name);
sum = data.sum + sum;
maximum = max(data.maximum, maximum);
minimum = min(data.minimum, minimum);
count = data.count + count;
}
@override
String toString() {
return '$name=$minimum/$average/$maximum';
}
static String dataToString(Iterable<Data> data) {
final buff = StringBuffer();
data.forEach((d) => buff.writeln(d.toString()));
return buff.toString();
}
}
void main(List<String> args) async {
const isolates = int.fromEnvironment('isolates', defaultValue: 24);
final String filePath = args.single;
final sw = Stopwatch()..start();
final totalBytes = File(filePath).lengthSync();
final bytesPerIsolate = totalBytes ~/ isolates;
final remainder = totalBytes % isolates;
final chunks = List.generate(isolates, (i) {
final start = i * bytesPerIsolate;
final isLast = i == isolates - 1;
final end = (start + bytesPerIsolate) - 1 + (isLast ? remainder : 0);
return (start, end);
});
final fd = open(filePath.toNativeUtf8(), 0);
if (fd < 0) {
throw 'open';
}
final ptr =
mmap(nullptr, totalBytes, 1 /*PROT_READ*/, 2 /* MAP_PRIVATE */, fd, 0);
if (ptr.address == 0) {
throw 'mmap';
}
final futures = <Future<Map<String, Data>>>[];
for (var c in chunks) {
final address = ptr.address;
futures.add(Isolate.run(
() {
return computeChunk(c.$1, c.$2, totalBytes - 1, address);
},
));
}
final res = await Future.wait(futures).then((data) => mergeData(data));
final buff = StringBuffer();
res.values.forEach((d) => buff.writeln(d.toString()));
sw.stop();
print(buff.toString());
print('took ${sw.elapsed}');
}
Future<Map<String, Data>> computeChunk(
int startByte, int endByte, int fileLength, int baseAddress) async {
final endPadding = endByte != fileLength ? maxBytesPerRow : 0;
final length = (endByte - startByte);
final bytes = Pointer<Uint8>.fromAddress(baseAddress)
.elementAt(startByte)
.asTypedList(endByte - startByte);
var fromIndex = 0;
var toIndex = length;
// effective start
if (startByte != 0) {
fromIndex = bytes.indexOf(newLineCodeUnit) + 1;
}
// effective end
if (endPadding != 0) {
toIndex = bytes.indexOf(newLineCodeUnit, length);
}
final cities = <String, Data>{};
final city = BytesBuilder(copy: false);
var start = fromIndex;
var end = fromIndex;
int b = 0;
for (fromIndex; fromIndex < toIndex; fromIndex++) {
b = bytes[fromIndex];
if (b == semiColonCodeUnit) {
city.add(Uint8List.sublistView(bytes, start, end));
end++;
start = end;
continue;
} else if (b == newLineCodeUnit) {
final name = String.fromCharCodes(city.takeBytes());
final temp = double.parse(
String.fromCharCodes(Uint8List.sublistView(bytes, start, end)));
final data = cities.putIfAbsent(name, () => Data(name));
data
..sum = data.sum + temp
..maximum = max(data.maximum, temp)
..minimum = min(data.minimum, temp)
..count = data.count + 1;
end++;
start = end;
continue;
} else {
end += 1;
}
}
return cities;
}
Map<String, Data> mergeData(List<Map<String, Data>> data) {
final merged = <String, Data>{};
for (var d in data) {
for (var entry in d.entries) {
final d = merged.putIfAbsent(entry.key, () => Data(entry.key));
d.merge(entry.value);
}
}
return merged;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment