Created
October 15, 2017 16:57
-
-
Save willmenn/2ff844c31d624e6d2dfc39a328b5ed43 to your computer and use it in GitHub Desktop.
RxJava Multiples Threads to process a List
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.willmenn.rxjava; | |
import org.springframework.util.StopWatch; | |
import rx.Observable; | |
import rx.Scheduler; | |
import rx.schedulers.Schedulers; | |
import java.util.List; | |
import java.util.concurrent.ExecutorService; | |
import java.util.concurrent.Executors; | |
import static com.google.common.collect.Lists.newArrayList; | |
public class RxJavaMultipleArrays { | |
public static void main(String[] args) throws InterruptedException { | |
List<List<Integer>> integerListList = getListsOfLists(); | |
StopWatch watch = new StopWatch(); | |
watch.start("array"); | |
int threadCount = Runtime.getRuntime().availableProcessors(); | |
System.out.println(threadCount); | |
ExecutorService executorService = Executors.newFixedThreadPool(threadCount); | |
Scheduler scheduler = Schedulers.from(executorService); | |
Observable<List<Integer>> from = Observable.from(integerListList); | |
from | |
.flatMap(val -> Observable.just(val) | |
.flatMap(integers -> Observable.from(integers)) | |
.subscribeOn(scheduler) | |
.map(i -> calculation(i)) | |
) | |
.subscribe(val -> System.out.println(val)); | |
watch.stop(); | |
System.out.println("\n"); | |
System.out.println(watch.getLastTaskInfo().getTimeSeconds()); | |
} | |
public static List<List<Integer>> getListsOfLists() { | |
List<List<Integer>> integerListList = newArrayList(); | |
for (int i = 0; i < 100; i++) { | |
List<Integer> integerList = newArrayList(); | |
for (int j = 0; j < 100; j++) { | |
integerList.add(j); | |
} | |
integerListList.add(integerList); | |
} | |
return integerListList; | |
} | |
public static int calculation(int i) { | |
try { | |
System.out.println("Calculating " + i + | |
" on " + Thread.currentThread().getName()); | |
Thread.sleep(100); | |
return i; | |
} catch (InterruptedException e) { | |
throw new RuntimeException(e); | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.willmenn.rxjava; | |
import org.springframework.util.StopWatch; | |
import rx.Observable; | |
import rx.Scheduler; | |
import rx.schedulers.Schedulers; | |
import java.util.List; | |
import java.util.concurrent.ExecutorService; | |
import java.util.concurrent.Executors; | |
import static com.google.common.collect.Lists.newArrayList; | |
public class RxJavaTest { | |
public static void main(String[] args) throws InterruptedException { | |
List<Integer> integerList = createList(); //Creating a List | |
StopWatch watch = new StopWatch(); //from Spring | |
int threadCount = Runtime.getRuntime().availableProcessors(); //Get how many Threads I can run with. | |
System.out.println(threadCount); | |
watch.start("array"); | |
ExecutorService executorService = Executors.newFixedThreadPool(threadCount); | |
Scheduler scheduler = Schedulers.from(executorService); //Creating a Scheduler | |
Observable<Integer> just = Observable.from(integerList); | |
just.flatMap(val -> Observable.just(val) | |
.observeOn(scheduler) | |
.map(i -> calculation(i)) | |
).doOnCompleted(() -> { | |
watch.stop(); //Stopping the watch. | |
executorService.shutdown(); //Shutdown the threads. | |
}).toList().toBlocking().single(); | |
System.out.println("\n"); | |
if (!watch.isRunning()) { | |
System.out.println(watch.getLastTaskInfo().getTimeSeconds()); // there is a possibility that the watch isn't close. | |
} | |
} | |
public static List<Integer> createList() { | |
List<Integer> integerList = newArrayList(); | |
for (int i = 0; i < 100; i++) { | |
integerList.add(i); | |
} | |
return integerList; | |
} | |
// Simulating a heavy calculation | |
public static int calculation(int i) { | |
try { | |
System.out.println("Calculating " + i + | |
" on " + Thread.currentThread().getName()); | |
Thread.sleep(100); | |
return i; | |
} catch (InterruptedException e) { | |
throw new RuntimeException(e); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment