Skip to content

Instantly share code, notes, and snippets.

@aikar
Last active August 9, 2022 11:20
Show Gist options
  • Save aikar/77f8caee3c153074c99b to your computer and use it in GitHub Desktop.
Save aikar/77f8caee3c153074c99b to your computer and use it in GitHub Desktop.
TaskChain v2.6 - Java 8 Version - Released Open Source Under MIT - Pre Java8 version here: https://gist.github.com/aikar/9010136 - learn about TC: https://aikar.co/2015/07/26/async-development-java-control-flow-for-bukkit/
/*
* TaskChain for Bukkit
*
* Written by Aikar <aikar@aikar.co>
* https://aikar.co
* https://starlis.com
*
* @license MIT
*/
package com.empireminecraft.util;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.bukkit.Bukkit;
import org.bukkit.ChatColor;
import org.bukkit.entity.Player;
import org.bukkit.plugin.Plugin;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.logging.Logger;
/**
* TaskChain v2.6 - by Daniel Ennis <aikar@aikar.co>
*
* Facilitates Control Flow for the Bukkit Scheduler to easily jump between
* Async and Sync tasks without deeply nested callbacks, passing the response of the
* previous task to the next task to use.
*
* REQUIREMENT: Must call TaskChain.initialize() in your plugin onEnable like so:
*
* TaskChain.initialize(this);
*
* Find latest updates at https://taskchain.emc.gs
*
* Usage example:
* @see #example
*/
@SuppressWarnings({"unused", "FieldAccessedSynchronizedAndUnsynchronized"})
public class TaskChain <T> {
/**
* A useless example of registering multiple task signatures and states
*/
public static void example() {
log("Starting example");
TaskChain<?> chain = TaskChain.newSharedChain("TEST");
chain
.delay(20 * 3)
.sync(() -> {
Object test = chain.setTaskData("test", 1);
log("==> 1st test");
})
.delay(20)
.async(() -> {
Object test = chain.getTaskData("test");
log("==> 2nd test: " + test + " = should be 1");
})
.sync(TaskChain::abort)
.execute();
// This chain essentially appends onto the previous one, and will not overlap
Bukkit.getScheduler().runTaskAsynchronously(plugin, () -> {
TaskChain<?> chain2 = TaskChain.newSharedChain("TEST");
chain2
.sync(() -> {
Object test = chain2.getTaskData("test");
log("==> 3rd test: " + test + " = should be null");
})
.delay(20)
.async(TaskChain::abort)
.execute();
TaskChain
.newSharedChain("TEST")
.async(() -> log("==> 4th test - should print"))
.returnData("notthere")
.abortIfNull()
.syncLast((val) -> log("Shouldn't execute due to null abort"))
.execute();
});
TaskChain
.newSharedChain("TEST2")
.delay(20 * 3)
.sync(() -> log("this should run at same time as 1st test"))
.delay(20)
.async(() -> log("this should run at same time as 2nd test"))
.execute();
TaskChain
.newChain()
.sync(() -> log("THE FIRST!"))
.delay(20 * 10) // Wait 20s to start any task
.async(() -> log("This ran async - with no input or return"))
.<Integer>asyncFirstCallback(next -> {
// Use a callback to provide result
log("this also ran async, but will call next task in 3 seconds.");
Bukkit.getScheduler().scheduleSyncDelayedTask(plugin, () -> next.accept(3), 60);
})
.sync(input -> { // Will be ran 3s later but didn't use .delay()
log("should of got 3: " + input);
return 5 + input;
})
.storeAsData("Test1")
.syncLast(input2 -> log("should be 8: " + input2)) // Consumes last result, but doesn't pass a new one
.delay(20) // Wait 1s until next
.sync(() -> log("Generic 1s later")) // no input expected, no output, run sync
.asyncFirst(() -> 3) // Run task async and return 3
.delay(5 * 20) // Wait 5s
.asyncLast(input1 -> log("async last value 5s later: " + input1)) // Run async again, with value of 3
.<Integer>returnData("Test1")
.asyncLast((val) -> log("Should of got 8 back from data: " + val))
.sync(TaskChain::abort)
.sync(() -> log("Shouldn't be called"))
.execute();
}
/**
* Util method for example logging
* @param log
*/
private static void log(String log) {
for (String s : log.split("\n")) {
Logger.getGlobal().info(s);
}
}
public static void logError(String log) {
for (String s : log.split("\n")) {
Logger.getGlobal().severe(s);
}
}
/**
* =============================================================================================
*/
private static Plugin plugin;
private static final Map<String, TaskChain<?>> sharedChains = new HashMap<>();
private static final ThreadLocal<TaskChain<?>> currentChain = new ThreadLocal<>();
private boolean shared = false;
private boolean done = false;
private boolean executed = false;
private boolean async;
private String sharedName;
private Object previous;
private final Map<String, Object> taskMap = new HashMap<>(0);
private TaskHolder<?, ?> currentHolder;
@SuppressWarnings("WeakerAccess") // IDE is wrong, can't be private
protected Runnable doneCallback;
protected BiConsumer<Exception, Task<?, ?>> errorHandler;
private final ConcurrentLinkedQueue<TaskHolder<?,?>> chainQueue = new ConcurrentLinkedQueue<>();
/**
* =============================================================================================
*/
/**
* Initializes TaskChain with the owning plugin. Must be called before any use.
* @param owner
*/
public static void initialize(Plugin owner) {
plugin = owner;
}
/**
* Starts a new chain.
* @return
*/
public static <T> TaskChain<T> newChain() {
if (plugin == null) {
throw new IllegalStateException("TaskChain has not been configured. Please call TaskChain.initialize(plugin);");
}
return new TaskChain<>();
}
/**
* Allows re-use of a Chain by giving it a name. This lets you keep adding Tasks to
* an already executing chain. This allows you to assure a sequence of events to only
* execute one at a time, but may be registered and executed from multiple execution points
* or threads.
*
* Task Data is not shared between chains of the same name. The only thing that is shared
* is execution order, in that 2 sequences of events can not run at the same time.
*
* If 2 chains are created at same time under same name, the first chain will execute fully before the 2nd chain will start, no matter how long
*
* @param name
* @param <T>
* @return
*/
public static synchronized <T> TaskChain<T> newSharedChain(String name) {
TaskChain<?> chain;
synchronized (sharedChains) {
chain = sharedChains.get(name);
}
if (chain != null) {
synchronized (chain) {
if (chain.done) {
chain = null;
}
}
}
if (chain == null) {
chain = newChain();
chain.shared = true;
chain.sharedName = name;
sharedChains.put(name, chain);
}
return new SharedTaskChain<>((TaskChain<T>) chain);
}
/**
* Creates a shared chain bound to a specific player with default name
* @see #newSharedChain(String) for full documentation
* @param player
* @param <T>
* @return
*/
public static <T> TaskChain<T> newSharedChain(Player player) {
return newSharedChain(player, "__MAIN__");
}
/**
* Creates a shared chain bound to a specific player with specified name
* @see #newSharedChain(String) for full documentation
* @param player
* @param name
* @param <T>
* @return
*/
public static <T> TaskChain<T> newSharedChain(Player player, String name) {
return newSharedChain(player.getUniqueId() + "__PlayerChain__" + name);
}
/**
* Call to abort execution of the chain.
*/
public static void abort() throws AbortChainException {
throw new AbortChainException();
}
/**
* =============================================================================================
*/
/**
* Checks if the chain has a value saved for the specified key.
* @param key
* @return
*/
public boolean hasTaskData(String key) {
return taskMap.containsKey(key);
}
/**
* Retrieves a value relating to a specific key, saved by a previous task.
*
* @param key
* @param <R>
* @return
*/
public <R> R getTaskData(String key) {
return (R) taskMap.get(key);
}
/**
* Saves a value for this chain so that a task furthur up the chain can access it.
*
* Useful for passing multiple values to the next (or furthur) tasks.
*
* @param key
* @param val
* @param <R>
* @return
*/
public <R> R setTaskData(String key, Object val) {
return (R) taskMap.put(key, val);
}
/**
* Removes a saved value on the chain.
*
* @param key
* @param <R>
* @return
*/
public <R> R removeTaskData(String key) {
return (R) taskMap.remove(key);
}
/**
* =============================================================================================
*/
/**
* Checks if the previous task return was null.
*
* If not null, the previous task return will forward to the next task.
* @return
*/
public TaskChain<T> abortIfNull() {
return abortIfNull(null, null);
}
/**
* Checks if the previous task return was null, and aborts if it was, optionally
* sending a message to the player.
*
* If not null, the previous task return will forward to the next task.
* @param player
* @param msg
* @return
*/
public TaskChain<T> abortIfNull(Player player, String msg) {
return current((obj) -> {
if (obj == null) {
if (msg != null && player != null) {
player.sendMessage(ChatColor.translateAlternateColorCodes('&', msg));
}
abort();
return null;
}
return obj;
});
}
/**
* Takes the previous tasks return value, stores it to the specified key
* as Task Data, and then forwards that value to the next task.
*
* @param key
* @return
*/
public TaskChain<T> storeAsData(String key) {
return current((val) -> {
setTaskData(key, val);
return val;
});
}
/**
* Reads the specified key from Task Data, and passes it to the next task.
*
* Will need to pass expected type such as chain.<Foo>returnData("key")
*
* @param key
* @param <R>
* @return
*/
public <R> TaskChain<R> returnData(String key) {
return currentFirst(() -> (R) getTaskData(key));
}
public TaskChain<TaskChain<?>> returnChain() {
return currentFirst(() -> this);
}
/**
* Adds a delay to the chain execution.
*
* @param ticks # of ticks to delay before next task (20 = 1 second)
* @return
*/
public TaskChain<T> delay(final int ticks) {
//noinspection CodeBlock2Expr
return currentCallback((input, next) -> {
Bukkit.getScheduler().scheduleSyncDelayedTask(plugin, () -> next.accept(input), ticks);
});
}
/**
* Execute a task on the main thread, with no previous input, and a callback to return the response to.
*
* It's important you don't perform blocking operations in this method. Only use this if
* the task will be scheduling a different sync operation outside of the TaskChains scope.
*
* Usually you could achieve the same design with a blocking API by switching to an async task
* for the next task and running it there.
*
* This method would primarily be for cases where you need to use an API that ONLY provides
* a callback style API.
*
* @param task
* @param <R>
* @return
*/
public <R> TaskChain<R> syncFirstCallback(AsyncExecutingFirstTask<R> task) {
return add0(new TaskHolder<>(this, false, task));
}
/**
* @see #syncFirstCallback(AsyncExecutingFirstTask) but ran off main thread
* @param task
* @param <R>
* @return
*/
public <R> TaskChain<R> asyncFirstCallback(AsyncExecutingFirstTask<R> task) {
return add0(new TaskHolder<>(this, true, task));
}
/**
* @see #syncFirstCallback(AsyncExecutingFirstTask) but ran on current thread the Chain was created on
* @param task
* @param <R>
* @return
*/
public <R> TaskChain<R> currentFirstCallback(AsyncExecutingFirstTask<R> task) {
return add0(new TaskHolder<>(this, null, task));
}
/**
* Execute a task on the main thread, with the last output, and a callback to return the response to.
*
* It's important you don't perform blocking operations in this method. Only use this if
* the task will be scheduling a different sync operation outside of the TaskChains scope.
*
* Usually you could achieve the same design with a blocking API by switching to an async task
* for the next task and running it there.
*
* This method would primarily be for cases where you need to use an API that ONLY provides
* a callback style API.
*
* @param task
* @param <R>
* @return
*/
public <R> TaskChain<R> syncCallback(AsyncExecutingTask<R, T> task) {
return add0(new TaskHolder<>(this, false, task));
}
public TaskChain<?> syncCallback(AsyncExecutingGenericTask task) {
return add0(new TaskHolder<>(this, false, task));
}
/**
* @see #syncCallback(AsyncExecutingTask) but ran off main thread
* @param task
* @param <R>
* @return
*/
public <R> TaskChain<R> asyncCallback(AsyncExecutingTask<R, T> task) {
return add0(new TaskHolder<>(this, true, task));
}
/**
* @see #syncCallback(AsyncExecutingTask) but ran off main thread
* @param task
* @return
*/
public TaskChain<?> asyncCallback(AsyncExecutingGenericTask task) {
return add0(new TaskHolder<>(this, true, task));
}
/**
* @see #syncCallback(AsyncExecutingTask) but ran on current thread the Chain was created on
* @param task
* @param <R>
* @return
*/
public <R> TaskChain<R> currentCallback(AsyncExecutingTask<R, T> task) {
return add0(new TaskHolder<>(this, null, task));
}
/**
* @see #syncCallback(AsyncExecutingTask) but ran on current thread the Chain was created on
* @param task
* @return
*/
public TaskChain<?> currentCallback(AsyncExecutingGenericTask task) {
return add0(new TaskHolder<>(this, null, task));
}
/**
* Execute task on main thread, with no input, returning an output
* @param task
* @param <R>
* @return
*/
public <R> TaskChain<R> syncFirst(FirstTask<R> task) {
return add0(new TaskHolder<>(this, false, task));
}
/**
* @see #syncFirst(FirstTask) but ran off main thread
* @param task
* @param <R>
* @return
*/
public <R> TaskChain<R> asyncFirst(FirstTask<R> task) {
return add0(new TaskHolder<>(this, true, task));
}
/**
* @see #syncFirst(FirstTask) but ran on current thread the Chain was created on
* @param task
* @param <R>
* @return
*/
public <R> TaskChain<R> currentFirst(FirstTask<R> task) {
return add0(new TaskHolder<>(this, null, task));
}
/**
* Execute task on main thread, with the last returned input, returning an output
* @param task
* @param <R>
* @return
*/
public <R> TaskChain<R> sync(Task<R, T> task) {
return add0(new TaskHolder<>(this, false, task));
}
/**
* Execute task on main thread, with no input or output
* @param task
* @return
*/
public TaskChain<?> sync(GenericTask task) {
return add0(new TaskHolder<>(this, false, task));
}
/**
* @see #sync(Task) but ran off main thread
* @param task
* @param <R>
* @return
*/
public <R> TaskChain<R> async(Task<R, T> task) {
return add0(new TaskHolder<>(this, true, task));
}
/**
* @see #sync(GenericTask) but ran off main thread
* @param task
* @return
*/
public TaskChain<?> async(GenericTask task) {
return add0(new TaskHolder<>(this, true, task));
}
/**
* @see #sync(Task) but ran on current thread the Chain was created on
* @param task
* @param <R>
* @return
*/
public <R> TaskChain<R> current(Task<R, T> task) {
return add0(new TaskHolder<>(this, null, task));
}
/**
* @see #sync(GenericTask) but ran on current thread the Chain was created on
* @param task
* @return
*/
public TaskChain<?> current(GenericTask task) {
return add0(new TaskHolder<>(this, null, task));
}
/**
* Execute task on main thread, with the last output, and no furthur output
* @param task
* @return
*/
public TaskChain<?> syncLast(LastTask<T> task) {
return add0(new TaskHolder<>(this, false, task));
}
/**
* @see #syncLast(LastTask) but ran off main thread
* @param task
* @return
*/
public TaskChain<?> asyncLast(LastTask<T> task) {
return add0(new TaskHolder<>(this, true, task));
}
/**
* @see #syncLast(LastTask) but ran on current thread the Chain was created on
* @param task
* @return
*/
public TaskChain<?> currentLast(LastTask<T> task) {
return add0(new TaskHolder<>(this, null, task));
}
/**
* Finished adding tasks, begins executing them.
*/
public void execute() {
execute0();
}
protected void execute0() {
synchronized (this) {
if (this.executed) {
if (this.shared) {
return;
}
throw new RuntimeException("Already executed and not a shared chain");
}
this.executed = true;
}
async = !Bukkit.isPrimaryThread();
nextTask();
}
public void executeNext() {
Bukkit.getScheduler().scheduleSyncDelayedTask(plugin, this::execute, 1);
}
public void execute(Runnable done) {
this.doneCallback = done;
execute();
}
public void execute(BiConsumer<Exception, Task<?, ?>> errorHandler) {
this.errorHandler = errorHandler;
execute();
}
public void execute(Runnable done, BiConsumer<Exception, Task<?, ?>> errorHandler) {
this.doneCallback = done;
this.errorHandler = errorHandler;
execute();
}
protected void done() {
this.done = true;
if (this.shared) {
synchronized (sharedChains) {
sharedChains.remove(this.sharedName);
}
}
if (this.doneCallback != null) {
this.doneCallback.run();
}
}
@SuppressWarnings("rawtypes")
protected TaskChain add0(TaskHolder<?,?> task) {
synchronized (this) {
if (!this.shared && this.executed) {
throw new RuntimeException("TaskChain is executing and not shared");
}
}
this.chainQueue.add(task);
return this;
}
/**
* Fires off the next task, and switches between Async/Sync as necessary.
*/
private void nextTask() {
synchronized (this) {
this.currentHolder = this.chainQueue.poll();
if (this.currentHolder == null) {
this.done = true; // to ensure its done while synchronized
}
}
if (this.currentHolder == null) {
this.previous = null;
// All Done!
this.done();
return;
}
Boolean isNextAsync = this.currentHolder.async;
if (isNextAsync == null) {
isNextAsync = this.async;
}
if (isNextAsync) {
if (this.async) {
this.currentHolder.run();
} else {
Bukkit.getScheduler().runTaskAsynchronously(plugin, () -> {
this.async = true;
this.currentHolder.run();
});
}
} else {
if (this.async) {
Bukkit.getScheduler().scheduleSyncDelayedTask(plugin, () -> {
this.async = false;
this.currentHolder.run();
});
} else {
this.currentHolder.run();
}
}
}
/**
* Provides foundation of a task with what the previous task type should return
* to pass to this and what this task will return.
* @param <R> Return Type
* @param <A> Argument Type Expected
*/
@SuppressWarnings("AccessingNonPublicFieldOfAnotherObject")
private static class TaskHolder<R, A> {
private final TaskChain<?> chain;
private final Task<R, A> task;
public final Boolean async;
private boolean executed = false;
private boolean aborted = false;
private TaskHolder(TaskChain<?> chain, Boolean async, Task<R, A> task) {
this.task = task;
this.chain = chain;
this.async = async;
}
/**
* Called internally by Task Chain to facilitate executing the task and then the next task.
*/
private void run() {
final Object arg = this.chain.previous;
this.chain.previous = null;
final R res;
try {
currentChain.set(this.chain);
if (this.task instanceof AsyncExecutingTask) {
((AsyncExecutingTask<R, A>) this.task).runAsync((A) arg, this::next);
} else {
next(this.task.run((A) arg));
}
} catch (AbortChainException ignored) {
this.abort();
} catch (Exception e) {
if (this.chain.errorHandler != null) {
this.chain.errorHandler.accept(e, this.task);
} else {
logError("TaskChain Exception on " + this.task.getClass().getName());
logError(ExceptionUtils.getFullStackTrace(e));
}
this.abort();
} finally {
currentChain.remove();
}
}
/**
* Abort the chain, and clear tasks for GC.
*/
private synchronized void abort() {
this.aborted = true;
this.chain.previous = null;
this.chain.chainQueue.clear();
this.chain.done();
}
/**
* Accepts result of previous task and executes the next
*/
private void next(Object resp) {
synchronized (this) {
if (this.aborted) {
this.chain.done();
return;
}
if (this.executed) {
this.chain.done();
throw new RuntimeException("This task has already been executed.");
}
this.executed = true;
}
this.chain.async = !Bukkit.isPrimaryThread(); // We don't know where the task called this from.
this.chain.previous = resp;
this.chain.nextTask();
}
}
@SuppressWarnings("PublicInnerClass,WeakerAccess")
public static class AbortChainException extends Throwable {}
/**
* Generic task with synchronous return (but may execute on any thread)
* @param <R>
* @param <A>
*/
@SuppressWarnings("WeakerAccess")
public interface Task <R, A> {
/**
* Gets the current chain that is executing this task. This method should only be called on the same thread
* that is executing the task.
* @return
*/
public default TaskChain<?> getCurrentChain() {
return currentChain.get();
}
R run(A input) throws AbortChainException;
}
@SuppressWarnings("WeakerAccess")
public interface AsyncExecutingTask<R, A> extends Task<R, A> {
/**
* Gets the current chain that is executing this task. This method should only be called on the same thread
* that is executing the task.
*
* Since this is an AsyncExecutingTask, You must call this method BEFORE passing control to another thread.
* @return
*/
default TaskChain<?> getCurrentChain() {
return currentChain.get();
}
@Override
default R run(A input) throws AbortChainException {
// unused
return null;
}
void runAsync(A input, Consumer<R> next) throws AbortChainException;
}
@SuppressWarnings("WeakerAccess")
public interface FirstTask <R> extends Task<R, Object> {
@Override
default R run(Object input) throws AbortChainException {
return run();
}
R run() throws AbortChainException;
}
@SuppressWarnings("WeakerAccess")
public interface AsyncExecutingFirstTask<R> extends AsyncExecutingTask<R, Object> {
@Override
default R run(Object input) throws AbortChainException {
// Unused
return null;
}
@Override
default void runAsync(Object input, Consumer<R> next) throws AbortChainException {
run(next);
}
void run(Consumer<R> next) throws AbortChainException;
}
@SuppressWarnings("WeakerAccess")
public interface LastTask <A> extends Task<Object, A> {
@Override
default Object run(A input) throws AbortChainException {
runLast(input);
return null;
}
void runLast(A input) throws AbortChainException;
}
@SuppressWarnings("WeakerAccess")
public interface GenericTask extends Task<Object, Object> {
@Override
default Object run(Object input) throws AbortChainException {
runGeneric();
return null;
}
void runGeneric() throws AbortChainException;
}
@SuppressWarnings("WeakerAccess")
public interface AsyncExecutingGenericTask extends AsyncExecutingTask<Object, Object> {
@Override
default Object run(Object input) throws AbortChainException {
return null;
}
@Override
default void runAsync(Object input, Consumer<Object> next) throws AbortChainException {
run(() -> next.accept(null));
}
void run(Runnable next) throws AbortChainException;
}
private static class SharedTaskChain<R> extends TaskChain<R> {
private final TaskChain<R> backingChain;
private SharedTaskChain(TaskChain<R> backingChain) {
this.backingChain = backingChain;
}
@Override
public void execute() {
synchronized (backingChain) {
// This executes SharedTaskChain.execute(Runnable), which says execute
// my wrapped chains queue of events, but pass a done callback for when its done.
// We then use the backing chain callback method to not execute the next task in the
// backing chain until the current one is fully done.
SharedTaskChain<R> sharedChain = this;
backingChain.currentCallback((AsyncExecutingGenericTask) sharedChain::execute);
backingChain.execute();
}
}
@Override
public void execute(Runnable done) {
this.doneCallback = done;
execute0();
}
}
}
@CommandAlias("wastereset")
public static void doWasteReset(Player player){
Confirmation.confirm(player, "&cWasteland Reset",
input -> {
Wastelands.RESETTING = true;
WorldEditHelper.setAsync(player, false);
TaskChain
.newChain()
.sync( () -> Bukkit.broadcastMessage(Util.color(
"&cWARNING: Wastelands Reset commencing in 5 seconds."
)))
.delay(TimeUtil.SECOND.inTicks(5))
.syncCallback(recreateWorld(player, "wastelands", "normal"))
.syncCallback(recreateWorld(player, "wastelands_nether", "nether"))
.syncCallback(recreateWorld(player, "wastelands_the_end", "end"))
.sync( () -> {
WorldEditHelper.setAsync(player, true);
Wastelands.RESETTING = false;
Bukkit.broadcastMessage(Util.color("&aWastelands has finished resetting. You may enter now."));
}).execute();
}, "&4&lWARNING!!!",
"&cYou are RESETTING the WASTELANDS");
}
public static AsyncExecutingGenericTask recreateWorld(Player player, String world, String type) {
return next -> {
Worlds.rescueAll(Bukkit.getWorld(world));
BukkitUtil.scheduleTask( () -> {
Util.performCommand(player, "mv delete " + world);
Util.performCommand(player, "mvconfirm");
Util.performCommand(player, "dynmap purgeworld " + world);
Util.performCommand(player, "mv create " + world + " " + type);
Util.performCommand(player, "mvm set difficulty normal " + world);
// The following method is async retuning (but still runs on main).
seedWorld(player, Bukkit.getWorld(world), next);
}, 10);
};
}
static void seedWorld(final Player player, final World world, Runnable next) {
if (!world.getName().startsWith("wastelands")) {
player.sendMessage("Must be in wastelands");
next.run();
return;
}
world.getWorldBorder().setCenter(0.0, 0.0);
world.getWorldBorder().setSize(16000);
if ("wastelands_the_end".equalsIgnoreCase(world.getName())) {
setupEnd(player, world, next);
return;
}
final WorldEdit we = WorldEdit.getInstance();
final BukkitPlayer wePlayer = WorldEditHelper.getPlayer(player);
if (ResidenceManager.getByName(world.getName()) != null) {
ResidenceManager.removeResidence(world.getName());
}
final Queue<WastelandOutpost> outpostQueue = new ArrayDeque<>();
Collections.addAll(outpostQueue, WastelandOutpost.values());
final Runnable stage2 = new Runnable() {
@Override
public void run() {
WastelandOutpost outpost = outpostQueue.poll();
if (outpost == null) {
Util.sendMsg(player, "&aSeeding done.");
next.run();
return;
}
Util.sendMsg(player, "&aSeeding &b" + outpost.name);
final Location loc = outpost.getLocation(world);
final EditSession editSession = WorldEditHelper.getEditSession(world, player);
BukkitUtil.runTaskNextTick(this);
seed(editSession, wePlayer, we, player, loc);
if (outpost == WastelandOutpost.CENTER) {
Location spawnLoc = outpost.getSpawnLocation(world);
Worlds.setSpawn(world, spawnLoc);
}
}
};
for (WastelandOutpost outpost : WastelandOutpost.values()) {
buildChunkQueue(outpost.getLocation(world), 8, 4);
}
Util.sendMsg(player, "&aPregenning outpost chunks for " + world.getName() + ": " + highPrioQueue.size());
highPrioQueue.add(() -> {
Util.sendMsg(player, "&aPregenning " + world.getName() + " done");
stage2.run();
});
schedulePregenTask(player);
}
private static void schedulePregenTask(final Player player) {
final Runnable stage1 = new Runnable() {
@Override
public void run() {
boolean isHighPrio = !highPrioQueue.isEmpty();
final int rate = isHighPrio ? 6 : 2;
Queue<Runnable> queue = isHighPrio ? highPrioQueue : lowPrioQueue;
String prefix = "Pregen (" + (isHighPrio ? "high" : "low") +") ";
for (int i = 0; i < rate; i++) {
Runnable run = queue.poll();
if (run == null) {
if (isHighPrio) {
break;
} else {
taskRunning = false;
return;
}
}
run.run();
if (!queue.isEmpty() && queue.size() % (isHighPrio ? 110 : 250) == 0) {
Util.sendMsg(isHighPrio ? player : null, prefix + queue.size() + " remaining.");
}
}
BukkitUtil.runTaskNextTick(this);
}
};
if (!taskRunning) {
taskRunning = true;
BukkitUtil.runTaskNextTick(stage1);
}
}
private static void buildChunkQueue(Location center, int radius, int lowStartRadius) {
World world = center.getWorld();
// pregen the chunks
final int x = center.getBlockX();
final int z = center.getBlockZ();
for (int cx = -radius; cx <= radius; cx++) {
for (int cz = -radius; cz <= radius; cz++) {
final int x1 = (x + (16 * cx)) >> 4;
final int z1 = (z + (16 * cz)) >> 4;
if (
cx > -lowStartRadius && cz > -lowStartRadius
&&
cx < lowStartRadius && cz < lowStartRadius
) {
highPrioQueue.add(() -> world.getChunkAt(x1, z1));
} else {
lowPrioQueue.add(() -> world.getChunkAt(x1, z1));
}
}
}
}
public static void processLogin(Player player, EmpireUser user, Long currentEpoch) {
TaskChain
.newChain()
.asyncFirst(friendsList.getOnlineFriendsTask())
.asyncLast((friends) -> friends.forEach(onlineFriend -> {
final EmpireServer server = onlineFriend.getServer();
if (onlineFriend.getFriend().isNotifyingLogins() && (server != EmpireServer.getServer() || EmpireServer.isDev)) {
CommandQueue.queueServerCommand(server, notifyCommand);
}
})).execute();
}
public static void openMail(Player player, EmpireUser user, Long mailId) {
TaskChain.newSharedChain(player, "MAIL")
.asyncFirst(getMessageTask(user, mailId))
.abortIfNull(player, MailLang.MAIL_NOT_FOUND)
.storeAsData("message")
.sync(Mail::retrieveMail)
.abortIfNull()
.<Message>returnData("message")
.asyncLast(Message::delete)
.execute();
}
public TaskChain.FirstTask<List<OnlineFriend>> getOnlineFriendsTask() {
return () -> {
List<OnlineFriend> friends = Lists.newArrayList();
try {
for (DbRow row : EmpireDb.getResults("SELECT s.user_id, s.server_id FROM user_friends f " +
" JOIN session s ON s.user_id = f.friend_user_id " +
" WHERE f.user_id = ? AND f.pending = 0 " +
" GROUP BY s.server_id ",
user.userId
)) {
final Friend friend = this.friends.get(row.get("user_id"));
final EmpireServer server = EmpireServer.getServerById(row.get("server_id"));
friends.add(new OnlineFriend(friend, server));
}
} catch (SQLException e) {
Util.printException(e);
}
return friends;
};
}
@aikar
Copy link
Author

aikar commented Jun 23, 2016

TaskChain 2.4 Changelog:

  • Added Task.getCurrentChain() - get the currently executing chain in a task. Useful for generic tasks to get execution context.

@aikar
Copy link
Author

aikar commented Jun 23, 2016

TaskChain 2.4.1 Changelog:

  • Added .returnChain() to the chain to also provide another way to access the chain in tasks, by passing the chain itself as an arg to the next task. chain.returnChain().async((chain) -> Object foo = chain.getTaskData("foo"); }).execute();

@aikar
Copy link
Author

aikar commented Jul 6, 2016

TaskChain 2.5 Changelog:

  • Fixed bug where Shared chains always executed next tick, breaking ability to execute synchronously
  • While now unused internally, executeNext() will properly use 1 instead of 0 for next tick, as 0 can mean 'this tick' if scheduled inside of another bukkit scheduler task.
  • Added Error Handler arguments to .execute(), so you can act on uncaught exceptions by your tasks.

@aikar
Copy link
Author

aikar commented Jul 6, 2016

TaskChain 2.6 Changelog:

  • Fixed bug where an exception in a task would not properly abort the chain, resulting in Shared Chains being forever stuck from executing.
  • Changed the recent Error Handler to a BiConsumer to also pass the task that triggered the Exception (reason for minor bump)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment