Navigation Menu

Skip to content

Instantly share code, notes, and snippets.

@jemshit
Last active October 1, 2017 08:09
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jemshit/f29fadfa5198bc05df28ce8b472ddb76 to your computer and use it in GitHub Desktop.
Save jemshit/f29fadfa5198bc05df28ce8b472ddb76 to your computer and use it in GitHub Desktop.
import com.jakewharton.rxrelay2.BehaviorRelay;
import com.jakewharton.rxrelay2.Relay;
import io.reactivex.Observable;
import java.util.concurrent.TimeUnit;
public class ColdReactiveDataExpiration {
// Problems to solve:
// 1- UI components should be notified when data is updated
// 2- Offline support, presentation layer always listens to local data source. So whenever DB is updated, UI should be notified.
// 3- Error propagation to UI when ...
// 4- Sometimes get fresh data from API
private static class LocalDataSource {
private String profile = "John";
Observable<String> getProfile() {
return Observable.just(profile);
}
void setProfile(String profile) {
this.profile = profile;
}
}
private static class RemoteDataSource {
private String profile = "David";
Observable<String> getProfile() {
return Observable.just(profile);
}
}
private static class ProfileRepository {
private LocalDataSource localDataSource = new LocalDataSource();
private RemoteDataSource remoteDataSource = new RemoteDataSource();
private Relay<String> relay = BehaviorRelay.<String>create().toSerialized();
Observable<String> getProfile() {
return Observable.concat(Observable.defer(() -> localDataSource.getProfile()), Observable.defer(() -> remoteDataSource.getProfile()))
.filter(profile -> !profile.equalsIgnoreCase("expired")) // simulating if local data source is expired
.take(1) // get only from first Observable source which passes filter
.compose(upstream -> upstream.doOnNext(profile -> relay.accept(profile)))
.flatMap(profile -> relay.hide().distinctUntilChanged()); // to not to get same item (state) twice
}
void updateProfileLocalOnly(String profile) {
localDataSource.setProfile(profile);
relay.accept(profile);
}
}
public static void main(String[] args) throws InterruptedException {
ProfileRepository repository = new ProfileRepository();
// Listen to profile
repository.getProfile()
.subscribe(profile -> System.out.println("Observer1 got profile: " + profile));
// Update profile from somewhere
Observable.intervalRange(0, 5, 1, 1, TimeUnit.SECONDS) // start, count, initialDelay, period, unit
.doOnNext(integer -> System.out.println("Updating profile from some place"))
.subscribe(integer -> {
if (integer == 3)
repository.updateProfileLocalOnly("expired"); // simulate expiration of local data source
else
repository.updateProfileLocalOnly("Agent " + integer); // update data
});
// Second Observer listens to the same data after a delay. Should get last item emitted + new updates
Observable.defer(() -> repository.getProfile())
.delaySubscription(4, TimeUnit.SECONDS) // delays subscription (for only cold observable i guess)
.subscribe(profile -> System.out.println("Observer2 got profile: " + profile));
Thread.sleep(7000);
// Prints:
/* Observer1 got profile: John
Updating profile from some place
Observer1 got profile: Agent 0
Updating profile from some place
Observer1 got profile: Agent 1
Updating profile from some place
Observer1 got profile: Agent 2
Updating profile from some place
Observer1 got profile: expired
Observer1 got profile: David
Observer2 got profile: David
Updating profile from some place
Observer1 got profile: Agent 4
Observer2 got profile: Agent 4
*/
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment