Created
September 24, 2019 13:22
-
-
Save bbakerman/d8a98e2b8b6f71e50fe2ff4e07ae41c6 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import graphql.ExecutionInput; | |
import graphql.ExecutionResult; | |
import graphql.GraphQL; | |
import graphql.StarWarsData; | |
import graphql.StarWarsSchema; | |
import graphql.schema.DataFetcher; | |
import graphql.schema.GraphQLSchema; | |
import graphql.schema.TypeResolver; | |
import graphql.schema.idl.RuntimeWiring; | |
import graphql.schema.idl.SchemaGenerator; | |
import graphql.schema.idl.SchemaParser; | |
import graphql.schema.idl.SchemaPrinter; | |
import graphql.schema.idl.TypeDefinitionRegistry; | |
import org.dataloader.BatchLoaderWithContext; | |
import org.dataloader.DataLoader; | |
import org.dataloader.DataLoaderOptions; | |
import org.dataloader.DataLoaderRegistry; | |
import java.time.Duration; | |
import java.time.temporal.ChronoUnit; | |
import java.util.ArrayList; | |
import java.util.Deque; | |
import java.util.List; | |
import java.util.Random; | |
import java.util.concurrent.CompletableFuture; | |
import java.util.concurrent.ConcurrentLinkedDeque; | |
import java.util.concurrent.ExecutorService; | |
import java.util.concurrent.Executors; | |
import java.util.concurrent.Semaphore; | |
import java.util.concurrent.TimeUnit; | |
import java.util.function.Supplier; | |
import java.util.stream.Collectors; | |
import static graphql.schema.idl.TypeRuntimeWiring.newTypeWiring; | |
@SuppressWarnings("UnnecessaryLocalVariable") | |
public class RequestConcurrencyPOC { | |
static class ConcurrentWorkManager { | |
class WorkJob { | |
final PerRequestWorkPermits workPermits; | |
final Supplier<CompletableFuture<?>> codeToRun; | |
final CompletableFuture<?> cfToComplete; | |
public WorkJob(PerRequestWorkPermits workPermits, CompletableFuture<?> cfToComplete, Supplier<CompletableFuture<?>> codeToRun) { | |
this.workPermits = workPermits; | |
this.codeToRun = codeToRun; | |
this.cfToComplete = cfToComplete; | |
} | |
} | |
ExecutorService executorService; | |
Deque<WorkJob> workJobs; | |
public ConcurrentWorkManager(ExecutorService executorService) { | |
this.executorService = executorService; | |
this.workJobs = new ConcurrentLinkedDeque<>(); | |
} | |
public <T> CompletableFuture<T> execute(PerRequestWorkPermits requestWorkPermits, Supplier<CompletableFuture<?>> codeToRun) { | |
CompletableFuture<T> cfToBeDone = new CompletableFuture<>(); | |
this.workJobs.offer(new WorkJob(requestWorkPermits, cfToBeDone, codeToRun)); | |
executorService.submit(serviceQ()); | |
return cfToBeDone; | |
} | |
@SuppressWarnings("unchecked") | |
public Runnable serviceQ() { | |
return () -> { | |
WorkJob workJob = workJobs.poll(); | |
if (workJob == null) { | |
return; | |
} | |
//System.out.printf("Dequeued work for %s\n", workJob.workPermits); | |
boolean proceed = false; | |
try { | |
proceed = workJob.workPermits.tryAcquire(1); | |
} catch (InterruptedException e) { | |
e.printStackTrace(); | |
return; | |
} | |
if (!proceed) { | |
// we could not get a permit to proceed - that is their per request concurrency was exceeded | |
// so enqueue the job again and try later | |
//System.out.printf("\t===== No permits available for %s\n", workJob.workPermits); | |
workJobs.offer(workJob); | |
executorService.submit(serviceQ()); | |
} else { | |
// they have a permit - run their code which supplies an CF | |
try { | |
System.out.printf("\t-- Running code for %s\n", workJob.workPermits); | |
CompletableFuture<?> resultCF = workJob.codeToRun.get(); | |
resultCF.whenComplete((data, throwable) -> { | |
CompletableFuture<Object> cfToComplete = (CompletableFuture<Object>) workJob.cfToComplete; | |
if (throwable != null) { | |
cfToComplete.completeExceptionally(throwable); | |
} else { | |
cfToComplete.complete(data); | |
} | |
}); | |
} finally { | |
workJob.workPermits.release(1); | |
} | |
} | |
}; | |
} | |
} | |
static class PerRequestWorkPermits { | |
private final String name; | |
private final Semaphore semaphore; | |
public PerRequestWorkPermits(String name, int permits) { | |
this.name = name; | |
semaphore = new Semaphore(permits, true); | |
} | |
public boolean tryAcquire(int permits) throws InterruptedException { | |
return semaphore.tryAcquire(permits, 0, TimeUnit.MILLISECONDS); | |
} | |
public void release(int permits) { | |
semaphore.release(permits); | |
} | |
public int availablePermits() { | |
return semaphore.availablePermits(); | |
} | |
@Override | |
public String toString() { | |
return name + ":" + semaphore.availablePermits(); | |
} | |
} | |
public static void randomSnoozeMs(int minMs, int maxMs) { | |
Duration duration = Duration.of(getRandomNumberInRange(minMs, maxMs), ChronoUnit.MILLIS); | |
try { | |
Thread.sleep(duration.toMillis()); | |
} catch (InterruptedException e) { | |
e.printStackTrace(); | |
} | |
} | |
private static int getRandomNumberInRange(int min, int max) { | |
if (min >= max) { | |
throw new IllegalArgumentException("max must be greater than min"); | |
} | |
Random r = new Random(); | |
return r.nextInt((max - min) + 1) + min; | |
} | |
public static void main(String[] args) { | |
ExecutorService oneHundredThreads = Executors.newFixedThreadPool(100); | |
ConcurrentWorkManager workManager = new ConcurrentWorkManager(oneHundredThreads); | |
// the batch loader is async | |
BatchLoaderWithContext<String, Object> characterBatchLoader = (keys, loadingEnv) -> { | |
String firstKey = keys.get(0); | |
PerRequestWorkPermits workPermits = (PerRequestWorkPermits) loadingEnv.getKeyContexts().get(firstKey); | |
return workManager.execute(workPermits, () -> { | |
// | |
// this batch load is going to simulate some delay -e g work being done | |
System.out.println("\t\tworking on behalf of " + workPermits.toString()); | |
randomSnoozeMs(200, 5000); | |
List<Object> characters = keys.stream().map(StarWarsData::getCharacter).collect(Collectors.toList()); | |
return CompletableFuture.completedFuture(characters); | |
}); | |
}; | |
DataFetcher humanDF = env -> { | |
DataLoader<String, Object> dl = env.getDataLoader("characters"); | |
PerRequestWorkPermits perRequestWorkPermits = env.getContext(); | |
return dl.load(env.getArgument("id"), perRequestWorkPermits); | |
}; | |
TypeResolver characterTR = env -> env.getSchema().getObjectType("Human"); | |
RuntimeWiring runtimeWiring = RuntimeWiring.newRuntimeWiring() | |
.type(newTypeWiring("QueryType").dataFetcher("human", humanDF)) | |
.type(newTypeWiring("Character").typeResolver(characterTR)) | |
.build(); | |
String sdl = new SchemaPrinter().print(StarWarsSchema.starWarsSchema); | |
TypeDefinitionRegistry typeRegistry = new SchemaParser().parse(sdl); | |
GraphQLSchema graphQLSchema = new SchemaGenerator().makeExecutableSchema(typeRegistry, runtimeWiring); | |
GraphQL graphQL = GraphQL.newGraphQL(graphQLSchema).build(); | |
// | |
// now execute | |
System.out.print("====== LETS GO LIMIT SOME CLIENTS\n"); | |
int NUM_CLIENTS = 5; | |
int NUM_OF_QUERIES_PER_CLIENT = 10; | |
List<CompletableFuture<?>> allResults = new ArrayList<>(); | |
for (int client = 0; client < NUM_CLIENTS; client++) { | |
// each client here gets 3 permits for work even through we are going to N lots of work for them | |
PerRequestWorkPermits perRequestWorkPermits = new PerRequestWorkPermits("client" + client, 3); | |
// and now we do N bits of work for them > permits | |
for (int i = 0; i < NUM_OF_QUERIES_PER_CLIENT; i++) { | |
int clientNum = client; | |
int iter = i; | |
// we dont cache to ensure this DataLoader / BatchLoader gets some real work! | |
DataLoaderOptions noCaching = DataLoaderOptions.newOptions().setCachingEnabled(false); | |
DataLoader<String, Object> characterDL = DataLoader.newDataLoader(characterBatchLoader, noCaching); | |
DataLoaderRegistry dataLoaderRegistry = new DataLoaderRegistry(); | |
dataLoaderRegistry.register("characters", characterDL); | |
String query = "{ human(id:\"1000\") { name } }"; // luke | |
ExecutionInput ei = ExecutionInput.newExecutionInput() | |
.context(perRequestWorkPermits) | |
.dataLoaderRegistry(dataLoaderRegistry) | |
.query(query) | |
.build(); | |
CompletableFuture<ExecutionResult> cfResult = graphQL.executeAsync(ei); | |
cfResult.whenComplete((data, throwable) -> { | |
System.out.printf("\t\tCompleted client %s-%d - data %s - errors %s\n", clientNum, iter, data.getData(), data.getErrors()); | |
}); | |
allResults.add(cfResult); | |
} | |
} | |
final CompletableFuture<Void> allOf = CompletableFuture.allOf(allResults.toArray(new CompletableFuture[0])); | |
// wait for all results to complete | |
allOf.join(); | |
System.out.println("All futures completed"); | |
// shutdown the threadpool | |
System.out.println("Shutting down the thread pool"); | |
oneHundredThreads.shutdown(); | |
System.out.println("Done"); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment