Skip to content

Instantly share code, notes, and snippets.

@sumew
Last active February 7, 2017 14:46
Show Gist options
  • Save sumew/1002ee30b2fde7035003082e42699979 to your computer and use it in GitHub Desktop.
Save sumew/1002ee30b2fde7035003082e42699979 to your computer and use it in GitHub Desktop.
import javaslang.control.Try;
import rx.Observable;
import rx.schedulers.Schedulers;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
/**
* Created by mamekuri on 2/7/17.
*/
public class ParallelTask {
int counter = 0;
List<Integer> integers = IntStream.range(0, 900).boxed().collect(Collectors.toList());
public List<Integer> top100() {
if (counter < 3) {
return integers.subList(counter * 100, counter++ * 100 + 100);
}
return new ArrayList<Integer>();
}
public void runBatchJobInParallel() {
List<Integer> fromDb = top100();
if (fromDb.isEmpty() ){
return;
}
System.out.println(String.format("running batch starting with %s, on thread %s", fromDb.get(0), Thread.currentThread().getName()));
Observable.from(fromDb)
.flatMap(s3Info -> Observable.just(s3Info).subscribeOn(Schedulers.computation())
.map(integer -> ioCall(integer))).subscribe(
i -> System.out.println(String.format("Recieved %s on %s",i, Thread.currentThread().getName())),
throwable -> new RuntimeException(throwable),
() -> runBatchJobInParallel());
}
public int ioCall(Integer value) {
System.out.println(String.format("doing intense i/o call with argument %s on thread %s", value, Thread.currentThread().getName()));
Try.run(() -> Thread.sleep(500));
if(value != 0 && value % 33 == 0)
throw new RuntimeException("it broke");
return value;
}
public static void main(String[] args) {
System.out.println("Starting job");
ParallelTask parallelTask = new ParallelTask();
parallelTask.runBatchJobInParallel();
Try.run(() -> Thread.sleep(100000l));
System.out.println("done");
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment