Skip to content

Instantly share code, notes, and snippets.

@amaranth
Forked from aikar/TaskChain.java
Created June 23, 2016 19:24
Show Gist options
  • Save amaranth/ad9aa9461b16902fef3601f4aae3e6a8 to your computer and use it in GitHub Desktop.
Save amaranth/ad9aa9461b16902fef3601f4aae3e6a8 to your computer and use it in GitHub Desktop.
TaskChain v2.3.2 - Java 8 Version - Released Open Source Under MIT - Pre Java8 version here: https://gist.github.com/aikar/9010136 - learn about TC: https://aikar.co/2015/07/26/async-development-java-control-flow-for-bukkit/
/*
* Copyright (c) 2015. Starlis LLC / dba Empire Minecraft
*
* MIT License:
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of
* this software and associated documentation files (the "Software"), to deal in
* the Software without restriction, including without limitation the rights to use,
* copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the
* Software, and to permit persons to whom the Software is furnished to do so,
* subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
*/
package com.empireminecraft.util;
import org.bukkit.Bukkit;
import org.bukkit.ChatColor;
import org.bukkit.entity.Player;
import org.bukkit.plugin.Plugin;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.function.Consumer;
import java.util.logging.Logger;
/**
* TaskChain v2.3.2 - by Daniel Ennis <aikar@aikar.co>
*
* Facilitates Control Flow for the Bukkit Scheduler to easily jump between
* Async and Sync tasks without deeply nested callbacks, passing the response of the
* previous task to the next task to use.
*
* REQUIREMENT: Must call TaskChain.initialize() in your plugin onEnable like so:
*
* TaskChain.initialize(this);
*
* Usage example:
* @see #example
*/
@SuppressWarnings({"unused", "FieldAccessedSynchronizedAndUnsynchronized"})
public class TaskChain <T> {
/**
* A useless example of registering multiple task signatures and states
*/
public static void example() {
log("Starting example");
TaskChain<?> chain = TaskChain.newSharedChain("TEST");
chain
.delay(20 * 3)
.sync(() -> {
Object test = chain.setTaskData("test", 1);
log("==> 1st test");
})
.delay(20)
.async(() -> {
Object test = chain.getTaskData("test");
log("==> 2nd test: " + test + " = should be 1");
})
.sync(TaskChain::abort)
.execute();
// This chain essentially appends onto the previous one, and will not overlap
Bukkit.getScheduler().runTaskAsynchronously(plugin, () -> {
TaskChain<?> chain2 = TaskChain.newSharedChain("TEST");
chain2
.sync(() -> {
Object test = chain2.getTaskData("test");
log("==> 3rd test: " + test + " = should be null");
})
.delay(20)
.async(TaskChain::abort)
.execute();
TaskChain
.newSharedChain("TEST")
.async(() -> log("==> 4th test - should print"))
.returnData("notthere")
.abortIfNull()
.syncLast((val) -> log("Shouldn't execute due to null abort"))
.execute();
});
TaskChain
.newSharedChain("TEST2")
.delay(20 * 3)
.sync(() -> log("this should run at same time as 1st test"))
.delay(20)
.async(() -> log("this should run at same time as 2nd test"))
.execute();
TaskChain
.newChain()
.sync(() -> log("THE FIRST!"))
.delay(20 * 10) // Wait 20s to start any task
.async(() -> log("This ran async - with no input or return"))
.<Integer>asyncFirstCallback(next -> {
// Use a callback to provide result
log("this also ran async, but will call next task in 3 seconds.");
Bukkit.getScheduler().scheduleSyncDelayedTask(plugin, () -> next.accept(3), 60);
})
.sync(input -> { // Will be ran 3s later but didn't use .delay()
log("should of got 3: " + input);
return 5 + input;
})
.storeAsData("Test1")
.syncLast(input2 -> log("should be 8: " + input2)) // Consumes last result, but doesn't pass a new one
.delay(20) // Wait 1s until next
.sync(() -> log("Generic 1s later")) // no input expected, no output, run sync
.asyncFirst(() -> 3) // Run task async and return 3
.delay(5 * 20) // Wait 5s
.asyncLast(input1 -> log("async last value 5s later: " + input1)) // Run async again, with value of 3
.<Integer>returnData("Test1")
.asyncLast((val) -> log("Should of got 8 back from data: " + val))
.sync(TaskChain::abort)
.sync(() -> log("Shouldn't be called"))
.execute();
}
/**
* Util method for example logging
* @param log
*/
private static void log(String log) {
Logger.getGlobal().info(log);
}
/**
* =============================================================================================
*/
private static Plugin plugin;
private static final Map<String, TaskChain<?>> sharedChains = new HashMap<>();
private boolean shared = false;
private boolean done = false;
private boolean executed = false;
private boolean async = !Bukkit.isPrimaryThread();
private String sharedName;
private Object previous;
private final Map<String, Object> taskMap = new HashMap<>(0);
private TaskHolder<?, ?> currentHolder;
protected Runnable doneCallback;
protected final ConcurrentLinkedQueue<TaskHolder<?,?>> chainQueue = new ConcurrentLinkedQueue<>();
/**
* =============================================================================================
*/
/**
* Initializes TaskChain with the owning plugin. Must be called before any use.
* @param owner
*/
public static void initialize(Plugin owner) {
plugin = owner;
}
/**
* Starts a new chain.
* @return
*/
public static <T> TaskChain<T> newChain() {
if (plugin == null) {
throw new IllegalStateException("TaskChain has not been configured. Please call TaskChain.initialize(plugin);");
}
return new TaskChain<>();
}
/**
* Allows re-use of a Chain by giving it a name. This lets you keep adding Tasks to
* an already executing chain. This allows you to assure a sequence of events to only
* execute one at a time, but may be registered and executed from multiple execution points
* or threads.
*
* Task Data is not shared between chains of the same name. The only thing that is shared
* is execution order, in that 2 sequences of events can not run at the same time.
*
* If 2 chains are created at same time under same name, the first chain will execute fully before the 2nd chain will start, no matter how long
*
* @param name
* @param <T>
* @return
*/
public static synchronized <T> TaskChain<T> newSharedChain(String name) {
TaskChain<?> chain;
synchronized (sharedChains) {
chain = sharedChains.get(name);
}
if (chain != null) {
synchronized (chain) {
if (chain.done) {
chain = null;
}
}
}
if (chain == null) {
chain = newChain();
chain.shared = true;
chain.sharedName = name;
sharedChains.put(name, chain);
}
return new SharedTaskChain<>((TaskChain<T>) chain);
}
/**
* Creates a shared chain bound to a specific player with default name
* @see #newSharedChain(String) for full documentation
* @param player
* @param <T>
* @return
*/
public static <T> TaskChain<T> newSharedChain(Player player) {
return newSharedChain(player, "__MAIN__");
}
/**
* Creates a shared chain bound to a specific player with specified name
* @see #newSharedChain(String) for full documentation
* @param player
* @param name
* @param <T>
* @return
*/
public static <T> TaskChain<T> newSharedChain(Player player, String name) {
return newSharedChain(player.getUniqueId() + "__PlayerChain__" + name);
}
/**
* Call to abort execution of the chain.
*/
public static void abort() throws AbortChainException {
throw new AbortChainException();
}
/**
* =============================================================================================
*/
/**
* Checks if the chain has a value saved for the specified key.
* @param key
* @return
*/
public boolean hasTaskData(String key) {
return taskMap.containsKey(key);
}
/**
* Retrieves a value relating to a specific key, saved by a previous task.
*
* @param key
* @param <R>
* @return
*/
public <R> R getTaskData(String key) {
return (R) taskMap.get(key);
}
/**
* Saves a value for this chain so that a task furthur up the chain can access it.
*
* Useful for passing multiple values to the next (or furthur) tasks.
*
* @param key
* @param val
* @param <R>
* @return
*/
public <R> R setTaskData(String key, Object val) {
return (R) taskMap.put(key, val);
}
/**
* Removes a saved value on the chain.
*
* @param key
* @param <R>
* @return
*/
public <R> R removeTaskData(String key) {
return (R) taskMap.remove(key);
}
/**
* =============================================================================================
*/
/**
* Checks if the previous task return was null.
*
* If not null, the previous task return will forward to the next task.
* @return
*/
public TaskChain<T> abortIfNull() {
return abortIfNull(null, null);
}
/**
* Checks if the previous task return was null, and aborts if it was, optionally
* sending a message to the player.
*
* If not null, the previous task return will forward to the next task.
* @param player
* @param msg
* @return
*/
public TaskChain<T> abortIfNull(Player player, String msg) {
return current((obj) -> {
if (obj == null) {
if (msg != null && player != null) {
player.sendMessage(ChatColor.translateAlternateColorCodes('&', msg));
}
abort();
return null;
}
return obj;
});
}
/**
* Takes the previous tasks return value, stores it to the specified key
* as Task Data, and then forwards that value to the next task.
*
* @param key
* @return
*/
public TaskChain<T> storeAsData(String key) {
return current((val) -> {
setTaskData(key, val);
return val;
});
}
/**
* Reads the specified key from Task Data, and passes it to the next task.
*
* Will need to pass expected type such as chain.<Foo>returnData("key")
*
* @param key
* @param <R>
* @return
*/
public <R> TaskChain<R> returnData(String key) {
return currentFirst(() -> (R) getTaskData(key));
}
/**
* Adds a delay to the chain execution.
*
* @param ticks # of ticks to delay before next task (20 = 1 second)
* @return
*/
public TaskChain<T> delay(final int ticks) {
//noinspection CodeBlock2Expr
return currentCallback((input, next) -> {
Bukkit.getScheduler().scheduleSyncDelayedTask(plugin, () -> next.accept(input), ticks);
});
}
/**
* Execute a task on the main thread, with no previous input, and a callback to return the response to.
*
* It's important you don't perform blocking operations in this method. Only use this if
* the task will be scheduling a different sync operation outside of the TaskChains scope.
*
* Usually you could achieve the same design with a blocking API by switching to an async task
* for the next task and running it there.
*
* This method would primarily be for cases where you need to use an API that ONLY provides
* a callback style API.
*
* @param task
* @param <R>
* @return
*/
public <R> TaskChain<R> syncFirstCallback(AsyncExecutingFirstTask<R> task) {
return add0(new TaskHolder<>(this, false, task));
}
/**
* @see #syncFirstCallback(AsyncExecutingFirstTask) but ran off main thread
* @param task
* @param <R>
* @return
*/
public <R> TaskChain<R> asyncFirstCallback(AsyncExecutingFirstTask<R> task) {
return add0(new TaskHolder<>(this, true, task));
}
/**
* @see #syncFirstCallback(AsyncExecutingFirstTask) but ran on current thread the Chain was created on
* @param task
* @param <R>
* @return
*/
public <R> TaskChain<R> currentFirstCallback(AsyncExecutingFirstTask<R> task) {
return add0(new TaskHolder<>(this, null, task));
}
/**
* Execute a task on the main thread, with the last output, and a callback to return the response to.
*
* It's important you don't perform blocking operations in this method. Only use this if
* the task will be scheduling a different sync operation outside of the TaskChains scope.
*
* Usually you could achieve the same design with a blocking API by switching to an async task
* for the next task and running it there.
*
* This method would primarily be for cases where you need to use an API that ONLY provides
* a callback style API.
*
* @param task
* @param <R>
* @return
*/
public <R> TaskChain<R> syncCallback(AsyncExecutingTask<R, T> task) {
return add0(new TaskHolder<>(this, false, task));
}
public TaskChain<?> syncCallback(AsyncExecutingGenericTask task) {
return add0(new TaskHolder<>(this, false, task));
}
/**
* @see #syncCallback(AsyncExecutingTask) but ran off main thread
* @param task
* @param <R>
* @return
*/
public <R> TaskChain<R> asyncCallback(AsyncExecutingTask<R, T> task) {
return add0(new TaskHolder<>(this, true, task));
}
/**
* @see #syncCallback(AsyncExecutingTask) but ran off main thread
* @param task
* @return
*/
public TaskChain<?> asyncCallback(AsyncExecutingGenericTask task) {
return add0(new TaskHolder<>(this, true, task));
}
/**
* @see #syncCallback(AsyncExecutingTask) but ran on current thread the Chain was created on
* @param task
* @param <R>
* @return
*/
public <R> TaskChain<R> currentCallback(AsyncExecutingTask<R, T> task) {
return add0(new TaskHolder<>(this, null, task));
}
/**
* @see #syncCallback(AsyncExecutingTask) but ran on current thread the Chain was created on
* @param task
* @return
*/
public TaskChain<?> currentCallback(AsyncExecutingGenericTask task) {
return add0(new TaskHolder<>(this, null, task));
}
/**
* Execute task on main thread, with no input, returning an output
* @param task
* @param <R>
* @return
*/
public <R> TaskChain<R> syncFirst(FirstTask<R> task) {
return add0(new TaskHolder<>(this, false, task));
}
/**
* @see #syncFirst(FirstTask) but ran off main thread
* @param task
* @param <R>
* @return
*/
public <R> TaskChain<R> asyncFirst(FirstTask<R> task) {
return add0(new TaskHolder<>(this, true, task));
}
/**
* @see #syncFirst(FirstTask) but ran on current thread the Chain was created on
* @param task
* @param <R>
* @return
*/
public <R> TaskChain<R> currentFirst(FirstTask<R> task) {
return add0(new TaskHolder<>(this, null, task));
}
/**
* Execute task on main thread, with the last returned input, returning an output
* @param task
* @param <R>
* @return
*/
public <R> TaskChain<R> sync(Task<R, T> task) {
return add0(new TaskHolder<>(this, false, task));
}
/**
* Execute task on main thread, with no input or output
* @param task
* @return
*/
public TaskChain<?> sync(GenericTask task) {
return add0(new TaskHolder<>(this, false, task));
}
/**
* @see #sync(Task) but ran off main thread
* @param task
* @param <R>
* @return
*/
public <R> TaskChain<R> async(Task<R, T> task) {
return add0(new TaskHolder<>(this, true, task));
}
/**
* @see #sync(GenericTask) but ran off main thread
* @param task
* @return
*/
public TaskChain<?> async(GenericTask task) {
return add0(new TaskHolder<>(this, true, task));
}
/**
* @see #sync(Task) but ran on current thread the Chain was created on
* @param task
* @param <R>
* @return
*/
public <R> TaskChain<R> current(Task<R, T> task) {
return add0(new TaskHolder<>(this, null, task));
}
/**
* @see #sync(GenericTask) but ran on current thread the Chain was created on
* @param task
* @return
*/
public TaskChain<?> current(GenericTask task) {
return add0(new TaskHolder<>(this, null, task));
}
/**
* Execute task on main thread, with the last output, and no furthur output
* @param task
* @return
*/
public TaskChain<?> syncLast(LastTask<T> task) {
return add0(new TaskHolder<>(this, false, task));
}
/**
* @see #syncLast(LastTask) but ran off main thread
* @param task
* @return
*/
public TaskChain<?> asyncLast(LastTask<T> task) {
return add0(new TaskHolder<>(this, true, task));
}
/**
* @see #syncLast(LastTask) but ran on current thread the Chain was created on
* @param task
* @return
*/
public TaskChain<?> currentLast(LastTask<T> task) {
return add0(new TaskHolder<>(this, null, task));
}
/**
* Finished adding tasks, begins executing them.
*/
public void execute() {
execute0();
}
protected void execute0() {
synchronized (this) {
if (this.executed) {
if (this.shared) {
return;
}
throw new RuntimeException("Already executed and not a shared chain");
}
this.executed = true;
}
nextTask();
}
public void executeNext() {
Bukkit.getScheduler().scheduleSyncDelayedTask(plugin, this::execute, 0);
}
public void execute(Runnable done) {
this.doneCallback = done;
execute();
}
protected void done() {
this.done = true;
if (this.shared) {
synchronized (sharedChains) {
sharedChains.remove(this.sharedName);
}
}
if (this.doneCallback != null) {
this.doneCallback.run();
}
}
@SuppressWarnings("rawtypes")
protected TaskChain add0(TaskHolder<?,?> task) {
synchronized (this) {
if (!this.shared && this.executed) {
throw new RuntimeException("TaskChain is executing and not shared");
}
}
this.chainQueue.add(task);
return this;
}
/**
* Fires off the next task, and switches between Async/Sync as necessary.
*/
private void nextTask() {
synchronized (this) {
this.currentHolder = this.chainQueue.poll();
if (this.currentHolder == null) {
this.done = true; // to ensure its done while synchronized
}
}
if (this.currentHolder == null) {
this.previous = null;
// All Done!
this.done();
return;
}
Boolean isNextAsync = this.currentHolder.async;
if (isNextAsync == null) {
isNextAsync = this.async;
}
if (isNextAsync) {
if (this.async) {
this.currentHolder.run();
} else {
Bukkit.getScheduler().runTaskAsynchronously(plugin, () -> {
this.async = true;
this.currentHolder.run();
});
}
} else {
if (this.async) {
Bukkit.getScheduler().scheduleSyncDelayedTask(plugin, () -> {
this.async = false;
this.currentHolder.run();
});
} else {
this.currentHolder.run();
}
}
}
/**
* Provides foundation of a task with what the previous task type should return
* to pass to this and what this task will return.
* @param <R> Return Type
* @param <A> Argument Type Expected
*/
@SuppressWarnings("AccessingNonPublicFieldOfAnotherObject")
private static class TaskHolder<R, A> {
private final TaskChain<?> chain;
private final Task<R, A> task;
public final Boolean async;
private boolean executed = false;
private boolean aborted = false;
private TaskHolder(TaskChain<?> chain, Boolean async, Task<R, A> task) {
this.task = task;
this.chain = chain;
this.async = async;
}
/**
* Called internally by Task Chain to facilitate executing the task and then the next task.
*/
private void run() {
final Object arg = this.chain.previous;
this.chain.previous = null;
final R res;
try {
if (this.task instanceof AsyncExecutingTask) {
((AsyncExecutingTask<R, A>) this.task).runAsync((A) arg, this::next);
} else {
next(this.task.run((A) arg));
}
} catch (AbortChainException ignored) {
this.abort();
}
}
/**
* Abort the chain, and clear tasks for GC.
*/
private synchronized void abort() {
this.aborted = true;
this.chain.previous = null;
this.chain.chainQueue.clear();
this.chain.done();
}
/**
* Accepts result of previous task and executes the next
*/
private void next(Object resp) {
synchronized (this) {
if (this.aborted) {
this.chain.done();
return;
}
if (this.executed) {
this.chain.done();
throw new RuntimeException("This task has already been executed.");
}
this.executed = true;
}
this.chain.async = !Bukkit.isPrimaryThread(); // We don't know where the task called this from.
this.chain.previous = resp;
this.chain.nextTask();
}
}
@SuppressWarnings("PublicInnerClass")
public static class AbortChainException extends Throwable {}
/**
* Generic task with synchronous return (but may execute on any thread)
* @param <R>
* @param <A>
*/
public interface Task <R, A> {
R run(A input) throws AbortChainException;
}
public interface AsyncExecutingTask<R, A> extends Task<R, A> {
@Override
default R run(A input) throws AbortChainException {
// unused
return null;
}
void runAsync(A input, Consumer<R> next) throws AbortChainException;
}
public interface FirstTask <R> extends Task<R, Object> {
@Override
default R run(Object input) throws AbortChainException {
return run();
}
R run() throws AbortChainException;
}
public interface AsyncExecutingFirstTask<R> extends AsyncExecutingTask<R, Object> {
@Override
default R run(Object input) throws AbortChainException {
// Unused
return null;
}
@Override
default void runAsync(Object input, Consumer<R> next) throws AbortChainException {
run(next);
}
void run(Consumer<R> next) throws AbortChainException;
}
public interface LastTask <A> extends Task<Object, A> {
@Override
default Object run(A input) throws AbortChainException {
runLast(input);
return null;
}
void runLast(A input) throws AbortChainException;
}
public interface GenericTask extends Task<Object, Object> {
@Override
default Object run(Object input) throws AbortChainException {
runGeneric();
return null;
}
void runGeneric() throws AbortChainException;
}
public interface AsyncExecutingGenericTask extends AsyncExecutingTask<Object, Object> {
@Override
default Object run(Object input) throws AbortChainException {
return null;
}
@Override
default void runAsync(Object input, Consumer<Object> next) throws AbortChainException {
run(() -> next.accept(null));
}
void run(Runnable next) throws AbortChainException;
}
private static class SharedTaskChain<R> extends TaskChain<R> {
private final TaskChain<R> backingChain;
private SharedTaskChain(TaskChain<R> backingChain) {
this.backingChain = backingChain;
}
@Override
public void execute() {
synchronized (backingChain) {
// This executes SharedTaskChain.execute(Runnable), which says execute
// my wrapped chains queue of events, but pass a done callback for when its done.
// We then use the backing chain callback method to not execute the next task in the
// backing chain until the current one is fully done.
SharedTaskChain<R> sharedChain = this;
backingChain.currentCallback((next) -> {
sharedChain.execute(next::run);
});
backingChain.executeNext();
}
}
@Override
public void execute(Runnable done) {
this.doneCallback = done;
execute0();
}
}
}
@CommandAlias("wastereset")
public static void doWasteReset(Player player){
Confirmation.confirm(player, "&cWasteland Reset",
input -> {
Wastelands.RESETTING = true;
WorldEditHelper.setAsync(player, false);
TaskChain
.newChain()
.sync( () -> Bukkit.broadcastMessage(Util.color(
"&cWARNING: Wastelands Reset commencing in 5 seconds."
)))
.delay(TimeUtil.SECOND.inTicks(5))
.syncCallback(recreateWorld(player, "wastelands", "normal"))
.syncCallback(recreateWorld(player, "wastelands_nether", "nether"))
.syncCallback(recreateWorld(player, "wastelands_the_end", "end"))
.sync( () -> {
WorldEditHelper.setAsync(player, true);
Wastelands.RESETTING = false;
Bukkit.broadcastMessage(Util.color("&aWastelands has finished resetting. You may enter now."));
}).execute();
}, "&4&lWARNING!!!",
"&cYou are RESETTING the WASTELANDS");
}
public static AsyncExecutingGenericTask recreateWorld(Player player, String world, String type) {
return next -> {
Worlds.rescueAll(Bukkit.getWorld(world));
BukkitUtil.scheduleTask( () -> {
Util.performCommand(player, "mv delete " + world);
Util.performCommand(player, "mvconfirm");
Util.performCommand(player, "dynmap purgeworld " + world);
Util.performCommand(player, "mv create " + world + " " + type);
Util.performCommand(player, "mvm set difficulty normal " + world);
// The following method is async retuning (but still runs on main).
seedWorld(player, Bukkit.getWorld(world), next);
}, 10);
};
}
static void seedWorld(final Player player, final World world, Runnable next) {
if (!world.getName().startsWith("wastelands")) {
player.sendMessage("Must be in wastelands");
next.run();
return;
}
world.getWorldBorder().setCenter(0.0, 0.0);
world.getWorldBorder().setSize(16000);
if ("wastelands_the_end".equalsIgnoreCase(world.getName())) {
setupEnd(player, world, next);
return;
}
final WorldEdit we = WorldEdit.getInstance();
final BukkitPlayer wePlayer = WorldEditHelper.getPlayer(player);
if (ResidenceManager.getByName(world.getName()) != null) {
ResidenceManager.removeResidence(world.getName());
}
final Queue<WastelandOutpost> outpostQueue = new ArrayDeque<>();
Collections.addAll(outpostQueue, WastelandOutpost.values());
final Runnable stage2 = new Runnable() {
@Override
public void run() {
WastelandOutpost outpost = outpostQueue.poll();
if (outpost == null) {
Util.sendMsg(player, "&aSeeding done.");
next.run();
return;
}
Util.sendMsg(player, "&aSeeding &b" + outpost.name);
final Location loc = outpost.getLocation(world);
final EditSession editSession = WorldEditHelper.getEditSession(world, player);
BukkitUtil.runTaskNextTick(this);
seed(editSession, wePlayer, we, player, loc);
if (outpost == WastelandOutpost.CENTER) {
Location spawnLoc = outpost.getSpawnLocation(world);
Worlds.setSpawn(world, spawnLoc);
}
}
};
for (WastelandOutpost outpost : WastelandOutpost.values()) {
buildChunkQueue(outpost.getLocation(world), 8, 4);
}
Util.sendMsg(player, "&aPregenning outpost chunks for " + world.getName() + ": " + highPrioQueue.size());
highPrioQueue.add(() -> {
Util.sendMsg(player, "&aPregenning " + world.getName() + " done");
stage2.run();
});
schedulePregenTask(player);
}
private static void schedulePregenTask(final Player player) {
final Runnable stage1 = new Runnable() {
@Override
public void run() {
boolean isHighPrio = !highPrioQueue.isEmpty();
final int rate = isHighPrio ? 6 : 2;
Queue<Runnable> queue = isHighPrio ? highPrioQueue : lowPrioQueue;
String prefix = "Pregen (" + (isHighPrio ? "high" : "low") +") ";
for (int i = 0; i < rate; i++) {
Runnable run = queue.poll();
if (run == null) {
if (isHighPrio) {
break;
} else {
taskRunning = false;
return;
}
}
run.run();
if (!queue.isEmpty() && queue.size() % (isHighPrio ? 110 : 250) == 0) {
Util.sendMsg(isHighPrio ? player : null, prefix + queue.size() + " remaining.");
}
}
BukkitUtil.runTaskNextTick(this);
}
};
if (!taskRunning) {
taskRunning = true;
BukkitUtil.runTaskNextTick(stage1);
}
}
private static void buildChunkQueue(Location center, int radius, int lowStartRadius) {
World world = center.getWorld();
// pregen the chunks
final int x = center.getBlockX();
final int z = center.getBlockZ();
for (int cx = -radius; cx <= radius; cx++) {
for (int cz = -radius; cz <= radius; cz++) {
final int x1 = (x + (16 * cx)) >> 4;
final int z1 = (z + (16 * cz)) >> 4;
if (
cx > -lowStartRadius && cz > -lowStartRadius
&&
cx < lowStartRadius && cz < lowStartRadius
) {
highPrioQueue.add(() -> world.getChunkAt(x1, z1));
} else {
lowPrioQueue.add(() -> world.getChunkAt(x1, z1));
}
}
}
}
public static void processLogin(Player player, EmpireUser user, Long currentEpoch) {
TaskChain
.newChain()
.asyncFirst(friendsList.getOnlineFriendsTask())
.asyncLast((friends) -> friends.forEach(onlineFriend -> {
final EmpireServer server = onlineFriend.getServer();
if (onlineFriend.getFriend().isNotifyingLogins() && (server != EmpireServer.getServer() || EmpireServer.isDev)) {
CommandQueue.queueServerCommand(server, notifyCommand);
}
})).execute();
}
public static void openMail(Player player, EmpireUser user, Long mailId) {
TaskChain.newSharedChain(player, "MAIL")
.asyncFirst(getMessageTask(user, mailId))
.abortIfNull(player, MailLang.MAIL_NOT_FOUND)
.storeAsData("message")
.sync(Mail::retrieveMail)
.abortIfNull()
.<Message>returnData("message")
.asyncLast(Message::delete)
.execute();
}
public TaskChain.FirstTask<List<OnlineFriend>> getOnlineFriendsTask() {
return () -> {
List<OnlineFriend> friends = Lists.newArrayList();
try {
for (DbRow row : EmpireDb.getResults("SELECT s.user_id, s.server_id FROM user_friends f " +
" JOIN session s ON s.user_id = f.friend_user_id " +
" WHERE f.user_id = ? AND f.pending = 0 " +
" GROUP BY s.server_id ",
user.userId
)) {
final Friend friend = this.friends.get(row.get("user_id"));
final EmpireServer server = EmpireServer.getServerById(row.get("server_id"));
friends.add(new OnlineFriend(friend, server));
}
} catch (SQLException e) {
Util.printException(e);
}
return friends;
};
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment