Skip to content

Instantly share code, notes, and snippets.

@jonathangarelick
Created July 9, 2024 19:19
Show Gist options
  • Save jonathangarelick/f890b14baad62190d195717e590a1541 to your computer and use it in GitHub Desktop.
Save jonathangarelick/f890b14baad62190d195717e590a1541 to your computer and use it in GitHub Desktop.
Implements a simple one-machine map reduce with no external dependencies.
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.*;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
// Tiny Shakespeare: https://raw.githubusercontent.com/karpathy/char-rnn/master/data/tinyshakespeare/input.txt
public class SimpleMapReduce {
static class Mapper implements Callable<ConcurrentHashMap<String, Integer>> {
List<String> input;
Mapper(List<String> input) {
this.input = input;
}
@Override
public ConcurrentHashMap<String, Integer> call() {
return input.stream()
.flatMap(line -> Arrays.stream(line.toLowerCase().split("\\W+")))
.filter(word -> !word.isEmpty())
.collect(Collectors.toConcurrentMap(
word -> word,
word -> 1,
Integer::sum,
ConcurrentHashMap::new
));
}
}
public static void main(String[] args) {
String path = "src/input.txt";
int numThreads = 10;
try {
List<String> lines = Files.readAllLines(Paths.get(path));
List<List<String>> chunks = createChunks(lines, numThreads);
processChunks(chunks, numThreads)
.forEach((key, value) -> System.out.println(key + ": " + value));
} catch (IOException e) {
e.printStackTrace();
}
}
static List<List<String>> createChunks(List<String> lines, int numChunks) {
int chunkSize = (int) Math.ceil((double) lines.size() / numChunks);
return IntStream.range(0, numChunks)
.mapToObj(i -> lines.subList(
i * chunkSize,
Math.min((i + 1) * chunkSize, lines.size())
))
.collect(Collectors.toList());
}
static ConcurrentHashMap<String, Integer> processChunks(List<List<String>> chunks, int numThreads) {
ExecutorService executorService = Executors.newFixedThreadPool(numThreads);
ConcurrentHashMap<String, Integer> result = new ConcurrentHashMap<>();
try {
List<Future<ConcurrentHashMap<String, Integer>>> futures = executorService.invokeAll(
chunks.stream().map(Mapper::new).collect(Collectors.toList())
);
for (Future<ConcurrentHashMap<String, Integer>> future : futures) {
try {
ConcurrentHashMap<String, Integer> map = future.get();
map.forEach((key, value) -> result.merge(key, value, Integer::sum));
} catch (ExecutionException e) {
e.printStackTrace();
}
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
shutdownExecutor(executorService);
}
return result;
}
static void shutdownExecutor(ExecutorService executorService) {
executorService.shutdown();
try {
if (!executorService.awaitTermination(10, TimeUnit.SECONDS)) {
executorService.shutdownNow();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment