Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
package com.mirth.stevek.rxtest;
import rx.Observable;
import rx.functions.Action1;
import rx.functions.Func1;
import rx.functions.Func3;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.*;
public class FutureTest1 {
public static void run() throws Exception {
final ExecutorService executor = new ThreadPoolExecutor(4, 4, 1, TimeUnit.MINUTES, new LinkedBlockingQueue<Runnable>());
try {
Future<String> f1 = executor.submit(new CallToRemoteServiceA());
Observable<String> f1Observable = Observable.from(f1);
Observable<String> f3Observable = f1Observable
.flatMap(new Func1<String, Observable<String>>() {
@Override
public Observable<String> call(String s) {
System.out.println("Observed from f1: " + s);
Future<String> f3 = executor.submit(new CallToRemoteServiceC(s));
return Observable.from(f3);
}
});
Future<Integer> f2 = executor.submit(new CallToRemoteServiceB());
Observable<Integer> f2Observable = Observable.from(f2);
Observable<Integer> f4Observable = f2Observable
.flatMap(new Func1<Integer, Observable<Integer>>() {
@Override
public Observable<Integer> call(Integer integer) {
System.out.println("Observed from f2: " + integer);
Future<Integer> f4 = executor.submit(new CallToRemoteServiceD(integer));
return Observable.from(f4);
}
});
Observable<Integer> f5Observable = f2Observable
.flatMap(new Func1<Integer, Observable<Integer>>() {
@Override
public Observable<Integer> call(Integer integer) {
System.out.println("Observed from f2: " + integer);
Future<Integer> f5 = executor.submit(new CallToRemoteServiceE(integer));
return Observable.from(f5);
}
});
Observable.zip(f3Observable, f4Observable, f5Observable, new Func3<String, Integer, Integer, Map<String, String>>() {
@Override
public Map<String, String> call(String s, Integer integer, Integer integer2) {
Map<String, String> map = new HashMap<String, String>();
map.put("f3", s);
map.put("f4", String.valueOf(integer));
map.put("f5", String.valueOf(integer2));
return map;
}
}).subscribe(new Action1<Map<String, String>>() {
@Override
public void call(Map<String, String> map) {
System.out.println(map.get("f3") + " => " + (Integer.valueOf(map.get("f4")) * Integer.valueOf(map.get("f5"))));
}
});
} finally {
executor.shutdownNow();
}
}
public static void main(String args[]) {
try {
long start = System.currentTimeMillis();
run();
System.out.println("Finished in: " + (System.currentTimeMillis() - start) + "ms");
} catch (Exception e) {
e.printStackTrace();
}
}
private static final class CallToRemoteServiceA implements Callable<String> {
@Override
public String call() throws Exception {
System.out.println("A called");
// simulate fetching data from remote service
Thread.sleep(100);
return "responseA";
}
}
private static final class CallToRemoteServiceB implements Callable<Integer> {
@Override
public Integer call() throws Exception {
System.out.println("B called");
// simulate fetching data from remote service
Thread.sleep(40);
return 100;
}
}
private static final class CallToRemoteServiceC implements Callable<String> {
private final String dependencyFromA;
public CallToRemoteServiceC(String dependencyFromA) {
this.dependencyFromA = dependencyFromA;
}
@Override
public String call() throws Exception {
System.out.println("C called");
// simulate fetching data from remote service
Thread.sleep(60);
return "responseB_" + dependencyFromA;
}
}
private static final class CallToRemoteServiceD implements Callable<Integer> {
private final Integer dependencyFromB;
public CallToRemoteServiceD(Integer dependencyFromB) {
this.dependencyFromB = dependencyFromB;
}
@Override
public Integer call() throws Exception {
System.out.println("D called");
// simulate fetching data from remote service
Thread.sleep(140);
return 40 + dependencyFromB;
}
}
private static final class CallToRemoteServiceE implements Callable<Integer> {
private final Integer dependencyFromB;
public CallToRemoteServiceE(Integer dependencyFromB) {
this.dependencyFromB = dependencyFromB;
}
@Override
public Integer call() throws Exception {
System.out.println("E called");
// simulate fetching data from remote service
Thread.sleep(55);
return 5000 + dependencyFromB;
}
}
}
@pollux-

This comment has been minimized.

Copy link

pollux- commented Feb 18, 2016

Awesome !!!
Thanks ..

@shriduttkothari

This comment has been minimized.

Copy link

shriduttkothari commented May 18, 2016

Wonderful !!!

@chenanze

This comment has been minimized.

Copy link

chenanze commented Jun 3, 2016

Nice !!!

@scratchboom

This comment has been minimized.

Copy link

scratchboom commented Jul 5, 2016

Is it possible to zip f1Observable and f2Observable too with f3Observable, f4Observable and f5Observable?

@finch001

This comment has been minimized.

Copy link

finch001 commented Jan 10, 2017

Amazing!!!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.