Last active
February 24, 2022 03:02
-
-
Save benjchristensen/04eef9ca0851f3a5d7bf to your computer and use it in GitHub Desktop.
EventBus.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import rx.Observable; | |
import rx.subjects.PublishSubject; | |
import rx.subjects.SerializedSubject; | |
import rx.subjects.Subject; | |
/** | |
* Simple pass-thru event bus with error handling and reconnect. | |
*/ | |
public class EventBus { | |
public static void main(String[] args) { | |
MyEventBus bus = new MyEventBus(); | |
bus.toObservable().filter(EventBus::IS_NUMBER).forEach(n -> System.out.println("Got number: " + n)); | |
bus.toObservable().filter(EventBus::IS_STRING).forEach(System.out::println); | |
// something that can fail (it assumes Integer) | |
bus.toObservable().map(o -> { | |
if (((Integer) o) > 10) { | |
return "Greater than 10"; | |
} else { | |
return "Less than or equal to 10"; | |
} | |
}).doOnError(e -> System.err.println(e.getMessage())) | |
.retry() // reconnects to bus if an error occurs | |
.forEach(System.out::println); | |
bus.send(1); | |
System.out.println("-----------------------"); | |
bus.send(11); | |
System.out.println("-----------------------"); | |
bus.send(28); | |
System.out.println("-----------------------"); | |
bus.send("hello"); | |
System.out.println("-----------------------"); | |
bus.send(5); | |
System.out.println("-----------------------"); | |
bus.send("world"); | |
System.out.println("-----------------------"); | |
} | |
public static boolean IS_NUMBER(Object o) { | |
if (o instanceof Number) { | |
return true; | |
} else { | |
return false; | |
} | |
} | |
public static boolean IS_STRING(Object o) { | |
if (o instanceof String) { | |
return true; | |
} else { | |
return false; | |
} | |
} | |
public static class MyEventBus { | |
private final PublishSubject<Object> bus = PublishSubject.create(); | |
/** | |
* If multiple threads are going to emit events to this then it must be made thread-safe like this instead: | |
*/ | |
// private final Subject<Object, Object> bus = new SerializedSubject<Object, Object>(PublishSubject.create()); | |
public void send(Object o) { | |
bus.onNext(o); | |
} | |
public Observable<Object> toObservable() { | |
return bus; | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import rx.Observable; | |
import rx.schedulers.Schedulers; | |
import rx.subjects.PublishSubject; | |
/** | |
* Simple pass-thru event bus with parallel processing of events and error handling. | |
*/ | |
public class EventBusWithParallelProcessing { | |
public static void main(String[] args) { | |
MyEventBus bus = new MyEventBus(); | |
// perform IO in parallel for each event | |
bus.toObservable().flatMap(o -> { | |
return Observable.just(o).map(v -> { | |
// simulate latent network call | |
try { Thread.sleep(400); } catch (Exception e) {} | |
return "IO-Response_" + v; | |
}).subscribeOn(Schedulers.io()); | |
}).forEach(result -> System.out.println("IO => " + result)); | |
// perform computation in parallel for each event | |
bus.toObservable().flatMap(o -> { | |
return Observable.just(o).map(v -> { | |
// simulate expensive computation | |
try { Thread.sleep(200); } catch (Exception e) {} | |
return "Computed_" + v; | |
}).subscribeOn(Schedulers.computation()); | |
}).forEach(result -> System.out.println("Computation => " + result)); | |
// perform work for each event that sometimes results in errors | |
bus.toObservable().flatMap(o -> { | |
return Observable.just(o).map(v -> { | |
if(v.equals("hello")) { | |
throw new RuntimeException("Simulated error processing -> " + v); | |
} | |
return "Processed_" + v; | |
}).onErrorResumeNext(Observable.just("DefaultValueFor_" + o)).subscribeOn(Schedulers.computation()); | |
}).forEach(result -> System.out.println("Processed => " + result)); | |
bus.send(1); | |
bus.send(11); | |
bus.send(28); | |
bus.send("hello"); | |
bus.send(5); | |
bus.send("world"); | |
// Since we're doing work asynchronously above we need to wait on it | |
// (There are other more "idiomatic" ways of doing this without sleeping | |
// but they require changing to ReplaySubject or using CountdownLatches. I | |
// chose to stay simple and obvious for this and leave the rest of the code | |
// as intended for the example). | |
try { Thread.sleep(2000); } catch (Exception e) {} | |
} | |
public static boolean IS_NUMBER(Object o) { | |
if (o instanceof Number) { | |
return true; | |
} else { | |
return false; | |
} | |
} | |
public static boolean IS_STRING(Object o) { | |
if (o instanceof String) { | |
return true; | |
} else { | |
return false; | |
} | |
} | |
public static class MyEventBus { | |
private final PublishSubject<Object> bus = PublishSubject.create(); | |
/** | |
* If multiple threads are going to emit events to this then it must be made thread-safe like this instead: | |
*/ | |
// private final Subject<Object, Object> bus = new SerializedSubject<Object, Object>(PublishSubject.create()); | |
public void send(Object o) { | |
bus.onNext(o); | |
} | |
public Observable<Object> toObservable() { | |
return bus; | |
} | |
} | |
} |
EventBusWithParallelProcessing.java Outputs:
Processed => Processed_1
Processed => Processed_11
Processed => Processed_28
Processed => Processed_5
Processed => Processed_world
Processed => DefaultValueFor_hello
Computation => Computed_1
Computation => Computed_11
Computation => Computed_28
Computation => Computed_hello
IO => IO-Response_1
Computation => Computed_5
IO => IO-Response_28
IO => IO-Response_11
Computation => Computed_world
IO => IO-Response_hello
IO => IO-Response_5
IO => IO-Response_world
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
EventBus.java Outputs: