Skip to content

Instantly share code, notes, and snippets.

@aikar
Created March 28, 2020 07:40
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save aikar/00b51b642d578061fdbc897174f5c515 to your computer and use it in GitHub Desktop.
Save aikar/00b51b642d578061fdbc897174f5c515 to your computer and use it in GitHub Desktop.
commit 26d9deef41eb09dd70394e15124c26f2c546252f
Author: Aikar <aikar@aikar.co>
Date: Fri Mar 27 20:57:32 2020 -0400
Improve behavior of main thread blocking chunk load/gens
diff --git a/src/main/java/com/destroystokyo/paper/io/PrioritizedTaskQueue.java b/src/main/java/com/destroystokyo/paper/io/PrioritizedTaskQueue.java
index 78bd238f4c..f28e563a43 100644
--- a/src/main/java/com/destroystokyo/paper/io/PrioritizedTaskQueue.java
+++ b/src/main/java/com/destroystokyo/paper/io/PrioritizedTaskQueue.java
@@ -72,8 +72,14 @@ public class PrioritizedTaskQueue<T extends PrioritizedTaskQueue.PrioritizedTask
* This can also be thrown if the queue has shutdown.
*/
public void add(final T task) throws IllegalStateException {
- task.onQueue(this);
- this.queues[task.getPriority()].add(task);
+ add(task, false);
+ }
+ public void add(final T task, boolean allowReenqueue) throws IllegalStateException {
+ int priority = task.getPriority();
+ if (priority != COMPLETING_PRIORITY) {
+ task.onQueue(this, allowReenqueue);
+ this.queues[priority].add(task);
+ }
if (this.shutdown.get()) {
// note: we're not actually sure at this point if our task will go through
throw new IllegalStateException("Queue has shutdown, refusing to execute task " + IOUtil.genericToString(task));
@@ -251,7 +257,10 @@ public class PrioritizedTaskQueue<T extends PrioritizedTaskQueue.PrioritizedTask
}
void onQueue(final PrioritizedTaskQueue queue) {
- if (this.queue.getAndSet(queue) != null) {
+ onQueue(queue, false);
+ }
+ void onQueue(final PrioritizedTaskQueue queue, boolean allowReenqueue) {
+ if (this.queue.getAndSet(queue) != null && !allowReenqueue) {
throw new IllegalStateException("Already queued!");
}
}
diff --git a/src/main/java/com/destroystokyo/paper/io/chunk/ChunkTaskManager.java b/src/main/java/com/destroystokyo/paper/io/chunk/ChunkTaskManager.java
index 715a2dd8d2..aaef3c426d 100644
--- a/src/main/java/com/destroystokyo/paper/io/chunk/ChunkTaskManager.java
+++ b/src/main/java/com/destroystokyo/paper/io/chunk/ChunkTaskManager.java
@@ -3,10 +3,12 @@ package com.destroystokyo.paper.io.chunk;
import com.destroystokyo.paper.io.PaperFileIOThread;
import com.destroystokyo.paper.io.IOUtil;
import com.destroystokyo.paper.io.PrioritizedTaskQueue;
+import com.destroystokyo.paper.io.PrioritizedTaskQueue.PrioritizedTask;
import com.destroystokyo.paper.io.QueueExecutorThread;
import net.minecraft.server.ChunkRegionLoader;
import net.minecraft.server.IAsyncTaskHandler;
import net.minecraft.server.IChunkAccess;
+import net.minecraft.server.MCUtil;
import net.minecraft.server.MinecraftServer;
import net.minecraft.server.NBTTagCompound;
import net.minecraft.server.WorldServer;
@@ -34,7 +36,9 @@ public final class ChunkTaskManager {
private final PrioritizedTaskQueue<ChunkTask> chunkTasks = new PrioritizedTaskQueue<>(); // used if async chunks are disabled in config
protected static QueueExecutorThread<ChunkTask>[] globalWorkers;
+ protected static QueueExecutorThread<ChunkTask> globalUrgentWorker;
protected static PrioritizedTaskQueue<ChunkTask> globalQueue;
+ protected static PrioritizedTaskQueue<ChunkTask> globalUrgentQueue;
protected static final ConcurrentLinkedQueue<Runnable> CHUNK_WAIT_QUEUE = new ConcurrentLinkedQueue<>();
@@ -116,6 +120,7 @@ public final class ChunkTaskManager {
globalWorkers = new QueueExecutorThread[threads];
globalQueue = new PrioritizedTaskQueue<>();
+ globalUrgentQueue = new PrioritizedTaskQueue<>();
for (int i = 0; i < threads; ++i) {
globalWorkers[i] = new QueueExecutorThread<>(globalQueue, (long)0.10e6); //0.1ms
@@ -127,6 +132,15 @@ public final class ChunkTaskManager {
globalWorkers[i].start();
}
+
+ globalUrgentWorker = new QueueExecutorThread<>(globalUrgentQueue, (long)0.10e6); //0.1ms
+ globalUrgentWorker.setName("Paper Async Chunk Urgent Task Thread");
+ globalUrgentWorker.setPriority(Thread.NORM_PRIORITY+1);
+ globalUrgentWorker.setUncaughtExceptionHandler((final Thread thread, final Throwable throwable) -> {
+ PaperFileIOThread.LOGGER.fatal("Thread '" + thread.getName() + "' threw an uncaught exception!", throwable);
+ });
+
+ globalUrgentWorker.start();
}
/**
@@ -234,7 +248,12 @@ public final class ChunkTaskManager {
if (!failed) {
chunkData.chunkData = data;
}
- ChunkTaskManager.this.internalSchedule(ret); // only schedule to the worker threads here
+ if (false && priority == PrioritizedTaskQueue.HIGHEST_PRIORITY) {
+ System.out.println("exec load on main " + world.getWorld().getName() + ":" + chunkX + "," + chunkZ);
+ MinecraftServer.getServer().execute(ret);
+ } else {
+ ChunkTaskManager.this.internalSchedule(ret); // only schedule to the worker threads here
+ }
}, true, failed, intendingToBlock); // read data off disk if the future fails
});
@@ -290,7 +309,12 @@ public final class ChunkTaskManager {
PaperFileIOThread.Holder.INSTANCE.loadChunkDataAsync(world, chunkX, chunkZ, priority, (final PaperFileIOThread.ChunkData chunkData) -> {
ret.chunkData = chunkData;
- ChunkTaskManager.this.internalSchedule(ret); // only schedule to the worker threads here
+ if (false && priority == PrioritizedTaskQueue.HIGHEST_PRIORITY) {
+ System.out.println("exec load on main " + world.getWorld().getName() + ":" + chunkX + "," + chunkZ);
+ MinecraftServer.getServer().execute(ret);
+ } else {
+ ChunkTaskManager.this.internalSchedule(ret); // only schedule to the worker threads here
+ }
}, true, true, intendingToBlock);
return ret;
@@ -377,6 +401,7 @@ public final class ChunkTaskManager {
worker.flush();
}
}
+ globalUrgentWorker.flush();
// flush again since tasks we execute async saves
drainChunkWaitQueue();
@@ -409,20 +434,30 @@ public final class ChunkTaskManager {
public void raisePriority(final int chunkX, final int chunkZ, final int priority) {
final Long chunkKey = Long.valueOf(IOUtil.getCoordinateKey(chunkX, chunkZ));
- ChunkSaveTask chunkSaveTask = this.chunkSaveTasks.get(chunkKey);
+ ChunkTask chunkSaveTask = this.chunkSaveTasks.get(chunkKey);
if (chunkSaveTask != null) {
- final boolean raised = chunkSaveTask.raisePriority(priority);
- if (chunkSaveTask.isScheduled() && raised) {
- // only notify if we're in queue to be executed
- this.internalScheduleNotify();
- }
+ // don't bump save into urgent queue
+ raiseTaskPriority(chunkSaveTask, priority != PrioritizedTaskQueue.HIGHEST_PRIORITY ? priority : PrioritizedTaskQueue.HIGH_PRIORITY);
}
ChunkLoadTask chunkLoadTask = this.chunkLoadTasks.get(chunkKey);
if (chunkLoadTask != null) {
- final boolean raised = chunkLoadTask.raisePriority(priority);
- if (chunkLoadTask.isScheduled() && raised) {
- // only notify if we're in queue to be executed
+ raiseTaskPriority(chunkLoadTask, priority);
+ }
+ }
+
+ private void raiseTaskPriority(ChunkTask task, int priority) {
+ final boolean raised = task.raisePriority(priority);
+ if (task.isScheduled() && raised) {
+ // only notify if we're in queue to be executed
+ if (priority == PrioritizedTaskQueue.HIGHEST_PRIORITY) {
+ // was in another queue but became urgent later, add to urgent queue and the previous
+ // queue will just have to ignore this task if it has already been started.
+ // Ultimately, we now have 2 potential queues that can pull it out whoever gets it first
+ // but the urgent queue has dedicated thread(s) so it's likely to win....
+ globalUrgentQueue.add(task, true);
+ this.internalScheduleNotifyUrgent();
+ } else {
this.internalScheduleNotify();
}
}
@@ -436,8 +471,14 @@ public final class ChunkTaskManager {
// It's important we order the task to be executed before notifying. Avoid a race condition where the worker thread
// wakes up and goes to sleep before we actually schedule (or it's just about to sleep)
- this.queue.add(task);
- this.internalScheduleNotify();
+ if (task.getPriority() == PrioritizedTaskQueue.HIGHEST_PRIORITY) {
+ globalUrgentQueue.add(task, true);
+ this.internalScheduleNotifyUrgent();
+ } else {
+ this.queue.add(task);
+ this.internalScheduleNotify();
+ }
+
}
protected void internalScheduleNotify() {
@@ -452,4 +493,12 @@ public final class ChunkTaskManager {
}
}
+
+ protected void internalScheduleNotifyUrgent() {
+ if (globalUrgentWorker == null) {
+ return;
+ }
+ globalUrgentWorker.notifyTasks();
+ }
+
}
diff --git a/src/main/java/net/minecraft/server/ChunkProviderServer.java b/src/main/java/net/minecraft/server/ChunkProviderServer.java
index 1dcd0980ec..afc255f794 100644
--- a/src/main/java/net/minecraft/server/ChunkProviderServer.java
+++ b/src/main/java/net/minecraft/server/ChunkProviderServer.java
@@ -227,6 +227,7 @@ public class ChunkProviderServer extends IChunkProvider {
}
private long asyncLoadSeqCounter;
+ public static boolean IS_CHUNK_LOAD_BLOCKING_MAIN = false;
public void getChunkAtAsynchronously(int x, int z, boolean gen, java.util.function.Consumer<Chunk> onComplete) {
if (Thread.currentThread() != this.serverThread) {
@@ -384,7 +385,11 @@ public class ChunkProviderServer extends IChunkProvider {
}
gameprofilerfiller.c("getChunkCacheMiss");
+ // Paper start - Async chunks
+ boolean prevBlocking = IS_CHUNK_LOAD_BLOCKING_MAIN;
+ IS_CHUNK_LOAD_BLOCKING_MAIN = true;
CompletableFuture<Either<IChunkAccess, PlayerChunk.Failure>> completablefuture = this.getChunkFutureMainThread(i, j, chunkstatus, flag);
+ // Paper end
if (!completablefuture.isDone()) { // Paper
// Paper start - async chunk io/loading
@@ -393,10 +398,13 @@ public class ChunkProviderServer extends IChunkProvider {
// Paper end
com.destroystokyo.paper.io.SyncLoadFinder.logSyncLoad(this.world, x, z); // Paper - sync load info
this.world.timings.chunkAwait.startTiming(); // Paper
+ System.out.println("waiting for chunk" + this.getWorld().getWorld().getName() + ":" + x + "," + z);
this.serverThreadQueue.awaitTasks(completablefuture::isDone);
+ System.out.println("chunk ready" + this.getWorld().getWorld().getName() + ":" + x + "," + z);
com.destroystokyo.paper.io.chunk.ChunkTaskManager.popChunkWait(); // Paper - async chunk debug
this.world.timings.chunkAwait.stopTiming(); // Paper
} // Paper
+ IS_CHUNK_LOAD_BLOCKING_MAIN = prevBlocking;// Paper
ichunkaccess = (IChunkAccess) ((Either) completablefuture.join()).map((ichunkaccess1) -> {
return ichunkaccess1;
}, (playerchunk_failure) -> {
diff --git a/src/main/java/net/minecraft/server/PlayerChunkMap.java b/src/main/java/net/minecraft/server/PlayerChunkMap.java
index f9e843288a..d064784a44 100644
--- a/src/main/java/net/minecraft/server/PlayerChunkMap.java
+++ b/src/main/java/net/minecraft/server/PlayerChunkMap.java
@@ -2,6 +2,7 @@ package net.minecraft.server;
import co.aikar.timings.Timing; // Paper
import com.destroystokyo.paper.PaperWorldConfig; // Paper
+import com.destroystokyo.paper.io.chunk.ChunkTaskManager;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.ComparisonChain; // Paper
@@ -705,13 +706,16 @@ public class PlayerChunkMap extends IChunkLoader implements PlayerChunk.d {
};
CompletableFuture<NBTTagCompound> chunkSaveFuture = this.world.asyncChunkTaskManager.getChunkSaveFuture(chunkcoordintpair.x, chunkcoordintpair.z);
+ boolean isBlockingMain = MCUtil.isMainThread() && ChunkProviderServer.IS_CHUNK_LOAD_BLOCKING_MAIN;
+ int priority = isBlockingMain ? com.destroystokyo.paper.io.PrioritizedTaskQueue.HIGHEST_PRIORITY : com.destroystokyo.paper.io.PrioritizedTaskQueue.HIGH_PRIORITY;
+ if (isBlockingMain) {
+ System.out.println("blocking main load " + chunkcoordintpair);
+ }
if (chunkSaveFuture != null) {
- this.world.asyncChunkTaskManager.scheduleChunkLoad(chunkcoordintpair.x, chunkcoordintpair.z,
- com.destroystokyo.paper.io.PrioritizedTaskQueue.HIGH_PRIORITY, chunkHolderConsumer, false, chunkSaveFuture);
- this.world.asyncChunkTaskManager.raisePriority(chunkcoordintpair.x, chunkcoordintpair.z, com.destroystokyo.paper.io.PrioritizedTaskQueue.HIGH_PRIORITY);
+ this.world.asyncChunkTaskManager.scheduleChunkLoad(chunkcoordintpair.x, chunkcoordintpair.z, priority, chunkHolderConsumer, isBlockingMain, chunkSaveFuture);
+ this.world.asyncChunkTaskManager.raisePriority(chunkcoordintpair.x, chunkcoordintpair.z, priority);
} else {
- this.world.asyncChunkTaskManager.scheduleChunkLoad(chunkcoordintpair.x, chunkcoordintpair.z,
- com.destroystokyo.paper.io.PrioritizedTaskQueue.NORMAL_PRIORITY, chunkHolderConsumer, false);
+ this.world.asyncChunkTaskManager.scheduleChunkLoad(chunkcoordintpair.x, chunkcoordintpair.z, priority, chunkHolderConsumer, isBlockingMain);
}
return ret;
// Paper end
@@ -749,7 +753,14 @@ public class PlayerChunkMap extends IChunkLoader implements PlayerChunk.d {
return CompletableFuture.completedFuture(Either.right(playerchunk_failure));
});
}, (runnable) -> {
- this.mailboxWorldGen.a(ChunkTaskQueueSorter.a(playerchunk, runnable)); // CraftBukkit - decompile error
+ // Paper start
+ if (ChunkProviderServer.IS_CHUNK_LOAD_BLOCKING_MAIN) {
+ System.out.println("exec main " + world.getWorld().getName() + ":" + chunkcoordintpair);
+ this.executor.execute(runnable);
+ } else {
+ // Paper end
+ this.mailboxWorldGen.a(ChunkTaskQueueSorter.a(playerchunk, runnable)); // CraftBukkit - decompile error
+ }
});
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment