Skip to content

Instantly share code, notes, and snippets.

@kitsook
Created December 21, 2022 05:57
Embed
What would you like to do?
Testing HashMap with multi-thread operations under specific conditions
package net.clarenceho;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Supplier;
import java.util.logging.Logger;
import java.util.stream.Collectors;
/**
* This is a test on how far we can push HashMap in multi-thread environment.
* Trying to solve a real life problem and this is the scenario to simulate:
* - a Hashmap to store "target" objects as values
* - all methods of the target object are synchronized (in real life, the "target" is from 3rd party. nothing we can do)
* - the target object has a "destroy" method to mark itself unusable anymore
* - there are multiple reader threads to get targets from the Hashmap and call *one* of the non-destroy methods
* - there is a single writer thread to replace target objects in the HashMap and call the "destroy" method on replaced
* objects
* - reader threads are executed a lot more frequent than the writer thread (in real life situation, readers are called
* millions time every day while writer are executed maybe once a month)
* - during multi-thread execution, the HashMap is only used with "get" (by readers) and "putAll" (by the writer)
*
* Now, generally we should be using a thread-safe collection instead of HashMap. Or, use something like a StampedLock
* to control read / write operations. But reader performance is a top priority. And given these special
* characteristics, we may be able to get away with using just the HashMap:
* - all methods to the target object are synchronized, so reader and writer can't mutate the same object concurrently
* - there is no logic to iterate the HashMap
* - only 1 writer will update the HashMap by adding / replacing values
* - a reader, after getting the target from HashMap, only calls one method from target. And if it failed because
* the target object is already marked as destroyed by writer concurrently, reader can retrieve the new target again
* and retry
* - readers won't access new values added to the HashMap, only existing values that might be replaced by the writer
*
* However, the unknown is that whether it is safe for readers to call HashMap's "get" while the writer is
* calling "putAll" (short answer: NO). This is going to be implementation specific. Looking at the source code
* from OpenJDK 8...:
* https://github.com/frohoff/jdk8u-jdk/blob/master/src/share/classes/java/util/HashMap.java#L499
*
* ... of course the code doesn't use synchronization or volatile. But the only mutation of replacement action is the
* assignment to replace old value. Even the variable is not volatile, since target's methods are synchronized and
* reader is able to retry, it should be fine for writer to replace existing values.
*
* But then, what about adding new values (even readers won't access them)?
* The underlying tree may resize and new nodes added. So reader should also retry the "get" from HashMap, not just
* the call to target's methods.
*
* DISCLAIMER: even this test seems to be working, it is like walking on thin ice. Use some kind of locking to be safe.
*/
public class Main {
private static final int NUM_READER_THREADS = 8;
private static final int READER_LOOP_COUNT = 1_000_000;
private static final Logger LOGGER = Logger.getGlobal();
public static void main(String[] args) throws Exception {
Map<Integer, Target> map = new HashMap<>();
ExecutorService executorService = Executors.newFixedThreadPool(NUM_READER_THREADS + 2);
List<CompletableFuture<Boolean>> futures = new ArrayList<>();
// populate the map with more entries than we need
for (int i = 0; i < NUM_READER_THREADS * 2; i++) {
map.put(i, new Target());
}
for (int i = 0; i < NUM_READER_THREADS; i++) {
futures.add(CompletableFuture.supplyAsync(new Reader(i, map), executorService));
}
// prepare the readers
CompletableFuture<List<Boolean>> results = CompletableFuture.allOf(
futures.toArray(new CompletableFuture<?>[0]))
.thenApply(v -> futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList())
);
// start the updater
Writer writer = new Writer(map);
executorService.execute(writer);
// start readers and check result
if (results.get().stream().anyMatch(result -> !result)) {
LOGGER.info("Failed!");
} else {
LOGGER.info("Success!");
}
writer.shutdown();
executorService.shutdown();
}
static class Reader implements Supplier<Boolean> {
private final int id;
private final Map<Integer, Target> map;
private final Random randomNum;
Reader(int id, Map<Integer, Target> map) {
this.id = id;
this.map = map;
this.randomNum = new Random(42L + id);
}
@Override
public Boolean get() {
for (int i = 0; i < READER_LOOP_COUNT; i++) {
// number of retry needed will depend on how frequent do we expect the optimistic read to fail...
int maxRetry = 5;
boolean success = false;
while (maxRetry-- > 0) {
Target newTarget = map.get(this.id);
if (newTarget == null) {
continue;
}
// simulate slow processing in reader threads that will cause the target we got destroyed by writer
try {
Thread.sleep(randomNum.nextInt(2));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
// see if the previously retrieved object is still valid
if (newTarget.doStuff()) {
success = true;
break;
}
}
if (maxRetry <= 0 && !success) {
LOGGER.severe("Max retry exhausted for " + this.id);
return false;
}
if (i % 100_000 == 0) {
LOGGER.info("#");
}
}
return true;
}
}
static class Target {
boolean valid = true;
public synchronized boolean doStuff() {
return valid;
}
public synchronized void destroy() {
valid = false;
}
}
static class Writer implements Runnable {
private final Map<Integer, Target> map;
private volatile Boolean done;
private Random randomNum = new Random(1337L);
Writer(Map<Integer, Target> map) {
this.map = map;
this.done = false;
}
public void shutdown() {
this.done = true;
}
private boolean shouldUpdate() {
return randomNum.nextInt(100) == 0;
}
@Override
public void run() {
int count = 0;
while (!done) {
Map<Integer, Target> newEntries = new HashMap<>();
Map<Integer, Target> oldEntries = new HashMap<>();
for (Map.Entry<Integer, Target> entry : map.entrySet()) {
// randomly replace existing entries that reader will access
if (shouldUpdate()) {
newEntries.put(entry.getKey(), new Target());
oldEntries.put(entry.getKey(), entry.getValue());
}
}
// !!! also add a new entry that readers won't access !!!
newEntries.put(randomNum.nextInt(999999) + NUM_READER_THREADS, new Target());
// update map and mark old values as outdated
map.putAll(newEntries);
for (Map.Entry<Integer, Target> oldEntry : oldEntries.entrySet()) {
oldEntry.getValue().destroy();
}
count++;
if (count % 10_000 == 0) {
LOGGER.info(".");
}
try {
// simulate infrequent update of objects in the map by sleeping longer than readers
Thread.sleep(randomNum.nextInt(5));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment