Created
March 28, 2020 00:53
-
-
Save aikar/97cf37b53e48baba54e6a6453987e3c5 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
diff --git a/src/main/java/com/destroystokyo/paper/io/PrioritizedTaskQueue.java b/src/main/java/com/destroystokyo/paper/io/PrioritizedTaskQueue.java | |
index 78bd238f4c..24d4902929 100644 | |
--- a/src/main/java/com/destroystokyo/paper/io/PrioritizedTaskQueue.java | |
+++ b/src/main/java/com/destroystokyo/paper/io/PrioritizedTaskQueue.java | |
@@ -72,7 +72,10 @@ public class PrioritizedTaskQueue<T extends PrioritizedTaskQueue.PrioritizedTask | |
* This can also be thrown if the queue has shutdown. | |
*/ | |
public void add(final T task) throws IllegalStateException { | |
- task.onQueue(this); | |
+ add(task, false); | |
+ } | |
+ public void add(final T task, boolean allowReenqueue) throws IllegalStateException { | |
+ task.onQueue(this, allowReenqueue); | |
this.queues[task.getPriority()].add(task); | |
if (this.shutdown.get()) { | |
// note: we're not actually sure at this point if our task will go through | |
@@ -251,7 +254,10 @@ public class PrioritizedTaskQueue<T extends PrioritizedTaskQueue.PrioritizedTask | |
} | |
void onQueue(final PrioritizedTaskQueue queue) { | |
- if (this.queue.getAndSet(queue) != null) { | |
+ onQueue(queue, false); | |
+ } | |
+ void onQueue(final PrioritizedTaskQueue queue, boolean allowReenqueue) { | |
+ if (this.queue.getAndSet(queue) != null && !allowReenqueue) { | |
throw new IllegalStateException("Already queued!"); | |
} | |
} | |
diff --git a/src/main/java/com/destroystokyo/paper/io/chunk/ChunkTaskManager.java b/src/main/java/com/destroystokyo/paper/io/chunk/ChunkTaskManager.java | |
index 715a2dd8d2..923d30b586 100644 | |
--- a/src/main/java/com/destroystokyo/paper/io/chunk/ChunkTaskManager.java | |
+++ b/src/main/java/com/destroystokyo/paper/io/chunk/ChunkTaskManager.java | |
@@ -34,7 +34,9 @@ public final class ChunkTaskManager { | |
private final PrioritizedTaskQueue<ChunkTask> chunkTasks = new PrioritizedTaskQueue<>(); // used if async chunks are disabled in config | |
protected static QueueExecutorThread<ChunkTask>[] globalWorkers; | |
+ protected static QueueExecutorThread<ChunkTask> globalUrgentWorker; | |
protected static PrioritizedTaskQueue<ChunkTask> globalQueue; | |
+ protected static PrioritizedTaskQueue<ChunkTask> globalUrgentQueue; | |
protected static final ConcurrentLinkedQueue<Runnable> CHUNK_WAIT_QUEUE = new ConcurrentLinkedQueue<>(); | |
@@ -116,6 +118,7 @@ public final class ChunkTaskManager { | |
globalWorkers = new QueueExecutorThread[threads]; | |
globalQueue = new PrioritizedTaskQueue<>(); | |
+ globalUrgentQueue = new PrioritizedTaskQueue<>(); | |
for (int i = 0; i < threads; ++i) { | |
globalWorkers[i] = new QueueExecutorThread<>(globalQueue, (long)0.10e6); //0.1ms | |
@@ -127,6 +130,15 @@ public final class ChunkTaskManager { | |
globalWorkers[i].start(); | |
} | |
+ | |
+ globalUrgentWorker = new QueueExecutorThread<>(globalUrgentQueue, (long)0.10e6); //0.1ms | |
+ globalUrgentWorker.setName("Paper Async Chunk Urgent Task Thread"); | |
+ globalUrgentWorker.setPriority(Thread.NORM_PRIORITY+1); | |
+ globalUrgentWorker.setUncaughtExceptionHandler((final Thread thread, final Throwable throwable) -> { | |
+ PaperFileIOThread.LOGGER.fatal("Thread '" + thread.getName() + "' threw an uncaught exception!", throwable); | |
+ }); | |
+ | |
+ globalUrgentWorker.start(); | |
} | |
/** | |
@@ -377,6 +389,7 @@ public final class ChunkTaskManager { | |
worker.flush(); | |
} | |
} | |
+ globalUrgentWorker.flush(); | |
// flush again since tasks we execute async saves | |
drainChunkWaitQueue(); | |
@@ -409,20 +422,30 @@ public final class ChunkTaskManager { | |
public void raisePriority(final int chunkX, final int chunkZ, final int priority) { | |
final Long chunkKey = Long.valueOf(IOUtil.getCoordinateKey(chunkX, chunkZ)); | |
- ChunkSaveTask chunkSaveTask = this.chunkSaveTasks.get(chunkKey); | |
+ ChunkTask chunkSaveTask = this.chunkSaveTasks.get(chunkKey); | |
if (chunkSaveTask != null) { | |
- final boolean raised = chunkSaveTask.raisePriority(priority); | |
- if (chunkSaveTask.isScheduled() && raised) { | |
- // only notify if we're in queue to be executed | |
- this.internalScheduleNotify(); | |
- } | |
+ // don't bump save into urgent queue | |
+ raiseTaskPriority(chunkSaveTask, priority != PrioritizedTaskQueue.HIGHEST_PRIORITY ? priority : PrioritizedTaskQueue.HIGH_PRIORITY); | |
} | |
ChunkLoadTask chunkLoadTask = this.chunkLoadTasks.get(chunkKey); | |
if (chunkLoadTask != null) { | |
- final boolean raised = chunkLoadTask.raisePriority(priority); | |
- if (chunkLoadTask.isScheduled() && raised) { | |
- // only notify if we're in queue to be executed | |
+ raiseTaskPriority(chunkLoadTask, priority); | |
+ } | |
+ } | |
+ | |
+ private void raiseTaskPriority(ChunkTask task, int priority) { | |
+ final boolean raised = task.raisePriority(priority); | |
+ if (task.isScheduled() && raised) { | |
+ // only notify if we're in queue to be executed | |
+ if (priority == PrioritizedTaskQueue.HIGHEST_PRIORITY) { | |
+ // was in another queue but became urgent later, add to urgent queue and the previous | |
+ // queue will just have to ignore this task if it has already been started. | |
+ // Ultimately, we now have 2 potential queues that can pull it out whoever gets it first | |
+ // but the urgent queue has dedicated thread(s) so it's likely to win.... | |
+ globalUrgentQueue.add(task, true); | |
+ this.internalScheduleNotifyUrgent(); | |
+ } else { | |
this.internalScheduleNotify(); | |
} | |
} | |
@@ -436,8 +459,14 @@ public final class ChunkTaskManager { | |
// It's important we order the task to be executed before notifying. Avoid a race condition where the worker thread | |
// wakes up and goes to sleep before we actually schedule (or it's just about to sleep) | |
- this.queue.add(task); | |
- this.internalScheduleNotify(); | |
+ if (task.getPriority() == PrioritizedTaskQueue.HIGHEST_PRIORITY) { | |
+ globalUrgentQueue.add(task, true); | |
+ this.internalScheduleNotifyUrgent(); | |
+ } else { | |
+ this.queue.add(task); | |
+ this.internalScheduleNotify(); | |
+ } | |
+ | |
} | |
protected void internalScheduleNotify() { | |
@@ -452,4 +481,12 @@ public final class ChunkTaskManager { | |
} | |
} | |
+ | |
+ protected void internalScheduleNotifyUrgent() { | |
+ if (globalUrgentWorker == null) { | |
+ return; | |
+ } | |
+ globalUrgentWorker.notifyTasks(); | |
+ } | |
+ | |
} | |
diff --git a/src/main/java/net/minecraft/server/ChunkProviderServer.java b/src/main/java/net/minecraft/server/ChunkProviderServer.java | |
index 1dcd0980ec..1873f7b7dd 100644 | |
--- a/src/main/java/net/minecraft/server/ChunkProviderServer.java | |
+++ b/src/main/java/net/minecraft/server/ChunkProviderServer.java | |
@@ -227,6 +227,7 @@ public class ChunkProviderServer extends IChunkProvider { | |
} | |
private long asyncLoadSeqCounter; | |
+ public static boolean IS_CHUNK_LOAD_BLOCKING_MAIN = false; | |
public void getChunkAtAsynchronously(int x, int z, boolean gen, java.util.function.Consumer<Chunk> onComplete) { | |
if (Thread.currentThread() != this.serverThread) { | |
@@ -384,7 +385,12 @@ public class ChunkProviderServer extends IChunkProvider { | |
} | |
gameprofilerfiller.c("getChunkCacheMiss"); | |
+ // Paper start - Async chunks | |
+ boolean prevBlocking = IS_CHUNK_LOAD_BLOCKING_MAIN; | |
+ IS_CHUNK_LOAD_BLOCKING_MAIN = true; | |
CompletableFuture<Either<IChunkAccess, PlayerChunk.Failure>> completablefuture = this.getChunkFutureMainThread(i, j, chunkstatus, flag); | |
+ IS_CHUNK_LOAD_BLOCKING_MAIN = prevBlocking; | |
+ // Paper end | |
if (!completablefuture.isDone()) { // Paper | |
// Paper start - async chunk io/loading | |
diff --git a/src/main/java/net/minecraft/server/PlayerChunkMap.java b/src/main/java/net/minecraft/server/PlayerChunkMap.java | |
index f9e843288a..b6db0fc1dc 100644 | |
--- a/src/main/java/net/minecraft/server/PlayerChunkMap.java | |
+++ b/src/main/java/net/minecraft/server/PlayerChunkMap.java | |
@@ -705,13 +705,13 @@ public class PlayerChunkMap extends IChunkLoader implements PlayerChunk.d { | |
}; | |
CompletableFuture<NBTTagCompound> chunkSaveFuture = this.world.asyncChunkTaskManager.getChunkSaveFuture(chunkcoordintpair.x, chunkcoordintpair.z); | |
+ boolean isBlockingMain = MCUtil.isMainThread() && ChunkProviderServer.IS_CHUNK_LOAD_BLOCKING_MAIN; | |
+ int priority = isBlockingMain ? com.destroystokyo.paper.io.PrioritizedTaskQueue.HIGH_PRIORITY : com.destroystokyo.paper.io.PrioritizedTaskQueue.HIGH_PRIORITY; | |
if (chunkSaveFuture != null) { | |
- this.world.asyncChunkTaskManager.scheduleChunkLoad(chunkcoordintpair.x, chunkcoordintpair.z, | |
- com.destroystokyo.paper.io.PrioritizedTaskQueue.HIGH_PRIORITY, chunkHolderConsumer, false, chunkSaveFuture); | |
- this.world.asyncChunkTaskManager.raisePriority(chunkcoordintpair.x, chunkcoordintpair.z, com.destroystokyo.paper.io.PrioritizedTaskQueue.HIGH_PRIORITY); | |
+ this.world.asyncChunkTaskManager.scheduleChunkLoad(chunkcoordintpair.x, chunkcoordintpair.z, priority, chunkHolderConsumer, isBlockingMain, chunkSaveFuture); | |
+ this.world.asyncChunkTaskManager.raisePriority(chunkcoordintpair.x, chunkcoordintpair.z, priority); | |
} else { | |
- this.world.asyncChunkTaskManager.scheduleChunkLoad(chunkcoordintpair.x, chunkcoordintpair.z, | |
- com.destroystokyo.paper.io.PrioritizedTaskQueue.NORMAL_PRIORITY, chunkHolderConsumer, false); | |
+ this.world.asyncChunkTaskManager.scheduleChunkLoad(chunkcoordintpair.x, chunkcoordintpair.z, priority, chunkHolderConsumer, isBlockingMain); | |
} | |
return ret; | |
// Paper end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment