Skip to content

Instantly share code, notes, and snippets.

@willmenn
Created October 15, 2017 16:57
Show Gist options
  • Save willmenn/2ff844c31d624e6d2dfc39a328b5ed43 to your computer and use it in GitHub Desktop.
Save willmenn/2ff844c31d624e6d2dfc39a328b5ed43 to your computer and use it in GitHub Desktop.
RxJava Multiples Threads to process a List
package com.willmenn.rxjava;
import org.springframework.util.StopWatch;
import rx.Observable;
import rx.Scheduler;
import rx.schedulers.Schedulers;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import static com.google.common.collect.Lists.newArrayList;
public class RxJavaMultipleArrays {
public static void main(String[] args) throws InterruptedException {
List<List<Integer>> integerListList = getListsOfLists();
StopWatch watch = new StopWatch();
watch.start("array");
int threadCount = Runtime.getRuntime().availableProcessors();
System.out.println(threadCount);
ExecutorService executorService = Executors.newFixedThreadPool(threadCount);
Scheduler scheduler = Schedulers.from(executorService);
Observable<List<Integer>> from = Observable.from(integerListList);
from
.flatMap(val -> Observable.just(val)
.flatMap(integers -> Observable.from(integers))
.subscribeOn(scheduler)
.map(i -> calculation(i))
)
.subscribe(val -> System.out.println(val));
watch.stop();
System.out.println("\n");
System.out.println(watch.getLastTaskInfo().getTimeSeconds());
}
public static List<List<Integer>> getListsOfLists() {
List<List<Integer>> integerListList = newArrayList();
for (int i = 0; i < 100; i++) {
List<Integer> integerList = newArrayList();
for (int j = 0; j < 100; j++) {
integerList.add(j);
}
integerListList.add(integerList);
}
return integerListList;
}
public static int calculation(int i) {
try {
System.out.println("Calculating " + i +
" on " + Thread.currentThread().getName());
Thread.sleep(100);
return i;
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
package com.willmenn.rxjava;
import org.springframework.util.StopWatch;
import rx.Observable;
import rx.Scheduler;
import rx.schedulers.Schedulers;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import static com.google.common.collect.Lists.newArrayList;
public class RxJavaTest {
public static void main(String[] args) throws InterruptedException {
List<Integer> integerList = createList(); //Creating a List
StopWatch watch = new StopWatch(); //from Spring
int threadCount = Runtime.getRuntime().availableProcessors(); //Get how many Threads I can run with.
System.out.println(threadCount);
watch.start("array");
ExecutorService executorService = Executors.newFixedThreadPool(threadCount);
Scheduler scheduler = Schedulers.from(executorService); //Creating a Scheduler
Observable<Integer> just = Observable.from(integerList);
just.flatMap(val -> Observable.just(val)
.observeOn(scheduler)
.map(i -> calculation(i))
).doOnCompleted(() -> {
watch.stop(); //Stopping the watch.
executorService.shutdown(); //Shutdown the threads.
}).toList().toBlocking().single();
System.out.println("\n");
if (!watch.isRunning()) {
System.out.println(watch.getLastTaskInfo().getTimeSeconds()); // there is a possibility that the watch isn't close.
}
}
public static List<Integer> createList() {
List<Integer> integerList = newArrayList();
for (int i = 0; i < 100; i++) {
integerList.add(i);
}
return integerList;
}
// Simulating a heavy calculation
public static int calculation(int i) {
try {
System.out.println("Calculating " + i +
" on " + Thread.currentThread().getName());
Thread.sleep(100);
return i;
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment