-
-
Save Twister915/bd053583ffbb7a4d1e7f to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package tech.rayline.core.rx; | |
import org.bukkit.plugin.Plugin; | |
import rx.Observable; | |
public abstract class BaseStreamer { | |
protected final Plugin plugin; | |
protected final RxBukkitScheduler syncScheduler, asyncScheduler; | |
public BaseStreamer(Plugin plugin, RxBukkitScheduler syncScheduler, RxBukkitScheduler asyncScheduler) { | |
this.plugin = plugin; | |
this.syncScheduler = syncScheduler; | |
this.asyncScheduler = asyncScheduler; | |
} | |
public <T> Observable.Transformer<T, T> getSyncTransformer() { | |
return new Observable.Transformer<T, T>() { | |
@Override | |
public Observable<T> call(Observable<T> tObservable) { | |
return tObservable.subscribeOn(syncScheduler); | |
} | |
}; | |
} | |
public <T> Observable.Transformer<T, T> getAsyncTransformer() { | |
return new Observable.Transformer<T, T>() { | |
@Override | |
public Observable<T> call(Observable<T> tObservable) { | |
return tObservable.subscribeOn(asyncScheduler); | |
} | |
}; | |
} | |
public Plugin getPlugin() { | |
return this.plugin; | |
} | |
public RxBukkitScheduler getSyncScheduler() { | |
return this.syncScheduler; | |
} | |
public RxBukkitScheduler getAsyncScheduler() { | |
return this.asyncScheduler; | |
} | |
public boolean equals(Object o) { | |
if (o == this) return true; | |
if (!(o instanceof BaseStreamer)) return false; | |
final BaseStreamer other = (BaseStreamer) o; | |
if (!other.canEqual((Object) this)) return false; | |
final Object this$plugin = this.plugin; | |
final Object other$plugin = other.plugin; | |
if (this$plugin == null ? other$plugin != null : !this$plugin.equals(other$plugin)) return false; | |
final Object this$syncScheduler = this.syncScheduler; | |
final Object other$syncScheduler = other.syncScheduler; | |
if (this$syncScheduler == null ? other$syncScheduler != null : !this$syncScheduler.equals(other$syncScheduler)) | |
return false; | |
final Object this$asyncScheduler = this.asyncScheduler; | |
final Object other$asyncScheduler = other.asyncScheduler; | |
if (this$asyncScheduler == null ? other$asyncScheduler != null : !this$asyncScheduler.equals(other$asyncScheduler)) | |
return false; | |
return true; | |
} | |
public int hashCode() { | |
final int PRIME = 59; | |
int result = 1; | |
final Object $plugin = this.plugin; | |
result = result * PRIME + ($plugin == null ? 0 : $plugin.hashCode()); | |
final Object $syncScheduler = this.syncScheduler; | |
result = result * PRIME + ($syncScheduler == null ? 0 : $syncScheduler.hashCode()); | |
final Object $asyncScheduler = this.asyncScheduler; | |
result = result * PRIME + ($asyncScheduler == null ? 0 : $asyncScheduler.hashCode()); | |
return result; | |
} | |
protected boolean canEqual(Object other) { | |
return other instanceof BaseStreamer; | |
} | |
public String toString() { | |
return "tech.rayline.core.rx.BaseStreamer(plugin=" + this.plugin + ", syncScheduler=" + this.syncScheduler + ", asyncScheduler=" + this.asyncScheduler + ")"; | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package tech.rayline.core.rx; | |
public enum ConcurrencyMode { | |
SYNC, | |
ASYNC | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package tech.rayline.core.rx; | |
import org.bukkit.Bukkit; | |
import org.bukkit.event.*; | |
import org.bukkit.event.server.PluginDisableEvent; | |
import org.bukkit.plugin.EventExecutor; | |
import org.bukkit.plugin.Plugin; | |
import org.bukkit.plugin.PluginManager; | |
import rx.Observable; | |
import rx.Subscriber; | |
import rx.exceptions.Exceptions; | |
import rx.functions.Action0; | |
import rx.subscriptions.Subscriptions; | |
public final class EventStreamer extends BaseStreamer { | |
public EventStreamer(Plugin plugin, RxBukkitScheduler syncScheduler, RxBukkitScheduler asyncScheduler) { | |
super(plugin, syncScheduler, asyncScheduler); | |
} | |
@SafeVarargs | |
public final <T extends Event> Observable<T> observeEvent(Class<? extends T>... events) { | |
return observeEvent(EventPriority.NORMAL, events); | |
} | |
@SafeVarargs | |
public final <T extends Event> Observable<T> observeEvent(EventPriority priority, Class<? extends T>... events) { | |
return observeEvent(priority, false, events); | |
} | |
@SafeVarargs | |
public final <T extends Event> Observable<T> observeEvent(final EventPriority priority, final boolean ignoreCancelled, final Class<? extends T>... events) { | |
//creates an observer which... | |
return Observable.create(new Observable.OnSubscribe<T>() { | |
@Override | |
public void call(final Subscriber<? super T> subscriber) { | |
//creates an empty listener | |
final Listener listener = new Listener() { | |
}; | |
//creates an event executor | |
@SuppressWarnings("unchecked") | |
EventExecutor executor = new EventExecutor() { | |
@Override | |
public void execute(Listener listener1, Event event) throws EventException { | |
//check to make sure the event bukkit sent us can apply to what the observer is expecting (type T) | |
//strangely, they'll send things like the EntityDamageEvent when we want to cast to EntityDamageByEntity | |
//so it's best if we manually check here to make sure it can apply to any class | |
boolean canAssign = false; //track a "canAssign" value | |
Class<? extends Event> eventClass = event.getClass(); //get the class of the event | |
for (Class<? extends T> aClass : events) { //go through all classes we're concerned with emitting (T is their mutual supertype) | |
if (aClass.isAssignableFrom(eventClass)) { //if one of the classes (from the events classes arg) is assignable to the event class (is a superclass or equal to) | |
canAssign = true; //then we're good, and can break | |
break; | |
} | |
} | |
//if we never discovered a class which is assignable, this is one of those weird and rare cases where Bukkit is stupid | |
if (!canAssign) | |
return; //so we return | |
try { | |
//noinspection unchecked | |
subscriber.onNext((T) event); | |
} catch (Throwable t) { | |
Exceptions.throwOrReport(t, subscriber); | |
} | |
} | |
}; | |
//registers all the event types to that listener | |
PluginManager pluginManager = Bukkit.getPluginManager(); | |
for (Class<? extends T> event : events) | |
pluginManager.registerEvent(event, listener, priority, executor, plugin, ignoreCancelled); | |
//and registers a HandlerList.unregisterAll call as the unsubscribe action | |
subscriber.add(Subscriptions.create(new Action0() { | |
@Override | |
public void call() { | |
HandlerList.unregisterAll(listener); | |
} | |
})); | |
//also needs to unsubscribe when the plugin disables | |
pluginManager.registerEvent(PluginDisableEvent.class, listener, EventPriority.MONITOR, new EventExecutor() { | |
@Override | |
public void execute(Listener l, Event event) throws EventException { | |
PluginDisableEvent disableEvent = (PluginDisableEvent) event; | |
if (disableEvent.getPlugin().equals(plugin)) | |
subscriber.onCompleted(); | |
} | |
}, plugin, false); | |
} | |
}).compose(this.<T>getSyncTransformer()); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package tech.rayline.core.rx; | |
import org.bukkit.plugin.java.JavaPlugin; | |
import org.bukkit.scheduler.BukkitTask; | |
import rx.Scheduler; | |
import rx.Subscription; | |
import rx.functions.Action0; | |
import rx.internal.schedulers.ScheduledAction; | |
import rx.subscriptions.CompositeSubscription; | |
import rx.subscriptions.Subscriptions; | |
import tech.rayline.core.util.RunnableShorthand; | |
import java.util.concurrent.TimeUnit; | |
public final class RxBukkitScheduler extends Scheduler { | |
private final JavaPlugin plugin; | |
private final ConcurrencyMode concurrencyMode; | |
public RxBukkitScheduler(JavaPlugin plugin, ConcurrencyMode concurrencyMode) { | |
this.plugin = plugin; | |
this.concurrencyMode = concurrencyMode; | |
} | |
private BukkitTask actualSchedule(final Action0 action, int ticksDelay) { | |
RunnableShorthand with = RunnableShorthand.forPlugin(plugin).with(new Runnable() { | |
@Override | |
public void run() { | |
action.call(); | |
} | |
}); | |
if (concurrencyMode == ConcurrencyMode.ASYNC) | |
with.async(); | |
return with.later(ticksDelay); | |
} | |
@Override | |
public Worker createWorker() { | |
return new BukkitWorker(); | |
} | |
public JavaPlugin getPlugin() { | |
return this.plugin; | |
} | |
public ConcurrencyMode getConcurrencyMode() { | |
return this.concurrencyMode; | |
} | |
public String toString() { | |
return "tech.rayline.core.rx.RxBukkitScheduler(plugin=" + this.plugin + ", concurrencyMode=" + this.concurrencyMode + ")"; | |
} | |
public boolean equals(Object o) { | |
if (o == this) return true; | |
if (!(o instanceof RxBukkitScheduler)) return false; | |
final RxBukkitScheduler other = (RxBukkitScheduler) o; | |
if (!other.canEqual((Object) this)) return false; | |
final Object this$plugin = this.getPlugin(); | |
final Object other$plugin = other.getPlugin(); | |
if (this$plugin == null ? other$plugin != null : !this$plugin.equals(other$plugin)) return false; | |
final Object this$concurrencyMode = this.getConcurrencyMode(); | |
final Object other$concurrencyMode = other.getConcurrencyMode(); | |
if (this$concurrencyMode == null ? other$concurrencyMode != null : !this$concurrencyMode.equals(other$concurrencyMode)) | |
return false; | |
return true; | |
} | |
public int hashCode() { | |
final int PRIME = 59; | |
int result = 1; | |
final Object $plugin = this.getPlugin(); | |
result = result * PRIME + ($plugin == null ? 0 : $plugin.hashCode()); | |
final Object $concurrencyMode = this.getConcurrencyMode(); | |
result = result * PRIME + ($concurrencyMode == null ? 0 : $concurrencyMode.hashCode()); | |
return result; | |
} | |
protected boolean canEqual(Object other) { | |
return other instanceof RxBukkitScheduler; | |
} | |
private final class BukkitWorker extends Worker { | |
private final CompositeSubscription allSubscriptions = new CompositeSubscription(); | |
@Override | |
public Subscription schedule(Action0 action) { | |
return schedule(action, 0, TimeUnit.MILLISECONDS); | |
} | |
@Override | |
public Subscription schedule(Action0 action, long delayTime, TimeUnit unit) { | |
final BukkitTask bukkitTask = actualSchedule(action, (int) Math.round((double) unit.toMillis(delayTime) / 50D)); | |
ScheduledAction scheduledAction = new ScheduledAction(action, allSubscriptions); | |
scheduledAction.add(Subscriptions.create(new Action0() { | |
@Override | |
public void call() { | |
bukkitTask.cancel(); | |
} | |
})); | |
return scheduledAction; | |
} | |
public void unsubscribe() { | |
this.allSubscriptions.unsubscribe(); | |
} | |
public boolean isUnsubscribed() { | |
return this.allSubscriptions.isUnsubscribed(); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment