Skip to content

Instantly share code, notes, and snippets.

@benjchristensen
Last active February 24, 2022 03:02
Show Gist options
  • Star 61 You must be signed in to star a gist
  • Fork 7 You must be signed in to fork a gist
  • Save benjchristensen/04eef9ca0851f3a5d7bf to your computer and use it in GitHub Desktop.
Save benjchristensen/04eef9ca0851f3a5d7bf to your computer and use it in GitHub Desktop.
EventBus.java
import rx.Observable;
import rx.subjects.PublishSubject;
import rx.subjects.SerializedSubject;
import rx.subjects.Subject;
/**
* Simple pass-thru event bus with error handling and reconnect.
*/
public class EventBus {
public static void main(String[] args) {
MyEventBus bus = new MyEventBus();
bus.toObservable().filter(EventBus::IS_NUMBER).forEach(n -> System.out.println("Got number: " + n));
bus.toObservable().filter(EventBus::IS_STRING).forEach(System.out::println);
// something that can fail (it assumes Integer)
bus.toObservable().map(o -> {
if (((Integer) o) > 10) {
return "Greater than 10";
} else {
return "Less than or equal to 10";
}
}).doOnError(e -> System.err.println(e.getMessage()))
.retry() // reconnects to bus if an error occurs
.forEach(System.out::println);
bus.send(1);
System.out.println("-----------------------");
bus.send(11);
System.out.println("-----------------------");
bus.send(28);
System.out.println("-----------------------");
bus.send("hello");
System.out.println("-----------------------");
bus.send(5);
System.out.println("-----------------------");
bus.send("world");
System.out.println("-----------------------");
}
public static boolean IS_NUMBER(Object o) {
if (o instanceof Number) {
return true;
} else {
return false;
}
}
public static boolean IS_STRING(Object o) {
if (o instanceof String) {
return true;
} else {
return false;
}
}
public static class MyEventBus {
private final PublishSubject<Object> bus = PublishSubject.create();
/**
* If multiple threads are going to emit events to this then it must be made thread-safe like this instead:
*/
// private final Subject<Object, Object> bus = new SerializedSubject<Object, Object>(PublishSubject.create());
public void send(Object o) {
bus.onNext(o);
}
public Observable<Object> toObservable() {
return bus;
}
}
}
import rx.Observable;
import rx.schedulers.Schedulers;
import rx.subjects.PublishSubject;
/**
* Simple pass-thru event bus with parallel processing of events and error handling.
*/
public class EventBusWithParallelProcessing {
public static void main(String[] args) {
MyEventBus bus = new MyEventBus();
// perform IO in parallel for each event
bus.toObservable().flatMap(o -> {
return Observable.just(o).map(v -> {
// simulate latent network call
try { Thread.sleep(400); } catch (Exception e) {}
return "IO-Response_" + v;
}).subscribeOn(Schedulers.io());
}).forEach(result -> System.out.println("IO => " + result));
// perform computation in parallel for each event
bus.toObservable().flatMap(o -> {
return Observable.just(o).map(v -> {
// simulate expensive computation
try { Thread.sleep(200); } catch (Exception e) {}
return "Computed_" + v;
}).subscribeOn(Schedulers.computation());
}).forEach(result -> System.out.println("Computation => " + result));
// perform work for each event that sometimes results in errors
bus.toObservable().flatMap(o -> {
return Observable.just(o).map(v -> {
if(v.equals("hello")) {
throw new RuntimeException("Simulated error processing -> " + v);
}
return "Processed_" + v;
}).onErrorResumeNext(Observable.just("DefaultValueFor_" + o)).subscribeOn(Schedulers.computation());
}).forEach(result -> System.out.println("Processed => " + result));
bus.send(1);
bus.send(11);
bus.send(28);
bus.send("hello");
bus.send(5);
bus.send("world");
// Since we're doing work asynchronously above we need to wait on it
// (There are other more "idiomatic" ways of doing this without sleeping
// but they require changing to ReplaySubject or using CountdownLatches. I
// chose to stay simple and obvious for this and leave the rest of the code
// as intended for the example).
try { Thread.sleep(2000); } catch (Exception e) {}
}
public static boolean IS_NUMBER(Object o) {
if (o instanceof Number) {
return true;
} else {
return false;
}
}
public static boolean IS_STRING(Object o) {
if (o instanceof String) {
return true;
} else {
return false;
}
}
public static class MyEventBus {
private final PublishSubject<Object> bus = PublishSubject.create();
/**
* If multiple threads are going to emit events to this then it must be made thread-safe like this instead:
*/
// private final Subject<Object, Object> bus = new SerializedSubject<Object, Object>(PublishSubject.create());
public void send(Object o) {
bus.onNext(o);
}
public Observable<Object> toObservable() {
return bus;
}
}
}
@benjchristensen
Copy link
Author

EventBus.java Outputs:

Got number: 1
Less than or equal to 10
-----------------------
Got number: 11
Greater than 10
-----------------------
Got number: 28
Greater than 10
-----------------------
hello
java.lang.String cannot be cast to java.lang.Integer
-----------------------
Got number: 5
Less than or equal to 10
-----------------------
world
java.lang.String cannot be cast to java.lang.Integer
-----------------------

@benjchristensen
Copy link
Author

EventBusWithParallelProcessing.java Outputs:

Processed => Processed_1
Processed => Processed_11
Processed => Processed_28
Processed => Processed_5
Processed => Processed_world
Processed => DefaultValueFor_hello
Computation => Computed_1
Computation => Computed_11
Computation => Computed_28
Computation => Computed_hello
IO => IO-Response_1
Computation => Computed_5
IO => IO-Response_28
IO => IO-Response_11
Computation => Computed_world
IO => IO-Response_hello
IO => IO-Response_5
IO => IO-Response_world

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment