Created
November 1, 2018 03:54
-
-
Save aikar/00295a7e20ae3e595601eb87f58c26fd to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
commit a825b5c4138e5b3a048b045d6fe65400c1900ae7 | |
Author: Aikar <aikar@aikar.co> | |
Date: Wed Oct 31 20:44:51 2018 -0400 | |
x | |
diff --git a/src/main/java/com/destroystokyo/paper/util/PriorityQueuedExecutor.java b/src/main/java/com/destroystokyo/paper/util/PriorityQueuedExecutor.java | |
index e589aa356c..a796af2921 100644 | |
--- a/src/main/java/com/destroystokyo/paper/util/PriorityQueuedExecutor.java | |
+++ b/src/main/java/com/destroystokyo/paper/util/PriorityQueuedExecutor.java | |
@@ -1,8 +1,5 @@ | |
package com.destroystokyo.paper.util; | |
-import com.google.common.util.concurrent.ThreadFactoryBuilder; | |
-import net.minecraft.server.NamedIncrementingThreadFactory; | |
- | |
import javax.annotation.Nonnull; | |
import java.util.ArrayList; | |
import java.util.List; | |
@@ -10,7 +7,6 @@ import java.util.concurrent.AbstractExecutorService; | |
import java.util.concurrent.CompletableFuture; | |
import java.util.concurrent.ConcurrentLinkedQueue; | |
import java.util.concurrent.RejectedExecutionException; | |
-import java.util.concurrent.ThreadFactory; | |
import java.util.concurrent.TimeUnit; | |
import java.util.concurrent.atomic.AtomicBoolean; | |
import java.util.concurrent.atomic.AtomicInteger; | |
@@ -41,12 +37,19 @@ public class PriorityQueuedExecutor extends AbstractExecutorService { | |
} | |
public PriorityQueuedExecutor(String name, int threads, RejectionHandler handler) { | |
- ThreadFactory factory = new ThreadFactoryBuilder() | |
- .setThreadFactory(new NamedIncrementingThreadFactory(name)) | |
- .setDaemon(true) | |
- .build(); | |
+ this(name, threads, handler, Thread.NORM_PRIORITY); | |
+ } | |
+ | |
+ public PriorityQueuedExecutor(String name, int threads, int threadPriority) { | |
+ this(name, threads, null, threadPriority); | |
+ } | |
+ | |
+ public PriorityQueuedExecutor(String name, int threads, RejectionHandler handler, int threadPriority) { | |
for (int i = 0; i < threads; i++) { | |
- final Thread thread = factory.newThread(this::processQueues); | |
+ ExecutorThread thread = new ExecutorThread(this::processQueues); | |
+ thread.setDaemon(true); | |
+ thread.setName(threads == 1 ? name : name + "-" + (i + 1)); | |
+ thread.setPriority(threadPriority); | |
thread.start(); | |
this.threads.add(thread); | |
} | |
@@ -123,28 +126,20 @@ public class PriorityQueuedExecutor extends AbstractExecutorService { | |
} | |
public PendingTask<Void> submitTask(Runnable run) { | |
- return submitTask(createPendingTask(run)); | |
+ return createPendingTask(run).submit(); | |
} | |
public PendingTask<Void> submitTask(Runnable run, Priority priority) { | |
- return submitTask(createPendingTask(run, priority)); | |
+ return createPendingTask(run, priority).submit(); | |
} | |
public <T> PendingTask<T> submitTask(Supplier<T> run) { | |
- return submitTask(createPendingTask(run)); | |
+ return createPendingTask(run).submit(); | |
} | |
public <T> PendingTask<T> submitTask(Supplier<T> run, Priority priority) { | |
- return submitTask(createPendingTask(run, priority)); | |
- } | |
- | |
- public <T> PendingTask<T> submitTask(PendingTask<T> task) { | |
- if (shuttingDown) { | |
- handler.onRejection(task, this); | |
- return task; | |
- } | |
- task.submit(this); | |
- return task; | |
+ PendingTask<T> task = createPendingTask(run, priority); | |
+ return task.submit(); | |
} | |
@Override | |
@@ -152,7 +147,19 @@ public class PriorityQueuedExecutor extends AbstractExecutorService { | |
submitTask(command); | |
} | |
- private Runnable getTask() { | |
+ public boolean isCurrentThread() { | |
+ final Thread thread = Thread.currentThread(); | |
+ if (!(thread instanceof ExecutorThread)) { | |
+ return false; | |
+ } | |
+ return ((ExecutorThread) thread).getExecutor() == this; | |
+ } | |
+ | |
+ public Runnable getUrgentTask() { | |
+ return urgent.poll(); | |
+ } | |
+ | |
+ public Runnable getTask() { | |
Runnable run = urgent.poll(); | |
if (run != null) { | |
return run; | |
@@ -196,6 +203,16 @@ public class PriorityQueuedExecutor extends AbstractExecutorService { | |
NORMAL, HIGH, URGENT | |
} | |
+ public class ExecutorThread extends Thread { | |
+ public ExecutorThread(Runnable runnable) { | |
+ super(runnable); | |
+ } | |
+ | |
+ public PriorityQueuedExecutor getExecutor() { | |
+ return PriorityQueuedExecutor.this; | |
+ } | |
+ } | |
+ | |
public class PendingTask <T> implements Runnable { | |
private final AtomicBoolean hasRan = new AtomicBoolean(); | |
@@ -238,31 +255,35 @@ public class PriorityQueuedExecutor extends AbstractExecutorService { | |
public void bumpPriority(Priority newPriority) { | |
for (;;) { | |
int current = this.priority.get(); | |
- if (current >= newPriority.ordinal()) { | |
- return; | |
- } | |
- if (priority.compareAndSet(current, newPriority.ordinal())) { | |
+ int ordinal = newPriority.ordinal(); | |
+ if (current >= ordinal || priority.compareAndSet(current, ordinal)) { | |
break; | |
} | |
} | |
- if (this.executor == null) { | |
+ | |
+ if (this.submitted.get() == -1 || this.hasRan.get()) { | |
return; | |
} | |
- // If we have already been submitted, resubmit with new priority | |
- submit(this.executor); | |
+ | |
+ // Only resubmit if it hasnt ran yet and has been submitted | |
+ submit(); | |
} | |
public CompletableFuture<T> onDone() { | |
return future; | |
} | |
- public void submit(PriorityQueuedExecutor executor) { | |
+ public PendingTask<T> submit() { | |
+ if (shuttingDown) { | |
+ handler.onRejection(this, PriorityQueuedExecutor.this); | |
+ return this; | |
+ } | |
for (;;) { | |
final int submitted = this.submitted.get(); | |
final int priority = this.priority.get(); | |
if (submitted == priority) { | |
- return; | |
+ return this; | |
} | |
if (this.submitted.compareAndSet(submitted, priority)) { | |
if (priority == Priority.URGENT.ordinal()) { | |
@@ -277,11 +298,11 @@ public class PriorityQueuedExecutor extends AbstractExecutorService { | |
} | |
} | |
- //noinspection SynchronizationOnLocalVariableOrMethodParameter | |
- synchronized (executor) { | |
+ synchronized (PriorityQueuedExecutor.this) { | |
// Wake up a thread to take this work | |
- executor.notify(); | |
+ PriorityQueuedExecutor.this.notify(); | |
} | |
+ return this; | |
} | |
} | |
public interface RejectionHandler { | |
diff --git a/src/main/java/net/minecraft/server/PaperAsyncChunkProvider.java b/src/main/java/net/minecraft/server/PaperAsyncChunkProvider.java | |
index c334462f20..cb2aa9c493 100644 | |
--- a/src/main/java/net/minecraft/server/PaperAsyncChunkProvider.java | |
+++ b/src/main/java/net/minecraft/server/PaperAsyncChunkProvider.java | |
@@ -25,6 +25,7 @@ package net.minecraft.server; | |
import com.destroystokyo.paper.PaperConfig; | |
import com.destroystokyo.paper.util.PriorityQueuedExecutor; | |
+import com.destroystokyo.paper.util.PriorityQueuedExecutor.ExecutorThread; | |
import com.destroystokyo.paper.util.PriorityQueuedExecutor.Priority; | |
import it.unimi.dsi.fastutil.longs.Long2ObjectMap; | |
import it.unimi.dsi.fastutil.longs.Long2ObjectMaps; | |
@@ -39,6 +40,7 @@ import java.util.ArrayList; | |
import java.util.Iterator; | |
import java.util.List; | |
import java.util.concurrent.CompletableFuture; | |
+import java.util.concurrent.ConcurrentLinkedDeque; | |
import java.util.concurrent.ConcurrentLinkedQueue; | |
import java.util.concurrent.atomic.AtomicBoolean; | |
import java.util.concurrent.atomic.AtomicInteger; | |
@@ -47,11 +49,11 @@ import java.util.function.Consumer; | |
@SuppressWarnings("unused") | |
public class PaperAsyncChunkProvider extends ChunkProviderServer { | |
- private static final PriorityQueuedExecutor EXECUTOR = new PriorityQueuedExecutor("PaperChunkLoader", PaperConfig.asyncChunks ? PaperConfig.asyncChunkLoadThreads : 0); | |
- private static final PriorityQueuedExecutor SINGLE_GEN_EXECUTOR = new PriorityQueuedExecutor("PaperChunkGenerator", PaperConfig.asyncChunks && PaperConfig.asyncChunkGeneration && !PaperConfig.asyncChunkGenThreadPerWorld ? 1 : 0); | |
- private static final ConcurrentLinkedQueue<Runnable> MAIN_THREAD_QUEUE = new ConcurrentLinkedQueue<>(); | |
- private static final ThreadLocal<Boolean> IS_CHUNK_THREAD = ThreadLocal.withInitial(() -> false); | |
- private static final ThreadLocal<Boolean> IS_CHUNK_GEN_THREAD = ThreadLocal.withInitial(() -> false); | |
+ private static final int GEN_THREAD_PRIORITY = Integer.getInteger("paper.genthreadpriority", 3); | |
+ private static final int LOAD_THREAD_PRIORITY = Integer.getInteger("paper.loadthreadpriority", 4); | |
+ private static final PriorityQueuedExecutor EXECUTOR = new PriorityQueuedExecutor("PaperChunkLoader", PaperConfig.asyncChunks ? PaperConfig.asyncChunkLoadThreads : 0, LOAD_THREAD_PRIORITY); | |
+ private static final PriorityQueuedExecutor SINGLE_GEN_EXECUTOR = new PriorityQueuedExecutor("PaperChunkGenerator", PaperConfig.asyncChunks && PaperConfig.asyncChunkGeneration && !PaperConfig.asyncChunkGenThreadPerWorld ? 1 : 0, GEN_THREAD_PRIORITY); | |
+ private static final ConcurrentLinkedDeque<Runnable> MAIN_THREAD_QUEUE = new ConcurrentLinkedDeque<>(); | |
private final PriorityQueuedExecutor generationExecutor; | |
//private static final PriorityQueuedExecutor generationExecutor = new PriorityQueuedExecutor("PaperChunkGen", 1); | |
@@ -72,7 +74,7 @@ public class PaperAsyncChunkProvider extends ChunkProviderServer { | |
this.chunkLoader = chunkLoader; | |
String worldName = this.world.getWorld().getName(); | |
this.shouldGenSync = generator instanceof CustomChunkGenerator && !(((CustomChunkGenerator) generator).asyncSupported) || !PaperConfig.asyncChunkGeneration; | |
- this.generationExecutor = PaperConfig.asyncChunkGenThreadPerWorld ? new PriorityQueuedExecutor("PaperChunkGen-" + worldName, shouldGenSync ? 0 : 1) : SINGLE_GEN_EXECUTOR; | |
+ this.generationExecutor = PaperConfig.asyncChunkGenThreadPerWorld ? new PriorityQueuedExecutor("PaperChunkGen-" + worldName, shouldGenSync ? 0 : 1, GEN_THREAD_PRIORITY) : SINGLE_GEN_EXECUTOR; | |
} | |
static void processChunkLoads(MinecraftServer server) { | |
@@ -107,11 +109,17 @@ public class PaperAsyncChunkProvider extends ChunkProviderServer { | |
} | |
private boolean processChunkLoads() { | |
+ return processChunkLoads((CompletableFuture<Chunk>) null); | |
+ } | |
+ private boolean processChunkLoads(CompletableFuture<Chunk> pendingRequest) { | |
Runnable run; | |
boolean hadLoad = false; | |
while ((run = MAIN_THREAD_QUEUE.poll()) != null) { | |
run.run(); | |
hadLoad = true; | |
+ if (pendingRequest != null && pendingRequest.isDone()) { | |
+ break; | |
+ } | |
} | |
return hadLoad; | |
} | |
@@ -154,35 +162,36 @@ public class PaperAsyncChunkProvider extends ChunkProviderServer { | |
// Obtain a PendingChunk | |
final PendingChunk pending; | |
final boolean isBlockingMain = consumer == null && server.isMainThread(); | |
+ final Priority taskPriority = calculatePriority(isBlockingMain, priority); | |
synchronized (pendingChunks) { | |
PendingChunk pendingChunk = pendingChunks.get(key); | |
if (pendingChunk == null) { | |
- pending = new PendingChunk(x, z, key, gen, calculatePriority(isBlockingMain, priority)); | |
+ pending = new PendingChunk(x, z, key, gen, taskPriority); | |
pendingChunks.put(key, pending); | |
} else if (pendingChunk.hasFinished && gen && !pendingChunk.canGenerate && pendingChunk.chunk == null) { | |
// need to overwrite the old | |
- pending = new PendingChunk(x, z, key, true, calculatePriority(isBlockingMain, priority)); | |
+ pending = new PendingChunk(x, z, key, true, taskPriority); | |
pendingChunks.put(key, pending); | |
} else { | |
pending = pendingChunk; | |
- | |
- Priority newPriority = calculatePriority(isBlockingMain, priority); | |
- if (pending.taskPriority != newPriority) { | |
- pending.bumpPriority(newPriority); | |
+ if (pending.taskPriority != taskPriority) { | |
+ pending.bumpPriority(taskPriority); | |
} | |
} | |
} | |
+ | |
// Listen for when result is ready | |
final CompletableFuture<Chunk> future = new CompletableFuture<>(); | |
PendingChunkRequest request = pending.addListener(future, gen); | |
- if (IS_CHUNK_THREAD.get()) { | |
- pending.loadTask.run(); | |
+ if (taskPriority != Priority.URGENT && Thread.currentThread() instanceof ExecutorThread) { | |
+ PriorityQueuedExecutor executor = ((ExecutorThread) Thread.currentThread()).getExecutor(); | |
+ Runnable run; | |
+ while ((run = executor.getUrgentTask()) != null) { | |
+ run.run(); | |
+ } | |
} | |
- | |
- if (isBlockingMain && pending.hasFinished) { | |
- processChunkLoads(); | |
- request.initialReturnChunk = pending.postChunk(); | |
- return request; | |
+ if (isBlockingMain || isChunkThread()) { | |
+ pending.loadTask.run(); | |
} | |
if (isBlockingMain) { | |
@@ -191,7 +200,7 @@ public class PaperAsyncChunkProvider extends ChunkProviderServer { | |
// We aren't done, obtain lock on queue | |
synchronized (MAIN_THREAD_QUEUE) { | |
// We may of received our request now, check it | |
- if (processChunkLoads()) { | |
+ if (processChunkLoads(future)) { | |
// If we processed SOMETHING, don't wait | |
continue; | |
} | |
@@ -202,7 +211,7 @@ public class PaperAsyncChunkProvider extends ChunkProviderServer { | |
} | |
} | |
// Queue has been notified or timed out, process it | |
- processChunkLoads(); | |
+ processChunkLoads(future); | |
} | |
// We should be done AND posted into chunk map now, return it | |
request.initialReturnChunk = future.join(); | |
@@ -313,6 +322,17 @@ public class PaperAsyncChunkProvider extends ChunkProviderServer { | |
} | |
} | |
+ private boolean isLoadThread() { | |
+ return EXECUTOR.isCurrentThread(); | |
+ } | |
+ | |
+ private boolean isGenThread() { | |
+ return generationExecutor.isCurrentThread(); | |
+ } | |
+ private boolean isChunkThread() { | |
+ return isLoadThread() || isGenThread(); | |
+ } | |
+ | |
private class PendingChunk implements Runnable { | |
private final int x; | |
private final int z; | |
@@ -365,11 +385,6 @@ public class PaperAsyncChunkProvider extends ChunkProviderServer { | |
} | |
} | |
- private Chunk generateChunkExecutor() { | |
- IS_CHUNK_THREAD.set(true); | |
- IS_CHUNK_GEN_THREAD.set(true); | |
- return generateChunk(); | |
- } | |
private Chunk generateChunk() { | |
synchronized (this) { | |
if (requests.get() <= 0) { | |
@@ -460,7 +475,11 @@ public class PaperAsyncChunkProvider extends ChunkProviderServer { | |
// Don't post here, even if on main, it must enter the queue so we can exit any open batch | |
// schedulers, as post stage may trigger a new generation and cause errors | |
synchronized (MAIN_THREAD_QUEUE) { | |
- MAIN_THREAD_QUEUE.add(this::postChunk); | |
+ if (this.taskPriority == Priority.URGENT) { | |
+ MAIN_THREAD_QUEUE.addFirst(this::postChunk); | |
+ } else { | |
+ MAIN_THREAD_QUEUE.addLast(this::postChunk); | |
+ } | |
MAIN_THREAD_QUEUE.notify(); | |
} | |
} | |
@@ -527,24 +546,18 @@ public class PaperAsyncChunkProvider extends ChunkProviderServer { | |
if (loadTask == null) { | |
// Take care of a race condition in that a request could be cancelled after the synchronize | |
// on pendingChunks, but before a listener is added, which would erase these pending tasks. | |
- if (shouldGenSync) { | |
- genTask = generationExecutor.createPendingTask(this::generateChunk, taskPriority); | |
- } else { | |
- genTask = generationExecutor.createPendingTask(this::generateChunkExecutor, taskPriority); | |
- } | |
+ genTask = generationExecutor.createPendingTask(this::generateChunk, taskPriority); | |
loadTask = EXECUTOR.createPendingTask(this, taskPriority); | |
- if (!IS_CHUNK_THREAD.get()) { | |
+ if (!isChunkThread()) { | |
// We will execute it outside of the synchronized context immediately after | |
- EXECUTOR.submitTask(loadTask); | |
+ loadTask.submit(); | |
} | |
} | |
return new PendingChunkRequest(this, gen); | |
} | |
- | |
@Override | |
public void run() { | |
- IS_CHUNK_THREAD.set(true); | |
try { | |
if (!loadFinished(loadChunk(x, z))) { | |
return; | |
@@ -560,18 +573,23 @@ public class PaperAsyncChunkProvider extends ChunkProviderServer { | |
if (shouldGenSync) { | |
synchronized (this) { | |
setStatus(PendingStatus.GENERATION_PENDING); | |
- MAIN_THREAD_QUEUE.add(() -> generateFinished(this.generateChunk())); | |
+ if (this.taskPriority == Priority.URGENT) { | |
+ MAIN_THREAD_QUEUE.addFirst(() -> generateFinished(this.generateChunk())); | |
+ } else { | |
+ MAIN_THREAD_QUEUE.addLast(() -> generateFinished(this.generateChunk())); | |
+ } | |
+ | |
} | |
synchronized (MAIN_THREAD_QUEUE) { | |
MAIN_THREAD_QUEUE.notify(); | |
} | |
} else { | |
- if (IS_CHUNK_GEN_THREAD.get()) { | |
+ if (isGenThread()) { | |
// ideally we should never run into 1 chunk generating another chunk... | |
// but if we do, let's apply same solution | |
genTask.run(); | |
} else { | |
- generationExecutor.submitTask(genTask); | |
+ genTask.submit(); | |
} | |
} | |
} | |
@@ -581,6 +599,10 @@ public class PaperAsyncChunkProvider extends ChunkProviderServer { | |
} | |
void bumpPriority(Priority newPriority) { | |
+ if (taskPriority.ordinal() >= newPriority.ordinal()) { | |
+ return; | |
+ } | |
+ | |
this.taskPriority = newPriority; | |
PriorityQueuedExecutor.PendingTask<Void> loadTask = this.loadTask; | |
PriorityQueuedExecutor.PendingTask<Chunk> genTask = this.genTask; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment