Skip to content

Instantly share code, notes, and snippets.

@bbakerman
Created September 24, 2019 06:51
Show Gist options
  • Save bbakerman/45ac8750e8a0c90484697a5f331c9d7f to your computer and use it in GitHub Desktop.
Save bbakerman/45ac8750e8a0c90484697a5f331c9d7f to your computer and use it in GitHub Desktop.
import graphql.ExecutionInput;
import graphql.ExecutionResult;
import graphql.GraphQL;
import graphql.StarWarsData;
import graphql.StarWarsSchema;
import graphql.schema.DataFetcher;
import graphql.schema.GraphQLSchema;
import graphql.schema.TypeResolver;
import graphql.schema.idl.RuntimeWiring;
import graphql.schema.idl.SchemaGenerator;
import graphql.schema.idl.SchemaParser;
import graphql.schema.idl.SchemaPrinter;
import graphql.schema.idl.TypeDefinitionRegistry;
import org.dataloader.BatchLoaderWithContext;
import org.dataloader.DataLoader;
import org.dataloader.DataLoaderOptions;
import org.dataloader.DataLoaderRegistry;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Deque;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import static graphql.schema.idl.TypeRuntimeWiring.newTypeWiring;
@SuppressWarnings("UnnecessaryLocalVariable")
public class RequestConcurrencyPOC {
static class ConcurrentWorkManager {
class WorkJob {
final PerRequestWorkPermits workPermits;
final Supplier<CompletableFuture<?>> codeToRun;
final CompletableFuture<?> cfToComplete;
public WorkJob(PerRequestWorkPermits workPermits, CompletableFuture<?> cfToComplete, Supplier<CompletableFuture<?>> codeToRun) {
this.workPermits = workPermits;
this.codeToRun = codeToRun;
this.cfToComplete = cfToComplete;
}
}
ExecutorService executorService;
Deque<WorkJob> workJobs;
public ConcurrentWorkManager(ExecutorService executorService) {
this.executorService = executorService;
this.workJobs = new ConcurrentLinkedDeque<>();
}
public <T> CompletableFuture<T> execute(PerRequestWorkPermits requestWorkPermits, Supplier<CompletableFuture<?>> codeToRun) {
CompletableFuture<T> cfToBeDone = new CompletableFuture<>();
this.workJobs.offer(new WorkJob(requestWorkPermits, cfToBeDone, codeToRun));
executorService.submit(serviceQ());
return cfToBeDone;
}
@SuppressWarnings("unchecked")
public Runnable serviceQ() {
return () -> {
WorkJob workJob = workJobs.poll();
if (workJob == null) {
return;
}
System.out.printf("Dequeued work for %s\n", workJob.workPermits);
boolean proceed = workJob.workPermits.tryAcquire(1);
if (!proceed) {
// we could not get a permit to proceed - that is their per request concurrency was exceeded
// so enqueue the job again and try later
System.out.printf("\t===== No permits available for %s\n", workJob.workPermits);
workJobs.offer(workJob);
executorService.submit(serviceQ());
}
// they have a permit - run their code which supplies an CF
try {
System.out.printf("\t-- Running code for %s\n", workJob.workPermits);
CompletableFuture<?> resultCF = workJob.codeToRun.get();
resultCF.whenComplete((data, throwable) -> {
CompletableFuture<Object> cfToComplete = (CompletableFuture<Object>) workJob.cfToComplete;
if (throwable != null) {
cfToComplete.completeExceptionally(throwable);
} else {
cfToComplete.complete(data);
}
});
} finally {
workJob.workPermits.release(1);
}
};
}
}
static class PerRequestWorkPermits {
private final String name;
private final Semaphore semaphore;
public PerRequestWorkPermits(String name, int permits) {
this.name = name;
semaphore = new Semaphore(permits, true);
}
public boolean tryAcquire(int permits) {
return semaphore.tryAcquire(permits);
}
public void release(int permits) {
semaphore.release(permits);
}
public int availablePermits() {
return semaphore.availablePermits();
}
@Override
public String toString() {
return name + ":" + semaphore.availablePermits();
}
}
public static void main(String[] args) {
ExecutorService oneHundredThreads = Executors.newFixedThreadPool(100);
ConcurrentWorkManager workManager = new ConcurrentWorkManager(oneHundredThreads);
// the batch loader is async
BatchLoaderWithContext<String, Object> characterBatchLoader = (keys, loadingEnv) -> {
String firstKey = keys.get(0);
PerRequestWorkPermits workPermits = (PerRequestWorkPermits) loadingEnv.getKeyContexts().get(firstKey);
return workManager.execute(workPermits, () -> {
//
// this batch load is going to simulate some delay -e g work being done
System.out.println("\t\tworking on behalf of " + workPermits.toString());
randomSnoozeMs(200, 5000);
List<Object> characters = keys.stream().map(StarWarsData::getCharacter).collect(Collectors.toList());
return CompletableFuture.completedFuture(characters);
});
};
DataFetcher humanDF = env -> {
DataLoader<String, Object> dl = env.getDataLoader("characters");
PerRequestWorkPermits perRequestWorkPermits = env.getContext();
return dl.load(env.getArgument("id"), perRequestWorkPermits);
};
TypeResolver characterTR = env -> env.getSchema().getObjectType("Human");
RuntimeWiring runtimeWiring = RuntimeWiring.newRuntimeWiring()
.type(newTypeWiring("QueryType").dataFetcher("human", humanDF))
.type(newTypeWiring("Character").typeResolver(characterTR))
.build();
String sdl = new SchemaPrinter().print(StarWarsSchema.starWarsSchema);
TypeDefinitionRegistry typeRegistry = new SchemaParser().parse(sdl);
GraphQLSchema graphQLSchema = new SchemaGenerator().makeExecutableSchema(typeRegistry, runtimeWiring);
GraphQL graphQL = GraphQL.newGraphQL(graphQLSchema).build();
//
// now execute
System.out.print("====== LETS GO LIMIT SOME CLIENTS\n");
int NUM_CLIENTS = 10;
int NUM_OF_QUERIES_PER_CLIENT = 100;
List<CompletableFuture<?>> allResults = new ArrayList<>();
for (int client = 0; client < NUM_CLIENTS; client++) {
// each client here gets 3 permits for work even through we are going to N lots of work for them
PerRequestWorkPermits perRequestWorkPermits = new PerRequestWorkPermits("client" + client, 3);
// and now we do N bits of work for them > permits
for (int i = 0; i < NUM_OF_QUERIES_PER_CLIENT; i++) {
int clientNum = client;
int iter = i;
// we dont cache to ensure this DataLoader / BatchLoader gets some real work!
DataLoaderOptions noCaching = DataLoaderOptions.newOptions().setCachingEnabled(false);
DataLoader<String, Object> characterDL = DataLoader.newDataLoader(characterBatchLoader, noCaching);
DataLoaderRegistry dataLoaderRegistry = new DataLoaderRegistry();
dataLoaderRegistry.register("characters", characterDL);
String query = "{ human(id:\"1000\") { name } }"; // luke
ExecutionInput ei = ExecutionInput.newExecutionInput()
.context(perRequestWorkPermits)
.dataLoaderRegistry(dataLoaderRegistry)
.query(query)
.build();
CompletableFuture<ExecutionResult> cfResult = graphQL.executeAsync(ei);
cfResult.whenComplete((data, throwable) -> {
System.out.printf("\t\tCompleted client %s-%d - data %s - errors %s\n", clientNum, iter, data.getData(), data.getErrors());
});
allResults.add(cfResult);
}
}
final CompletableFuture<Void> allOf = CompletableFuture.allOf(allResults.toArray(new CompletableFuture[0]));
// wait for all results to complete
allOf.join();
System.out.println("All futures completed");
// shutdown the threadpool
System.out.println("Shutting down the thread pool");
oneHundredThreads.shutdown();
System.out.println("Done");
}
public static void randomSnoozeMs(int minMs, int maxMs) {
Duration duration = Duration.of(getRandomNumberInRange(minMs, maxMs), ChronoUnit.MILLIS);
try {
Thread.sleep(duration.toMillis());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
private static int getRandomNumberInRange(int min, int max) {
if (min >= max) {
throw new IllegalArgumentException("max must be greater than min");
}
Random r = new Random();
return r.nextInt((max - min) + 1) + min;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment