Skip to content

Instantly share code, notes, and snippets.

@Twister915
Last active July 28, 2021 04:59
Show Gist options
  • Star 4 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save Twister915/bd053583ffbb7a4d1e7f to your computer and use it in GitHub Desktop.
Save Twister915/bd053583ffbb7a4d1e7f to your computer and use it in GitHub Desktop.
package tech.rayline.core.rx;
import org.bukkit.plugin.Plugin;
import rx.Observable;
public abstract class BaseStreamer {
protected final Plugin plugin;
protected final RxBukkitScheduler syncScheduler, asyncScheduler;
public BaseStreamer(Plugin plugin, RxBukkitScheduler syncScheduler, RxBukkitScheduler asyncScheduler) {
this.plugin = plugin;
this.syncScheduler = syncScheduler;
this.asyncScheduler = asyncScheduler;
}
public <T> Observable.Transformer<T, T> getSyncTransformer() {
return new Observable.Transformer<T, T>() {
@Override
public Observable<T> call(Observable<T> tObservable) {
return tObservable.subscribeOn(syncScheduler);
}
};
}
public <T> Observable.Transformer<T, T> getAsyncTransformer() {
return new Observable.Transformer<T, T>() {
@Override
public Observable<T> call(Observable<T> tObservable) {
return tObservable.subscribeOn(asyncScheduler);
}
};
}
public Plugin getPlugin() {
return this.plugin;
}
public RxBukkitScheduler getSyncScheduler() {
return this.syncScheduler;
}
public RxBukkitScheduler getAsyncScheduler() {
return this.asyncScheduler;
}
public boolean equals(Object o) {
if (o == this) return true;
if (!(o instanceof BaseStreamer)) return false;
final BaseStreamer other = (BaseStreamer) o;
if (!other.canEqual((Object) this)) return false;
final Object this$plugin = this.plugin;
final Object other$plugin = other.plugin;
if (this$plugin == null ? other$plugin != null : !this$plugin.equals(other$plugin)) return false;
final Object this$syncScheduler = this.syncScheduler;
final Object other$syncScheduler = other.syncScheduler;
if (this$syncScheduler == null ? other$syncScheduler != null : !this$syncScheduler.equals(other$syncScheduler))
return false;
final Object this$asyncScheduler = this.asyncScheduler;
final Object other$asyncScheduler = other.asyncScheduler;
if (this$asyncScheduler == null ? other$asyncScheduler != null : !this$asyncScheduler.equals(other$asyncScheduler))
return false;
return true;
}
public int hashCode() {
final int PRIME = 59;
int result = 1;
final Object $plugin = this.plugin;
result = result * PRIME + ($plugin == null ? 0 : $plugin.hashCode());
final Object $syncScheduler = this.syncScheduler;
result = result * PRIME + ($syncScheduler == null ? 0 : $syncScheduler.hashCode());
final Object $asyncScheduler = this.asyncScheduler;
result = result * PRIME + ($asyncScheduler == null ? 0 : $asyncScheduler.hashCode());
return result;
}
protected boolean canEqual(Object other) {
return other instanceof BaseStreamer;
}
public String toString() {
return "tech.rayline.core.rx.BaseStreamer(plugin=" + this.plugin + ", syncScheduler=" + this.syncScheduler + ", asyncScheduler=" + this.asyncScheduler + ")";
}
}
package tech.rayline.core.rx;
public enum ConcurrencyMode {
SYNC,
ASYNC
}
package tech.rayline.core.rx;
import org.bukkit.Bukkit;
import org.bukkit.event.*;
import org.bukkit.event.server.PluginDisableEvent;
import org.bukkit.plugin.EventExecutor;
import org.bukkit.plugin.Plugin;
import org.bukkit.plugin.PluginManager;
import rx.Observable;
import rx.Subscriber;
import rx.exceptions.Exceptions;
import rx.functions.Action0;
import rx.subscriptions.Subscriptions;
public final class EventStreamer extends BaseStreamer {
public EventStreamer(Plugin plugin, RxBukkitScheduler syncScheduler, RxBukkitScheduler asyncScheduler) {
super(plugin, syncScheduler, asyncScheduler);
}
@SafeVarargs
public final <T extends Event> Observable<T> observeEvent(Class<? extends T>... events) {
return observeEvent(EventPriority.NORMAL, events);
}
@SafeVarargs
public final <T extends Event> Observable<T> observeEvent(EventPriority priority, Class<? extends T>... events) {
return observeEvent(priority, false, events);
}
@SafeVarargs
public final <T extends Event> Observable<T> observeEvent(final EventPriority priority, final boolean ignoreCancelled, final Class<? extends T>... events) {
//creates an observer which...
return Observable.create(new Observable.OnSubscribe<T>() {
@Override
public void call(final Subscriber<? super T> subscriber) {
//creates an empty listener
final Listener listener = new Listener() {
};
//creates an event executor
@SuppressWarnings("unchecked")
EventExecutor executor = new EventExecutor() {
@Override
public void execute(Listener listener1, Event event) throws EventException {
//check to make sure the event bukkit sent us can apply to what the observer is expecting (type T)
//strangely, they'll send things like the EntityDamageEvent when we want to cast to EntityDamageByEntity
//so it's best if we manually check here to make sure it can apply to any class
boolean canAssign = false; //track a "canAssign" value
Class<? extends Event> eventClass = event.getClass(); //get the class of the event
for (Class<? extends T> aClass : events) { //go through all classes we're concerned with emitting (T is their mutual supertype)
if (aClass.isAssignableFrom(eventClass)) { //if one of the classes (from the events classes arg) is assignable to the event class (is a superclass or equal to)
canAssign = true; //then we're good, and can break
break;
}
}
//if we never discovered a class which is assignable, this is one of those weird and rare cases where Bukkit is stupid
if (!canAssign)
return; //so we return
try {
//noinspection unchecked
subscriber.onNext((T) event);
} catch (Throwable t) {
Exceptions.throwOrReport(t, subscriber);
}
}
};
//registers all the event types to that listener
PluginManager pluginManager = Bukkit.getPluginManager();
for (Class<? extends T> event : events)
pluginManager.registerEvent(event, listener, priority, executor, plugin, ignoreCancelled);
//and registers a HandlerList.unregisterAll call as the unsubscribe action
subscriber.add(Subscriptions.create(new Action0() {
@Override
public void call() {
HandlerList.unregisterAll(listener);
}
}));
//also needs to unsubscribe when the plugin disables
pluginManager.registerEvent(PluginDisableEvent.class, listener, EventPriority.MONITOR, new EventExecutor() {
@Override
public void execute(Listener l, Event event) throws EventException {
PluginDisableEvent disableEvent = (PluginDisableEvent) event;
if (disableEvent.getPlugin().equals(plugin))
subscriber.onCompleted();
}
}, plugin, false);
}
}).compose(this.<T>getSyncTransformer());
}
}
package tech.rayline.core.rx;
import org.bukkit.plugin.java.JavaPlugin;
import org.bukkit.scheduler.BukkitTask;
import rx.Scheduler;
import rx.Subscription;
import rx.functions.Action0;
import rx.internal.schedulers.ScheduledAction;
import rx.subscriptions.CompositeSubscription;
import rx.subscriptions.Subscriptions;
import tech.rayline.core.util.RunnableShorthand;
import java.util.concurrent.TimeUnit;
public final class RxBukkitScheduler extends Scheduler {
private final JavaPlugin plugin;
private final ConcurrencyMode concurrencyMode;
public RxBukkitScheduler(JavaPlugin plugin, ConcurrencyMode concurrencyMode) {
this.plugin = plugin;
this.concurrencyMode = concurrencyMode;
}
private BukkitTask actualSchedule(final Action0 action, int ticksDelay) {
RunnableShorthand with = RunnableShorthand.forPlugin(plugin).with(new Runnable() {
@Override
public void run() {
action.call();
}
});
if (concurrencyMode == ConcurrencyMode.ASYNC)
with.async();
return with.later(ticksDelay);
}
@Override
public Worker createWorker() {
return new BukkitWorker();
}
public JavaPlugin getPlugin() {
return this.plugin;
}
public ConcurrencyMode getConcurrencyMode() {
return this.concurrencyMode;
}
public String toString() {
return "tech.rayline.core.rx.RxBukkitScheduler(plugin=" + this.plugin + ", concurrencyMode=" + this.concurrencyMode + ")";
}
public boolean equals(Object o) {
if (o == this) return true;
if (!(o instanceof RxBukkitScheduler)) return false;
final RxBukkitScheduler other = (RxBukkitScheduler) o;
if (!other.canEqual((Object) this)) return false;
final Object this$plugin = this.getPlugin();
final Object other$plugin = other.getPlugin();
if (this$plugin == null ? other$plugin != null : !this$plugin.equals(other$plugin)) return false;
final Object this$concurrencyMode = this.getConcurrencyMode();
final Object other$concurrencyMode = other.getConcurrencyMode();
if (this$concurrencyMode == null ? other$concurrencyMode != null : !this$concurrencyMode.equals(other$concurrencyMode))
return false;
return true;
}
public int hashCode() {
final int PRIME = 59;
int result = 1;
final Object $plugin = this.getPlugin();
result = result * PRIME + ($plugin == null ? 0 : $plugin.hashCode());
final Object $concurrencyMode = this.getConcurrencyMode();
result = result * PRIME + ($concurrencyMode == null ? 0 : $concurrencyMode.hashCode());
return result;
}
protected boolean canEqual(Object other) {
return other instanceof RxBukkitScheduler;
}
private final class BukkitWorker extends Worker {
private final CompositeSubscription allSubscriptions = new CompositeSubscription();
@Override
public Subscription schedule(Action0 action) {
return schedule(action, 0, TimeUnit.MILLISECONDS);
}
@Override
public Subscription schedule(Action0 action, long delayTime, TimeUnit unit) {
final BukkitTask bukkitTask = actualSchedule(action, (int) Math.round((double) unit.toMillis(delayTime) / 50D));
ScheduledAction scheduledAction = new ScheduledAction(action, allSubscriptions);
scheduledAction.add(Subscriptions.create(new Action0() {
@Override
public void call() {
bukkitTask.cancel();
}
}));
return scheduledAction;
}
public void unsubscribe() {
this.allSubscriptions.unsubscribe();
}
public boolean isUnsubscribed() {
return this.allSubscriptions.isUnsubscribed();
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment