Skip to content

Instantly share code, notes, and snippets.

@malkia
Created April 28, 2020 23:19
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save malkia/fd841c3fbcbb0185828758d4fa4bc744 to your computer and use it in GitHub Desktop.
Save malkia/fd841c3fbcbb0185828758d4fa4bc744 to your computer and use it in GitHub Desktop.
md5sum implementation in dart, doing recursive scan and using isolates (set to be half the number of Logical CPUs)
import 'dart:isolate';
import 'dart:io';
import 'dart:math';
import 'package:crypto/crypto.dart';
class Stop {}
final _stop = Stop();
class Args {
final int fileIndex;
final String fileName;
Args(this.fileIndex, this.fileName);
}
class Result {
final int fileIndex;
final Digest digest;
final int size;
Result(this.fileIndex, this.digest, this.size);
}
class Results {
final int isolateIndex;
final List<Result> results;
final DateTime startTime;
final DateTime endTime;
Results(this.isolateIndex, this.results, this.startTime, this.endTime);
}
class IsolatePort {
final int isolateIndex;
final SendPort sendPort;
IsolatePort(this.isolateIndex, this.sendPort);
}
void _runTask(IsolatePort isolatePort) async {
var startTime = DateTime.now();
int count = 0;
var results = List<Result>();
var receivePort = ReceivePort();
isolatePort.sendPort
.send(IsolatePort(isolatePort.isolateIndex, receivePort.sendPort));
await for (var msg in receivePort) {
if (msg is Args) {
var args = msg as Args;
var fileName = args.fileName;
var file = File(fileName);
var size = -1;
Digest digest = null;
try {
var bytes = await file.readAsBytes();
digest = md5.convert(bytes);
size = bytes.length;
} catch (e) {}
count = count + 1;
results.add(Result(args.fileIndex, digest, size));
continue;
}
if (msg is Stop) {
isolatePort.sendPort.send(Results(isolatePort.isolateIndex, results, startTime, DateTime.now()));
continue;
}
}
}
Future<void> main1(String path) async {
List<String> fileIndex = List<String>();
var stopwatch = Stopwatch()..start();
var finishedCount = 0;
var sendPorts = List<SendPort>.filled(max(1,Platform.numberOfProcessors~/2), null);
var indexedResults = List<Results>.filled(sendPorts.length, null);
var jobSizes = List<int>.filled(sendPorts.length, 0);
var receivePort = ReceivePort();
receivePort.listen((msg) {
if (msg is IsolatePort) {
var isolatePort = msg as IsolatePort;
sendPorts[isolatePort.isolateIndex] = isolatePort.sendPort;
return;
}
if (msg is Results) {
var results = msg as Results;
indexedResults[results.isolateIndex] = results;
finishedCount = finishedCount + 1;
if (finishedCount < sendPorts.length)
return;
receivePort.close();
var elapsedTime = stopwatch.elapsedMicroseconds/1e6;
print("${elapsedTime} seconds.");
for (var results in indexedResults)
for (var result in results.results)
if (result.size != -1)
print("${result.digest} ${result.size} ${fileIndex[result.fileIndex]} ");
var count = 0;
var size = 0;
var index = 0;
print("");
print("isolate stats:");
for (var results in indexedResults)
{
count = count + results.results.length;
var localSize = 0;
for( var result in results.results )
localSize += result.size;
size += localSize;
var duration = results.endTime.difference(results.startTime).inMicroseconds / 1e6;
print(" isolate #${index} files: ${results.results.length}, totalSize: ${localSize}, time: ${duration.toStringAsFixed(3)} (s)");
index++;
}
print("DONE $count out of ${fileIndex.length}, totalSize=${(size/(1024.0*1024.0)).toStringAsFixed(3)} MB, speed=${((size/elapsedTime)/(1024*1024)).toStringAsFixed(3)} MB/s");
}
});
for (var i = 0; i < sendPorts.length; i++)
await Isolate.spawn(_runTask, IsolatePort(i, receivePort.sendPort));
Directory(path).list( recursive: true, followLinks: false )
..listen(
(FileSystemEntity entity) async {
if( entity is File ) {
var file = entity as File;
var fileName = file.path;
var jobIndex = fileIndex.length % sendPorts.length;
var bestJobSize = -1;
for( var i=0; i<jobSizes.length; i++ )
{
if( bestJobSize == -1 || bestJobSize > jobSizes[i] )
{
bestJobSize = jobSizes[i];
jobIndex = i;
}
}
// TODO: Maybe each isolate can inform us about it's state, so we update this with better info.
// an "iso-balancer" :)
sendPorts[jobIndex].send(Args(fileIndex.length, fileName));
jobSizes[jobIndex] += (file.statSync().size + 65535) ~/ 65536;
fileIndex.add(fileName);
}
},
onDone: (() {
for (var sendPort in sendPorts)
sendPort.send(_stop);
}));
}
Future<int> main(List<String> args) async {
if( args.isEmpty ) {
print("Provide directory to be recursively scanned and print MD5's");
return 1;
}
await main1(args[0]);
return 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment