Last active
October 1, 2017 08:09
-
-
Save jemshit/f29fadfa5198bc05df28ce8b472ddb76 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import com.jakewharton.rxrelay2.BehaviorRelay; | |
import com.jakewharton.rxrelay2.Relay; | |
import io.reactivex.Observable; | |
import java.util.concurrent.TimeUnit; | |
public class ColdReactiveDataExpiration { | |
// Problems to solve: | |
// 1- UI components should be notified when data is updated | |
// 2- Offline support, presentation layer always listens to local data source. So whenever DB is updated, UI should be notified. | |
// 3- Error propagation to UI when ... | |
// 4- Sometimes get fresh data from API | |
private static class LocalDataSource { | |
private String profile = "John"; | |
Observable<String> getProfile() { | |
return Observable.just(profile); | |
} | |
void setProfile(String profile) { | |
this.profile = profile; | |
} | |
} | |
private static class RemoteDataSource { | |
private String profile = "David"; | |
Observable<String> getProfile() { | |
return Observable.just(profile); | |
} | |
} | |
private static class ProfileRepository { | |
private LocalDataSource localDataSource = new LocalDataSource(); | |
private RemoteDataSource remoteDataSource = new RemoteDataSource(); | |
private Relay<String> relay = BehaviorRelay.<String>create().toSerialized(); | |
Observable<String> getProfile() { | |
return Observable.concat(Observable.defer(() -> localDataSource.getProfile()), Observable.defer(() -> remoteDataSource.getProfile())) | |
.filter(profile -> !profile.equalsIgnoreCase("expired")) // simulating if local data source is expired | |
.take(1) // get only from first Observable source which passes filter | |
.compose(upstream -> upstream.doOnNext(profile -> relay.accept(profile))) | |
.flatMap(profile -> relay.hide().distinctUntilChanged()); // to not to get same item (state) twice | |
} | |
void updateProfileLocalOnly(String profile) { | |
localDataSource.setProfile(profile); | |
relay.accept(profile); | |
} | |
} | |
public static void main(String[] args) throws InterruptedException { | |
ProfileRepository repository = new ProfileRepository(); | |
// Listen to profile | |
repository.getProfile() | |
.subscribe(profile -> System.out.println("Observer1 got profile: " + profile)); | |
// Update profile from somewhere | |
Observable.intervalRange(0, 5, 1, 1, TimeUnit.SECONDS) // start, count, initialDelay, period, unit | |
.doOnNext(integer -> System.out.println("Updating profile from some place")) | |
.subscribe(integer -> { | |
if (integer == 3) | |
repository.updateProfileLocalOnly("expired"); // simulate expiration of local data source | |
else | |
repository.updateProfileLocalOnly("Agent " + integer); // update data | |
}); | |
// Second Observer listens to the same data after a delay. Should get last item emitted + new updates | |
Observable.defer(() -> repository.getProfile()) | |
.delaySubscription(4, TimeUnit.SECONDS) // delays subscription (for only cold observable i guess) | |
.subscribe(profile -> System.out.println("Observer2 got profile: " + profile)); | |
Thread.sleep(7000); | |
// Prints: | |
/* Observer1 got profile: John | |
Updating profile from some place | |
Observer1 got profile: Agent 0 | |
Updating profile from some place | |
Observer1 got profile: Agent 1 | |
Updating profile from some place | |
Observer1 got profile: Agent 2 | |
Updating profile from some place | |
Observer1 got profile: expired | |
Observer1 got profile: David | |
Observer2 got profile: David | |
Updating profile from some place | |
Observer1 got profile: Agent 4 | |
Observer2 got profile: Agent 4 | |
*/ | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment