Skip to content

Instantly share code, notes, and snippets.

@clydebarrow
Created May 18, 2021 10:39
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save clydebarrow/0cdd53754096942f8b242f9b0e64b2d4 to your computer and use it in GitHub Desktop.
Save clydebarrow/0cdd53754096942f8b242f9b0e64b2d4 to your computer and use it in GitHub Desktop.
Main thread scheduler for RxJava on iOS. Used with RoboVM.
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package reactivex.ios.schedulers
import io.reactivex.rxjava3.core.Scheduler
import org.robovm.apple.foundation.NSOperationQueue
import reactivex.ios.plugins.RxIosPlugins
/**
* RoboVM iOS-specific Schedulers.
*/
class IosSchedulers private constructor() {
private object MainHolder {
val DEFAULT: Scheduler = QueueScheduler(NSOperationQueue.getMainQueue())
}
companion object {
private val MAIN_THREAD: Scheduler = RxIosPlugins.initMainThreadScheduler(
{ MainHolder.DEFAULT })
/**
* A [Scheduler] which executes actions on the Android main thread.
*/
fun mainThread(): Scheduler {
return RxIosPlugins.onMainThreadScheduler(MAIN_THREAD)
}
/**
* A [Scheduler] which executes actions on `looper`.
*/
fun from(queue: NSOperationQueue?): Scheduler {
if (queue == null) throw NullPointerException("queue == null")
return QueueScheduler(queue)
}
}
init {
throw AssertionError("No instances.")
}
}
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package reactivex.ios.schedulers
import io.reactivex.rxjava3.core.Scheduler
import io.reactivex.rxjava3.disposables.Disposable
import io.reactivex.rxjava3.plugins.RxJavaPlugins
import org.robovm.apple.dispatch.DispatchQueue
import org.robovm.apple.foundation.NSBlockOperation
import org.robovm.apple.foundation.NSOperationQueue
import java.util.concurrent.TimeUnit
internal class QueueScheduler(private val operationQueue: NSOperationQueue) : Scheduler() {
private val dispatchQueue: DispatchQueue = DispatchQueue.getMainQueue()
override fun scheduleDirect(run: Runnable, delay: Long, unit: TimeUnit): Disposable {
val scheduled = ScheduledRunnable(operationQueue, RxJavaPlugins.onSchedule(run))
dispatchQueue.after(delay, unit, scheduled)
return scheduled
}
override fun createWorker(): Worker {
return HandlerWorker(operationQueue, dispatchQueue)
}
private class HandlerWorker internal constructor(private val queue: NSOperationQueue, private val dispatchQueue: DispatchQueue) : Worker() {
@Volatile
private var disposed = false
override fun schedule(run: Runnable, delay: Long, unit: TimeUnit): Disposable {
if (disposed) return Disposable.disposed()
val scheduled = ScheduledRunnable(queue, RxJavaPlugins.onSchedule(run))
dispatchQueue.after(delay, unit, scheduled)
// Re-check disposed state for removing in case we were racing a call to dispose().
if (disposed) {
scheduled.dispose()
return Disposable.disposed()
}
return scheduled
}
override fun dispose() {
disposed = true
}
override fun isDisposed(): Boolean {
return disposed
}
}
private class ScheduledRunnable internal constructor(private val queue: NSOperationQueue, delegate: Runnable?) : Runnable, Disposable {
private val blockOperation: NSBlockOperation = NSBlockOperation(delegate)
@Volatile
private var disposed = false
override fun run() {
if (!disposed) try {
queue.addOperation(blockOperation)
} catch (t: Throwable) {
val ie = IllegalStateException("Fatal Exception thrown on Scheduler.", t)
RxJavaPlugins.onError(ie)
val thread = Thread.currentThread()
thread.uncaughtExceptionHandler.uncaughtException(thread, ie)
}
}
override fun dispose() {
disposed = true
blockOperation.cancel()
}
override fun isDisposed(): Boolean {
return disposed
}
}
}
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package reactivex.ios.plugins;
import io.reactivex.rxjava3.core.Scheduler;
import io.reactivex.rxjava3.exceptions.Exceptions;
import io.reactivex.rxjava3.functions.Function;
import java.util.concurrent.Callable;
/**
* Utility class to inject handlers to certain standard RxAndroid operations.
*/
public final class RxIosPlugins {
private static volatile Function<Callable<Scheduler>, Scheduler> onInitMainThreadHandler;
private static volatile Function<Scheduler, Scheduler> onMainThreadHandler;
public static void setInitMainThreadSchedulerHandler(Function<Callable<Scheduler>, Scheduler> handler) {
onInitMainThreadHandler = handler;
}
public static Scheduler initMainThreadScheduler(Callable<Scheduler> scheduler) {
if (scheduler == null) {
throw new NullPointerException("scheduler == null");
}
Function<Callable<Scheduler>, Scheduler> f = onInitMainThreadHandler;
if (f == null) {
return callRequireNonNull(scheduler);
}
return applyRequireNonNull(f, scheduler);
}
public static void setMainThreadSchedulerHandler(Function<Scheduler, Scheduler> handler) {
onMainThreadHandler = handler;
}
public static Scheduler onMainThreadScheduler(Scheduler scheduler) {
if (scheduler == null) {
throw new NullPointerException("scheduler == null");
}
Function<Scheduler, Scheduler> f = onMainThreadHandler;
if (f == null) {
return scheduler;
}
return apply(f, scheduler);
}
/**
* Returns the current hook function.
* @return the hook function, may be null
*/
public static Function<Callable<Scheduler>, Scheduler> getInitMainThreadSchedulerHandler() {
return onInitMainThreadHandler;
}
/**
* Returns the current hook function.
* @return the hook function, may be null
*/
public static Function<Scheduler, Scheduler> getOnMainThreadSchedulerHandler() {
return onMainThreadHandler;
}
/**
* Removes all handlers and resets the default behavior.
*/
public static void reset() {
setInitMainThreadSchedulerHandler(null);
setMainThreadSchedulerHandler(null);
}
static Scheduler callRequireNonNull(Callable<Scheduler> s) {
try {
Scheduler scheduler = s.call();
if (scheduler == null) {
throw new NullPointerException("Scheduler Callable returned null");
}
return scheduler;
} catch (Throwable ex) {
throw Exceptions.propagate(ex);
}
}
static Scheduler applyRequireNonNull(Function<Callable<Scheduler>, Scheduler> f, Callable<Scheduler> s) {
Scheduler scheduler = apply(f,s);
if (scheduler == null) {
throw new NullPointerException("Scheduler Callable returned null");
}
return scheduler;
}
static <T, R> R apply(Function<T, R> f, T t) {
try {
return f.apply(t);
} catch (Throwable ex) {
throw Exceptions.propagate(ex);
}
}
private RxIosPlugins() {
throw new AssertionError("No instances.");
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment