Skip to content

Instantly share code, notes, and snippets.

@NikitaKozlov
Last active August 31, 2017 21:26
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save NikitaKozlov/52f47898b920132e418899b42f87ad2f to your computer and use it in GitHub Desktop.
Save NikitaKozlov/52f47898b920132e418899b42f87ad2f to your computer and use it in GitHub Desktop.
inputStream.groupBy(Event::getItemId)
.subscribe(substream ->
substream.onBackpressureLatest()
.subscribe(new Subscriber<Event>() {
public void onNext(Event event) {
if (shouldSkipEvent(event)) {
outputStream.onNext(Response.createSkippedResponse());
request(1);
} else {
saveLastLaunchedEvent(event);
launchNetworkCall(event).doOnSuccess(outputStream::onNext)
.subscribe(ignored -> request(1));
}
}
}
}));
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment