Skip to content

Instantly share code, notes, and snippets.

@gjesse
Last active August 29, 2015 14:22
Show Gist options
  • Save gjesse/944efd001ee9b7e4abd6 to your computer and use it in GitHub Desktop.
Save gjesse/944efd001ee9b7e4abd6 to your computer and use it in GitHub Desktop.
using RxJava to implement downtime alarms
/**
* as long as new Observations are added to _lastSeen, we
* will emit CLEARED events. If interval passes without any
* observation, a stream of BREACH events will be emitted
* until such time we see more data incoming
**/
PublishSubject<Observation> lastSeen = PublishSubject.create();
// dump incoming events in to _lastSeen.onNext()
_lastSeen = new SerializedObserver<>(lastSeen);
Observable<Observation> share = lastSeen.share();
share
// for each observation emit an OK state downstream
.doOnNext(obs -> emitState(clear(obs)))
// this only passes through an event if we haven't seen one in the timeframe
.debounce(alarm.interval(), TimeUnit.SECONDS)
.flatMap(obs ->
// this maps the breach event to a repeating observable
// of BREACHED events, either 10 or until we hear from the main
// stream again
Observable.timer(0, alarm.interval(), TimeUnit.SECONDS)
.map(i -> breached(obs))
.take(10)
.takeUntil(share)
)
.subscribe(state -> {
LOG.info("DownTime alarm triggered: {}", state);
this.emitState(state);
});
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment