Skip to content

Instantly share code, notes, and snippets.

@dimsuz
Created March 14, 2016 13:34
Show Gist options
  • Save dimsuz/a942288ab27d518ba437 to your computer and use it in GitHub Desktop.
Save dimsuz/a942288ab27d518ba437 to your computer and use it in GitHub Desktop.
package com.example.observable_queue;
import android.support.annotation.Nullable;
import java.util.Arrays;
import rx.Observable;
import rx.Scheduler;
import rx.android.schedulers.AndroidSchedulers;
import rx.subjects.BehaviorSubject;
import rx.subjects.PublishSubject;
import rx.subjects.SerializedSubject;
import timber.log.Timber;
/**
* Contains functions for scheduling Observable execution in serial order - one after another
*/
public final class ObservableQueue {
private final SerializedSubject<Operation, Operation> operationQueue = PublishSubject.<Operation>create().toSerialized();
private final SerializedSubject<int[], int[]> nextOperations = BehaviorSubject.create(new int[0]).toSerialized();
private final SerializedSubject<CompletedOperation, CompletedOperation> completedOperations = BehaviorSubject.<CompletedOperation>create().toSerialized();
public ObservableQueue(Scheduler operationScheduler) {
operationQueue
.concatMap(operation -> nextOperations
.filter(idQueue -> idQueue.length > 0 && idQueue[0] == operation.id)
.take(1) // (!! otherwise will stay continuously listening)
.map(v -> operation))
.subscribe(operation -> {
Timber.d("running next operation in queue: %s", operation);
operation.observable
.subscribeOn(operationScheduler)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(
v -> {
completedOperations.onNext(new CompletedOperation(operation.id, operation.tag, v));
removeFirstFromIdQueue(operation.id);
},
e -> {
completedOperations.onNext(new CompletedOperation(operation.id, operation.tag, e));
removeFirstFromIdQueue(operation.id);
});
}
);
// debug output
completedOperations
.subscribe(op -> {
if (op.error != null) {
Timber.d("operation %s completed with ERROR", op);
} else {
Timber.d("operation %s completed", op);
//noinspection unchecked
}
});
}
private void removeFirstFromIdQueue(int operationId) {
nextOperations.take(1).subscribe(idQueue -> {
if (idQueue.length == 0 || idQueue[0] != operationId) {
throw new RuntimeException("expected first id in queue to be " + operationId);
}
int[] newQueue;
if (idQueue.length == 1) {
newQueue = new int[0];
} else {
newQueue = Arrays.copyOfRange(idQueue, 1, idQueue.length);
}
nextOperations.onNext(newQueue);
});
}
private void addOperationToIdQueue(int operationId) {
nextOperations
.take(1)
.subscribe(idQueue -> {
int[] newQueue = Arrays.copyOf(idQueue, idQueue.length + 1);
newQueue[newQueue.length - 1] = operationId;
nextOperations.onNext(newQueue);
});
}
/**
* Schedules <em>observable</em> to be performed on operations scheduler.
* Subscription to this observable will be delayed until there are no other observables executing
*
* @return observable which will emit result or error. It is not necessary to subscribe to it for operation
* to be completed, it is provided only when result needs to be obtained.
*/
public <T> Observable<T> scheduleOperation(String tag, Observable<T> observable) {
Operation operation = new Operation(tag, observable);
addOperationToIdQueue(operation.id);
operationQueue.onNext(operation);
return completedOperations
.takeFirst(op -> op.id == operation.id)
.flatMap(op -> {
//noinspection unchecked
return op.error == null ? Observable.just((T) op.result) : Observable.error(op.error);
});
}
private static class Operation {
private static int NEXT_ID = 0;
final int id;
final String tag;
final Observable<?> observable;
Operation(String tag, Observable<?> observable) {
this.id = NEXT_ID++;
this.tag = tag;
this.observable = observable;
}
public String toString() {
return "[" + tag + "-" + id + "]";
}
}
private static class CompletedOperation {
final int id;
final String tag;
final Object result;
@Nullable
final Throwable error;
CompletedOperation(int id, String tag, Object result) {
this.id = id;
this.tag = tag;
this.result = result;
this.error = null;
}
@SuppressWarnings("NullableProblems")
CompletedOperation(int id, String tag, Throwable error) {
this.id = id;
this.tag = tag;
this.result = null;
this.error = error;
}
public String toString() {
return "[" + tag + "-" + id + "]";
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment