Skip to content

Instantly share code, notes, and snippets.

@stephanenicolas
Last active May 6, 2017 07:23
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save stephanenicolas/3d0674cca56f0acf327ae2e3e6bf5a72 to your computer and use it in GitHub Desktop.
Save stephanenicolas/3d0674cca56f0acf327ae2e3e6bf5a72 to your computer and use it in GitHub Desktop.
Updates an adapter concurrently using Rx-managed Redux
import io.reactivex.Maybe;
import io.reactivex.MaybeSource;
import io.reactivex.Observable;
import io.reactivex.ObservableTransformer;
import io.reactivex.disposables.Disposable;
import io.reactivex.functions.Consumer;
import io.reactivex.functions.Function;
import io.reactivex.subjects.BehaviorSubject;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;
import rx.exceptions.OnErrorNotImplementedException;
import static io.reactivex.Observable.merge;
import static io.reactivex.schedulers.Schedulers.computation;
import static io.reactivex.schedulers.Schedulers.single;
public class Main {
public static void main(String[] args) {
final int controllerCount = 10;
FeatureController[] featureControllers = createControllers(controllerCount);
Adapter adapter = new Adapter();
Observable<Long> storeObservable = createStore();
storeObservable.doOnSubscribe(registerFeatureControllers(adapter, featureControllers))
.compose(buildItemsConcurrently(featureControllers))
.subscribeOn(computation())
.observeOn(single())
.subscribe(updateItems(adapter),//
Main::crash);
sleep(10000);
}
public static class FeatureController {
private int id;
public FeatureController(int id) {
this.id = id;
}
//can return null if there is no update to be performed
public List<Object> updateItemList(long bigModel) {
return Arrays.asList(bigModel * 2 + id, bigModel * 2 + id + 1);
}
}
public static class FeatureItemsUpdateEvent {
public int featureControllerId;
public List<Object> items;
public FeatureItemsUpdateEvent(int featureControllerId, List<Object> items) {
this.featureControllerId = featureControllerId;
this.items = items;
}
}
public static class Adapter {
List<List<Object>> itemList = new ArrayList<>();
public void registerFeatureControllers(FeatureController... featureControllers) {
while (itemList.size() < featureControllers.length) {
itemList.add(null);
}
}
public void updateFeatureItems(int featureControllerId, List<Object> items) {
itemList.set(featureControllerId, items);
display();
}
public void display() {
for (int featureControllerId = 0; featureControllerId < itemList.size(); featureControllerId++) {
List<Object> items = itemList.get(featureControllerId);
System.out.println(featureControllerId + " - " + items);
}
System.out.println();
}
}
private static Consumer<? super Disposable> registerFeatureControllers(Adapter adapter, FeatureController... featureControllers) {
return ignore -> adapter.registerFeatureControllers(featureControllers);
}
private static ObservableTransformer<Long, FeatureItemsUpdateEvent> buildItemsConcurrently(FeatureController... featureControllers) {
return source -> {
List<Function<Long, MaybeSource<FeatureItemsUpdateEvent>>> list = new ArrayList<>();
for (FeatureController featureController : featureControllers) {
list.add(toFeatureItemsUpdateEventObservable(featureController));
}
Function<Long, MaybeSource<FeatureItemsUpdateEvent>>[] transformers = list.toArray(new Function[0]);
return source.compose(composeConcurrently(transformers));
};
}
private static <T, R> ObservableTransformer<T, R> composeConcurrently(Function<? super T, ? extends MaybeSource<R>>... transformers) {
return source -> source.publish(shared -> {
List<Observable<R>> list = new ArrayList<>();
for (Function<? super T, ? extends MaybeSource<R>> transformer : transformers) {
list.add(shared.flatMapMaybe(transformer));
}
return merge(list);
});
}
private static Function<Long, MaybeSource<FeatureItemsUpdateEvent>> toFeatureItemsUpdateEventObservable(FeatureController featureController) {
return bigModel -> justFeatureItemsUpdateEvent(bigModel, featureController);
}
private static Maybe<FeatureItemsUpdateEvent> justFeatureItemsUpdateEvent(Long bigModel, FeatureController featureController) {
return Maybe //
.just(featureController.updateItemList(bigModel)) //
.filter(it -> it != null) //
.map(it -> new FeatureItemsUpdateEvent(featureController.id, it));
}
private static Consumer<FeatureItemsUpdateEvent> updateItems(Adapter adapter) {
return event -> adapter.updateFeatureItems(event.featureControllerId, event.items);
}
private static Observable<Long> createStore() {
return BehaviorSubject.interval(1, TimeUnit.SECONDS);
}
private static FeatureController[] createControllers(int controllerCount) {
FeatureController[] featureControllers = new FeatureController[controllerCount];
for (int controllerIndex = 0; controllerIndex < controllerCount; controllerIndex++) {
featureControllers[controllerIndex] = new FeatureController(controllerIndex);
}
return featureControllers;
}
private static void crash(Throwable error) {
throw new OnErrorNotImplementedException(error);
}
private static void sleep(int millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment