Skip to content

Instantly share code, notes, and snippets.

@aikar
Created March 28, 2020 00:53
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save aikar/97cf37b53e48baba54e6a6453987e3c5 to your computer and use it in GitHub Desktop.
Save aikar/97cf37b53e48baba54e6a6453987e3c5 to your computer and use it in GitHub Desktop.
diff --git a/src/main/java/com/destroystokyo/paper/io/PrioritizedTaskQueue.java b/src/main/java/com/destroystokyo/paper/io/PrioritizedTaskQueue.java
index 78bd238f4c..24d4902929 100644
--- a/src/main/java/com/destroystokyo/paper/io/PrioritizedTaskQueue.java
+++ b/src/main/java/com/destroystokyo/paper/io/PrioritizedTaskQueue.java
@@ -72,7 +72,10 @@ public class PrioritizedTaskQueue<T extends PrioritizedTaskQueue.PrioritizedTask
* This can also be thrown if the queue has shutdown.
*/
public void add(final T task) throws IllegalStateException {
- task.onQueue(this);
+ add(task, false);
+ }
+ public void add(final T task, boolean allowReenqueue) throws IllegalStateException {
+ task.onQueue(this, allowReenqueue);
this.queues[task.getPriority()].add(task);
if (this.shutdown.get()) {
// note: we're not actually sure at this point if our task will go through
@@ -251,7 +254,10 @@ public class PrioritizedTaskQueue<T extends PrioritizedTaskQueue.PrioritizedTask
}
void onQueue(final PrioritizedTaskQueue queue) {
- if (this.queue.getAndSet(queue) != null) {
+ onQueue(queue, false);
+ }
+ void onQueue(final PrioritizedTaskQueue queue, boolean allowReenqueue) {
+ if (this.queue.getAndSet(queue) != null && !allowReenqueue) {
throw new IllegalStateException("Already queued!");
}
}
diff --git a/src/main/java/com/destroystokyo/paper/io/chunk/ChunkTaskManager.java b/src/main/java/com/destroystokyo/paper/io/chunk/ChunkTaskManager.java
index 715a2dd8d2..923d30b586 100644
--- a/src/main/java/com/destroystokyo/paper/io/chunk/ChunkTaskManager.java
+++ b/src/main/java/com/destroystokyo/paper/io/chunk/ChunkTaskManager.java
@@ -34,7 +34,9 @@ public final class ChunkTaskManager {
private final PrioritizedTaskQueue<ChunkTask> chunkTasks = new PrioritizedTaskQueue<>(); // used if async chunks are disabled in config
protected static QueueExecutorThread<ChunkTask>[] globalWorkers;
+ protected static QueueExecutorThread<ChunkTask> globalUrgentWorker;
protected static PrioritizedTaskQueue<ChunkTask> globalQueue;
+ protected static PrioritizedTaskQueue<ChunkTask> globalUrgentQueue;
protected static final ConcurrentLinkedQueue<Runnable> CHUNK_WAIT_QUEUE = new ConcurrentLinkedQueue<>();
@@ -116,6 +118,7 @@ public final class ChunkTaskManager {
globalWorkers = new QueueExecutorThread[threads];
globalQueue = new PrioritizedTaskQueue<>();
+ globalUrgentQueue = new PrioritizedTaskQueue<>();
for (int i = 0; i < threads; ++i) {
globalWorkers[i] = new QueueExecutorThread<>(globalQueue, (long)0.10e6); //0.1ms
@@ -127,6 +130,15 @@ public final class ChunkTaskManager {
globalWorkers[i].start();
}
+
+ globalUrgentWorker = new QueueExecutorThread<>(globalUrgentQueue, (long)0.10e6); //0.1ms
+ globalUrgentWorker.setName("Paper Async Chunk Urgent Task Thread");
+ globalUrgentWorker.setPriority(Thread.NORM_PRIORITY+1);
+ globalUrgentWorker.setUncaughtExceptionHandler((final Thread thread, final Throwable throwable) -> {
+ PaperFileIOThread.LOGGER.fatal("Thread '" + thread.getName() + "' threw an uncaught exception!", throwable);
+ });
+
+ globalUrgentWorker.start();
}
/**
@@ -377,6 +389,7 @@ public final class ChunkTaskManager {
worker.flush();
}
}
+ globalUrgentWorker.flush();
// flush again since tasks we execute async saves
drainChunkWaitQueue();
@@ -409,20 +422,30 @@ public final class ChunkTaskManager {
public void raisePriority(final int chunkX, final int chunkZ, final int priority) {
final Long chunkKey = Long.valueOf(IOUtil.getCoordinateKey(chunkX, chunkZ));
- ChunkSaveTask chunkSaveTask = this.chunkSaveTasks.get(chunkKey);
+ ChunkTask chunkSaveTask = this.chunkSaveTasks.get(chunkKey);
if (chunkSaveTask != null) {
- final boolean raised = chunkSaveTask.raisePriority(priority);
- if (chunkSaveTask.isScheduled() && raised) {
- // only notify if we're in queue to be executed
- this.internalScheduleNotify();
- }
+ // don't bump save into urgent queue
+ raiseTaskPriority(chunkSaveTask, priority != PrioritizedTaskQueue.HIGHEST_PRIORITY ? priority : PrioritizedTaskQueue.HIGH_PRIORITY);
}
ChunkLoadTask chunkLoadTask = this.chunkLoadTasks.get(chunkKey);
if (chunkLoadTask != null) {
- final boolean raised = chunkLoadTask.raisePriority(priority);
- if (chunkLoadTask.isScheduled() && raised) {
- // only notify if we're in queue to be executed
+ raiseTaskPriority(chunkLoadTask, priority);
+ }
+ }
+
+ private void raiseTaskPriority(ChunkTask task, int priority) {
+ final boolean raised = task.raisePriority(priority);
+ if (task.isScheduled() && raised) {
+ // only notify if we're in queue to be executed
+ if (priority == PrioritizedTaskQueue.HIGHEST_PRIORITY) {
+ // was in another queue but became urgent later, add to urgent queue and the previous
+ // queue will just have to ignore this task if it has already been started.
+ // Ultimately, we now have 2 potential queues that can pull it out whoever gets it first
+ // but the urgent queue has dedicated thread(s) so it's likely to win....
+ globalUrgentQueue.add(task, true);
+ this.internalScheduleNotifyUrgent();
+ } else {
this.internalScheduleNotify();
}
}
@@ -436,8 +459,14 @@ public final class ChunkTaskManager {
// It's important we order the task to be executed before notifying. Avoid a race condition where the worker thread
// wakes up and goes to sleep before we actually schedule (or it's just about to sleep)
- this.queue.add(task);
- this.internalScheduleNotify();
+ if (task.getPriority() == PrioritizedTaskQueue.HIGHEST_PRIORITY) {
+ globalUrgentQueue.add(task, true);
+ this.internalScheduleNotifyUrgent();
+ } else {
+ this.queue.add(task);
+ this.internalScheduleNotify();
+ }
+
}
protected void internalScheduleNotify() {
@@ -452,4 +481,12 @@ public final class ChunkTaskManager {
}
}
+
+ protected void internalScheduleNotifyUrgent() {
+ if (globalUrgentWorker == null) {
+ return;
+ }
+ globalUrgentWorker.notifyTasks();
+ }
+
}
diff --git a/src/main/java/net/minecraft/server/ChunkProviderServer.java b/src/main/java/net/minecraft/server/ChunkProviderServer.java
index 1dcd0980ec..1873f7b7dd 100644
--- a/src/main/java/net/minecraft/server/ChunkProviderServer.java
+++ b/src/main/java/net/minecraft/server/ChunkProviderServer.java
@@ -227,6 +227,7 @@ public class ChunkProviderServer extends IChunkProvider {
}
private long asyncLoadSeqCounter;
+ public static boolean IS_CHUNK_LOAD_BLOCKING_MAIN = false;
public void getChunkAtAsynchronously(int x, int z, boolean gen, java.util.function.Consumer<Chunk> onComplete) {
if (Thread.currentThread() != this.serverThread) {
@@ -384,7 +385,12 @@ public class ChunkProviderServer extends IChunkProvider {
}
gameprofilerfiller.c("getChunkCacheMiss");
+ // Paper start - Async chunks
+ boolean prevBlocking = IS_CHUNK_LOAD_BLOCKING_MAIN;
+ IS_CHUNK_LOAD_BLOCKING_MAIN = true;
CompletableFuture<Either<IChunkAccess, PlayerChunk.Failure>> completablefuture = this.getChunkFutureMainThread(i, j, chunkstatus, flag);
+ IS_CHUNK_LOAD_BLOCKING_MAIN = prevBlocking;
+ // Paper end
if (!completablefuture.isDone()) { // Paper
// Paper start - async chunk io/loading
diff --git a/src/main/java/net/minecraft/server/PlayerChunkMap.java b/src/main/java/net/minecraft/server/PlayerChunkMap.java
index f9e843288a..b6db0fc1dc 100644
--- a/src/main/java/net/minecraft/server/PlayerChunkMap.java
+++ b/src/main/java/net/minecraft/server/PlayerChunkMap.java
@@ -705,13 +705,13 @@ public class PlayerChunkMap extends IChunkLoader implements PlayerChunk.d {
};
CompletableFuture<NBTTagCompound> chunkSaveFuture = this.world.asyncChunkTaskManager.getChunkSaveFuture(chunkcoordintpair.x, chunkcoordintpair.z);
+ boolean isBlockingMain = MCUtil.isMainThread() && ChunkProviderServer.IS_CHUNK_LOAD_BLOCKING_MAIN;
+ int priority = isBlockingMain ? com.destroystokyo.paper.io.PrioritizedTaskQueue.HIGH_PRIORITY : com.destroystokyo.paper.io.PrioritizedTaskQueue.HIGH_PRIORITY;
if (chunkSaveFuture != null) {
- this.world.asyncChunkTaskManager.scheduleChunkLoad(chunkcoordintpair.x, chunkcoordintpair.z,
- com.destroystokyo.paper.io.PrioritizedTaskQueue.HIGH_PRIORITY, chunkHolderConsumer, false, chunkSaveFuture);
- this.world.asyncChunkTaskManager.raisePriority(chunkcoordintpair.x, chunkcoordintpair.z, com.destroystokyo.paper.io.PrioritizedTaskQueue.HIGH_PRIORITY);
+ this.world.asyncChunkTaskManager.scheduleChunkLoad(chunkcoordintpair.x, chunkcoordintpair.z, priority, chunkHolderConsumer, isBlockingMain, chunkSaveFuture);
+ this.world.asyncChunkTaskManager.raisePriority(chunkcoordintpair.x, chunkcoordintpair.z, priority);
} else {
- this.world.asyncChunkTaskManager.scheduleChunkLoad(chunkcoordintpair.x, chunkcoordintpair.z,
- com.destroystokyo.paper.io.PrioritizedTaskQueue.NORMAL_PRIORITY, chunkHolderConsumer, false);
+ this.world.asyncChunkTaskManager.scheduleChunkLoad(chunkcoordintpair.x, chunkcoordintpair.z, priority, chunkHolderConsumer, isBlockingMain);
}
return ret;
// Paper end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment