Skip to content

Instantly share code, notes, and snippets.

View murki's full-sized avatar

Miguel Juárez López murki

View GitHub Profile
// This assumes both 'repositories' return Observable<Data>
Observable<Data> results = Observable.merge(
diskRepository.getData().subscribeOn(Schedulers.io()),
networkRepository.getData().subscribeOn(Schedulers.io())
)
// DiskRepository.java
@RxLogObservable
public Observable<Data> getData() {
// implementation
}
// NetworkRepository.java
@RxLogObservable
public Observable<Data> getData() {
// implementation
// DiskRepository.java
public void saveData(Data data) {
// implementation
}
// DomainService.java
@RxLogObservable
public Observable<Data> getMergedData() {
return Observable.merge(
diskRepository.getData().subscribeOn(Schedulers.io()),
// NetworkRepository.java - no changes
@RxLogObservable
public Observable<Data> getData() {
// implementation
}
// DiskRepository.java
@RxLogObservable
public Observable<Timestamped<Data>> getData() {
// implementation
// DomainService.java
@RxLogObservable
public Observable<Data> getData() {
return getMergedData()
.onErrorReturn(new Func1<Throwable, Data>() {
@Override
public Data call(Throwable throwable) {
Log.e(CLASSNAME, "Error while fetching data. Swallowing the exception.", throwable);
return null; // We return null since we know our filter will ignore null values.
}
// We assume the repositories getData() methods will return null if no data is available
// DomainService.java
@RxLogObservable
public Observable<Data> getData() {
return getMergedData()
.filter(
new Func1<Data, Boolean>() {
@Override
public Boolean call(Data data) {
// wrapping synchronous operation in an RxJava Observable
Observable<Boolean> wipeContents(final SharedPreferences sharedPreferences) {
return Observable.fromCallable(new Callable<Boolean>() {
@Override
public Boolean call() throws Exception {
return sharedPreferences.edit().clear().commit();
}
});
}
public class SensorActivity extends Activity {
private static final String LOG_TAG = SensorActivity.class.getName();
private SensorManager sensorManager;
private Sensor accelerometer;
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
sensorManager = (SensorManager)getSystemService(SENSOR_SERVICE);
accelerometer = sensorManager.getDefaultSensor(Sensor.TYPE_ACCELEROMETER);
Observable<SensorEvent> naiveObserveSensorChanged(final SensorManager sensorManager, final Sensor sensor, final int samplingPreiodUs) {
return Observable.create(new Observable.OnSubscribe<SensorEvent>() {
@Override
public void call(final Subscriber<? super SensorEvent> subscriber) {
SensorEventListener sensorEventListener = new SensorEventListener() {
@Override
public void onSensorChanged(SensorEvent event) {
subscriber.onNext(event);
}
public Observable<SensorEvent> naiveObserveSensorChanged(final SensorManager sensorManager, final Sensor sensor, final int samplingPreiodUs) {
return Observable.create(new Observable.OnSubscribe<SensorEvent>() {
@Override
public void call(final Subscriber<? super SensorEvent> subscriber) {
final SensorEventListener sensorEventListener = new SensorEventListener() {
@Override
public void onSensorChanged(SensorEvent event) {
// (3) - checking for subscribers before emitting values
if (!subscriber.isUnsubscribed()) {
subscriber.onNext(event);