Skip to content

Instantly share code, notes, and snippets.

@joaocsousa
Last active January 29, 2018 09:37
Show Gist options
  • Save joaocsousa/f3453026954930c203fa7a22a495a9b7 to your computer and use it in GitHub Desktop.
Save joaocsousa/f3453026954930c203fa7a22a495a9b7 to your computer and use it in GitHub Desktop.
Shared observable that will run a task. Every subscriber that subscribes while the task is running will share the same observable and receive the same result. When the tasks finishes the next subscriber will trigger a new task.
[1] subscribed at 224ms
-> Started Task at 229ms
[2] subscribed at 1230ms
[3] subscribed at 2233ms
-> Finished Task at 3231ms. Value To Deliver: 40
[1] Received result. Value: 40
[2] Received result. Value: 40
[3] Received result. Value: 40
[4] subscribed at 3239ms
-> Started Task at 3239ms
[5] subscribed at 4243ms
-> Finished Task at 6243ms. Value To Deliver: 84
[4] Received result. Value: 84
[5] Received result. Value: 84
public class Main {
public static final long START_TIME = System.currentTimeMillis();
public static void main(String[] args) throws InterruptedException {
SingletonTask singletonTask = singletonTask();
singletonTask.getObservable()
.subscribeOn(Schedulers.newThread())
.observeOn(Schedulers.single())
.subscribe(result -> System.out.println("[1] Received result. Value: " + result));
System.out.println("[1] subscribed at " + (System.currentTimeMillis() - START_TIME) + "ms");
Thread.sleep(1000);
singletonTask.getObservable()
.subscribeOn(Schedulers.newThread())
.observeOn(Schedulers.single())
.subscribe(result -> System.out.println("[2] Received result. Value: " + result));
System.out.println("[2] subscribed at " + (System.currentTimeMillis() - START_TIME) + "ms");
Thread.sleep(1000);
singletonTask.getObservable()
.subscribeOn(Schedulers.newThread())
.observeOn(Schedulers.single())
.subscribe(result -> System.out.println("[3] Received result. Value: " + result));
System.out.println("[3] subscribed at " + (System.currentTimeMillis() - START_TIME) + "ms");
Thread.sleep(1000);
singletonTask.getObservable()
.subscribeOn(Schedulers.newThread())
.observeOn(Schedulers.single())
.subscribe(result -> System.out.println("[4] Received result. Value: " + result));
System.out.println("[4] subscribed at " + (System.currentTimeMillis() - START_TIME) + "ms");
Thread.sleep(1000);
singletonTask.getObservable()
.subscribeOn(Schedulers.newThread())
.observeOn(Schedulers.single())
.subscribe(result -> System.out.println("[5] Received result. Value: " + result));
System.out.println("[5] subscribed at " + (System.currentTimeMillis() - START_TIME) + "ms");
Thread.sleep(20000);
}
}
public enum SingletonTask {
INSTANCE;
private final Random randomGenerator = new Random(System.currentTimeMillis());
private final Observable<Integer> longRunningObservable = Observable.<Integer>create(observableEmitter -> {
int result = randomGenerator.nextInt(100);
System.out.println("-> Started Task at " + (System.currentTimeMillis() - Main.START_TIME) + "ms");
Thread.sleep(3000);
System.out.println("-> Finished Task at " + (System.currentTimeMillis() - Main.START_TIME) + "ms. Value To Deliver: " + result);
observableEmitter.onNext(result);
observableEmitter.onComplete();
}).share();
public static SingletonTask singletonTask() {
return INSTANCE;
}
public Observable<Integer> getObservable() {
return longRunningObservable;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment