Skip to content

Instantly share code, notes, and snippets.

@aikar
Created November 1, 2018 03:54
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save aikar/00295a7e20ae3e595601eb87f58c26fd to your computer and use it in GitHub Desktop.
Save aikar/00295a7e20ae3e595601eb87f58c26fd to your computer and use it in GitHub Desktop.
commit a825b5c4138e5b3a048b045d6fe65400c1900ae7
Author: Aikar <aikar@aikar.co>
Date: Wed Oct 31 20:44:51 2018 -0400
x
diff --git a/src/main/java/com/destroystokyo/paper/util/PriorityQueuedExecutor.java b/src/main/java/com/destroystokyo/paper/util/PriorityQueuedExecutor.java
index e589aa356c..a796af2921 100644
--- a/src/main/java/com/destroystokyo/paper/util/PriorityQueuedExecutor.java
+++ b/src/main/java/com/destroystokyo/paper/util/PriorityQueuedExecutor.java
@@ -1,8 +1,5 @@
package com.destroystokyo.paper.util;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import net.minecraft.server.NamedIncrementingThreadFactory;
-
import javax.annotation.Nonnull;
import java.util.ArrayList;
import java.util.List;
@@ -10,7 +7,6 @@ import java.util.concurrent.AbstractExecutorService;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -41,12 +37,19 @@ public class PriorityQueuedExecutor extends AbstractExecutorService {
}
public PriorityQueuedExecutor(String name, int threads, RejectionHandler handler) {
- ThreadFactory factory = new ThreadFactoryBuilder()
- .setThreadFactory(new NamedIncrementingThreadFactory(name))
- .setDaemon(true)
- .build();
+ this(name, threads, handler, Thread.NORM_PRIORITY);
+ }
+
+ public PriorityQueuedExecutor(String name, int threads, int threadPriority) {
+ this(name, threads, null, threadPriority);
+ }
+
+ public PriorityQueuedExecutor(String name, int threads, RejectionHandler handler, int threadPriority) {
for (int i = 0; i < threads; i++) {
- final Thread thread = factory.newThread(this::processQueues);
+ ExecutorThread thread = new ExecutorThread(this::processQueues);
+ thread.setDaemon(true);
+ thread.setName(threads == 1 ? name : name + "-" + (i + 1));
+ thread.setPriority(threadPriority);
thread.start();
this.threads.add(thread);
}
@@ -123,28 +126,20 @@ public class PriorityQueuedExecutor extends AbstractExecutorService {
}
public PendingTask<Void> submitTask(Runnable run) {
- return submitTask(createPendingTask(run));
+ return createPendingTask(run).submit();
}
public PendingTask<Void> submitTask(Runnable run, Priority priority) {
- return submitTask(createPendingTask(run, priority));
+ return createPendingTask(run, priority).submit();
}
public <T> PendingTask<T> submitTask(Supplier<T> run) {
- return submitTask(createPendingTask(run));
+ return createPendingTask(run).submit();
}
public <T> PendingTask<T> submitTask(Supplier<T> run, Priority priority) {
- return submitTask(createPendingTask(run, priority));
- }
-
- public <T> PendingTask<T> submitTask(PendingTask<T> task) {
- if (shuttingDown) {
- handler.onRejection(task, this);
- return task;
- }
- task.submit(this);
- return task;
+ PendingTask<T> task = createPendingTask(run, priority);
+ return task.submit();
}
@Override
@@ -152,7 +147,19 @@ public class PriorityQueuedExecutor extends AbstractExecutorService {
submitTask(command);
}
- private Runnable getTask() {
+ public boolean isCurrentThread() {
+ final Thread thread = Thread.currentThread();
+ if (!(thread instanceof ExecutorThread)) {
+ return false;
+ }
+ return ((ExecutorThread) thread).getExecutor() == this;
+ }
+
+ public Runnable getUrgentTask() {
+ return urgent.poll();
+ }
+
+ public Runnable getTask() {
Runnable run = urgent.poll();
if (run != null) {
return run;
@@ -196,6 +203,16 @@ public class PriorityQueuedExecutor extends AbstractExecutorService {
NORMAL, HIGH, URGENT
}
+ public class ExecutorThread extends Thread {
+ public ExecutorThread(Runnable runnable) {
+ super(runnable);
+ }
+
+ public PriorityQueuedExecutor getExecutor() {
+ return PriorityQueuedExecutor.this;
+ }
+ }
+
public class PendingTask <T> implements Runnable {
private final AtomicBoolean hasRan = new AtomicBoolean();
@@ -238,31 +255,35 @@ public class PriorityQueuedExecutor extends AbstractExecutorService {
public void bumpPriority(Priority newPriority) {
for (;;) {
int current = this.priority.get();
- if (current >= newPriority.ordinal()) {
- return;
- }
- if (priority.compareAndSet(current, newPriority.ordinal())) {
+ int ordinal = newPriority.ordinal();
+ if (current >= ordinal || priority.compareAndSet(current, ordinal)) {
break;
}
}
- if (this.executor == null) {
+
+ if (this.submitted.get() == -1 || this.hasRan.get()) {
return;
}
- // If we have already been submitted, resubmit with new priority
- submit(this.executor);
+
+ // Only resubmit if it hasnt ran yet and has been submitted
+ submit();
}
public CompletableFuture<T> onDone() {
return future;
}
- public void submit(PriorityQueuedExecutor executor) {
+ public PendingTask<T> submit() {
+ if (shuttingDown) {
+ handler.onRejection(this, PriorityQueuedExecutor.this);
+ return this;
+ }
for (;;) {
final int submitted = this.submitted.get();
final int priority = this.priority.get();
if (submitted == priority) {
- return;
+ return this;
}
if (this.submitted.compareAndSet(submitted, priority)) {
if (priority == Priority.URGENT.ordinal()) {
@@ -277,11 +298,11 @@ public class PriorityQueuedExecutor extends AbstractExecutorService {
}
}
- //noinspection SynchronizationOnLocalVariableOrMethodParameter
- synchronized (executor) {
+ synchronized (PriorityQueuedExecutor.this) {
// Wake up a thread to take this work
- executor.notify();
+ PriorityQueuedExecutor.this.notify();
}
+ return this;
}
}
public interface RejectionHandler {
diff --git a/src/main/java/net/minecraft/server/PaperAsyncChunkProvider.java b/src/main/java/net/minecraft/server/PaperAsyncChunkProvider.java
index c334462f20..cb2aa9c493 100644
--- a/src/main/java/net/minecraft/server/PaperAsyncChunkProvider.java
+++ b/src/main/java/net/minecraft/server/PaperAsyncChunkProvider.java
@@ -25,6 +25,7 @@ package net.minecraft.server;
import com.destroystokyo.paper.PaperConfig;
import com.destroystokyo.paper.util.PriorityQueuedExecutor;
+import com.destroystokyo.paper.util.PriorityQueuedExecutor.ExecutorThread;
import com.destroystokyo.paper.util.PriorityQueuedExecutor.Priority;
import it.unimi.dsi.fastutil.longs.Long2ObjectMap;
import it.unimi.dsi.fastutil.longs.Long2ObjectMaps;
@@ -39,6 +40,7 @@ import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -47,11 +49,11 @@ import java.util.function.Consumer;
@SuppressWarnings("unused")
public class PaperAsyncChunkProvider extends ChunkProviderServer {
- private static final PriorityQueuedExecutor EXECUTOR = new PriorityQueuedExecutor("PaperChunkLoader", PaperConfig.asyncChunks ? PaperConfig.asyncChunkLoadThreads : 0);
- private static final PriorityQueuedExecutor SINGLE_GEN_EXECUTOR = new PriorityQueuedExecutor("PaperChunkGenerator", PaperConfig.asyncChunks && PaperConfig.asyncChunkGeneration && !PaperConfig.asyncChunkGenThreadPerWorld ? 1 : 0);
- private static final ConcurrentLinkedQueue<Runnable> MAIN_THREAD_QUEUE = new ConcurrentLinkedQueue<>();
- private static final ThreadLocal<Boolean> IS_CHUNK_THREAD = ThreadLocal.withInitial(() -> false);
- private static final ThreadLocal<Boolean> IS_CHUNK_GEN_THREAD = ThreadLocal.withInitial(() -> false);
+ private static final int GEN_THREAD_PRIORITY = Integer.getInteger("paper.genthreadpriority", 3);
+ private static final int LOAD_THREAD_PRIORITY = Integer.getInteger("paper.loadthreadpriority", 4);
+ private static final PriorityQueuedExecutor EXECUTOR = new PriorityQueuedExecutor("PaperChunkLoader", PaperConfig.asyncChunks ? PaperConfig.asyncChunkLoadThreads : 0, LOAD_THREAD_PRIORITY);
+ private static final PriorityQueuedExecutor SINGLE_GEN_EXECUTOR = new PriorityQueuedExecutor("PaperChunkGenerator", PaperConfig.asyncChunks && PaperConfig.asyncChunkGeneration && !PaperConfig.asyncChunkGenThreadPerWorld ? 1 : 0, GEN_THREAD_PRIORITY);
+ private static final ConcurrentLinkedDeque<Runnable> MAIN_THREAD_QUEUE = new ConcurrentLinkedDeque<>();
private final PriorityQueuedExecutor generationExecutor;
//private static final PriorityQueuedExecutor generationExecutor = new PriorityQueuedExecutor("PaperChunkGen", 1);
@@ -72,7 +74,7 @@ public class PaperAsyncChunkProvider extends ChunkProviderServer {
this.chunkLoader = chunkLoader;
String worldName = this.world.getWorld().getName();
this.shouldGenSync = generator instanceof CustomChunkGenerator && !(((CustomChunkGenerator) generator).asyncSupported) || !PaperConfig.asyncChunkGeneration;
- this.generationExecutor = PaperConfig.asyncChunkGenThreadPerWorld ? new PriorityQueuedExecutor("PaperChunkGen-" + worldName, shouldGenSync ? 0 : 1) : SINGLE_GEN_EXECUTOR;
+ this.generationExecutor = PaperConfig.asyncChunkGenThreadPerWorld ? new PriorityQueuedExecutor("PaperChunkGen-" + worldName, shouldGenSync ? 0 : 1, GEN_THREAD_PRIORITY) : SINGLE_GEN_EXECUTOR;
}
static void processChunkLoads(MinecraftServer server) {
@@ -107,11 +109,17 @@ public class PaperAsyncChunkProvider extends ChunkProviderServer {
}
private boolean processChunkLoads() {
+ return processChunkLoads((CompletableFuture<Chunk>) null);
+ }
+ private boolean processChunkLoads(CompletableFuture<Chunk> pendingRequest) {
Runnable run;
boolean hadLoad = false;
while ((run = MAIN_THREAD_QUEUE.poll()) != null) {
run.run();
hadLoad = true;
+ if (pendingRequest != null && pendingRequest.isDone()) {
+ break;
+ }
}
return hadLoad;
}
@@ -154,35 +162,36 @@ public class PaperAsyncChunkProvider extends ChunkProviderServer {
// Obtain a PendingChunk
final PendingChunk pending;
final boolean isBlockingMain = consumer == null && server.isMainThread();
+ final Priority taskPriority = calculatePriority(isBlockingMain, priority);
synchronized (pendingChunks) {
PendingChunk pendingChunk = pendingChunks.get(key);
if (pendingChunk == null) {
- pending = new PendingChunk(x, z, key, gen, calculatePriority(isBlockingMain, priority));
+ pending = new PendingChunk(x, z, key, gen, taskPriority);
pendingChunks.put(key, pending);
} else if (pendingChunk.hasFinished && gen && !pendingChunk.canGenerate && pendingChunk.chunk == null) {
// need to overwrite the old
- pending = new PendingChunk(x, z, key, true, calculatePriority(isBlockingMain, priority));
+ pending = new PendingChunk(x, z, key, true, taskPriority);
pendingChunks.put(key, pending);
} else {
pending = pendingChunk;
-
- Priority newPriority = calculatePriority(isBlockingMain, priority);
- if (pending.taskPriority != newPriority) {
- pending.bumpPriority(newPriority);
+ if (pending.taskPriority != taskPriority) {
+ pending.bumpPriority(taskPriority);
}
}
}
+
// Listen for when result is ready
final CompletableFuture<Chunk> future = new CompletableFuture<>();
PendingChunkRequest request = pending.addListener(future, gen);
- if (IS_CHUNK_THREAD.get()) {
- pending.loadTask.run();
+ if (taskPriority != Priority.URGENT && Thread.currentThread() instanceof ExecutorThread) {
+ PriorityQueuedExecutor executor = ((ExecutorThread) Thread.currentThread()).getExecutor();
+ Runnable run;
+ while ((run = executor.getUrgentTask()) != null) {
+ run.run();
+ }
}
-
- if (isBlockingMain && pending.hasFinished) {
- processChunkLoads();
- request.initialReturnChunk = pending.postChunk();
- return request;
+ if (isBlockingMain || isChunkThread()) {
+ pending.loadTask.run();
}
if (isBlockingMain) {
@@ -191,7 +200,7 @@ public class PaperAsyncChunkProvider extends ChunkProviderServer {
// We aren't done, obtain lock on queue
synchronized (MAIN_THREAD_QUEUE) {
// We may of received our request now, check it
- if (processChunkLoads()) {
+ if (processChunkLoads(future)) {
// If we processed SOMETHING, don't wait
continue;
}
@@ -202,7 +211,7 @@ public class PaperAsyncChunkProvider extends ChunkProviderServer {
}
}
// Queue has been notified or timed out, process it
- processChunkLoads();
+ processChunkLoads(future);
}
// We should be done AND posted into chunk map now, return it
request.initialReturnChunk = future.join();
@@ -313,6 +322,17 @@ public class PaperAsyncChunkProvider extends ChunkProviderServer {
}
}
+ private boolean isLoadThread() {
+ return EXECUTOR.isCurrentThread();
+ }
+
+ private boolean isGenThread() {
+ return generationExecutor.isCurrentThread();
+ }
+ private boolean isChunkThread() {
+ return isLoadThread() || isGenThread();
+ }
+
private class PendingChunk implements Runnable {
private final int x;
private final int z;
@@ -365,11 +385,6 @@ public class PaperAsyncChunkProvider extends ChunkProviderServer {
}
}
- private Chunk generateChunkExecutor() {
- IS_CHUNK_THREAD.set(true);
- IS_CHUNK_GEN_THREAD.set(true);
- return generateChunk();
- }
private Chunk generateChunk() {
synchronized (this) {
if (requests.get() <= 0) {
@@ -460,7 +475,11 @@ public class PaperAsyncChunkProvider extends ChunkProviderServer {
// Don't post here, even if on main, it must enter the queue so we can exit any open batch
// schedulers, as post stage may trigger a new generation and cause errors
synchronized (MAIN_THREAD_QUEUE) {
- MAIN_THREAD_QUEUE.add(this::postChunk);
+ if (this.taskPriority == Priority.URGENT) {
+ MAIN_THREAD_QUEUE.addFirst(this::postChunk);
+ } else {
+ MAIN_THREAD_QUEUE.addLast(this::postChunk);
+ }
MAIN_THREAD_QUEUE.notify();
}
}
@@ -527,24 +546,18 @@ public class PaperAsyncChunkProvider extends ChunkProviderServer {
if (loadTask == null) {
// Take care of a race condition in that a request could be cancelled after the synchronize
// on pendingChunks, but before a listener is added, which would erase these pending tasks.
- if (shouldGenSync) {
- genTask = generationExecutor.createPendingTask(this::generateChunk, taskPriority);
- } else {
- genTask = generationExecutor.createPendingTask(this::generateChunkExecutor, taskPriority);
- }
+ genTask = generationExecutor.createPendingTask(this::generateChunk, taskPriority);
loadTask = EXECUTOR.createPendingTask(this, taskPriority);
- if (!IS_CHUNK_THREAD.get()) {
+ if (!isChunkThread()) {
// We will execute it outside of the synchronized context immediately after
- EXECUTOR.submitTask(loadTask);
+ loadTask.submit();
}
}
return new PendingChunkRequest(this, gen);
}
-
@Override
public void run() {
- IS_CHUNK_THREAD.set(true);
try {
if (!loadFinished(loadChunk(x, z))) {
return;
@@ -560,18 +573,23 @@ public class PaperAsyncChunkProvider extends ChunkProviderServer {
if (shouldGenSync) {
synchronized (this) {
setStatus(PendingStatus.GENERATION_PENDING);
- MAIN_THREAD_QUEUE.add(() -> generateFinished(this.generateChunk()));
+ if (this.taskPriority == Priority.URGENT) {
+ MAIN_THREAD_QUEUE.addFirst(() -> generateFinished(this.generateChunk()));
+ } else {
+ MAIN_THREAD_QUEUE.addLast(() -> generateFinished(this.generateChunk()));
+ }
+
}
synchronized (MAIN_THREAD_QUEUE) {
MAIN_THREAD_QUEUE.notify();
}
} else {
- if (IS_CHUNK_GEN_THREAD.get()) {
+ if (isGenThread()) {
// ideally we should never run into 1 chunk generating another chunk...
// but if we do, let's apply same solution
genTask.run();
} else {
- generationExecutor.submitTask(genTask);
+ genTask.submit();
}
}
}
@@ -581,6 +599,10 @@ public class PaperAsyncChunkProvider extends ChunkProviderServer {
}
void bumpPriority(Priority newPriority) {
+ if (taskPriority.ordinal() >= newPriority.ordinal()) {
+ return;
+ }
+
this.taskPriority = newPriority;
PriorityQueuedExecutor.PendingTask<Void> loadTask = this.loadTask;
PriorityQueuedExecutor.PendingTask<Chunk> genTask = this.genTask;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment