Skip to content

Instantly share code, notes, and snippets.

@willmenn
Created March 28, 2019 06:59
Show Gist options
  • Save willmenn/d734c956caf703898ffe3a81abfc21b4 to your computer and use it in GitHub Desktop.
Save willmenn/d734c956caf703898ffe3a81abfc21b4 to your computer and use it in GitHub Desktop.
RxJava Fanout with network request
public static void main(String[] args) {
int threadCount = Runtime.getRuntime().availableProcessors();
System.out.println("Thread Count:" + threadCount);
ExecutorService executorService = Executors.newFixedThreadPool(threadCount);
Scheduler scheduler = Schedulers.from(executorService);
Observable.fromArray(buildNetworkUriCalls("oi").toArray())
.flatMap(url -> getNetworkFutureAndObservable(url)
.subscribeOn(scheduler)
.map(s -> {
System.out.println(Thread.currentThread().getName() + " | " + s);
return s;
}).map(s -> {
System.out.println("insede thread: " + s);
return s;
}))
.doOnComplete(executorService::shutdown)
.subscribe(System.out::println);//Shutdown the threads.
}
public static List<String> buildNetworkUriCalls(String z) {
List<String> obs = new ArrayList<>();
for (int i = 0; i < 10; i++) {
final String s = z + "-11-" + i;
obs.add(s);
}
return obs;
}
public static Observable getNetworkObservsable(Object s) {
return Observable.just("url -> : " + s);
}
public static Observable getNetworkFutureAndObservable(Object s) {
return Observable.fromFuture(CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "url -> : " + s;
}));
}
public static CompletableFuture getNetwork(Object s) {
return CompletableFuture.supplyAsync(() -> "url -> : " + s);
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment