Skip to content

Instantly share code, notes, and snippets.

@thomasjungblut
Last active December 14, 2015 07:08
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save thomasjungblut/5048046 to your computer and use it in GitHub Desktop.
Save thomasjungblut/5048046 to your computer and use it in GitHub Desktop.
On-disk MergeSort Numbers and code
import java.io.File;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.hadoop.io.IntWritable;
import de.jungblut.datastructure.Merger;
import de.jungblut.datastructure.SortedFile;
public class SpillSortTimer {
public static void main(String[] args) throws Exception {
final int availableProcessors = Runtime.getRuntime().availableProcessors();
final ExecutorService pool = Executors
.newFixedThreadPool(availableProcessors);
final ExecutorCompletionService<File> service = new ExecutorCompletionService<>(
pool);
long numBytes = (1000 * 1000 * 1000);
// sort 1gb of data
final int numInts = (int) (numBytes / 4) / availableProcessors;
System.out.println("Writing and sorting " + numBytes + " bytes as "
+ numBytes / 4 + " integers with " + availableProcessors + " threads.");
long start = System.currentTimeMillis();
for (int i = 0; i < availableProcessors; i++) {
final int x = i;
service.submit(new Callable<File>() {
@Override
public File call() throws Exception {
String outPath = "/tmp/merged_" + x + ".bin";
try (SortedFile<IntWritable> ints = new SortedFile<>(
"/tmp/merge_sorted_" + x, outPath, 16 * 1000 * 1000,
IntWritable.class, true)) {
IntWritable intWritable = new IntWritable();
Random r = new Random();
for (int i = 0; i < numInts; i++) {
intWritable.set(r.nextInt());
ints.collect(intWritable);
}
}
return new File(outPath);
}
});
}
List<File> files = new ArrayList<>();
for (int i = 0; i < availableProcessors; i++) {
files.add(service.take().get());
}
System.out.println("Merging " + files.size() + " sorted segments after "
+ (System.currentTimeMillis() - start) / 1000 + "s.");
// merging the files
Merger.<IntWritable> merge(IntWritable.class, new File(
"/tmp/final_merge_output.bin"), files);
long taken = System.currentTimeMillis() - start;
System.out.println("Took " + (taken / 1000) + "s to mergesort overall "
+ numBytes + " bytes.");
pool.shutdownNow();
}
}
Writing and sorting 1000000000 bytes as 250000000 integers with 8 threads.
Starting intermediate merge of 9 files.
Starting intermediate merge of 9 files.
Starting intermediate merge of 9 files.
Starting intermediate merge of 9 files.
Starting intermediate merge of 9 files.
Starting intermediate merge of 9 files.
Starting intermediate merge of 9 files.
Starting intermediate merge of 9 files.
Merging 8 sorted segments after 74s.
Took 98s to mergesort overall 1000000000 bytes.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment