Created
March 14, 2016 13:34
-
-
Save dimsuz/a942288ab27d518ba437 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.example.observable_queue; | |
import android.support.annotation.Nullable; | |
import java.util.Arrays; | |
import rx.Observable; | |
import rx.Scheduler; | |
import rx.android.schedulers.AndroidSchedulers; | |
import rx.subjects.BehaviorSubject; | |
import rx.subjects.PublishSubject; | |
import rx.subjects.SerializedSubject; | |
import timber.log.Timber; | |
/** | |
* Contains functions for scheduling Observable execution in serial order - one after another | |
*/ | |
public final class ObservableQueue { | |
private final SerializedSubject<Operation, Operation> operationQueue = PublishSubject.<Operation>create().toSerialized(); | |
private final SerializedSubject<int[], int[]> nextOperations = BehaviorSubject.create(new int[0]).toSerialized(); | |
private final SerializedSubject<CompletedOperation, CompletedOperation> completedOperations = BehaviorSubject.<CompletedOperation>create().toSerialized(); | |
public ObservableQueue(Scheduler operationScheduler) { | |
operationQueue | |
.concatMap(operation -> nextOperations | |
.filter(idQueue -> idQueue.length > 0 && idQueue[0] == operation.id) | |
.take(1) // (!! otherwise will stay continuously listening) | |
.map(v -> operation)) | |
.subscribe(operation -> { | |
Timber.d("running next operation in queue: %s", operation); | |
operation.observable | |
.subscribeOn(operationScheduler) | |
.observeOn(AndroidSchedulers.mainThread()) | |
.subscribe( | |
v -> { | |
completedOperations.onNext(new CompletedOperation(operation.id, operation.tag, v)); | |
removeFirstFromIdQueue(operation.id); | |
}, | |
e -> { | |
completedOperations.onNext(new CompletedOperation(operation.id, operation.tag, e)); | |
removeFirstFromIdQueue(operation.id); | |
}); | |
} | |
); | |
// debug output | |
completedOperations | |
.subscribe(op -> { | |
if (op.error != null) { | |
Timber.d("operation %s completed with ERROR", op); | |
} else { | |
Timber.d("operation %s completed", op); | |
//noinspection unchecked | |
} | |
}); | |
} | |
private void removeFirstFromIdQueue(int operationId) { | |
nextOperations.take(1).subscribe(idQueue -> { | |
if (idQueue.length == 0 || idQueue[0] != operationId) { | |
throw new RuntimeException("expected first id in queue to be " + operationId); | |
} | |
int[] newQueue; | |
if (idQueue.length == 1) { | |
newQueue = new int[0]; | |
} else { | |
newQueue = Arrays.copyOfRange(idQueue, 1, idQueue.length); | |
} | |
nextOperations.onNext(newQueue); | |
}); | |
} | |
private void addOperationToIdQueue(int operationId) { | |
nextOperations | |
.take(1) | |
.subscribe(idQueue -> { | |
int[] newQueue = Arrays.copyOf(idQueue, idQueue.length + 1); | |
newQueue[newQueue.length - 1] = operationId; | |
nextOperations.onNext(newQueue); | |
}); | |
} | |
/** | |
* Schedules <em>observable</em> to be performed on operations scheduler. | |
* Subscription to this observable will be delayed until there are no other observables executing | |
* | |
* @return observable which will emit result or error. It is not necessary to subscribe to it for operation | |
* to be completed, it is provided only when result needs to be obtained. | |
*/ | |
public <T> Observable<T> scheduleOperation(String tag, Observable<T> observable) { | |
Operation operation = new Operation(tag, observable); | |
addOperationToIdQueue(operation.id); | |
operationQueue.onNext(operation); | |
return completedOperations | |
.takeFirst(op -> op.id == operation.id) | |
.flatMap(op -> { | |
//noinspection unchecked | |
return op.error == null ? Observable.just((T) op.result) : Observable.error(op.error); | |
}); | |
} | |
private static class Operation { | |
private static int NEXT_ID = 0; | |
final int id; | |
final String tag; | |
final Observable<?> observable; | |
Operation(String tag, Observable<?> observable) { | |
this.id = NEXT_ID++; | |
this.tag = tag; | |
this.observable = observable; | |
} | |
public String toString() { | |
return "[" + tag + "-" + id + "]"; | |
} | |
} | |
private static class CompletedOperation { | |
final int id; | |
final String tag; | |
final Object result; | |
@Nullable | |
final Throwable error; | |
CompletedOperation(int id, String tag, Object result) { | |
this.id = id; | |
this.tag = tag; | |
this.result = result; | |
this.error = null; | |
} | |
@SuppressWarnings("NullableProblems") | |
CompletedOperation(int id, String tag, Throwable error) { | |
this.id = id; | |
this.tag = tag; | |
this.result = null; | |
this.error = error; | |
} | |
public String toString() { | |
return "[" + tag + "-" + id + "]"; | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment