Skip to content

Instantly share code, notes, and snippets.

@aikar
Created October 13, 2015 06:43
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save aikar/efd37cb9023f6b329907 to your computer and use it in GitHub Desktop.
Save aikar/efd37cb9023f6b329907 to your computer and use it in GitHub Desktop.
/*
* Copyright (c) 2015. Starlis LLC / dba Empire Minecraft
*
* This source code is proprietary software and must not be redistributed without Starlis LLC's approval
*
*/
package com.empireminecraft.util;
import org.bukkit.Bukkit;
import org.spigotmc.timings.Timing;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.function.Consumer;
/**
* Facilitates Control Flow for the Bukkit Scheduler to easily jump between
* Async and Sync tasks without deeply nested callbacks, passing the response of the
* previous task to the next task to use.
*
* Usage example:
* @see #example
*/
@SuppressWarnings("unused")
public class TaskChain2 <T> {
/**
* A useless example of registering multiple task signatures and states
*/
/* public static void example() {
TaskChain2.newChain()
.delay(20 * 10) // Wait 20s to start any task
.async(() -> Util.log("This ran async - with no input or return"))
.<Integer>asyncFirstCallback(next -> {
// Use a callback to provide result
Util.log("this also ran async, but will call next task in 3 seconds.");
BukkitUtil.scheduleTask(() -> next.accept(3), 60);
})
.sync(input -> { // Will be ran 3s later but didn't use .delay()
Util.log("should of got 3: " + input);
return 5 + input;
})
.syncLast(input2 -> Util.log(String.valueOf(input2))) // Consumes last result, but doesn't pass a new one
.delay(20) // Wait 1s until next
.sync(() -> Util.log("Generic")) // no input expected, no output, run sync
.asyncFirst(() -> 3) // Run task async and return 3
.delay(5 * 20) // Wait 5s
.asyncLast(input1 -> Util.log("final async value 5s later: " + input1)) // Run async again, with value of 3
.sync(TaskChain2::abort)
.sync(() -> Util.log("Shouldn't be called"))
.execute();
}*/
/**
* =============================================================================================
*/
private final ConcurrentLinkedQueue<TaskHolder<?,?>> chainQueue = new ConcurrentLinkedQueue<>();
private boolean executed = false;
private Object previous;
private boolean async = !Bukkit.isPrimaryThread();
private TaskHolder<?, ?> currentHolder;
/**
* =============================================================================================
*/
/**
* Starts a new chain.
* @return
*/
public static <T> TaskChain2<T> newChain() {
return new TaskChain2<>();
}
/**
* Call to abort execution of the chain.
*/
public static void abort() throws AbortChainException {
throw new AbortChainException();
}
/**
* =============================================================================================
*/
/**
* Adds a delay to the chain execution.
*
* @param ticks # of ticks to delay before next task (20 = 1 second)
* @return
*/
public TaskChain2<T> delay(final int ticks) {
return syncCallback((input, next) -> BukkitUtil.scheduleTask(() -> next.accept(input), ticks));
}
/**
* Execute task on main thread, with no input, returning an output
* @param task
* @param <R>
* @return
*/
public <R> TaskChain2<R> syncFirst(FirstTask<R> task) {
return add0(new TaskHolder<>(this, false, task));
}
/**
* Execute a task on the main thread, with no previous input, and a callback to return the response to.
*
* It's important you don't perform blocking operations in this method. Only use this if
* the task will be scheduling a different sync operation outside of the TaskChains scope.
*
* Usually you could achieve the same design with a blocking API by switching to an async task
* for the next task and running it there.
*
* This method would primarily be for cases where you need to use an API that ONLY provides
* a callback style API.
*
* @param task
* @param <R>
* @return
*/
public <R> TaskChain2<R> syncFirstCallback(AsyncExecutingFirstTask<R> task) {
return add0(new TaskHolder<>(this, false, task));
}
/**
* Execute task on main thread, with the last returned input, returning an output
* @param task
* @param <R>
* @return
*/
public <R> TaskChain2<R> sync(Task<R, T> task) {
return add0(new TaskHolder<>(this, false, task));
}
/**
* Execute a task on the main thread, with the last output, and a callback to return the response to.
*
* It's important you don't perform blocking operations in this method. Only use this if
* the task will be scheduling a different sync operation outside of the TaskChains scope.
*
* Usually you could achieve the same design with a blocking API by switching to an async task
* for the next task and running it there.
*
* This method would primarily be for cases where you need to use an API that ONLY provides
* a callback style API.
*
* @param task
* @param <R>
* @return
*/
public <R> TaskChain2<R> syncCallback(AsyncExecutingTask<R, T> task) {
return add0(new TaskHolder<>(this, false, task));
}
/**
* Execute task on main thread, with no input or output
* @param task
* @return
*/
public TaskChain2<?> sync(GenericTask task) {
return add0(new TaskHolder<>(this, false, task));
}
/**
* Execute task on main thread, with the last output, and no furthur output
* @param task
* @return
*/
public TaskChain2<?> syncLast(LastTask<T> task) {
return add0(new TaskHolder<>(this, false, task));
}
/**
* @see #syncFirst(FirstTask) but ran off main thread
* @param task
* @param <R>
* @return
*/
public <R> TaskChain2<R> asyncFirst(FirstTask<R> task) {
return add0(new TaskHolder<>(this, true, task));
}
/**
* @see #syncFirstCallback(AsyncExecutingFirstTask) but ran off main thread
* @param task
* @param <R>
* @return
*/
public <R> TaskChain2<R> asyncFirstCallback(AsyncExecutingFirstTask<R> task) {
return add0(new TaskHolder<>(this, true, task));
}
/**
* @see #sync(Task) but ran off main thread
* @param task
* @param <R>
* @return
*/
public <R> TaskChain2<R> async(Task<R, T> task) {
return add0(new TaskHolder<>(this, true, task));
}
/**
* @see #syncCallback(AsyncExecutingTask) but ran off main thread
* @param task
* @param <R>
* @return
*/
public <R> TaskChain2<R> asyncCallback(AsyncExecutingTask<R, T> task) {
return add0(new TaskHolder<>(this, true, task));
}
/**
* @see #sync(GenericTask) but ran off main thread
* @param task
* @return
*/
public TaskChain2<?> async(GenericTask task) {
return add0(new TaskHolder<>(this, true, task));
}
/**
* @see #syncLast(LastTask) but ran off main thread
* @param task
* @return
*/
public TaskChain2<?> asyncLast(LastTask<T> task) {
return add0(new TaskHolder<>(this, true, task));
}
/**
* Finished adding tasks, begins executing them.
*/
public void execute() {
synchronized (this) {
if (this.executed) {
throw new RuntimeException("Already executed");
}
this.executed = true;
}
nextTask();
}
@SuppressWarnings("rawtypes")
private TaskChain2 add0(TaskHolder<?,?> task) {
synchronized (this) {
if (this.executed) {
throw new RuntimeException("TaskChain2 is executing");
}
}
this.chainQueue.add(task);
return this;
}
/**
* Fires off the next task, and switches between Async/Sync as necessary.
*/
private void nextTask() {
this.currentHolder = this.chainQueue.poll();
if (this.currentHolder == null) {
this.previous = null;
// All Done!
return;
}
if (this.currentHolder.async) {
if (this.async) {
this.currentHolder.run();
} else {
BukkitUtil.runTaskAsync(() -> {
this.async = true;
this.currentHolder.run();
});
}
} else {
if (this.async) {
BukkitUtil.runTask(() -> {
this.async = false;
this.currentHolder.run();
});
} else {
this.currentHolder.run();
}
}
}
/**
* Provides foundation of a task with what the previous task type should return
* to pass to this and what this task will return.
* @param <R> Return Type
* @param <A> Argument Type Expected
*/
@SuppressWarnings("AccessingNonPublicFieldOfAnotherObject")
private static class TaskHolder<R, A> {
private final TaskChain2<?> chain;
private final Task<R, A> task;
public final boolean async;
private boolean executed = false;
private boolean aborted = false;
private TaskHolder(TaskChain2<?> chain, boolean async, Task<R, A> task) {
this.task = task;
this.chain = chain;
this.async = async;
}
/**
* Called internally by Task Chain to facilitate executing the task and then the next task.
*/
private void run() {
final Object arg = this.chain.previous;
this.chain.previous = null;
final R res;
try (Timing timing = Util.startTimingIfSync(this.async, "Sync Task: " + (!this.async ? this.task.getClass().getName() : ""))) {
try {
if (this.task instanceof AsyncExecutingTask) {
((AsyncExecutingTask<R, A>) this.task).runAsync((A) arg, this::next);
} else {
next(this.task.run((A) arg));
}
} catch (AbortChainException ignored) {
this.abort();
}
}
}
/**
* Abort the chain, and clear tasks for GC.
*/
private synchronized void abort() {
this.aborted = true;
this.chain.previous = null;
this.chain.chainQueue.clear();
}
/**
* Accepts result of previous task and executes the next
*/
private void next(Object resp) {
synchronized (this) {
if (this.aborted) {
return;
}
if (this.executed) {
throw new RuntimeException("This task has already been executed.");
}
this.executed = true;
}
this.chain.async = !Bukkit.isPrimaryThread(); // We don't know where the task called this from.
this.chain.previous = resp;
this.chain.nextTask();
}
}
private static class AbortChainException extends Throwable {}
/**
* Generic task with synchronous return (but may execute on any thread)
* @param <R>
* @param <A>
*/
public interface Task <R, A> {
R run(A input) throws AbortChainException;
}
public interface AsyncExecutingTask<R, A> extends Task<R, A> {
@Override
default R run(A input) throws AbortChainException {
// unused
return null;
}
void runAsync(A input, Consumer<R> next) throws AbortChainException;
}
public interface FirstTask <R> extends Task<R, Object> {
@Override
default R run(Object input) throws AbortChainException {
return run();
}
R run() throws AbortChainException;
}
public interface AsyncExecutingFirstTask<R> extends AsyncExecutingTask<R, Object> {
@Override
default R run(Object input) throws AbortChainException {
// Unused
return null;
}
@Override
default void runAsync(Object input, Consumer<R> next) throws AbortChainException {
run(next);
}
void run(Consumer<R> next) throws AbortChainException;
}
public interface LastTask <A> extends Task<Object, A> {
@Override
default Object run(A input) throws AbortChainException {
runLast(input);
return null;
}
void runLast(A input) throws AbortChainException;
}
public interface GenericTask extends Task<Object, Object> {
@Override
default Object run(Object input) throws AbortChainException {
runGeneric();
return null;
}
void runGeneric() throws AbortChainException;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment