Skip to content

Instantly share code, notes, and snippets.

@alacambra
Created December 20, 2016 05:19
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save alacambra/19d43a280e139f9d4b588a7cf1ce45dd to your computer and use it in GitHub Desktop.
Save alacambra/19d43a280e139f9d4b588a7cf1ce45dd to your computer and use it in GitHub Desktop.
import rx.Emitter;
import rx.Observable;
import rx.observables.GroupedObservable;
import java.util.*;
import java.util.function.Consumer;
/**
* Created by alacambra on 19/12/2016.
*/
public class Test {
static int t = 0;
static int generated = 0;
public static class Generator {
Collection<Consumer<Integer>> consumers = Collections.synchronizedCollection(new ArrayList<>());
Random r = new Random();
public void addConsumer(Consumer<Integer> consumer) {
consumers.add(consumer);
}
public Generator() {
TimerTask task = new TimerTask() {
public void run() {
int i = generated++;
System.out.println("Generating.... " + i);
consumers.forEach(c -> c.accept(i));
}
};
Timer timer = new Timer(false);
timer.schedule(task, 0, 2000);
}
}
public static void main(String[] args) {
Generator g = new Generator();
Observable<Integer> s = Observable.fromEmitter(emitter -> {
Consumer<Integer> ci = integer -> emitter.onNext(integer);
g.addConsumer(ci);
}, Emitter.BackpressureMode.BUFFER);
Observable<GroupedObservable<Boolean, Integer>> g1 = s.groupBy(i -> i % 2 == 0);
TimerTask task = new TimerTask() {
public void run() {
int id = t;
System.out.println("new subscriber " + id);
g1.subscribe(i -> {
i.subscribe(v -> System.out.println("receiving number " + id + ":" + i.getKey() + "," + v));
});
t++;
}
};
Timer timer = new Timer(false);
timer.schedule(task, 5000, 10000);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment